重要
この機能はパブリック プレビュー段階にあります。
この記事では、 大規模な AI Functions を使用してバッチ推論を実行する方法について説明します。 この記事の例は、スケジュールされたワークフローとしてバッチ推論パイプラインをデプロイしたり、構造化ストリーミングに ai_query
と Databricks でホストされる基盤モデルを使用するなど、運用環境のシナリオに推奨されます。
AI Functions の使用を開始するために、Databricks では次のいずれかを使用することをお勧めします。
必要条件
- Foundation Model API でサポートされているリージョン内のワークスペース。
- AI Functions を使用したバッチ推論ワークロードには、Databricks Runtime 15.4 LTS 以上が必要です。
- 使用するデータを含む Unity カタログの Delta テーブルに対するクエリアクセス許可。
-
pipelines.channel
を使用するには、テーブルのプロパティのai_query()
を "プレビュー" として設定します。 クエリの例については、「 要件」 を参照してください。
タスク固有の AI 関数を使用したバッチ LLM 推論
タスク固有の AI 関数を使用してバッチ推論を実行できます。 タスク固有の AI 関数をパイプラインに組み込む方法のガイダンスについては、 バッチ推論 パイプラインのデプロイを参照してください。
タスク固有の AI 関数 ( ai_translate
) の使用例を次に示します。
SELECT
writer_summary,
ai_translate(writer_summary, "cn") as cn_translation
from user.batch.news_summaries
limit 500
;
ai_query
を使用したバッチ LLM 推論
汎用 AI 関数 ai_query
使用して、バッチ推論を実行できます。 サポートされているモデルの種類と関連するモデルai_query
確認します。
このセクションの例では、 ai_query
の柔軟性と、それをバッチ推論パイプラインとワークフローで使用する方法について説明します。
ai_query
および Databricks でホストされる基盤モデル
バッチ推論に Databricks ホスト型および事前プロビジョニング済みの基盤モデルを使用する場合、Databricks は、ワークロードに基づいて自動的にスケーリングされるプロビジョニング済みスループット エンドポイントをユーザーに代わって構成します。
バッチ推論にこのメソッドを使用するには、要求で次を指定します。
-
ai_query
での使用をご希望の、事前プロビジョニング済みの LLM。 サポートされている 事前プロビジョニングされた LLM から選択します。 これらの事前プロビジョニングされた LLM は、制限のないライセンスと使用ポリシーの対象となります。 該当するモデル開発者のライセンスと使用条件を参照してください。 - Unity カタログの入力テーブルと出力テーブル。
- モデル プロンプトと任意のモデル パラメーター。
SELECT text, ai_query(
"databricks-meta-llama-3-1-8b-instruct",
"Summarize the given text comprehensively, covering key points and main ideas concisely while retaining relevant details and examples. Ensure clarity and accuracy without unnecessary repetition or omissions: " || text
) AS summary
FROM uc_catalog.schema.table;
ai_query
およびカスタムまたは微調整された基礎モデル
このセクションのノートブックの例では、カスタムまたは微調整された基盤モデルを使用して複数の入力を処理するバッチ推論ワークロードを示します。 この例では、 Foundation Model API によってプロビジョニングされたスループットを使用する既存のモデル サービス エンドポイントが必要です。
埋め込みモデルを使用した LLM バッチ推論
次のサンプル ノートブックでは、プロビジョニングされたスループット エンドポイントを作成し、Python と、GTE Large (English) または BGE Large (English) 埋め込みモデルのいずれかを使用してバッチ LLM 推論を実行します。
プロビジョニングされたスループット エンドポイント ノートブック使用する、LLM バッチ推論の埋め込み
ノートブック を取得する
バッチ推論と構造化データ抽出
次のノートブック例では、 ai_query
を使用して基本的な構造化データ抽出を実行し、自動抽出手法を使用して、未加工の非構造化データを整理された使用可能な情報に変換する方法を示します。 このノートブックでは、Mosaic AI エージェント評価を活用し、典拠とする真偽データを使用して精度を評価する方法も示します。
バッチ推論と構造化データ抽出ノートブック
ノートブック を取得する
名前付きエンティティ認識に BERT を使用したバッチ推論
次のノートブックは、BERT を使用した従来の ML モデルバッチ推論の例を示しています。
名前付きエンティティ認識ノートブックに BERT を使用したバッチ推論
ノートブック を取得する
バッチ推論パイプラインをデプロイする
このセクションでは、AI Functions を他の Databricks データおよび AI 製品に統合して、完全なバッチ推論パイプラインを構築する方法について説明します。 これらのパイプラインは、インジェスト、前処理、推論、後処理を含むエンドツーエンドのワークフローを実行できます。 パイプラインは、SQL または Python で作成し、次のようにデプロイできます。
- Lakeflow 宣言型パイプライン
- Databricks ワークフローを使用したスケジュールされたワークフロー
- 構造化ストリーミングを使用したストリーミング推論ワークフロー
Lakeflow 宣言パイプラインで増分バッチ推論を実行する
次の例では、データが継続的に更新される場合に、Lakeflow 宣言パイプラインを使用して増分バッチ推論を実行します。
手順 1: ボリュームから未加工のニュースデータを取り込む
SQL
CREATE OR REFRESH STREAMING TABLE news_raw
COMMENT "Raw news articles ingested from volume."
AS SELECT *
FROM STREAM(read_files(
'/Volumes/databricks_news_summarization_benchmarking_data/v01/csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
multiLine => 'true'
));
Python(プログラミング言語)
パッケージをインポートし、LLM 応答の JSON スキーマを Python 変数として定義する
import dlt
from pyspark.sql.functions import expr, get_json_object, concat
news_extraction_schema = (
'{"type": "json_schema", "json_schema": {"name": "news_extraction", '
'"schema": {"type": "object", "properties": {"title": {"type": "string"}, '
'"category": {"type": "string", "enum": ["Politics", "Sports", "Technology", '
'"Health", "Entertainment", "Business"]}}}, "strict": true}}'
)
Unity カタログ ボリュームからデータを取り込みます。
@dlt.table(
comment="Raw news articles ingested from volume."
)
def news_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("mode", "PERMISSIVE")
.option("multiLine", "true")
.load("/Volumes/databricks_news_summarization_benchmarking_data/v01/csv")
)
手順 2: LLM 推論を適用してタイトルとカテゴリを抽出する
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_categorized
COMMENT "Extract category and title from news articles using LLM inference."
AS
SELECT
inputs,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Extract the category of the following news article: " || inputs,
responseFormat => '{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"schema": {
"type": "object",
"properties": {
"title": { "type": "string" },
"category": {
"type": "string",
"enum": ["Politics", "Sports", "Technology", "Health", "Entertainment", "Business"]
}
}
},
"strict": true
}
}'
) AS meta_data
FROM news_raw
LIMIT 2;
Python(プログラミング言語)
@dlt.table(
comment="Extract category and title from news articles using LLM inference."
)
def news_categorized():
# Limit the number of rows to 2 as in the SQL version
df_raw = spark.read.table("news_raw").limit(2)
# Inject the JSON schema variable into the ai_query call using an f-string.
return df_raw.withColumn(
"meta_data",
expr(
f"ai_query('databricks-meta-llama-3-3-70b-instruct', "
f"concat('Extract the category of the following news article: ', inputs), "
f"responseFormat => '{news_extraction_schema}')"
)
)
手順 3: 要約の前に LLM 推論出力を検証する
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_validated (
CONSTRAINT valid_title EXPECT (size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3),
CONSTRAINT valid_category EXPECT (get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business'))
)
COMMENT "Validated news articles ensuring the title has at least 3 words and the category is valid."
AS
SELECT *
FROM news_categorized;
Python(プログラミング言語)
@dlt.table(
comment="Validated news articles ensuring the title has at least 3 words and the category is valid."
)
@dlt.expect("valid_title", "size(split(get_json_object(meta_data, '$.title'), ' ')) >= 3")
@dlt.expect_or_fail("valid_category", "get_json_object(meta_data, '$.category') IN ('Politics', 'Sports', 'Technology', 'Health', 'Entertainment', 'Business')")
def news_validated():
return spark.read.table("news_categorized")
手順 4: 検証済みデータからニュース記事を要約する
SQL
CREATE OR REFRESH MATERIALIZED VIEW news_summarized
COMMENT "Summarized political news articles after validation."
AS
SELECT
get_json_object(meta_data, '$.category') as category,
get_json_object(meta_data, '$.title') as title,
ai_query(
"databricks-meta-llama-3-3-70b-instruct",
"Summarize the following political news article in 2-3 sentences: " || inputs
) AS summary
FROM news_validated;
Python(プログラミング言語)
@dlt.table(
comment="Summarized political news articles after validation."
)
def news_summarized():
df = spark.read.table("news_validated")
return df.select(
get_json_object("meta_data", "$.category").alias("category"),
get_json_object("meta_data", "$.title").alias("title"),
expr(
"ai_query('databricks-meta-llama-3-3-70b-instruct', "
"concat('Summarize the following political news article in 2-3 sentences: ', inputs))"
).alias("summary")
)
Databricks ワークフローを使用してバッチ推論ジョブを自動化する
バッチ推論ジョブをスケジュールし、AI パイプラインを自動化します。
SQL
SELECT
*,
ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat("You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup. The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that. It made three or four large servings of soup. It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound. The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does..
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
", REVIEW_TEXT, '\n\nRESULT\n')) as result
FROM catalog.schema.product_reviews
LIMIT 10
Python(プログラミング言語)
import json
from pyspark.sql.functions import expr
# Define the opinion mining prompt as a multi-line string.
opinion_prompt = """You are an opinion mining service. Given a piece of text, output an array of json results that extracts key user opinions, a classification, and a Positive, Negative, Neutral, or Mixed sentiment about that subject.
AVAILABLE CLASSIFICATIONS
Quality, Service, Design, Safety, Efficiency, Usability, Price
Examples below:
DOCUMENT
I got soup. It really did take only 20 minutes to make some pretty good soup.The noises it makes when it's blending are somewhat terrifying, but it gives a little beep to warn you before it does that.It made three or four large servings of soup.It's a single layer of steel, so the outside gets pretty hot. It can be hard to unplug the lid without knocking the blender against the side, which is not a nice sound.The soup was good and the recipes it comes with look delicious, but I'm not sure I'll use it often. 20 minutes of scary noises from the kitchen when I already need comfort food is not ideal for me. But if you aren't sensitive to loud sounds it does exactly what it says it does.
RESULT
[
{'Classification': 'Efficiency', 'Comment': 'only 20 minutes','Sentiment': 'Positive'},
{'Classification': 'Quality','Comment': 'pretty good soup','Sentiment': 'Positive'},
{'Classification': 'Usability', 'Comment': 'noises it makes when it's blending are somewhat terrifying', 'Sentiment': 'Negative'},
{'Classification': 'Safety','Comment': 'outside gets pretty hot','Sentiment': 'Negative'},
{'Classification': 'Design','Comment': 'Hard to unplug the lid without knocking the blender against the side, which is not a nice sound', 'Sentiment': 'Negative'}
]
DOCUMENT
"""
# Escape the prompt so it can be safely embedded in the SQL expression.
escaped_prompt = json.dumps(opinion_prompt)
# Read the source table and limit to 10 rows.
df = spark.table("catalog.schema.product_reviews").limit(10)
# Apply the LLM inference to each row, concatenating the prompt, the review text, and the tail string.
result_df = df.withColumn(
"result",
expr(f"ai_query('databricks-meta-llama-3-3-70b-instruct', request => concat({escaped_prompt}, REVIEW_TEXT, '\\n\\nRESULT\\n'))")
)
# Display the result DataFrame.
display(result_df)
構造化ストリーミングを使用した AI 関数
ai_query
と構造化ストリーミングを使用して、ほぼリアルタイムまたはマイクロバッチのシナリオで AI 推論を適用します。
ステップ 1. 静的な Delta テーブルを読み取る
静的な Delta テーブルをストリームであるかのように読み取ります。
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
# Spark processes all existing rows exactly once in the first micro-batch.
df = spark.table("enterprise.docs") # Replace with your table name containing enterprise documents
df.repartition(50).write.format("delta").mode("overwrite").saveAsTable("enterprise.docs")
df_stream = spark.readStream.format("delta").option("maxBytesPerTrigger", "50K").table("enterprise.docs")
# Define the prompt outside the SQL expression.
prompt = (
"You are provided with an enterprise document. Summarize the key points in a concise paragraph. "
"Do not include extra commentary or suggestions. Document: "
)
手順 2. 申し込む ai_query
Spark は、テーブルに新しい行が到着しない限り、静的データに対してこれを 1 回だけ処理します。
df_transformed = df_stream.select(
"document_text",
F.expr(f"""
ai_query(
'databricks-meta-llama-3-1-8b-instruct',
CONCAT('{prompt}', document_text)
)
""").alias("summary")
)
手順 3: 要約された出力を書き込む
集計された出力を別の Delta テーブルに書き込む
# Time-based triggers apply, but only the first trigger processes all existing static data.
query = df_transformed.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/_docs_summary") \
.outputMode("append") \
.toTable("enterprise.docs_summary")
query.awaitTermination()
バッチ推論ワークロードのコストを表示する
次の例は、ジョブ、コンピューティング、SQL ウェアハウス、および Lakeflow 宣言パイプラインに基づいてバッチ推論ワークロードをフィルター処理する方法を示しています。
AI Functions を使用するバッチ推論ワークロードのコストを表示する方法の一般的な例については、「 モデル サービス コストの監視」 を参照してください。
仕事
次のクエリは、 system.workflow.jobs
システム テーブルを使用したバッチ推論に使用されているジョブを示しています。 「システム テーブルを使用してジョブのコストとパフォーマンスを監視する」を参照してください。
SELECT *
FROM system.billing.usage u
JOIN system.workflow.jobs x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.job_id = x.job_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
コンピューティング
次に、 system.compute.clusters
システム テーブルを使用したバッチ推論に使用されているクラスターを示します。
SELECT *
FROM system.billing.usage u
JOIN system.compute.clusters x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.cluster_id = x.cluster_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
Lakeflow 宣言型パイプライン
次に、 system.lakeflow.pipelines
システム テーブルを使用したバッチ推論に使用されている Lakeflow 宣言パイプラインを示します。
SELECT *
FROM system.billing.usage u
JOIN system.lakeflow.pipelines x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.dlt_pipeline_id = x.pipeline_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";
SQL Warehouse
次に、 system.compute.warehouses
システム テーブルを使用したバッチ推論に使用されている Lakeflow 宣言パイプラインを示します。
SELECT *
FROM system.billing.usage u
JOIN system.compute.warehouses x
ON u.workspace_id = x.workspace_id
AND u.usage_metadata.warehouse_id = x.warehouse_id
WHERE u.usage_metadata.workspace_id = <workspace_id>
AND u.billing_origin_product = "MODEL_SERVING"
AND u.product_features.model_serving.offering_type = "BATCH_INFERENCE";