手动跟踪

虽然 MLflow 的 自动跟踪 为支持的框架提供即时可观测性,但手动跟踪可让你完全控制 GenAI 应用程序的检测方式。 这种灵活性对于生成需要详细监视和调试功能的生产就绪应用程序至关重要。

先决条件

MLflow 3

本指南需要以下包:

  • mlflow[databricks]>=3.1:具有 GenAI 功能和 Databricks 连接的核心 MLflow 功能。
  • openai>=1.0.0:(可选)仅当自定义代码与 OpenAI 交互时;根据需要替换为其他 SDK。

安装基本要求:

%pip install --upgrade "mlflow[databricks]>=3.1"
# %pip install --upgrade openai>=1.0.0 # Install if needed

MLflow 2.x

本指南需要以下包:

  • mlflow[databricks]>=2.15.0,3.0.0<:具有 Databricks 连接的核心 MLflow 功能。
  • openai>=1.0.0:(可选)仅当自定义代码与 OpenAI 交互时。

安装基本要求:

%pip install --upgrade "mlflow[databricks]>=2.15.0,<3.0.0"
# pip install --upgrade openai>=1.0.0 # Install if needed

注释

虽然 MLflow 2.15.0+ 中提供了手动跟踪功能,但强烈建议安装 MLflow 3(特别是 3.1 或更高版本(如果使用mlflow[databricks])以获取最新的 GenAI 功能,包括扩展跟踪功能、优化的范围类型、改进的上下文传播和可靠的支持。

小窍门

在 Databricks Notebook 中运行? MLflow 已预安装。 仅当手动跟踪的代码使用它们时,才需要安装其他 SDK。

在本地运行? 必须安装mlflow[databricks]以及代码调用的任何其他SDK。

何时使用手动跟踪

如果需要,手动跟踪是正确的选择:

Fine-Grained 控件

自定义跟踪结构

  • 准确定义要跟踪的代码部分
  • 创建自定义跨度层次结构
  • 控制范围边界和关系

示例用例:跟踪要单独测量检索与生成延迟的 RAG 管道中的特定业务逻辑。

自定义框架

不支持的库

  • 检测专有框架或内部框架
  • 将跟踪功能添加到自定义 LLM 封装器
  • 在正式集成之前支持新库

示例用例:将跟踪添加到公司内部 LLM 网关或自定义代理框架。

高级场景

复杂工作流

  • 多线程或异步操作
  • 使用自定义聚合进行流式处理响应
  • 复杂的嵌套操作
  • 自定义跟踪元数据和属性

示例用例:跟踪多代理系统,其中代理使用自定义业务逻辑执行复杂的工作流。

手动跟踪方法

MLflow 为手动跟踪提供了三个级别的抽象,每个抽象级别都适合不同的用例:

高级 API 提供了一种直观的方式来添加跟踪,只需更改最少的代码即可。 这些系统会自动处理追踪生命周期、异常跟踪和父子关系。

装饰者

最适合:使用最少的代码更改进行函数级跟踪

import mlflow
from mlflow.entities import SpanType

@mlflow.trace(span_type=SpanType.CHAIN)
def process_request(query: str) -> str:
    # Your code here - automatically traced!
    result = generate_response(query)
    return result

@mlflow.trace(span_type=SpanType.LLM)
def generate_response(query: str) -> str:
    # Nested function - parent-child relationship handled automatically
    return llm.invoke(query)

主要优势

  • 单行检测
  • 自动异常处理
  • 适用于异步/生成器函数
  • 与自动跟踪兼容

了解更多关于修饰器的内容→

上下文管理器

最适合:跟踪代码块和复杂工作流

import mlflow

with mlflow.start_span(name="data_processing") as span:
    # Set inputs at the start
    span.set_inputs({"query": query, "filters": filters})

    # Your processing logic
    data = fetch_data(query, filters)
    processed = transform_data(data)

    # Set outputs before exiting
    span.set_outputs({"count": len(processed), "status": "success"})

主要优势

  • 灵活的跨距界限
  • 动态输入/输出设置
  • 对跨度生命周期进行精细控制
  • 适用于非函数代码块

详细了解上下文管理器→

2. Low-Level 客户端 API (高级)

对于需要完全控制跟踪生命周期的方案,客户端 API 提供对 MLflow 跟踪后端的直接访问。

from mlflow import MlflowClient

client = MlflowClient()

# Start a trace
root_span = client.start_trace("complex_workflow")

# Create child spans with explicit parent relationships
child_span = client.start_span(
    name="data_retrieval",
    request_id=root_span.request_id,
    parent_id=root_span.span_id,
    inputs={"query": query}
)

# End spans explicitly
client.end_span(
    request_id=child_span.request_id,
    span_id=child_span.span_id,
    outputs={"documents": documents}
)

# End the trace
client.end_trace(request_id=root_span.request_id)

何时使用客户端 API

  • 自定义追踪 ID 管理
  • 与现有可观测性系统集成
  • 复杂的跟踪生命周期要求
  • 非标准跟踪模式

警告

客户端 API 需要手动管理:

  • 父子关系
  • 跨生命周期(开始/结束)
  • 异常处理
  • 线程安全性

详细了解客户端 API →

API 比较

功能 / 特点 装饰者 上下文管理器 客户端 API
自动父子级 是的 是的 否 - 手动管理
异常处理 自动 自动 手动
适用于自动跟踪 是的 是的
线程安全性 自动 自动 手动
自定义跟踪 ID 是的
最适合 函数跟踪 代码块跟踪 高级控制

常见模式

与自动跟踪相结合

手动跟踪与 MLflow 的自动跟踪功能无缝集成:

import mlflow
import openai

# Enable auto-tracing for OpenAI
mlflow.openai.autolog()

@mlflow.trace(span_type="CHAIN")
def rag_pipeline(query: str):
    # Manual span for retrieval
    with mlflow.start_span(name="retrieval") as span:
        docs = retrieve_documents(query)
        span.set_outputs({"doc_count": len(docs)})

    # Auto-traced OpenAI call
    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Answer based on: {docs}\n\nQuery: {query}"}]
    )

    return response.choices[0].message.content

复杂的工作流跟踪

对于具有多个步骤的复杂工作流,请使用嵌套跨度来捕获详细的执行流:

@mlflow.trace(name="data_pipeline")
def process_data_pipeline(data_source: str):
    # Extract phase
    with mlflow.start_span(name="extract") as extract_span:
        raw_data = extract_from_source(data_source)
        extract_span.set_outputs({"record_count": len(raw_data)})

    # Transform phase
    with mlflow.start_span(name="transform") as transform_span:
        transformed = apply_transformations(raw_data)
        transform_span.set_outputs({"transformed_count": len(transformed)})

    # Load phase
    with mlflow.start_span(name="load") as load_span:
        result = load_to_destination(transformed)
        load_span.set_outputs({"status": "success"})

    return result

在 UI 中自定义请求和响应预览

MLflow UI 在“追踪”选项卡中提供了 RequestResponse 列,显示总体追踪输入和输出的预览。 默认情况下,这些内容将被截断。 使用手动跟踪时,可以使用 @mlflow.trace 自定义这些预览,尤其是在使用 mlflow.update_current_trace() 创建跟踪根范围的修饰器或上下文管理器时。

这对于复杂数据结构非常有用,其中默认预览可能没有信息。

import mlflow
import openai # Assuming openai is used, replace if not

# This example assumes you have an OpenAI client initialized and API key set up.
# client = openai.OpenAI()

@mlflow.trace
def predict(messages: list[dict]) -> str:
    # Customize the request preview to show the first and last messages
    custom_preview = f'{messages[0]["content"][:10]} ... {messages[-1]["content"][:10]}'
    mlflow.update_current_trace(request_preview=custom_preview)

    # Call the model
    # response = openai.chat.completions.create(
    #     model="gpt-4o-mini",
    #     messages=messages,
    # )
    # return response.choices[0].message.content
    return f"Response based on {len(messages)} messages."

messages = [
    {"role": "user", "content": "Hi, how are you?"},
    {"role": "assistant", "content": "I'm good, thank you!"},
    {"role": "user", "content": "What's your name?"},
    # ... (long message history)
    {"role": "assistant", "content": "Bye!"},
]
predict(messages)

这样,你可以定制预览,以便为特定数据结构提供更丰富的信息。

后续步骤

继续您的旅程,并参考这些推荐的行动和教程。

参考指南

浏览本指南中提到的概念和功能的详细文档。

小窍门

大多数用户都应从高级 API(修饰器和上下文管理器)开始。 它们提供易于使用和功能的最佳平衡,同时保持与 MLflow 生态系统的兼容性。