为非结构化数据生成和跟踪检索器工具

使用马赛克 AI 代理框架生成工具,让 AI 代理查询非结构化数据,例如文档集合。 本页演示如何:

若要详细了解代理工具,请参阅 AI 代理工具

使用 AI Bridge 在本地开发矢量搜索检索器工具

开始构建 Databricks 矢量搜索检索器工具的最快方法是使用 Databricks AI Bridge 包(例如 databricks-langchaindatabricks-openai)在本地开发和测试它。

LangChain/LangGraph

安装最新版本的 databricks-langchain,其中包括 Databricks AI Bridge。

%pip install --upgrade databricks-langchain

以下代码原型是一个检索器工具,该工具查询假设矢量搜索索引并将其绑定到本地 LLM,以便可以测试其工具调用行为。

提供描述性tool_description以帮助代理理解工具并确定何时调用它。

from databricks_langchain import VectorSearchRetrieverTool, ChatDatabricks

# Initialize the retriever tool.
vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation."
)

# Run a query against the vector search index locally for testing
vs_tool.invoke("Databricks Agent Framework?")

# Bind the retriever tool to your Langchain LLM of choice
llm = ChatDatabricks(endpoint="databricks-claude-3-7-sonnet")
llm_with_tools = llm.bind_tools([vs_tool])

# Chat with your LLM to test the tool calling functionality
llm_with_tools.invoke("Based on the Databricks documentation, what is Databricks Agent Framework?")

对于使用直接访问索引或使用自托管嵌入的Delta同步索引的场景,必须配置VectorSearchRetrieverTool并指定自定义嵌入模型和文本列。 请参阅用于提供嵌入的选项

以下示例演示如何使用 VectorSearchRetrieverToolcolumns 密钥配置 embedding

from databricks_langchain import VectorSearchRetrieverTool
from databricks_langchain import DatabricksEmbeddings

embedding_model = DatabricksEmbeddings(
    endpoint="databricks-bge-large-en",
)

vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
  num_results=5, # Max number of documents to return
  columns=["primary_key", "text_column"], # List of columns to include in the search
  filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
  query_type="ANN", # Query type ("ANN" or "HYBRID").
  tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
  tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
  text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
  embedding=embedding_model # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

有关更多详细信息,请参阅 API 文档VectorSearchRetrieverTool

开放人工智能

安装最新版本的 databricks-openai,其中包括 Databricks AI Bridge。

%pip install --upgrade databricks-openai

下面的代码原型是查询假设矢量搜索索引并将其与 OpenAI 的 GPT 模型集成的检索器。

提供描述性tool_description以帮助代理理解工具并确定何时调用它。

有关 OpenAI 对工具的建议的详细信息,请参阅 OpenAI 函数调用文档

from databricks_openai import VectorSearchRetrieverTool
from openai import OpenAI
import json

# Initialize OpenAI client
client = OpenAI(api_key=<your_API_key>)

# Initialize the retriever tool
dbvs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation"
)

messages = [
  {"role": "system", "content": "You are a helpful assistant."},
  {
    "role": "user",
    "content": "Using the Databricks documentation, answer what is Spark?"
  }
]
first_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

# Execute function code and parse the model's response and handle function calls.
tool_call = first_response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = dbvs_tool.execute(query=args["query"])  # For self-managed embeddings, optionally pass in openai_client=client

# Supply model with results – so it can incorporate them into its final response.
messages.append(first_response.choices[0].message)
messages.append({
  "role": "tool",
  "tool_call_id": tool_call.id,
  "content": json.dumps(result)
})
second_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

对于使用直接访问索引或使用自托管嵌入的Delta同步索引的场景,必须配置VectorSearchRetrieverTool并指定自定义嵌入模型和文本列。 请参阅用于提供嵌入的选项

以下示例演示如何使用 VectorSearchRetrieverToolcolumns 密钥配置 embedding

from databricks_openai import VectorSearchRetrieverTool

vs_tool = VectorSearchRetrieverTool(
    index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
    num_results=5, # Max number of documents to return
    columns=["primary_key", "text_column"], # List of columns to include in the search
    filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
    query_type="ANN", # Query type ("ANN" or "HYBRID").
    tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
    tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
    text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
    embedding_model_name="databricks-bge-large-en" # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

有关更多详细信息,请参阅 API 文档VectorSearchRetrieverTool

本地工具准备就绪后,可以直接将其生产为代理代码的一部分,或将其迁移到 Unity 目录函数,该函数可提供更好的可发现性和治理性,但存在某些限制。

以下部分介绍如何将检索器迁移到 Unity 目录函数。

使用 Unity Catalog 功能的 矢量搜索检索工具

可以创建封装 马赛克 AI 矢量搜索索引 查询的 Unity Catalog 函数。 这种方法:

  • 支持具有治理和可发现性的生产用例
  • 在后台使用 vector_search() SQL 函数
  • 支持自动 MLflow 跟踪
    • 必须使用page_content别名将函数的输出与metadata对齐。
    • 必须使用 metadata(而不是顶级输出键)将任何其他元数据列添加到该列。

在笔记本或 SQL 编辑器中运行以下代码以创建函数:

CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  chunked_text as page_content,
  map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => 'catalog.schema.databricks_docs_index',
    query => query,
    num_results => 5
  )

要在 AI 代理中使用此检索工具,请用 UCFunctionToolkit将其封装。 这使得可以在 MLflow 日志中自动生成 RETRIEVER 追踪跨度,从而通过 MLflow 实现自动跟踪。

from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

toolkit = UCFunctionToolkit(
    function_names=[
        "main.default.databricks_docs_vector_search"
    ]
)
tools = toolkit.tools

Unity 目录检索器工具具有以下注意事项:

  • SQL 客户端可能会限制返回的最大行数或字节数。 若要防止数据截断,应截断 UDF 返回的列值。 例如,可以使用 substring(chunked_text, 0, 8192) 来减小大型内容列的大小,并避免在执行过程中行截断。
  • 由于此工具是 vector_search() 函数的包装器,因此它受到与 vector_search() 函数相同的限制。 请参阅限制

有关 UCFunctionToolkit 的详细信息,请参阅 Unity 目录文档

查询 Databricks 外部托管的向量索引的检索器

如果矢量索引托管在 Azure Databricks 外部,则可以创建 Unity 目录连接以连接到外部服务并使用代理代码中的连接。 请参阅 将 AI 代理工具连接到外部服务

以下示例创建一个检索器,该检索器调用一个不在 Databricks 上托管的向量索引,用于 PyFunc 样式的代理。

  1. 在本例中,创建一个Unity Catalog连接以连接到外部服务Azure。

    CREATE CONNECTION ${connection_name}
    TYPE HTTP
    OPTIONS (
      host 'https://example.search.windows.net',
      base_path '/',
      bearer_token secret ('<secret-scope>','<secret-key>')
    );
    
  2. 使用 Unity 目录连接在代理代码中定义检索器工具。 此示例使用 MLflow 修饰器启用代理跟踪。

    注释

    为了符合 MLflow 检索器架构,检索器函数应返回一个 List[Document] 对象,并使用 metadata Document 类中的字段向返回的文档添加其他属性,例如 doc_urisimilarity_score。 请参阅 MLflow 文档

    import mlflow
    import json
    
    from mlflow.entities import Document
    from typing import List, Dict, Any
    from dataclasses import asdict
    
    class VectorSearchRetriever:
      """
      Class using Databricks Vector Search to retrieve relevant documents.
      """
    
      def __init__(self):
        self.azure_search_index = "hotels_vector_index"
    
      @mlflow.trace(span_type="RETRIEVER", name="vector_search")
      def __call__(self, query_vector: List[Any], score_threshold=None) -> List[Document]:
        """
        Performs vector search to retrieve relevant chunks.
        Args:
          query: Search query.
          score_threshold: Score threshold to use for the query.
    
        Returns:
          List of retrieved Documents.
        """
        from databricks.sdk import WorkspaceClient
        from databricks.sdk.service.serving import ExternalFunctionRequestHttpMethod
    
        json = {
          "count": true,
          "select": "HotelId, HotelName, Description, Category",
          "vectorQueries": [
            {
              "vector": query_vector,
              "k": 7,
              "fields": "DescriptionVector",
              "kind": "vector",
              "exhaustive": true,
            }
          ],
        }
    
        response = (
          WorkspaceClient()
          .serving_endpoints.http_request(
            conn=connection_name,
            method=ExternalFunctionRequestHttpMethod.POST,
            path=f"indexes/{self.azure_search_index}/docs/search?api-version=2023-07-01-Preview",
            json=json,
          )
          .text
        )
    
        documents = self.convert_vector_search_to_documents(response, score_threshold)
        return [asdict(doc) for doc in documents]
    
      @mlflow.trace(span_type="PARSER")
      def convert_vector_search_to_documents(
        self, vs_results, score_threshold
      ) -> List[Document]:
        docs = []
    
        for item in vs_results.get("value", []):
          score = item.get("@search.score", 0)
    
          if score >= score_threshold:
            metadata = {
              "score": score,
              "HotelName": item.get("HotelName"),
              "Category": item.get("Category"),
            }
    
            doc = Document(
              page_content=item.get("Description", ""),
              metadata=metadata,
              id=item.get("HotelId"),
            )
            docs.append(doc)
    
        return docs
    
  3. 若要运行检索器,请运行以下 Python 代码。 可以选择在请求中包含 矢量搜索筛选器 来筛选结果。

    retriever = VectorSearchRetriever()
    query = [0.01944167, 0.0040178085 . . .  TRIMMED FOR BREVITY 010858015, -0.017496133]
    results = retriever(query, score_threshold=0.1)
    

向检索器添加追踪功能

添加 MLflow 跟踪以监视和调试检索器。 通过跟踪可以查看每个执行步骤的输入、输出和元数据。

上一个示例将 @mlflow.trace 修饰器 添加到 __call__ 和分析方法。 修饰器创建一个 范围,该范围在调用函数时开始,并在函数返回时结束。 MLflow 会自动记录函数的输入和输出以及引发的任何异常。

注释

LangChain、LlamaIndex 和 OpenAI 库的用户除了使用修饰器手动定义跟踪之外,还可以使用 MLflow 的自动记录功能。 请参阅 监测您的应用

import mlflow
from mlflow.entities import Document

## This code snippet has been truncated for brevity, see the full retriever example above
class VectorSearchRetriever:
  ...

  # Create a RETRIEVER span. The span name must match the retriever schema name.
  @mlflow.trace(span_type="RETRIEVER", name="vector_search")
  def __call__(...) -> List[Document]:
    ...

  # Create a PARSER span.
  @mlflow.trace(span_type="PARSER")
  def parse_results(...) -> List[Document]:
    ...

若要确保代理评估和 AI 操场等下游应用程序正确呈现检索器跟踪,请确保修饰器满足以下要求:

设置检索器架构以确保 MLflow 兼容性

如果从检索器返回的跟踪或 span_type="RETRIEVER" 不符合 MLflow 的标准检索器架构,则必须手动将返回的架构映射到 MLflow 的预期字段。 这可确保 MLflow 能够正确追踪您的检索器,并在下游应用中呈现跟踪信息。

手动设置检索器架构:

  1. 定义代理时调用 mlflow.models.set_retriever_schema 。 用于 set_retriever_schema 将返回表中的列名映射到 MLflow 的预期字段,例如 primary_keytext_columndoc_uri

    # Define the retriever's schema by providing your column names
    mlflow.models.set_retriever_schema(
      name="vector_search",
      primary_key="chunk_id",
      text_column="text_column",
      doc_uri="doc_uri"
      # other_columns=["column1", "column2"],
    )
    
  2. 通过提供 other_columns 字段的列名称列表,在检索器的架构中指定额外的列。

  3. 如果有多个检索器,则可以为每个检索器架构使用唯一名称来定义多个架构。

代理创建期间设置的检索器架构会影响下游应用程序和工作流,例如评审应用和评估集。 具体而言,doc_uri 列充当检索器返回的文档的主要标识符。

  • 审阅应用 显示 doc_uri,以帮助审阅者评估响应和跟踪文档来源。 请参阅评审应用 UI
  • 评估集 用于 doc_uri 将检索器结果与预定义评估数据集进行比较,以确定检索器的召回率和精度。 请参阅评估集(旧版)。

后续步骤

生成检索器后,最后一步是将其集成到 AI 代理定义中。 了解如何向代理添加工具,请参阅 向代理添加 Unity 目录工具