在代码中创作 AI 代理

本页演示如何使用马赛克 AI 代理框架和常用的代理创作库(如 LangGraph、PyFunc 和 OpenAI)在 Python 中创作 AI 代理。

要求

小窍门

Databricks 建议在开发代理时安装最新版本的 MLflow Python 客户端。

若要使用此页上的方法创作和部署代理,请安装以下内容:

  • databricks-agents 0.16.0 或更高版本
  • mlflow 2.20.2 或更高版本
  • Python 3.10 或更高版本。
    • 若要满足此要求,可以使用无服务器计算或 Databricks Runtime 13.3 LTS 或更高版本。
%pip install -U -qqqq databricks-agents>=0.16.0 mlflow>=2.20.2

Databricks 还建议在创作代理时安装 Databricks AI Bridge 集成包。 这些集成包(如 databricks-langchaindatabricks-openai)提供了一个共享的 API 层,用于与 Databricks AI 功能(如 Databricks AI/BI Genie 和矢量搜索)进行交互,这些 API 跨代理创作框架和 SDK。

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

开放人工智能

%pip install -U -qqqq databricks-openai

完全用 Python 编写的代理

%pip install -U -qqqq databricks-ai-bridge

使用 ChatAgent 来创作代理

Databricks 建议使用 MLflow ChatAgent 接口制作生产级代理。 此聊天架构规范类似于 OpenAI ChatCompletion 架构,但不严格兼容。

ChatAgent 可以轻松包装现有代理,实现 Databricks 兼容性。

ChatAgent 提供以下优势:

  • 高级代理功能

    • 多代理支持
    • 流式输出:通过将输出分成较小的块进行流式传输,以实现互动式用户体验。
    • 全面的工具呼叫消息历史记录:返回多个消息,包括中间工具呼叫消息,以提高质量和聊天管理。
    • 工具调用确认支持
  • 简化的开发、部署和监视

    • 使用任何框架编写代理:使用ChatAgent接口包装任何现有代理,以实现与 AI Playground、代理评估和代理监控的开箱即用兼容性。
    • 类型化创作接口:使用类型化的 Python 类编写代理代码,受益于 IDE 和笔记本自动完成。
    • 自动签名推理:MLflow 在记录代理、简化注册和部署时自动推断 ChatAgent 签名。 请参阅在日志记录期间推断模型签名
    • AI 网关增强的推理表:自动为已部署的代理启用 AI 网关推理表,从而提供对详细请求日志元数据的访问权限。

若要了解如何创建 ChatAgent,请参阅以下部分中的示例,并 MLflow 文档 - 什么是 ChatAgent 接口

如果我已有代理,该怎么办?

如果已有使用 LangChain、LangGraph 或类似框架构建的代理,则无需重写代理以在 Databricks 上使用它。 只需使用 MLflow ChatAgent 接口包装现有代理即可:

  1. 编写继承自 mlflow.pyfunc.ChatAgent. 的 Python 包装类。

    在包装类中,将现有代理保留为属性 self.agent = your_existing_agent

  2. ChatAgent 类要求你实现一个处理非流请求的 predict 方法。

    predict 必须接受:

    • messages: list[ChatAgentMessage] 是一个包含多个角色的列表 ChatAgentMessage(例如“用户”或“助手”),每个角色都有一个提示和一个 ID。

    • (可选)context: Optional[ChatContext]custom_inputs: Optional[dict] 用于额外数据。

    import uuid
    
    # input example
    [
      ChatAgentMessage(
        id=str(uuid.uuid4()),  # Generate a unique ID for each message
        role="user",
        content="What's the weather in Paris?"
      )
    ]
    

    predict 必须返回一个 ChatAgentResponse

    import uuid
    
    # output example
    ChatAgentResponse(
      messages=[
        ChatAgentMessage(
          id=str(uuid.uuid4()),  # Generate a unique ID for each message
          role="assistant",
          content="It's sunny in Paris."
        )
      ]
    )
    
  3. 在格式之间转换

    predict中,将传入消息转换为 list[ChatAgentMessage] 代理所需的输入格式。

    代理生成响应后,将其输出转换为一个或多个 ChatAgentMessage 对象,并将其包装在一个 ChatAgentResponse中。

小窍门

自动转换 LangChain 输出

如果要包装 LangChain 代理,可以使用 mlflow.langchain.output_parsers.ChatAgentOutputParser 它自动将 LangChain 输出转换为 MLflow ChatAgentMessageChatAgentResponse 架构。

下面是用于转换代理的简化模板:

from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import ChatAgentMessage, ChatAgentResponse, ChatAgentChunk
import uuid


class MyWrappedAgent(ChatAgent):
  def __init__(self, agent):
    self.agent = agent

  def predict(self, messages, context=None, custom_inputs=None):
    # Convert messages to your agent's format
    agent_input = ... # build from messages
    agent_output = self.agent.invoke(agent_input)
    # Convert output to ChatAgentMessage
    return ChatAgentResponse(
      messages=[ChatAgentMessage(role="assistant", content=agent_output, id=str(uuid.uuid4()),)]
    )

  def predict_stream(self, messages, context=None, custom_inputs=None):
    # If your agent supports streaming
    for chunk in self.agent.stream(...):
      yield ChatAgentChunk(delta=ChatAgentMessage(role="assistant", content=chunk, id=str(uuid.uuid4())))

有关完整示例,请参阅以下部分中的笔记本。

ChatAgent 示例

以下笔记本演示如何使用常用库 OpenAI、LangGraph 和 AutoGen 编写流式和非流式处理的ChatAgents

LangGraph

如果要包装 LangChain 代理,可以使用 mlflow.langchain.output_parsers.ChatAgentOutputParser 它自动将 LangChain 输出转换为 MLflow ChatAgentMessageChatAgentResponse 架构。

LangGraph 工具调用代理

获取笔记本

开放人工智能

OpenAI 工具调用代理

获取笔记本

OpenAI 响应 API 工具调用代理

获取笔记本

仅限聊天的 OpenAI 代理

获取笔记本

AutoGen

AutoGen 工具调用代理

获取笔记本

DSPy

DSPy 聊天专用代理

获取笔记本

若要了解如何通过添加工具扩展这些代理的功能,请参阅 AI 代理工具

多代理示例

若要了解如何使用 Genie 创建多代理系统,请参阅 在多代理系统中使用 Genie

流式处理输出代理

流式处理代理以较小增量区块的连续流形式提供响应。 流式处理可减少感知到的延迟并改善会话代理的用户体验。

若要创作流式 ChatAgent,需要定义一个方法,该方法返回一个生成器,该生成器会生成 predict_stream 对象——每个 ChatAgentChunk 对象都包含响应的一部分。 在 ChatAgent中详细了解理想的 流式处理行为。

以下代码演示示例 predict_stream 函数,有关流式处理代理的完整示例,请参阅 ChatAgent 示例

def predict_stream(
  self,
  messages: list[ChatAgentMessage],
  context: Optional[ChatContext] = None,
  custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
  # Convert messages to a format suitable for your agent
  request = {"messages": self._convert_messages_to_dict(messages)}

  # Stream the response from your agent
  for event in self.agent.stream(request, stream_mode="updates"):
    for node_data in event.values():
      # Yield each chunk of the response
      yield from (
        ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
      )

为 Databricks 模型服务创作随时可部署的 ChatAgent

Databricks 在 Databricks 模型服务上的分布式环境中部署 ChatAgent,这意味着在多回合对话期间,相同的服务副本可能无法处理所有请求。 请注意管理代理状态的以下影响:

  • 避免本地缓存:在部署时,不要假设同一个实例会处理多轮对话中的所有请求。 使用字典 ChatAgentRequest 架构为每个回合重新构造内部状态。

  • 线程安全状态:将代理状态设计为线程安全,防止多线程环境中的冲突。

  • predict 函数中初始化状态:每次调用 函数时(而不是在 predict 初始化期间)初始化状态ChatAgent。 在 ChatAgent 级别存储状态可能会泄露对话之间的信息并导致冲突,因为单个 ChatAgent 副本可以处理来自多个会话的请求。

自定义输入和输出

某些方案可能需要其他代理输入,例如 client_typesession_id,或者检索源链接等输出,这些源链接不应包含在聊天历史记录中供将来交互使用。

对于这些方案,MLflow ChatAgent 原生支持字段 custom_inputscustom_outputs

警告

代理评估评审应用目前不支持为具有其他输入字段的代理呈现跟踪。

请参阅以下笔记本,了解如何设置自定义输入和输出。

OpenAI + PyFunc 自定义架构代理笔记本

获取笔记本

LangGraph 自定义架构代理笔记本

获取笔记本

在 AI 操场和代理评审应用中提供 custom_inputs

如果代理使用 custom_inputs 字段接受其他输入,则可以在 AI Playground代理评审应用中手动提供这些输入。

  1. 在 AI游乐场或代理评估应用中,选择齿轮图标齿轮图标。

  2. 启用自定义输入

  3. 提供与代理定义的输入架构匹配的 JSON 对象。

    在 AI 操场中提供 custom_inputs。

指定自定义检索器架构

AI 代理通常使用检索器从矢量搜索索引查找和查询非结构化数据。 有关检索器工具的示例,请参阅 非结构化数据的生成和跟踪检索器工具

使用 MLflow RETRIEVER 范围在代理中跟踪这些检索器,以启用 Databricks 产品功能,包括:

  • 在 AI Playground UI 中自动显示检索到的源文档的链接
  • 在代理评估中自动运行检索有据性和相关性判断

注意

Databricks 建议使用 Databricks AI Bridge 包提供的检索器工具,例如 databricks_langchain.VectorSearchRetrieverTooldatabricks_openai.VectorSearchRetrieverTool,因为它们已符合 MLflow 检索器架构。 请参阅使用 AI Bridge 在本地开发矢量搜索检索工具

如果代理包含使用自定义架构的检索器范围,请在代码中定义代理时调用 mlflow.models.set_retriever_schema。 这会将检索器的输出列映射到 MLflow 的预期字段(primary_keytext_columndoc_uri)。

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

注意

在评估检索器的性能时,该 doc_uri 列尤为重要。 doc_uri 是检索器返回的文档的主要标识符,使你可以将它们与地面真相评估集进行比较。 请参阅评估集(旧版)。

参数化代理代码,以便跨环境进行部署

可以参数化代理代码,以在不同的环境中重复使用相同的代理代码。

参数是在 Python 字典或 .yaml 文件中定义的键值对。

若要配置代码,请使用 Python 字典或 ModelConfig文件创建 .yaml ModelConfig 是一组键值参数,用于灵活配置管理。 例如,可以在开发期间使用字典,然后将其转换为生产部署和 CI/CD .yaml 文件。

有关 ModelConfig的详细信息,请参阅 MLflow 文档

示例 ModelConfig 如以下所示:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

在代理代码中,可以从 .yaml 文件或字典引用默认(开发)配置:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

然后,在记录代理时,将 model_config 参数指定为 log_model,以便在加载已记录的代理时使用自定义参数集。 请参阅 MLflow 文档 - ModelConfig

流式处理错误传播

在使用 databricks_output.error 下的最后一个标记进行流式处理时,Mosaic AI 传播遇到的任何错误。 由调用客户端来正确处理和显示此错误。

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

后续步骤