本文介绍如何使用湖屋作为数据源,在 Microsoft Fabric 中设置数据代理。 为了说明该过程,我们首先创建一个湖屋,然后向其添加数据。 然后,我们将创建一个 Fabric 数据代理,并将湖屋配置为其数据源。 如果你已经拥有一个 Power BI 语义模型(具备必要的读/写权限)、一个数据仓库或一个 KQL 数据库,则可以在创建 Fabric 数据代理后,按照相同的步骤添加数据源。 虽然此处显示的步骤侧重于湖屋,但过程与其他数据源类似,你只需根据特定选择进行调整。
重要
此功能目前为预览版。
先决条件
- 付费 F2 或更高版本的 Fabric 容量资源
- Fabric 数据代理租户设置已启用。
- 已启用 Copilot 租户切换。
- 启用了 AI 的跨地理位置处理 。
- AI 的跨地理存储 功能已启用。
- 至少具备其中一项:数据仓库、湖屋、一个或多个 Power BI 语义模型或含数据的 KQL 数据库。
- Power BI 语义模型通过 XMLA 终结点租户交换机 为 Power BI 语义模型数据源启用。
使用 AdventureWorksLH 创建湖屋
首先,创建湖屋并使用必要的数据进行填充。
如果在湖仓(或仓库)中已有 AdventureWorksLH 实例,则可以跳过此步骤。 如果没有,可以利用 Fabric 笔记本中的以下说明来使用数据填充湖屋。
在要创建 Fabric 数据代理的工作区中创建新的笔记本。
在“资源管理器”窗格左侧,选择“+ 数据源”。 使用此选项,可以添加现有的湖屋或创建新湖屋。 为清楚起见,请创建新湖屋并为其分配名称。
在顶部单元格中,添加以下代码片段:
import pandas as pd from tqdm.auto import tqdm base = "https://synapseaisolutionsa.z13.web.core.windows.net/data/AdventureWorks" # load list of tables df_tables = pd.read_csv(f"{base}/adventureworks.csv", names=["table"]) for table in (pbar := tqdm(df_tables['table'].values)): pbar.set_description(f"Uploading {table} to lakehouse") # download df = pd.read_parquet(f"{base}/{table}.parquet") # save as lakehouse table spark.createDataFrame(df).write.mode('overwrite').saveAsTable(table)
选择“全部运行”。
几分钟后,系统将对湖屋填充必要的数据。
创建 Fabric 数据代理
要创建新的 Fabric 数据代理,请导航到工作区,然后选择“+ 新建项”按钮,如以下屏幕截图所示:
在“所有项”选项卡中,搜索“Fabric 数据代理”以找到相应的选项。 选择后,系统会提示你为 Fabric 数据代理提供一个名称,如以下屏幕截图所示:
输入名称后,继续执行以下步骤,使 Fabric 数据代理符合你的特定要求。
选择数据
选择在上一步中创建的湖屋,然后选择“添加”,如以下屏幕截图所示:
将湖屋添加为数据源后,Fabric 数据代理页面左侧的“资源管理器”窗格会显示该湖屋的名称。 选择该湖屋以查看所有可用的表。 使用复选框选择要提供给 AI 的表。 对于此方案,请选择以下表:
dimcustomer
dimdate
dimgeography
dimproduct
dimproductcategory
dimpromotion
dimreseller
dimsalesterritory
factinternetsales
factresellersales
提供指示
要添加 Fabric 数据代理指令,请选择“数据代理指令”按钮,以在右侧打开 Fabric 数据代理指令窗格。 可以添加以下说明。
AdventureWorksLH
数据源包含来自三个表的信息:
-
dimcustomer
,用于获取详细的客户人口统计数据和联系信息 -
dimdate
,用于与日期相关的数据 - 例如日历和会计信息 -
dimgeography
,用于地理详细信息,包括城市名称和国家/地区代码。
使用此数据源进行涉及客户详细信息、基于时间的事件和地理位置的查询和分析。
提供示例
若要添加示例查询,请选择 示例查询 按钮打开右侧的示例查询窗格。 此窗格提供用于为所有受支持的数据源添加或编辑示例查询的选项。 对于每个数据源,可以选择 添加或编辑示例查询 输入相关示例,如以下屏幕截图所示:
在这里,应为创建的 Lakehouse 数据源添加示例查询。
Question: Calculate the average percentage increase in sales amount for repeat purchases for every zipcode. Repeat purchase is a purchase subsequent to the first purchase (the average should always be computed relative to the first purchase)
SELECT AVG((s.SalesAmount - first_purchase.SalesAmount) / first_purchase.SalesAmount * 100) AS AvgPercentageIncrease
FROM factinternetsales s
INNER JOIN dimcustomer c ON s.CustomerKey = c.CustomerKey
INNER JOIN dimgeography g ON c.GeographyKey = g.GeographyKey
INNER JOIN (
SELECT *
FROM (
SELECT
CustomerKey,
SalesAmount,
OrderDate,
ROW_NUMBER() OVER (PARTITION BY CustomerKey ORDER BY OrderDate) AS RowNumber
FROM factinternetsales
) AS t
WHERE RowNumber = 1
) first_purchase ON s.CustomerKey = first_purchase.CustomerKey
WHERE s.OrderDate > first_purchase.OrderDate
GROUP BY g.PostalCode;
Question: Show the monthly total and year-to-date total sales. Order by year and month.
SELECT
Year,
Month,
MonthlySales,
SUM(MonthlySales) OVER (PARTITION BY Year ORDER BY Year, Month ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS CumulativeTotal
FROM (
SELECT
YEAR(OrderDate) AS Year,
MONTH(OrderDate) AS Month,
SUM(SalesAmount) AS MonthlySales
FROM factinternetsales
GROUP BY YEAR(OrderDate), MONTH(OrderDate)
) AS t
注释
Power BI 语义模型数据源目前不支持添加示例查询/问题对。
测试和修订 Fabric 数据代理
现在,你已经配置了 Fabric 数据代理,添加了 Fabric 数据代理指令,并为数据湖屋提供了示例查询,接下来可以通过提问并接收答案来与之交互。 在继续测试的过程中,你可以添加更多示例并完善指令,以进一步提高 Fabric 数据代理的性能。 与同事协作,收集反馈,并根据他们的输入,确保提供的示例查询和说明符合他们想要提问的问题类型。
发布 Fabric 数据代理
验证 Fabric 数据代理的性能后,你可能会决定发布它,以便与想要针对数据进行问答的同事共享。 在这种情况下,请选择“发布”,如以下屏幕截图所示:
“发布数据代理”框将会打开,如以下屏幕截图所示:
在此框中,选择“发布”以发布 Fabric 数据代理。 此时会显示 Fabric 数据代理的已发布 URL,如以下屏幕截图所示:
在 Power BI 中使用 Copilot 中的 Fabric 数据代理
发布后,可以使用 Power BI 中的 Copilot 与 Fabric 数据代理交互。 借助 Power BI 中的 Copilot,可以直接使用数据代理和其他项(例如报表或语义模型),而无需在它们之间切换。
选择左侧导航窗格中的 Copilot 按钮,在 Power BI 中打开 Copilot。 接下来,选择 在底部的文本框中添加项目,以便获得更好的结果,来添加数据代理。 在打开的窗口中选择 数据代理 。 您只能看到自己有权访问的数据管理员。 选择所需的数据代理,然后选择“ 确认”。 此示例演示如何使用单个数据代理,但可以添加更多项,例如其他数据代理、报表或语义模型。 以下屏幕截图演示了单个数据代理的步骤:
将数据代理添加到 Power BI 中的 Copilot 后,可以提出与 Fabric 数据代理相关的任何问题,如以下屏幕截图所示:
以编程方式使用 Fabric 数据代理
可以在 Fabric 笔记本中以编程方式使用 Fabric 数据代理。 若要确定 Fabric 数据代理是否具有已发布的 URL 值,请选择 “设置”,如以下屏幕截图所示:
发布 Fabric 数据代理之前,它没有已发布的 URL 值,如以下屏幕截图所示:
如果以前尚未发布 Fabric 数据代理,可以按照前面的步骤中的说明发布它。 然后,可以复制已发布 URL 并将其用于 Fabric 笔记本。 这样,你就可以在 Fabric 笔记本中通过调用 Fabric 数据代理 API 来查询 Fabric 数据代理。 将复制的 URL 粘贴到此代码片段中。 然后,将问题替换为与你的 Fabric 数据代理相关的任何查询。 此示例使用 \<generic published URL value\>
作为 URL。
%pip install "openai==1.70.0"
%pip install httpx==0.27.2
import requests
import json
import pprint
import typing as t
import time
import uuid
from openai import OpenAI
from openai._exceptions import APIStatusError
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
from synapse.ml.mlflow import get_mlflow_env_config
from sempy.fabric._token_provider import SynapseTokenProvider
base_url = "https://<generic published base URL value>"
question = "What datasources do you have access to?"
configs = get_mlflow_env_config()
# Create OpenAI Client
class FabricOpenAI(OpenAI):
def __init__(
self,
api_version: str ="2024-05-01-preview",
**kwargs: t.Any,
) -> None:
self.api_version = api_version
default_query = kwargs.pop("default_query", {})
default_query["api-version"] = self.api_version
super().__init__(
api_key="",
base_url=base_url,
default_query=default_query,
**kwargs,
)
def _prepare_options(self, options: FinalRequestOptions) -> None:
headers: dict[str, str | Omit] = (
{**options.headers} if is_given(options.headers) else {}
)
options.headers = headers
headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
if "Accept" not in headers:
headers["Accept"] = "application/json"
if "ActivityId" not in headers:
correlation_id = str(uuid.uuid4())
headers["ActivityId"] = correlation_id
return super()._prepare_options(options)
# Pretty printing helper
def pretty_print(messages):
print("---Conversation---")
for m in messages:
print(f"{m.role}: {m.content[0].text.value}")
print()
fabric_client = FabricOpenAI()
# Create assistant
assistant = fabric_client.beta.assistants.create(model="not used")
# Create thread
thread = fabric_client.beta.threads.create()
# Create message on thread
message = fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
# Create run
run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)
# Wait for run to complete
while run.status == "queued" or run.status == "in_progress":
run = fabric_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
print(run.status)
time.sleep(2)
# Print messages
response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
pretty_print(response)
# Delete thread
fabric_client.beta.threads.delete(thread_id=thread.id)