Share via


Track versions & environments

Tracking the execution environment and application version of your GenAI application allows you to debug performance and quality issues relative to the code. This metadata enables:

  • Environment-specific analysis across development, staging, and production
  • Performance/quality tracking and regression detection across app versions
  • Faster root cause analysis when issues occur

MLflow uses metadata (key-value pairs) to store contextual information on traces.

Note

For a comprehensive overview of how versioning works, see Version Tracking.

Automatically Populated Metadata

These standard metadata fields are automatically captured by MLflow based on your execution environment.

Important

If the automatic capture logic does not meet your requirements, you can override these automatically populated metadata manually using mlflow.update_current_trace(metadata={"mlflow.source.name": "custom_name"}).

Category Metadata Field Description Automatic Setting Logic
Execution environment mlflow.source.name The entry point or script that generated the trace. Automatically populated with the filename for Python scripts, notebook name for Databricks/Jupyter notebooks.
mlflow.source.git.commit Git commit hash. If run from a Git repository, the commit hash is automatically detected and populated.
mlflow.source.git.branch Git branch. If run from a Git repository, the current branch name is automatically detected and populated.
mlflow.source.git.repoURL Git repo URL. If run from a Git repository, the repository URL is automatically detected and populated.
mlflow.source.type Captures the execution environment. Automatically set to NOTEBOOK if running in Jupyter or Databricks notebook, LOCAL if running a local Python script, else UNKNOWN (automatically detected).
In your deployed app, we suggest updating this variable based on the environment e.g., PRODUCTION, STAGING, etc.
Application version metadata.mlflow.modelId MLflow LoggedModel ID. Automatically set to the model ID value in the environment variable MLFLOW_ACTIVE_MODEL_ID or the model ID set via mlflow.set_active_model() function.

Customizing automatically populated metadata

You can override any of the automatically populated metadata fields using mlflow.update_current_trace(). This is useful when the automatic detection doesn't meet your requirements or when you want to add additional context:

import mlflow
import os

# We suggest populating metadata from environment variables rather than hard coding the values

@mlflow.trace
def my_app(user_question: str) -> dict:
    # Override automatically populated metadata and add custom context
    mlflow.update_current_trace(
        metadata={
            # Use any of the keys from above
            "mlflow.source.type": current_env = os.getenv("APP_ENVIRONMENT", "development"),  # Override default LOCAL/NOTEBOOK
        }
    )

    # Application logic

    return {"response": user_question + "!!"}

my_app("test")

Fully custom metadata

You can attach custom metadata to capture any application-specific context. For more details on attaching custom metadata, see Attach custom metadata / tags.

For example, you might want to attach information such as:

  • app_version: e.g., "1.0.0" (from APP_VERSION environment variable)
  • deployment_id: e.g., "deploy-abc-123" (from DEPLOYMENT_ID environment variable)
  • region: e.g., "us-east-1" (from REGION environment variable)
  • (Other custom tags like feature flags can also be added)
import mlflow
import os

# We suggest populating metadata from environment variables rather than hard coding the values

@mlflow.trace
def my_app(user_question: str) -> dict:
    # Override automatically populated metadata and add custom context
    mlflow.update_current_trace(
        metadata={
            # Use any key
            "app_version": os.getenv("APP_VERSION", "development")
        }
    )

    # Application logic

    return {"response": user_question + "!!"}

my_app("test")

Production web application example

In a production FastAPI application, context can be derived from environment variables, request headers, or application configuration. The following example is adapted from the Production Observability with Tracing guide and demonstrates how to capture various context types:

import mlflow
import os
from fastapi import FastAPI, Request, HTTPException # HTTPException might be needed depending on full app logic
from pydantic import BaseModel

# Initialize FastAPI app
app = FastAPI()

class ChatRequest(BaseModel):
    message: str

@mlflow.trace # Ensure @mlflow.trace is the outermost decorator
@app.post("/chat") # FastAPI decorator should be inner
def handle_chat(request: Request, chat_request: ChatRequest):
    # Retrieve all context from request headers
    client_request_id = request.headers.get("X-Request-ID")
    session_id = request.headers.get("X-Session-ID")
    user_id = request.headers.get("X-User-ID")

    # Update the current trace with all context and environment metadata
    # The @mlflow.trace decorator ensures an active trace is available
    mlflow.update_current_trace(
        client_request_id=client_request_id,
        metadata={
            # Session context - groups traces from multi-turn conversations
            "mlflow.trace.session": session_id,
            # User context - associates traces with specific users
            "mlflow.trace.user": user_id,
            # Override automatically popoulated environment metadata
            "mlflow.source.type": os.getenv("APP_ENVIRONMENT", "development"),  # Override default LOCAL/NOTEBOOK
            # Add customer environment metadata
            "environment": "production",
            "app_version": os.getenv("APP_VERSION", "1.0.0"),
            "deployment_id": os.getenv("DEPLOYMENT_ID", "unknown"),
            "region": os.getenv("REGION", "us-east-1")
        }
    )

    # --- Your application logic for processing the chat message ---
    # For example, calling a language model with context
    # response_text = my_llm_call(
    #     message=chat_request.message,
    #     session_id=session_id,
    #     user_id=user_id
    # )
    response_text = f"Processed message: '{chat_request.message}'"
    # --- End of application logic ---

    # Return response
    return {
        "response": response_text
    }

# To run this example (requires uvicorn and fastapi):
# uvicorn your_file_name:app --reload
#
# Example curl request with context headers:
# curl -X POST "http://127.0.0.1:8000/chat" \
#      -H "Content-Type: application/json" \
#      -H "X-Request-ID: req-abc-123-xyz-789" \
#      -H "X-Session-ID: session-def-456-uvw-012" \
#      -H "X-User-ID: user-jane-doe-12345" \
#      -d '{"message": "What is my account balance?"}'

Querying and Analyzing Context Data

Using the MLflow UI

In the MLflow UI (Traces tab), you can view the attached metadata:

trace metadata

Programmatic Analysis

Use the MLflow SDK for more complex analysis or to integrate with other tools:

from mlflow.client import MlflowClient

client = MlflowClient()

# Example 1: Compare error rates across app versions in production
def compare_version_error_rates(experiment_id: str, versions: list):
    error_rates = {}
    for version in versions:
        traces = client.search_traces(
            filter_string=f"metadata.`mlflow.source.type` = 'production' AND metadata.app_version = '{version}'"
        )
        if not traces:
            error_rates[version] = None # Or 0 if no traces means no errors
            continue

        error_count = sum(1 for t in traces if t.info.status == "ERROR")
        error_rates[version] = (error_count / len(traces)) * 100
    return error_rates

# version_errors = compare_version_error_rates("your_exp_id", ["1.0.0", "1.1.0"])
# print(version_errors)

# Example 2: Analyze performance for a specific feature flag
def analyze_feature_flag_performance(experiment_id: str, flag_name: str):
    control_latency = []
    treatment_latency = []

    control_traces = client.search_traces(
        filter_string=f"metadata.feature_flag_{flag_name} = 'false'",
        # extract_fields=["execution_time_ms"] # Not a real field, use span attributes if needed
    )
    for t in control_traces: control_latency.append(t.info.execution_time_ms)

    treatment_traces = client.search_traces(
        experiment_ids=[experiment_id],
        filter_string=f"metadata.feature_flag_{flag_name} = 'true'",
    )
    for t in treatment_traces: treatment_latency.append(t.info.execution_time_ms)

    avg_control_latency = sum(control_latency) / len(control_latency) if control_latency else 0
    avg_treatment_latency = sum(treatment_latency) / len(treatment_latency) if treatment_latency else 0

    return {
        f"avg_latency_{flag_name}_off": avg_control_latency,
        f"avg_latency_{flag_name}_on": avg_treatment_latency
    }

# perf_metrics = analyze_feature_flag_performance("your_exp_id", "new_retriever")
# print(perf_metrics)

Next steps

Continue your journey with these recommended actions and tutorials.

Reference guides

Explore detailed documentation for concepts and features mentioned in this guide.