跟踪版本和环境

通过跟踪 GenAI 应用程序的执行环境和应用程序版本,可以针对代码相关的性能和质量问题进行调试。 此元数据允许:

  • developmentstaging之间进行production
  • 跨应用版本的性能/质量跟踪和回归检测
  • 问题发生时更快地进行根本原因分析

MLflow 使用 元数据 (键值对)来存储有关跟踪的上下文信息。

注释

有关版本控制工作原理的全面概述,请参阅 版本跟踪

自动填充的元数据

这些标准元数据字段由 MLflow 根据执行环境自动捕获。

重要

如果自动捕获逻辑不符合要求,则可以使用 mlflow.update_current_trace(metadata={"mlflow.source.name": "custom_name"})手动替代这些自动填充的元数据。

类别 元数据字段 DESCRIPTION 自动设置逻辑
执行环境 mlflow.source.name 生成跟踪的入口点或脚本。 自动使用 Python 脚本的文件名和 Databricks/Jupyter 笔记本的名称进行填充。
mlflow.source.git.commit Git 提交哈希。 如果从 Git 存储库运行,则会自动检测并填充提交哈希。
mlflow.source.git.branch Git 分支。 如果从 Git 存储库运行,则会自动检测和填充当前分支名称。
mlflow.source.git.repoURL Git 存储库 URL。 如果从 Git 存储库运行,则会自动检测并填充存储库 URL。
mlflow.source.type 捕获执行环境。 在 Jupyter 或 Databricks 笔记本中运行时,自动设置为 NOTEBOOK;在本地 Python 脚本中运行时,为 LOCAL;否则为 UNKNOWN(自动检测到)。
在已部署的应用中,我们建议根据环境(例如,PRODUCTIONSTAGING等)更新此变量。
应用程序版本 metadata.mlflow.modelId MLflow LoggedModel 的 ID。 自动设置为环境变量 MLFLOW_ACTIVE_MODEL_ID 中的模型 ID 值或通过 mlflow.set_active_model() 函数设置的模型 ID 值。

自定义自动填充的元数据

可以通过mlflow.update_current_trace()覆盖任何自动填充的元数据字段。 当自动检测不符合要求或想要添加其他上下文时,这非常有用:

import mlflow
import os

# We suggest populating metadata from environment variables rather than hard coding the values

@mlflow.trace
def my_app(user_question: str) -> dict:
    # Override automatically populated metadata and add custom context
    mlflow.update_current_trace(
        metadata={
            # Use any of the keys from above
            "mlflow.source.type": current_env = os.getenv("APP_ENVIRONMENT", "development"),  # Override default LOCAL/NOTEBOOK
        }
    )

    # Application logic

    return {"response": user_question + "!!"}

my_app("test")

完全自定义元数据

可以附加 自定义元数据 来捕获任何特定于应用程序的上下文。 有关附加自定义元数据的更多详细信息,请参阅 “附加自定义元数据/标记”。

例如,你可能想要附加如下信息:

  • app_version:例如( "1.0.0" 来自 APP_VERSION 环境变量)
  • deployment_id:例如( "deploy-abc-123" 来自 DEPLOYMENT_ID 环境变量)
  • region:例如( "us-east-1" 来自 REGION 环境变量)
  • (还可以添加其他自定义标记(如功能标志)
import mlflow
import os

# We suggest populating metadata from environment variables rather than hard coding the values

@mlflow.trace
def my_app(user_question: str) -> dict:
    # Override automatically populated metadata and add custom context
    mlflow.update_current_trace(
        metadata={
            # Use any key
            "app_version": os.getenv("APP_VERSION", "development")
        }
    )

    # Application logic

    return {"response": user_question + "!!"}

my_app("test")

生产 Web 应用程序示例

在生产 FastAPI 应用程序中,上下文可以派生自环境变量、请求标头或应用程序配置。 以下示例改编自 “使用跟踪的生产可观测性 ”指南,并演示如何捕获各种上下文类型:

import mlflow
import os
from fastapi import FastAPI, Request, HTTPException # HTTPException might be needed depending on full app logic
from pydantic import BaseModel

# Initialize FastAPI app
app = FastAPI()

class ChatRequest(BaseModel):
    message: str

@mlflow.trace # Ensure @mlflow.trace is the outermost decorator
@app.post("/chat") # FastAPI decorator should be inner
def handle_chat(request: Request, chat_request: ChatRequest):
    # Retrieve all context from request headers
    client_request_id = request.headers.get("X-Request-ID")
    session_id = request.headers.get("X-Session-ID")
    user_id = request.headers.get("X-User-ID")

    # Update the current trace with all context and environment metadata
    # The @mlflow.trace decorator ensures an active trace is available
    mlflow.update_current_trace(
        client_request_id=client_request_id,
        metadata={
            # Session context - groups traces from multi-turn conversations
            "mlflow.trace.session": session_id,
            # User context - associates traces with specific users
            "mlflow.trace.user": user_id,
            # Override automatically popoulated environment metadata
            "mlflow.source.type": os.getenv("APP_ENVIRONMENT", "development"),  # Override default LOCAL/NOTEBOOK
            # Add customer environment metadata
            "environment": "production",
            "app_version": os.getenv("APP_VERSION", "1.0.0"),
            "deployment_id": os.getenv("DEPLOYMENT_ID", "unknown"),
            "region": os.getenv("REGION", "us-east-1")
        }
    )

    # --- Your application logic for processing the chat message ---
    # For example, calling a language model with context
    # response_text = my_llm_call(
    #     message=chat_request.message,
    #     session_id=session_id,
    #     user_id=user_id
    # )
    response_text = f"Processed message: '{chat_request.message}'"
    # --- End of application logic ---

    # Return response
    return {
        "response": response_text
    }

# To run this example (requires uvicorn and fastapi):
# uvicorn your_file_name:app --reload
#
# Example curl request with context headers:
# curl -X POST "http://127.0.0.1:8000/chat" \
#      -H "Content-Type: application/json" \
#      -H "X-Request-ID: req-abc-123-xyz-789" \
#      -H "X-Session-ID: session-def-456-uvw-012" \
#      -H "X-User-ID: user-jane-doe-12345" \
#      -d '{"message": "What is my account balance?"}'

查询和分析上下文数据

使用 MLflow UI

在 MLflow UI (跟踪选项卡)中,可以查看附加的元数据:

跟踪元数据

编程分析

使用 MLflow SDK 进行更复杂的分析或与其他工具集成:

from mlflow.client import MlflowClient

client = MlflowClient()

# Example 1: Compare error rates across app versions in production
def compare_version_error_rates(experiment_id: str, versions: list):
    error_rates = {}
    for version in versions:
        traces = client.search_traces(
            filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
        )
        if not traces:
            error_rates[version] = None # Or 0 if no traces means no errors
            continue

        error_count = sum(1 for t in traces if t.info.status == "ERROR")
        error_rates[version] = (error_count / len(traces)) * 100
    return error_rates

# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)

# Example 2: Analyze performance for a specific feature flag
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
    control_latency = []
    treatment_latency = []

    control_traces = client.search_traces(
        filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
        # extract_fields=["execution_time_ms"] # Not a real field, use span attributes if needed
    )
    for t in control_traces: control_latency.append(t.info.execution_time_ms)

    treatment_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
    )
    for t in treatment_traces: treatment_latency.append(t.info.execution_time_ms)

    avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
    avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0

    return {
        f"avg_latency_{flag_name}_off": avg_control_latency,
        f"avg_latency_{flag_name}_on": avg_treatment_latency
    }

# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)

后续步骤

继续您的旅程,并参考这些推荐的行动和教程。

参考指南

浏览本指南中提到的概念和功能的详细文档。