重要
此功能目前以公共预览版提供。
本文介绍如何大规模使用 AI 函数 执行批处理推理。 本文中的示例建议用于生产方案,例如将批处理推理管道部署为计划工作流,并使用 ai_query
和 Databricks 托管的基础模型进行结构化流式处理。
若要开始使用 AI Functions,Databricks 建议使用以下方法之一:
要求
- 支持基础模型 API 的区域中的工作区。
- 使用 AI Functions 的批处理推理工作负荷需要 Databricks Runtime 15.4 LTS 或更高版本。
- 查询 Unity Catalog 中 Delta 表的权限,该表包含您想要使用的数据。
- 将
pipelines.channel
表中属性设置为“预览”以使用ai_query()
。 请参阅要求以获取示例查询。
使用特定任务的 AI 函数进行 LLM 批量推理
可以使用 特定于任务的 AI 函数运行批处理推理。 有关如何将特定于任务的 AI 函数合并到管道中的指南,请参阅 “部署批处理推理管道 ”。
下面是使用特定于任务的 AI 函数的示例: ai_translate
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;
使用 ai_query
进行批量 LLM 推理
可以使用常规用途 AI 函数 ai_query
执行批处理推理。 查看支持哪些模型类型和关联的模型ai_query
。
本部分中的示例重点介绍 ai_query
批处理推理管道和工作流的灵活性以及如何使用它。
ai_query
和 Databricks 托管的基础模型
使用 Databricks 托管的预预配基础模型进行批量推理时,Databricks 代表你配置预配的吞吐量终结点,该终结点会根据工作负荷自动缩放。
若要使用此方法进行批处理推理,请在请求中指定以下内容:
- 要在
ai_query
中使用的预配 LLM。 从受支持的预配 LLM 中进行选择。 这些预预配的 LLM 受宽松许可证和使用策略的约束,请参阅 适用的模型开发人员许可证和条款。 - Unity Catalog 输入表和输出表。
- 模型提示和任何模型参数。
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
ai_query
和自定义或微调的基础模型
本部分中的笔记本示例演示了使用自定义或微调的基础模型来处理多个输入的批处理推理工作负荷。 这些示例需要一个使用基础模型 API 预配吞吐量的现有模型服务终结点。
使用嵌入模型进行 LLM 批处理推理
以下示例笔记本创建了一个预置吞吐量端点,并使用 Python 运行批量 LLM 推理。您可以从 GTE 大型(英语)或 BGE 大型(英语)嵌入模型中进行选择。
使用预配吞吐量终结点笔记本的 LLM 批量推理嵌入
批处理推理和结构化数据提取
以下示例笔记本演示如何通过自动化提取技术执行基本的结构化数据提取 ai_query
,以将原始非结构化数据转换为有组织的、易用的信息。 此笔记本还展示如何利用 Mosaic AI 代理评估工具,通过地面真实数据来评估准确性。
批处理推理和结构化数据提取笔记本
使用 BERT 进行批量推理以用于命名实体识别
以下笔记本显示了使用 BERT 的传统 ML 模型批处理推理示例。
使用 BERT 进行命名实体识别笔记本的批量推理
部署批处理推理管道
本部分介绍如何将 AI Functions 集成到其他 Databricks 数据和 AI 产品中,以生成完整的批量推理管道。 这些管道可以执行端到端工作流,包括引入、预处理、推理和后期处理。 管道可以在 SQL 或 Python 中创作,并部署为:
- DLT 管道
- 使用 Databricks 工作流的计划工作流
- 使用结构化流处理进行流推断工作流
对 DLT 执行增量批处理推理
以下示例使用 DLT 对数据持续更新时执行增量批处理推理。
步骤 1:从卷中引入原始资讯数据
SQL
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));
Python语言
导入包并将 LLM 响应的 JSON 架构定义为 Python 变量
import dlt
from pyspark.sql.functions import expr, get_json_object, concat
news_extraction_schema = (
'{"type": "json_schema", "json_schema": {"name": "news_extraction", '
'"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
'"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
'"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)
从 Unity Catalog 卷中引入数据。
@dlt.table(
comment="Raw news articles ingested from volume."
)
def news_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("mode", "PERMISSIVE")
.option("multiLine", "true")
.load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
)
步骤 2:应用 LLM 推理以提取标题和类别
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;
Python语言
@dlt.table(
comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
# Limit the number of rows to 2 as in the SQL version
df_raw = spark.read.table("news_raw").limit(2)
# Inject the JSON schema variable into the ai_query call using an f-string.
return df_raw.withColumn(
"meta_data",
expr(
f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
f"concat('Extract the category of the following news article: ', inputs), "
f"responseFormat => '{news_extraction_schema}')"
)
)
步骤 3:在汇总之前验证 LLM 推理输出
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Python语言
@dlt.table(
comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
return spark.read.table("news_categorized")
步骤 4:汇总已验证数据中的新闻文章
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;
Python语言
@dlt.table(
comment="Summarized political news articles after validation."
)
def news_summarized():
df = spark.read.table("news_validated")
return df.select(
get_json_object("meta_data", "$.category").alias("category"),
get_json_object("meta_data", "$.title").alias("title"),
expr(
"ai_query('databricks-meta-llama-3-3-70b-instruct', "
"concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
).alias("summary")
)
使用 Databricks 工作流自动执行批处理推理作业
调度批量推理作业并自动化 AI 管道。
SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
Python语言
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
使用结构化流式处理的 AI 函数
使用 ai_query
和结构化流式处理在近实时或微批处理方案中应用 AI 推理。
步骤 1. 读取静态 Delta 表
像流一样读取静态 Delta 表。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
步骤 2. 应用 ai_query
Spark 仅处理静态数据一次,除非新行到达表中。
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
步骤 3:编写汇总输出
将汇总输出写入另一个 Delta 表
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()
查看批处理推理工作负荷的成本
以下示例演示如何基于作业、计算、SQL 仓库和 DLT 管道筛选批处理推理工作负荷。
有关如何查看使用 AI Functions 的批处理推理工作负荷的成本的常规示例,请参阅 Monitor 模型提供服务成本 。
职位
以下查询显示了哪些作业任务正在使用系统表system.workflow.jobs
进行批处理推理。 请参阅使用系统表监控作业成本和性能。
SELECT *
FROM system.billing.usage u
JOIN system.workflow.jobs x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.job_id = x.job_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
计算
下面显示了使用system.compute.clusters
系统表进行批处理推理的群集。
SELECT *
FROM system.billing.usage u
JOIN system.compute.clusters x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.cluster_id = x.cluster_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
DLT 管道
下面显示了哪些 DLT 管道正在使用 system.lakeflow.pipelines
系统表进行批处理推理。
SELECT *
FROM system.billing.usage u
JOIN system.lakeflow.pipelines x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
SQL 仓库
下面显示了哪些 SQL 仓库正用于使用系统表进行 system.compute.warehouses
批量推理。
SELECT *
FROM system.billing.usage u
JOIN system.compute.warehouses x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.warehouse_id = x.warehouse_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";