使用 AI 函数执行批处理 LLM 推理

重要

此功能目前以公共预览版提供。

本文介绍如何大规模使用 AI 函数 执行批处理推理。 本文中的示例建议用于生产方案,例如将批处理推理管道部署为计划工作流,并使用 ai_query 和 Databricks 托管的基础模型进行结构化流式处理。

若要开始使用 AI Functions,Databricks 建议使用以下方法之一:

要求

  • 支持基础模型 API 的区域中的工作区。
  • 使用 AI Functions 的批处理推理工作负荷需要 Databricks Runtime 15.4 LTS 或更高版本。
  • 查询 Unity Catalog 中 Delta 表的权限,该表包含您想要使用的数据。
  • pipelines.channel 表中属性设置为“预览”以使用 ai_query()。 请参阅要求以获取示例查询。

使用特定任务的 AI 函数进行 LLM 批量推理

可以使用 特定于任务的 AI 函数运行批处理推理。 有关如何将特定于任务的 AI 函数合并到管道中的指南,请参阅 “部署批处理推理管道 ”。

下面是使用特定于任务的 AI 函数的示例: ai_translate

SELECT
writer_summary,
  ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;

使用 ai_query 进行批量 LLM 推理

可以使用常规用途 AI 函数 ai_query 执行批处理推理。 查看支持哪些模型类型和关联的模型ai_query

本部分中的示例重点介绍 ai_query 批处理推理管道和工作流的灵活性以及如何使用它。

ai_query 和 Databricks 托管的基础模型

使用 Databricks 托管的预预配基础模型进行批量推理时,Databricks 代表你配置预配的吞吐量终结点,该终结点会根据工作负荷自动缩放。

若要使用此方法进行批处理推理,请在请求中指定以下内容:

  • 要在 ai_query 中使用的预配 LLM。 从受支持的预配 LLM 中进行选择。 这些预预配的 LLM 受宽松许可证和使用策略的约束,请参阅 适用的模型开发人员许可证和条款
  • Unity Catalog 输入表和输出表。
  • 模型提示和任何模型参数。
SELECT text, ai_query(
    "databricks-meta-llama-3-1-8b-instruct",
    "Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;

ai_query 和自定义或微调的基础模型

本部分中的笔记本示例演示了使用自定义或微调的基础模型来处理多个输入的批处理推理工作负荷。 这些示例需要一个使用基础模型 API 预配吞吐量的现有模型服务终结点。

使用嵌入模型进行 LLM 批处理推理

以下示例笔记本创建了一个预置吞吐量端点,并使用 Python 运行批量 LLM 推理。您可以从 GTE 大型(英语)或 BGE 大型(英语)嵌入模型中进行选择。

使用预配吞吐量终结点笔记本的 LLM 批量推理嵌入

获取笔记本

批处理推理和结构化数据提取

以下示例笔记本演示如何通过自动化提取技术执行基本的结构化数据提取 ai_query ,以将原始非结构化数据转换为有组织的、易用的信息。 此笔记本还展示如何利用 Mosaic AI 代理评估工具,通过地面真实数据来评估准确性。

批处理推理和结构化数据提取笔记本

获取笔记本

使用 BERT 进行批量推理以用于命名实体识别

以下笔记本显示了使用 BERT 的传统 ML 模型批处理推理示例。

使用 BERT 进行命名实体识别笔记本的批量推理

获取笔记本

部署批处理推理管道

本部分介绍如何将 AI Functions 集成到其他 Databricks 数据和 AI 产品中,以生成完整的批量推理管道。 这些管道可以执行端到端工作流,包括引入、预处理、推理和后期处理。 管道可以在 SQL 或 Python 中创作,并部署为:

  • DLT 管道
  • 使用 Databricks 工作流的计划工作流
  • 使用结构化流处理进行流推断工作流

对 DLT 执行增量批处理推理

以下示例使用 DLT 对数据持续更新时执行增量批处理推理。

步骤 1:从卷中引入原始资讯数据

SQL

CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
  '/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  multiLine => 'true'
));
Python语言

导入包并将 LLM 响应的 JSON 架构定义为 Python 变量


import dlt
from pyspark.sql.functions import expr, get_json_object, concat

news_extraction_schema = (
    '{"type": "json_schema", "json_schema": {"name": "news_extraction", '
    '"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
    '"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
    '"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)

从 Unity Catalog 卷中引入数据。

@dlt.table(
  comment="Raw news articles ingested from volume."
)
def news_raw():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("mode", "PERMISSIVE")
      .option("multiLine", "true")
      .load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
  )

步骤 2:应用 LLM 推理以提取标题和类别

SQL

CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
  inputs,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Extract the category of the following news article: " || inputs,
    responseFormat => '{
      "type": "json_schema",
      "json_schema": {
        "name": "news_extraction",
        "schema": {
          "type": "object",
          "properties": {
            "title": { "type": "string" },
            "category": {
              "type": "string",
              "enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
            }
          }
        },
        "strict": true
      }
    }'
  ) AS meta_data
FROM news_raw
LIMIT 2;
Python语言
@dlt.table(
  comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
  # Limit the number of rows to 2 as in the SQL version
  df_raw = spark.read.table("news_raw").limit(2)
  # Inject the JSON schema variable into the ai_query call using an f-string.
  return df_raw.withColumn(
    "meta_data",
    expr(
      f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
      f"concat('Extract the category of the following news article: ', inputs), "
      f"responseFormat => '{news_extraction_schema}')"
    )
  )

步骤 3:在汇总之前验证 LLM 推理输出

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
  CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
  CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Python语言
@dlt.table(
  comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
  return spark.read.table("news_categorized")

步骤 4:汇总已验证数据中的新闻文章

SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
  get_json_object(meta_data, '$.category') as category,
  get_json_object(meta_data, '$.title') as title,
  ai_query(
    "databricks-meta-llama-3-3-70b-instruct",
    "Summarize the following political news article in 2-3 sentences: " || inputs
  ) AS summary
FROM news_validated;
Python语言

@dlt.table(
  comment="Summarized political news articles after validation."
)
def news_summarized():
  df = spark.read.table("news_validated")
  return df.select(
    get_json_object("meta_data", "$.category").alias("category"),
    get_json_object("meta_data", "$.title").alias("title"),
    expr(
      "ai_query('databricks-meta-llama-3-3-70b-instruct', "
      "concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
    ).alias("summary")
  )

使用 Databricks 工作流自动执行批处理推理作业

调度批量推理作业并自动化 AI 管道。

SQL

SELECT
   *,
   ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.


AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price


Examples below:


DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..


RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]


DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10

Python语言


import json
from pyspark.sql.functions import expr

# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.

AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price

Examples below:

DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.

RESULT
[
 {'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
 {'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
 {'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
 {'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
 {'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]

DOCUMENT
"""

# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)

# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)

# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
    "result",
    expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)

# Display the result DataFrame.
display(result_df)

使用结构化流式处理的 AI 函数

使用 ai_query结构化流式处理在近实时或微批处理方案中应用 AI 推理。

步骤 1. 读取静态 Delta 表

像流一样读取静态 Delta 表。


from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs")  # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")

# Define the prompt outside the SQL expression.
prompt = (
    "You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
    "Do not include extra commentary or suggestions. Document: "
)

步骤 2. 应用 ai_query

Spark 仅处理静态数据一次,除非新行到达表中。


df_transformed = df_stream.select(
    "document_text",
    F.expr(f"""
      ai_query(
        'databricks-meta-llama-3-1-8b-instruct',
        CONCAT('{prompt}', document_text)
      )
    """).alias("summary")
)

步骤 3:编写汇总输出

将汇总输出写入另一个 Delta 表


# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
    .outputMode("append") \
    .toTable("enterprise.docs_summary")

query.awaitTermination()

查看批处理推理工作负荷的成本

以下示例演示如何基于作业、计算、SQL 仓库和 DLT 管道筛选批处理推理工作负荷。

有关如何查看使用 AI Functions 的批处理推理工作负荷的成本的常规示例,请参阅 Monitor 模型提供服务成本

职位

以下查询显示了哪些作业任务正在使用系统表system.workflow.jobs进行批处理推理。 请参阅使用系统表监控作业成本和性能


SELECT *
FROM system.billing.usage u
  JOIN system.workflow.jobs x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.job_id = x.job_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

计算

下面显示了使用system.compute.clusters系统表进行批处理推理的群集。

SELECT *
FROM system.billing.usage u
  JOIN system.compute.clusters x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.cluster_id = x.cluster_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

DLT 管道

下面显示了哪些 DLT 管道正在使用 system.lakeflow.pipelines 系统表进行批处理推理。

SELECT *
FROM system.billing.usage u
  JOIN system.lakeflow.pipelines x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";

SQL 仓库

下面显示了哪些 SQL 仓库正用于使用系统表进行 system.compute.warehouses 批量推理。

SELECT *
FROM system.billing.usage u
  JOIN system.compute.warehouses x
    ON u.workspace_id = x.workspace_id
    AND u.usage_metadata.warehouse_id = x.warehouse_id
  WHERE u.usage_metadata.workspace_id = <workspace_id>
    AND u.billing_origin_product = "MODEL_SERVING"
    AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";