通过跟踪 GenAI 应用程序的执行环境和应用程序版本,可以针对代码相关的性能和质量问题进行调试。 此元数据允许:
- 在、
development
和staging
之间进行production
- 跨应用版本的性能/质量跟踪和回归检测
- 问题发生时更快地进行根本原因分析
MLflow 使用 元数据 (键值对)来存储有关跟踪的上下文信息。
注释
有关版本控制工作原理的全面概述,请参阅 版本跟踪。
自动填充的元数据
这些标准元数据字段由 MLflow 根据执行环境自动捕获。
重要
如果自动捕获逻辑不符合要求,则可以使用 mlflow.update_current_trace(metadata={"mlflow.source.name": "custom_name"})
手动替代这些自动填充的元数据。
类别 | 元数据字段 | DESCRIPTION | 自动设置逻辑 |
---|---|---|---|
执行环境 | mlflow.source.name |
生成跟踪的入口点或脚本。 | 自动使用 Python 脚本的文件名和 Databricks/Jupyter 笔记本的名称进行填充。 |
mlflow.source.git.commit |
Git 提交哈希。 | 如果从 Git 存储库运行,则会自动检测并填充提交哈希。 | |
mlflow.source.git.branch |
Git 分支。 | 如果从 Git 存储库运行,则会自动检测和填充当前分支名称。 | |
mlflow.source.git.repoURL |
Git 存储库 URL。 | 如果从 Git 存储库运行,则会自动检测并填充存储库 URL。 | |
mlflow.source.type |
捕获执行环境。 | 在 Jupyter 或 Databricks 笔记本中运行时,自动设置为 NOTEBOOK ;在本地 Python 脚本中运行时,为 LOCAL ;否则为 UNKNOWN (自动检测到)。在已部署的应用中,我们建议根据环境(例如, PRODUCTION STAGING 等)更新此变量。 |
|
应用程序版本 | metadata.mlflow.modelId |
MLflow LoggedModel 的 ID。 | 自动设置为环境变量 MLFLOW_ACTIVE_MODEL_ID 中的模型 ID 值或通过 mlflow.set_active_model() 函数设置的模型 ID 值。 |
自定义自动填充的元数据
可以通过mlflow.update_current_trace()
覆盖任何自动填充的元数据字段。 当自动检测不符合要求或想要添加其他上下文时,这非常有用:
import mlflow
import os
# We suggest populating metadata from environment variables rather than hard coding the values
@mlflow.trace
def my_app(user_question: str) -> dict:
# Override automatically populated metadata and add custom context
mlflow.update_current_trace(
metadata={
# Use any of the keys from above
"mlflow.source.type": current_env = os.getenv("APP_ENVIRONMENT", "development"), # Override default LOCAL/NOTEBOOK
}
)
# Application logic
return {"response": user_question + "!!"}
my_app("test")
完全自定义元数据
可以附加 自定义元数据 来捕获任何特定于应用程序的上下文。 有关附加自定义元数据的更多详细信息,请参阅 “附加自定义元数据/标记”。
例如,你可能想要附加如下信息:
-
app_version
:例如("1.0.0"
来自APP_VERSION
环境变量) -
deployment_id
:例如("deploy-abc-123"
来自DEPLOYMENT_ID
环境变量) -
region
:例如("us-east-1"
来自REGION
环境变量) - (还可以添加其他自定义标记(如功能标志)
import mlflow
import os
# We suggest populating metadata from environment variables rather than hard coding the values
@mlflow.trace
def my_app(user_question: str) -> dict:
# Override automatically populated metadata and add custom context
mlflow.update_current_trace(
metadata={
# Use any key
"app_version": os.getenv("APP_VERSION", "development")
}
)
# Application logic
return {"response": user_question + "!!"}
my_app("test")
生产 Web 应用程序示例
在生产 FastAPI 应用程序中,上下文可以派生自环境变量、请求标头或应用程序配置。 以下示例改编自 “使用跟踪的生产可观测性 ”指南,并演示如何捕获各种上下文类型:
import mlflow
import os
from fastapi import FastAPI, Request, HTTPException # HTTPException might be needed depending on full app logic
from pydantic import BaseModel
# Initialize FastAPI app
app = FastAPI()
class ChatRequest(BaseModel):
message: str
@mlflow.trace # Ensure @mlflow.trace is the outermost decorator
@app.post("/chat") # FastAPI decorator should be inner
def handle_chat(request: Request, chat_request: ChatRequest):
# Retrieve all context from request headers
client_request_id = request.headers.get("X-Request-ID")
session_id = request.headers.get("X-Session-ID")
user_id = request.headers.get("X-User-ID")
# Update the current trace with all context and environment metadata
# The @mlflow.trace decorator ensures an active trace is available
mlflow.update_current_trace(
client_request_id=client_request_id,
metadata={
# Session context - groups traces from multi-turn conversations
"mlflow.trace.session": session_id,
# User context - associates traces with specific users
"mlflow.trace.user": user_id,
# Override automatically popoulated environment metadata
"mlflow.source.type": os.getenv("APP_ENVIRONMENT", "development"), # Override default LOCAL/NOTEBOOK
# Add customer environment metadata
"environment": "production",
"app_version": os.getenv("APP_VERSION", "1.0.0"),
"deployment_id": os.getenv("DEPLOYMENT_ID", "unknown"),
"region": os.getenv("REGION", "us-east-1")
}
)
# --- Your application logic for processing the chat message ---
# For example, calling a language model with context
# response_text = my_llm_call(
# message=chat_request.message,
# session_id=session_id,
# user_id=user_id
# )
response_text = f"Processed message: '{chat_request.message}'"
# --- End of application logic ---
# Return response
return {
"response": response_text
}
# To run this example (requires uvicorn and fastapi):
# uvicorn your_file_name:app --reload
#
# Example curl request with context headers:
# curl -X POST "http://127.0.0.1:8000/chat" \
# -H "Content-Type: application/json" \
# -H "X-Request-ID: req-abc-123-xyz-789" \
# -H "X-Session-ID: session-def-456-uvw-012" \
# -H "X-User-ID: user-jane-doe-12345" \
# -d '{"message": "What is my account balance?"}'
查询和分析上下文数据
使用 MLflow UI
在 MLflow UI (跟踪选项卡)中,可以查看附加的元数据:
编程分析
使用 MLflow SDK 进行更复杂的分析或与其他工具集成:
from mlflow.client import MlflowClient
client = MlflowClient()
# Example 1: Compare error rates across app versions in production
def compare_version_error_rates(experiment_id: str, versions: list):
error_rates = {}
for version in versions:
traces = client.search_traces(
filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
)
if not traces:
error_rates[version] = None # Or 0 if no traces means no errors
continue
error_count = sum(1 for t in traces if t.info.status == "ERROR")
error_rates[version] = (error_count / len(traces)) * 100
return error_rates
# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)
# Example 2: Analyze performance for a specific feature flag
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
control_latency = []
treatment_latency = []
control_traces = client.search_traces(
filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
# extract_fields=["execution_time_ms"] # Not a real field, use span attributes if needed
)
for t in control_traces: control_latency.append(t.info.execution_time_ms)
treatment_traces = client.search_traces(
experiment_ids=[experiment_id],
filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
)
for t in treatment_traces: treatment_latency.append(t.info.execution_time_ms)
avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0
return {
f"avg_latency_{flag_name}_off": avg_control_latency,
f"avg_latency_{flag_name}_on": avg_treatment_latency
}
# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)
后续步骤
继续您的旅程,并参考这些推荐的行动和教程。
- 跟踪用户和会话 - 向跟踪添加以用户为中心的可观测性
- 附加自定义标记/元数据 - 了解有关使用上下文扩充跟踪的更多方法
- 生产可观测性与跟踪 - 在生产环境中部署全面的跟踪
参考指南
浏览本指南中提到的概念和功能的详细文档。
- 跟踪数据模型 - 了解元数据及其存储在跟踪中的方式
- 应用版本跟踪概念 - 了解版本控制策略
- 通过 SDK 查询跟踪 - 使用元数据筛选器进行高级查询