本页演示如何使用马赛克 AI 代理框架和常用的代理创作库(如 LangGraph、PyFunc 和 OpenAI)在 Python 中创作 AI 代理。
要求
小窍门
Databricks 建议在开发代理时安装最新版本的 MLflow Python 客户端。
若要使用此页上的方法创作和部署代理,请安装以下内容:
-
databricks-agents
0.16.0 或更高版本 -
mlflow
2.20.2 或更高版本 - Python 3.10 或更高版本。
- 若要满足此要求,可以使用无服务器计算或 Databricks Runtime 13.3 LTS 或更高版本。
%pip install -U -qqqq databricks-agents>=0.16.0 mlflow>=2.20.2
Databricks 还建议在创作代理时安装 Databricks AI Bridge 集成包。 这些集成包(如 databricks-langchain
、databricks-openai
)提供了一个共享的 API 层,用于与 Databricks AI 功能(如 Databricks AI/BI Genie 和矢量搜索)进行交互,这些 API 跨代理创作框架和 SDK。
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
开放人工智能
%pip install -U -qqqq databricks-openai
完全用 Python 编写的代理
%pip install -U -qqqq databricks-ai-bridge
使用 ChatAgent
来创作代理
Databricks 建议使用 MLflow ChatAgent
接口制作生产级代理。 此聊天架构规范类似于 OpenAI ChatCompletion
架构,但不严格兼容。
ChatAgent
提供以下优势:
高级代理功能
- 多代理支持
- 流式输出:通过将输出分成较小的块进行流式传输,以实现互动式用户体验。
- 全面的工具呼叫消息历史记录:返回多个消息,包括中间工具呼叫消息,以提高质量和聊天管理。
- 工具调用确认支持
简化的开发、部署和监视
-
使用任何框架编写代理:使用
ChatAgent
接口包装任何现有代理,以实现与 AI Playground、代理评估和代理监控的开箱即用兼容性。 - 类型化创作接口:使用类型化的 Python 类编写代理代码,受益于 IDE 和笔记本自动完成。
-
自动签名推理:MLflow 在记录代理、简化注册和部署时自动推断
ChatAgent
签名。 请参阅在日志记录期间推断模型签名。 - AI 网关增强的推理表:自动为已部署的代理启用 AI 网关推理表,从而提供对详细请求日志元数据的访问权限。
-
使用任何框架编写代理:使用
若要了解如何创建 ChatAgent
,请参阅以下部分中的示例,并 MLflow 文档 - 什么是 ChatAgent 接口。
如果我已有代理,该怎么办?
如果已有使用 LangChain、LangGraph 或类似框架构建的代理,则无需重写代理以在 Databricks 上使用它。 只需使用 MLflow ChatAgent
接口包装现有代理即可:
编写继承自
mlflow.pyfunc.ChatAgent
. 的 Python 包装类。在包装类中,将现有代理保留为属性
self.agent = your_existing_agent
。该
ChatAgent
类要求你实现一个处理非流请求的predict
方法。predict
必须接受:messages: list[ChatAgentMessage]
是一个包含多个角色的列表ChatAgentMessage
(例如“用户”或“助手”),每个角色都有一个提示和一个 ID。(可选)
context: Optional[ChatContext]
和custom_inputs: Optional[dict]
用于额外数据。
import uuid # input example [ ChatAgentMessage( id=str(uuid.uuid4()), # Generate a unique ID for each message role="user", content="What's the weather in Paris?" ) ]
predict
必须返回一个ChatAgentResponse
。import uuid # output example ChatAgentResponse( messages=[ ChatAgentMessage( id=str(uuid.uuid4()), # Generate a unique ID for each message role="assistant", content="It's sunny in Paris." ) ] )
在格式之间转换
在
predict
中,将传入消息转换为list[ChatAgentMessage]
代理所需的输入格式。代理生成响应后,将其输出转换为一个或多个
ChatAgentMessage
对象,并将其包装在一个ChatAgentResponse
中。
小窍门
自动转换 LangChain 输出
如果要包装 LangChain 代理,可以使用 mlflow.langchain.output_parsers.ChatAgentOutputParser
它自动将 LangChain 输出转换为 MLflow ChatAgentMessage
和 ChatAgentResponse
架构。
下面是用于转换代理的简化模板:
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import ChatAgentMessage, ChatAgentResponse, ChatAgentChunk
import uuid
class MyWrappedAgent(ChatAgent):
def __init__(self, agent):
self.agent = agent
def predict(self, messages, context=None, custom_inputs=None):
# Convert messages to your agent's format
agent_input = ... # build from messages
agent_output = self.agent.invoke(agent_input)
# Convert output to ChatAgentMessage
return ChatAgentResponse(
messages=[ChatAgentMessage(role="assistant", content=agent_output, id=str(uuid.uuid4()),)]
)
def predict_stream(self, messages, context=None, custom_inputs=None):
# If your agent supports streaming
for chunk in self.agent.stream(...):
yield ChatAgentChunk(delta=ChatAgentMessage(role="assistant", content=chunk, id=str(uuid.uuid4())))
有关完整示例,请参阅以下部分中的笔记本。
ChatAgent
示例
以下笔记本演示如何使用常用库 OpenAI、LangGraph 和 AutoGen 编写流式和非流式处理的ChatAgents
。
LangGraph
如果要包装 LangChain 代理,可以使用 mlflow.langchain.output_parsers.ChatAgentOutputParser
它自动将 LangChain 输出转换为 MLflow ChatAgentMessage
和 ChatAgentResponse
架构。
LangGraph 工具调用代理
开放人工智能
OpenAI 工具调用代理
OpenAI 响应 API 工具调用代理
仅限聊天的 OpenAI 代理
AutoGen
AutoGen 工具调用代理
DSPy
DSPy 聊天专用代理
若要了解如何通过添加工具扩展这些代理的功能,请参阅 AI 代理工具。
多代理示例
若要了解如何使用 Genie 创建多代理系统,请参阅 在多代理系统中使用 Genie。
流式处理输出代理
流式处理代理以较小增量区块的连续流形式提供响应。 流式处理可减少感知到的延迟并改善会话代理的用户体验。
若要创作流式 ChatAgent
,需要定义一个方法,该方法返回一个生成器,该生成器会生成 predict_stream
对象——每个 ChatAgentChunk
对象都包含响应的一部分。 在 ChatAgent
中详细了解理想的 流式处理行为。
以下代码演示示例 predict_stream
函数,有关流式处理代理的完整示例,请参阅 ChatAgent 示例:
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
# Convert messages to a format suitable for your agent
request = {"messages": self._convert_messages_to_dict(messages)}
# Stream the response from your agent
for event in self.agent.stream(request, stream_mode="updates"):
for node_data in event.values():
# Yield each chunk of the response
yield from (
ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
)
为 Databricks 模型服务创作随时可部署的 ChatAgent
Databricks 在 Databricks 模型服务上的分布式环境中部署 ChatAgent
,这意味着在多回合对话期间,相同的服务副本可能无法处理所有请求。 请注意管理代理状态的以下影响:
避免本地缓存:在部署时,不要假设同一个实例会处理多轮对话中的所有请求。 使用字典
ChatAgentRequest
架构为每个回合重新构造内部状态。线程安全状态:将代理状态设计为线程安全,防止多线程环境中的冲突。
在
predict
函数中初始化状态:每次调用 函数时(而不是在predict
初始化期间)初始化状态ChatAgent
。 在ChatAgent
级别存储状态可能会泄露对话之间的信息并导致冲突,因为单个ChatAgent
副本可以处理来自多个会话的请求。
自定义输入和输出
某些方案可能需要其他代理输入,例如 client_type
和 session_id
,或者检索源链接等输出,这些源链接不应包含在聊天历史记录中供将来交互使用。
对于这些方案,MLflow ChatAgent
原生支持字段 custom_inputs
和 custom_outputs
。
警告
代理评估评审应用目前不支持为具有其他输入字段的代理呈现跟踪。
请参阅以下笔记本,了解如何设置自定义输入和输出。
OpenAI + PyFunc 自定义架构代理笔记本
LangGraph 自定义架构代理笔记本
在 AI 操场和代理评审应用中提供 custom_inputs
如果代理使用 custom_inputs
字段接受其他输入,则可以在 AI Playground 和 代理评审应用中手动提供这些输入。
在 AI游乐场或代理评估应用中,选择齿轮图标
启用自定义输入。
提供与代理定义的输入架构匹配的 JSON 对象。
指定自定义检索器架构
AI 代理通常使用检索器从矢量搜索索引查找和查询非结构化数据。 有关检索器工具的示例,请参阅 非结构化数据的生成和跟踪检索器工具。
使用 MLflow RETRIEVER 范围在代理中跟踪这些检索器,以启用 Databricks 产品功能,包括:
- 在 AI Playground UI 中自动显示检索到的源文档的链接
- 在代理评估中自动运行检索有据性和相关性判断
注意
Databricks 建议使用 Databricks AI Bridge 包提供的检索器工具,例如 databricks_langchain.VectorSearchRetrieverTool
和 databricks_openai.VectorSearchRetrieverTool
,因为它们已符合 MLflow 检索器架构。 请参阅使用 AI Bridge 在本地开发矢量搜索检索工具。
如果代理包含使用自定义架构的检索器范围,请在代码中定义代理时调用 mlflow.models.set_retriever_schema。 这会将检索器的输出列映射到 MLflow 的预期字段(primary_key
、text_column
、doc_uri
)。
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
注意
在评估检索器的性能时,该 doc_uri
列尤为重要。
doc_uri
是检索器返回的文档的主要标识符,使你可以将它们与地面真相评估集进行比较。 请参阅评估集(旧版)。
参数化代理代码,以便跨环境进行部署
可以参数化代理代码,以在不同的环境中重复使用相同的代理代码。
参数是在 Python 字典或 .yaml
文件中定义的键值对。
若要配置代码,请使用 Python 字典或 ModelConfig
文件创建 .yaml
。
ModelConfig
是一组键值参数,用于灵活配置管理。 例如,可以在开发期间使用字典,然后将其转换为生产部署和 CI/CD .yaml
文件。
有关 ModelConfig
的详细信息,请参阅 MLflow 文档。
示例 ModelConfig
如以下所示:
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
在代理代码中,可以从 .yaml
文件或字典引用默认(开发)配置:
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
然后,在记录代理时,将 model_config
参数指定为 log_model
,以便在加载已记录的代理时使用自定义参数集。 请参阅 MLflow 文档 - ModelConfig。
流式处理错误传播
在使用 databricks_output.error
下的最后一个标记进行流式处理时,Mosaic AI 传播遇到的任何错误。 由调用客户端来正确处理和显示此错误。
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}