Share via


Search and analyze traces

Learn how to create searchable traces, query them effectively, and analyze the results to gain insights into your GenAI application's behavior.

Quick reference

Essential search syntax

# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")

# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")

# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")

# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")

# Combined filters (AND only)
mlflow.search_traces(
    "attributes.status = 'OK' AND tags.environment = 'production'"
)

Key rules

  • Always use prefixes: attributes., tags., or metadata.
  • Backticks if tag or attribute names have dots: tags.`mlflow.traceName`
  • Single quotes only: 'value' not "value"
  • Milliseconds for time: 1749006880539 not dates
  • AND only: No OR support

Searchable fields

Field Path Operators
Status attributes.status =, !=
Timestamp attributes.timestamp_ms =, <, <=, >, >=
Duration attributes.execution_time_ms =, <, <=, >, >=
Tags tags.* =, !=
Metadata metadata.* =, !=

End-to-end example

:::note Prerequisites

  1. Install MLflow and required packages

    pip install --upgrade "mlflow[databricks]>=3.1.0" openai "databricks-connect>=16.1"
    
  2. Create an MLflow experiment by following the setup your environment quickstart. :::

Create sample traces to demonstrate search functionality:

import time
import mlflow

# Define methods to be traced
@mlflow.trace()
def morning_greeting(name: str):
    time.sleep(1)
    # Add tag and metadata for better categorization
    mlflow.update_current_trace(
        tags={"person": name},
    )
    return f"Good morning {name}."


@mlflow.trace()
def evening_greeting(name: str):
    time.sleep(1)
    # Add tag with different values for comparison
    mlflow.update_current_trace(
        tags={"person": name},
    )
    return f"Good evening {name}."

@mlflow.trace()
def goodbye():
    # Add tag even for functions that might fail
    mlflow.update_current_trace(
        tags={"greeting_type": "goodbye"},
    )
    raise Exception("Cannot say goodbye")


# Execute the methods
morning_greeting("Tom")

# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)

evening_greeting("Mary")

# Execute goodbye, catching the exception
try:
    goodbye()
except Exception as e:
    print(f"Caught expected exception: {e}")
    pass

The code above creates the following traces:

traces

Search these traces using the correct field prefixes:

# Search successful traces
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
)
print(traces)
# 2 results

# Search failed traces
traces = mlflow.search_traces(
    filter_string="attributes.status = 'ERROR'",
)
print(traces)
# 1 result

# Search all traces in experiment
traces = mlflow.search_traces()
print(traces)
# 3 results

# Search by single tag
traces = mlflow.search_traces(filter_string="tags.person = 'Tom'")
print(traces)
# 1 result

# Complex search combining tags and status
traces = mlflow.search_traces(
    filter_string="tags.person = 'Tom' AND attributes.status = 'OK'"
)
print(traces)
# 1 result

# Search by timestamp
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {morning_time}")
print(traces)
# 1 result

API reference

Search API

Use mlflow.search_traces() to search and analyze traces in your experiments:

mlflow.search_traces(
    experiment_ids: Optional[List[str]] = None,          # Uses active experiment if not specified
    filter_string: Optional[str] = None,
    max_results: Optional[int] = None,
    order_by: Optional[List[str]] = None,
    extract_fields: Optional[List[str]] = None,          # DataFrame column extraction (pandas only)
    run_id: Optional[str] = None,                        # Filter traces by run ID
    return_type: Optional[Literal["pandas", "list"]] = None,  # Return type (default: pandas if available)
    model_id: Optional[str] = None,                      # Search traces by model ID
    sql_warehouse_id: Optional[str] = None               # Databricks SQL warehouse ID
) -> Union[pandas.DataFrame, List[Trace]]

Parameter details:

Parameter Description
experiment_ids List of experiment ids to scope the search. If not provided, the search will be performed across the current active experiment.
filter_string A search filter string.
max_results Maximum number of traces desired. If None, all traces matching the search expressions will be returned.
order_by List of order_by clauses.
extract_fields Specify fields to extract from traces using the format "span_name.[inputs\|outputs].field_name" or "span_name.[inputs\|outputs]".
run_id A run id to scope the search. When a trace is created under an active run, it will be associated with the run and you can filter on the run id to retrieve the trace. See the example below for how to filter traces by run id.
return_type The type of the return value. The following return types are supported. If the pandas library is installed, the default return type is "pandas". Otherwise, the default return type is "list":
"pandas": Returns a Pandas DataFrame containing information about traces where each row represents a single trace and each column represents a field of the trace e.g. trace_id, spans, etc.
"list": Returns a list of :py:class:Trace <mlflow.entities.Trace> objects.
model_id If specified, search traces associated with the given model ID.

Note

MLflow also provides the MlflowClient.search_traces(). However, we recommend using mlflow.search_traces() - with the exception of pagination support, it provides a superset of functionality with more convenient defaults and additional features like DataFrame output and field extraction.

Searchable fields reference

Important

For a complete reference on these fields, refer to the trace data model.

Field Type Search Path Operators Values Notes
Metadata metadata.* =, != See details below String equality only
Tags tags.* =, != See details below String equality only
Status attributes.status =, != OK, ERROR, IN_PROGRESS String equality only
Name attributes.name =, != Trace name String equality only
Timestamp attributes.timestamp_ms =, <, <=, >, >= Creation time (ms since epoch) Numeric comparisons
Execution Time attributes.execution_time_ms =, <, <=, >, >= Duration in milliseconds Numeric comparisons

Metadata details

The following metadata fields are available for filtering:

  • metadata.mlflow.traceInputs: Request content
  • metadata.mlflow.traceOutputs: Response content
  • metadata.mlflow.sourceRun: Source run ID
  • metadata.mlflow.modelId: Model ID
  • metadata.mlflow.trace.sizeBytes: Trace size in bytes
  • metadata.mlflow.trace.tokenUsage: Aggregated token usage information (JSON string)
  • metadata.mlflow.trace.user: User ID/name of the application request
  • metadata.mlflow.trace.session: Session ID of the application request

Tags details

In addition to user defined tags, the following system defined tags are available:

Filter syntax rules

  1. Table prefixes required: Always use attributes., tags., or metadata.
  2. Backticks for dots: Fields with dots need backticks: tags.`mlflow.traceName`
  3. Single quotes only: String values must use single quotes: 'value'
  4. Case sensitive: All field names and values are case sensitive
  5. AND only: OR operators are not supported

Order by syntax

# Single field ordering
order_by=["attributes.timestamp_ms DESC"]
order_by=["attributes.execution_time_ms ASC"]

# Multiple field ordering (applied in sequence)
order_by=[
    "attributes.timestamp_ms DESC",
    "attributes.execution_time_ms ASC"
]

# Supported fields for ordering
# - attributes.timestamp_ms (and aliases)
# - attributes.execution_time_ms (and aliases)
# - attributes.status
# - attributes.name

Common patterns

# Status filtering
"attributes.status = 'OK'"
"attributes.status = 'ERROR'"

# Time-based queries
"attributes.timestamp_ms > 1749006880539"
"attributes.execution_time_ms > 5000"

# Tag searches
"tags.user_id = 'U001'"
"tags.`mlflow.traceName` = 'my_function'"

# Metadata queries
"metadata.`mlflow.user` = 'alice@company.com'"
"metadata.`mlflow.traceOutputs` != ''"

# Combined filters
"attributes.status = 'OK' AND tags.environment = 'production'"
"attributes.timestamp_ms > 1749006880539 AND attributes.execution_time_ms > 1000"

Common pitfalls

❌ Incorrect ✅ Correct Issue
status = 'OK' attributes.status = 'OK' Missing prefix
mlflow.user = 'alice' metadata.`mlflow.user` = 'alice' Missing prefix and backticks
timestamp > '2024-01-01' attributes.timestamp > 1704067200000 Use milliseconds, not strings
tags.env = "prod" tags.env = 'prod' Use single quotes
status = 'OK' OR status = 'ERROR' Use separate queries OR not supported

Detailed search examples

Search by run ID

# Find all traces associated with a specific MLflow run
with mlflow.start_run() as run:
    # Your traced code here
    traced_result = my_traced_function()

# Search for traces from this run
run_traces = mlflow.search_traces(
    run_id=run.info.run_id,
    return_type="list"  # Get list of Trace objects
)

Control return type

# Get results as pandas DataFrame (default if pandas is installed)
traces_df = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    return_type="pandas"
)

# Get results as list of Trace objects
traces_list = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    return_type="list"
)

# Access trace details from list
for trace in traces_list:
    print(f"Trace ID: {trace.info.trace_id}")
    print(f"Status: {trace.info.state}")
    print(f"Duration: {trace.info.execution_duration}")

Search by model ID

# Find traces associated with a specific MLflow model
model_traces = mlflow.search_traces(
    model_id="my-model-123",
    filter_string="attributes.status = 'OK'"
)

# Analyze model performance
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")

Search by status

# Find successful traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")

# Find failed traces
traces = mlflow.search_traces(filter_string="attributes.status = 'ERROR'")

# Find in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'IN_PROGRESS'")

# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")

Search by trace name

# Find traces with specific name (rarely used - legacy field)
traces = mlflow.search_traces(filter_string="attributes.name = 'foo'")

# Find traces excluding a specific name
traces = mlflow.search_traces(filter_string="attributes.name != 'test_trace'")

# Note: Most users should use tags.`mlflow.traceName` instead
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_request'"
)

Search by timestamp

import time
from datetime import datetime

# Current time in milliseconds
current_time_ms = int(time.time() * 1000)

# Last 5 minutes
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)

# Specific date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
    filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)

# Using timestamp aliases
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")

Search by execution time

# Find slow traces (>5 seconds)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")

# Find fast traces (<100ms)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms < 100")

# Performance range
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)

# Using execution time aliases
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")

Search by tags

# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
traces = mlflow.search_traces(filter_string="tags.environment = 'production'")
traces = mlflow.search_traces(filter_string="tags.version = 'v2.1.0'")

# MLflow system tags (require backticks)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.artifactLocation` != ''"
)

Search by metadata

# Search by response content (exact match)
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.traceOutputs` = 'exact response text'"
)

# Find traces with any output
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.traceOutputs` != ''"
)

# Search by user
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.user` = 'alice@company.com'"
)

# Search by source file
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.source.name` = 'app.py'"
)

# Search by git information
traces = mlflow.search_traces(
    filter_string="metadata.`mlflow.source.git.branch` = 'main'"
)

Complex filters with AND

# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)

traces = mlflow.search_traces(
    filter_string=f"attributes.status = 'OK' AND "
                 f"attributes.timestamp_ms > {one_hour_ago} AND "
                 f"tags.environment = 'production'"
)

# Fast traces from specific user
traces = mlflow.search_traces(
    filter_string="attributes.execution_time_ms < 100 AND "
                 "metadata.`mlflow.user` = 'alice@company.com'"
)

# Specific function with performance threshold
traces = mlflow.search_traces(
    filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
                 "attributes.execution_time_ms > 1000"
)

Ordering results

# Most recent first
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    order_by=["attributes.timestamp_ms DESC"]
)

# Fastest first
traces = mlflow.search_traces(
    order_by=["attributes.execution_time_ms ASC"]
)

# Multiple sort criteria
traces = mlflow.search_traces(
    filter_string="attributes.status = 'OK'",
    order_by=[
        "attributes.timestamp_ms DESC",
        "attributes.execution_time_ms ASC"
    ]
)

DataFrame operations

The DataFrame returned by mlflow.search_traces contains these columns:

traces_df = mlflow.search_traces()

# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
#  'request', 'response', 'request_metadata', 'spans', 'tags']

Extract span fields

# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
    extract_fields=[
        "process_request.inputs.customer_id",
        "process_request.outputs",
        "validate_input.inputs",
        "generate_response.outputs.message"
    ]
)

# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
    "process_request.inputs.customer_id": "customer",
    "generate_response.outputs.message": "ground_truth"
})

Building dynamic queries

def build_trace_filter(status=None, user=None, min_duration=None,
                      max_duration=None, tags=None, after_timestamp=None):
    """Build dynamic filter string from parameters"""
    conditions = []

    if status:
        conditions.append(f"attributes.status = '{status}'")

    if user:
        conditions.append(f"metadata.`mlflow.user` = '{user}'")

    if min_duration:
        conditions.append(f"attributes.execution_time_ms > {min_duration}")

    if max_duration:
        conditions.append(f"attributes.execution_time_ms < {max_duration}")

    if after_timestamp:
        conditions.append(f"attributes.timestamp_ms > {after_timestamp}")

    if tags:
        for key, value in tags.items():
            # Handle dotted tag names
            if '.' in key:
                conditions.append(f"tags.`{key}` = '{value}'")
            else:
                conditions.append(f"tags.{key} = '{value}'")

    return " AND ".join(conditions) if conditions else None

# Usage
filter_string = build_trace_filter(
    status="OK",
    user="alice@company.com",
    min_duration=100,
    tags={"environment": "production", "mlflow.traceName": "process_order"}
)

traces = mlflow.search_traces(filter_string=filter_string)

Practical examples reference

Error monitoring

Monitor and analyze errors in your production environment:

import mlflow
import time
import pandas as pd

def monitor_errors(experiment_name: str, hours: int = 1):
    """Monitor errors in the last N hours."""

    # Calculate time window
    current_time_ms = int(time.time() * 1000)
    cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)

    # Find all errors
    failed_traces = mlflow.search_traces(
        filter_string=f"attributes.status = 'ERROR' AND "
                     f"attributes.timestamp_ms > {cutoff_time_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )

    if len(failed_traces) == 0:
        print(f"No errors found in the last {hours} hour(s)")
        return

    # Analyze error patterns
    print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")

    # Group by function name
    error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
    print("Errors by function:")
    print(error_by_function.to_string())

    # Show recent error samples
    print("\nRecent error samples:")
    for _, trace in failed_traces.head(5).iterrows():
        print(f"- {trace['request_preview'][:60]}...")
        print(f"  Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
        print(f"  Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
        print()

    return failed_traces

Performance profiling

Analyze performance characteristics and identify bottlenecks:

def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
    """Profile performance metrics for traces."""

    # Build filter
    filter_parts = []
    if function_name:
        filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

    filter_string = " AND ".join(filter_parts) if filter_parts else None

    # Get traces
    traces = mlflow.search_traces(filter_string=filter_string)

    if len(traces) == 0:
        print("No traces found")
        return

    # Calculate percentiles
    perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])

    print(f"Performance Analysis ({len(traces)} traces)")
    print("=" * 40)
    for p in percentiles:
        print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
    print(f"Mean: {perf_stats['mean']:.1f}ms")
    print(f"Max: {perf_stats['max']:.1f}ms")

    # Find outliers (>P99)
    if 99 in percentiles:
        p99_threshold = perf_stats['99%']
        outliers = traces[traces['execution_time_ms'] > p99_threshold]

        if len(outliers) > 0:
            print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
            for _, trace in outliers.head(3).iterrows():
                print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")

    return traces

User activity analysis

Track and analyze user behavior patterns:

def analyze_user_activity(user_id: str, days: int = 7):
    """Analyze activity patterns for a specific user."""

    cutoff_ms = int((time.time() - days * 86400) * 1000)

    traces = mlflow.search_traces(
        filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
                     f"attributes.timestamp_ms > {cutoff_ms}",
        order_by=["attributes.timestamp_ms DESC"]
    )

    if len(traces) == 0:
        print(f"No activity found for user {user_id}")
        return

    print(f"User {user_id} Activity Report ({days} days)")
    print("=" * 50)
    print(f"Total requests: {len(traces)}")

    # Daily activity
    traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
    daily_activity = traces.groupby('date').size()
    print(f"\nDaily activity:")
    print(daily_activity.to_string())

    # Query categories
    if 'tags.query_category' in traces.columns:
        categories = traces['tags.query_category'].value_counts()
        print(f"\nQuery categories:")
        print(categories.to_string())

    # Performance stats
    print(f"\nPerformance:")
    print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
    print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")

    return traces

Best practices

1. Design a consistent tagging strategy

Create a tagging taxonomy for your organization:

class TraceTagging:
    """Standardized tagging strategy for traces."""

    # Required tags for all traces
    REQUIRED_TAGS = ["environment", "version", "service_name"]

    # Category mappings
    CATEGORIES = {
        "user_management": ["login", "logout", "profile_update"],
        "content_generation": ["summarize", "translate", "rewrite"],
        "data_retrieval": ["search", "fetch", "query"]
    }

    @staticmethod
    def tag_trace(operation: str, **kwargs):
        """Apply standardized tags to current trace."""
        tags = {
            "operation": operation,
            "timestamp": datetime.now().isoformat(),
            "service_name": "genai-platform"
        }

        # Add category based on operation
        for category, operations in TraceTagging.CATEGORIES.items():
            if operation in operations:
                tags["category"] = category
                break

        # Add custom tags
        tags.update(kwargs)

        # Validate required tags
        for required in TraceTagging.REQUIRED_TAGS:
            if required not in tags:
                tags[required] = "unknown"

        mlflow.update_current_trace(tags=tags)
        return tags

2. Build reusable search utilities

class TraceSearcher:
    """Reusable trace search utilities."""

    def __init__(self, experiment_ids: list = None):
        self.experiment_ids = experiment_ids

    def recent_errors(self, hours: int = 1) -> pd.DataFrame:
        """Get recent error traces."""
        cutoff = int((time.time() - hours * 3600) * 1000)
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"attributes.status = 'ERROR' AND "
                         f"attributes.timestamp_ms > {cutoff}",
            order_by=["attributes.timestamp_ms DESC"]
        )

    def slow_operations(self, threshold_ms: int = 5000) -> pd.DataFrame:
        """Find operations slower than threshold."""
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"attributes.execution_time_ms > {threshold_ms}",
            order_by=["attributes.execution_time_ms DESC"]
        )

    def by_user(self, user_id: str, days: int = 7) -> pd.DataFrame:
        """Get traces for a specific user."""
        cutoff = int((time.time() - days * 86400) * 1000)
        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=f"tags.user_id = '{user_id}' AND "
                         f"attributes.timestamp_ms > {cutoff}",
            order_by=["attributes.timestamp_ms DESC"]
        )

    def by_category(self, category: str, status: str = None) -> pd.DataFrame:
        """Get traces by category with optional status filter."""
        filters = [f"tags.category = '{category}'"]
        if status:
            filters.append(f"attributes.status = '{status}'")

        return mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=" AND ".join(filters)
        )

    def performance_report(self, function_name: str = None) -> dict:
        """Generate performance report."""
        filter_parts = []
        if function_name:
            filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")

        filter_string = " AND ".join(filter_parts) if filter_parts else None
        traces = mlflow.search_traces(
            experiment_ids=self.experiment_ids,
            filter_string=filter_string
        )

        if len(traces) == 0:
            return {"error": "No traces found"}

        return {
            "total_traces": len(traces),
            "error_rate": (traces['status'] == 'ERROR').mean(),
            "avg_duration_ms": traces['execution_time_ms'].mean(),
            "p50_duration_ms": traces['execution_time_ms'].quantile(0.5),
            "p95_duration_ms": traces['execution_time_ms'].quantile(0.95),
            "p99_duration_ms": traces['execution_time_ms'].quantile(0.99)
        }

# Usage example
searcher = TraceSearcher()
errors = searcher.recent_errors(hours=24)
slow_ops = searcher.slow_operations(threshold_ms=10000)
user_traces = searcher.by_user("U001", days=30)
report = searcher.performance_report("process_request")

Next Steps