Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Learn how to create searchable traces, query them effectively, and analyze the results to gain insights into your GenAI application's behavior.
Quick reference
Essential search syntax
# Search by status
mlflow.search_traces("attributes.status = 'OK'")
mlflow.search_traces("attributes.status = 'ERROR'")
# Search by time (milliseconds since epoch)
mlflow.search_traces("attributes.timestamp_ms > 1749006880539")
mlflow.search_traces("attributes.execution_time_ms > 5000")
# Search by tags
mlflow.search_traces("tags.environment = 'production'")
mlflow.search_traces("tags.`mlflow.traceName` = 'my_function'")
# Search by metadata
mlflow.search_traces("metadata.`mlflow.user` = 'alice@company.com'")
# Combined filters (AND only)
mlflow.search_traces(
"attributes.status = 'OK' AND tags.environment = 'production'"
)
Key rules
- Always use prefixes:
attributes.
,tags.
, ormetadata.
- Backticks if tag or attribute names have dots:
tags.`mlflow.traceName`
- Single quotes only:
'value'
not"value"
- Milliseconds for time:
1749006880539
not dates - AND only: No OR support
Searchable fields
Field | Path | Operators |
---|---|---|
Status | attributes.status |
= , != |
Timestamp | attributes.timestamp_ms |
= , < , <= , > , >= |
Duration | attributes.execution_time_ms |
= , < , <= , > , >= |
Tags | tags.* |
= , != |
Metadata | metadata.* |
= , != |
End-to-end example
:::note Prerequisites
Install MLflow and required packages
pip install --upgrade "mlflow[databricks]>=3.1.0" openai "databricks-connect>=16.1"
Create an MLflow experiment by following the setup your environment quickstart. :::
Create sample traces to demonstrate search functionality:
import time
import mlflow
# Define methods to be traced
@mlflow.trace()
def morning_greeting(name: str):
time.sleep(1)
# Add tag and metadata for better categorization
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good morning {name}."
@mlflow.trace()
def evening_greeting(name: str):
time.sleep(1)
# Add tag with different values for comparison
mlflow.update_current_trace(
tags={"person": name},
)
return f"Good evening {name}."
@mlflow.trace()
def goodbye():
# Add tag even for functions that might fail
mlflow.update_current_trace(
tags={"greeting_type": "goodbye"},
)
raise Exception("Cannot say goodbye")
# Execute the methods
morning_greeting("Tom")
# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)
evening_greeting("Mary")
# Execute goodbye, catching the exception
try:
goodbye()
except Exception as e:
print(f"Caught expected exception: {e}")
pass
The code above creates the following traces:
Search these traces using the correct field prefixes:
# Search successful traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
)
print(traces)
# 2 results
# Search failed traces
traces = mlflow.search_traces(
filter_string="attributes.status = 'ERROR'",
)
print(traces)
# 1 result
# Search all traces in experiment
traces = mlflow.search_traces()
print(traces)
# 3 results
# Search by single tag
traces = mlflow.search_traces(filter_string="tags.person = 'Tom'")
print(traces)
# 1 result
# Complex search combining tags and status
traces = mlflow.search_traces(
filter_string="tags.person = 'Tom' AND attributes.status = 'OK'"
)
print(traces)
# 1 result
# Search by timestamp
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {morning_time}")
print(traces)
# 1 result
API reference
Search API
Use mlflow.search_traces()
to search and analyze traces in your experiments:
mlflow.search_traces(
experiment_ids: Optional[List[str]] = None, # Uses active experiment if not specified
filter_string: Optional[str] = None,
max_results: Optional[int] = None,
order_by: Optional[List[str]] = None,
extract_fields: Optional[List[str]] = None, # DataFrame column extraction (pandas only)
run_id: Optional[str] = None, # Filter traces by run ID
return_type: Optional[Literal["pandas", "list"]] = None, # Return type (default: pandas if available)
model_id: Optional[str] = None, # Search traces by model ID
sql_warehouse_id: Optional[str] = None # Databricks SQL warehouse ID
) -> Union[pandas.DataFrame, List[Trace]]
Parameter details:
Parameter | Description |
---|---|
experiment_ids |
List of experiment ids to scope the search. If not provided, the search will be performed across the current active experiment. |
filter_string |
A search filter string. |
max_results |
Maximum number of traces desired. If None, all traces matching the search expressions will be returned. |
order_by |
List of order_by clauses. |
extract_fields |
Specify fields to extract from traces using the format "span_name.[inputs\|outputs].field_name" or "span_name.[inputs\|outputs]" . |
run_id |
A run id to scope the search. When a trace is created under an active run, it will be associated with the run and you can filter on the run id to retrieve the trace. See the example below for how to filter traces by run id. |
return_type |
The type of the return value. The following return types are supported. If the pandas library is installed, the default return type is "pandas". Otherwise, the default return type is "list": • "pandas" : Returns a Pandas DataFrame containing information about traces where each row represents a single trace and each column represents a field of the trace e.g. trace_id, spans, etc.• "list" : Returns a list of :py:class:Trace <mlflow.entities.Trace> objects. |
model_id |
If specified, search traces associated with the given model ID. |
Note
MLflow also provides the MlflowClient.search_traces()
. However, we recommend using mlflow.search_traces()
- with the exception of pagination support, it provides a superset of functionality with more convenient defaults and additional features like DataFrame output and field extraction.
Searchable fields reference
Important
For a complete reference on these fields, refer to the trace data model.
Field Type | Search Path | Operators | Values | Notes |
---|---|---|---|---|
Metadata | metadata.* |
= , != |
See details below | String equality only |
Tags | tags.* |
= , != |
See details below | String equality only |
Status | attributes.status |
= , != |
OK , ERROR , IN_PROGRESS |
String equality only |
Name | attributes.name |
= , != |
Trace name | String equality only |
Timestamp | attributes.timestamp_ms |
= , < , <= , > , >= |
Creation time (ms since epoch) | Numeric comparisons |
Execution Time | attributes.execution_time_ms |
= , < , <= , > , >= |
Duration in milliseconds | Numeric comparisons |
Metadata details
The following metadata fields are available for filtering:
metadata.mlflow.traceInputs
: Request contentmetadata.mlflow.traceOutputs
: Response contentmetadata.mlflow.sourceRun
: Source run IDmetadata.mlflow.modelId
: Model IDmetadata.mlflow.trace.sizeBytes
: Trace size in bytesmetadata.mlflow.trace.tokenUsage
: Aggregated token usage information (JSON string)metadata.mlflow.trace.user
: User ID/name of the application requestmetadata.mlflow.trace.session
: Session ID of the application request
Tags details
In addition to user defined tags, the following system defined tags are available:
mlflow.traceName
: The name of the traceeval.requestId
: Evaluation request ID, set bymlflow.genai.evaluate()
Filter syntax rules
- Table prefixes required: Always use
attributes.
,tags.
, ormetadata.
- Backticks for dots: Fields with dots need backticks:
tags.`mlflow.traceName`
- Single quotes only: String values must use single quotes:
'value'
- Case sensitive: All field names and values are case sensitive
- AND only: OR operators are not supported
Order by syntax
# Single field ordering
order_by=["attributes.timestamp_ms DESC"]
order_by=["attributes.execution_time_ms ASC"]
# Multiple field ordering (applied in sequence)
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]
# Supported fields for ordering
# - attributes.timestamp_ms (and aliases)
# - attributes.execution_time_ms (and aliases)
# - attributes.status
# - attributes.name
Common patterns
# Status filtering
"attributes.status = 'OK'"
"attributes.status = 'ERROR'"
# Time-based queries
"attributes.timestamp_ms > 1749006880539"
"attributes.execution_time_ms > 5000"
# Tag searches
"tags.user_id = 'U001'"
"tags.`mlflow.traceName` = 'my_function'"
# Metadata queries
"metadata.`mlflow.user` = 'alice@company.com'"
"metadata.`mlflow.traceOutputs` != ''"
# Combined filters
"attributes.status = 'OK' AND tags.environment = 'production'"
"attributes.timestamp_ms > 1749006880539 AND attributes.execution_time_ms > 1000"
Common pitfalls
❌ Incorrect | ✅ Correct | Issue |
---|---|---|
status = 'OK' |
attributes.status = 'OK' |
Missing prefix |
mlflow.user = 'alice' |
metadata.`mlflow.user` = 'alice' |
Missing prefix and backticks |
timestamp > '2024-01-01' |
attributes.timestamp > 1704067200000 |
Use milliseconds, not strings |
tags.env = "prod" |
tags.env = 'prod' |
Use single quotes |
status = 'OK' OR status = 'ERROR' |
Use separate queries | OR not supported |
Detailed search examples
Search by run ID
# Find all traces associated with a specific MLflow run
with mlflow.start_run() as run:
# Your traced code here
traced_result = my_traced_function()
# Search for traces from this run
run_traces = mlflow.search_traces(
run_id=run.info.run_id,
return_type="list" # Get list of Trace objects
)
Control return type
# Get results as pandas DataFrame (default if pandas is installed)
traces_df = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="pandas"
)
# Get results as list of Trace objects
traces_list = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
return_type="list"
)
# Access trace details from list
for trace in traces_list:
print(f"Trace ID: {trace.info.trace_id}")
print(f"Status: {trace.info.state}")
print(f"Duration: {trace.info.execution_duration}")
Search by model ID
# Find traces associated with a specific MLflow model
model_traces = mlflow.search_traces(
model_id="my-model-123",
filter_string="attributes.status = 'OK'"
)
# Analyze model performance
print(f"Found {len(model_traces)} successful traces for model")
print(f"Average latency: {model_traces['execution_time_ms'].mean():.2f}ms")
Search by status
# Find successful traces
traces = mlflow.search_traces(filter_string="attributes.status = 'OK'")
# Find failed traces
traces = mlflow.search_traces(filter_string="attributes.status = 'ERROR'")
# Find in-progress traces
traces = mlflow.search_traces(filter_string="attributes.status = 'IN_PROGRESS'")
# Exclude errors
traces = mlflow.search_traces(filter_string="attributes.status != 'ERROR'")
Search by trace name
# Find traces with specific name (rarely used - legacy field)
traces = mlflow.search_traces(filter_string="attributes.name = 'foo'")
# Find traces excluding a specific name
traces = mlflow.search_traces(filter_string="attributes.name != 'test_trace'")
# Note: Most users should use tags.`mlflow.traceName` instead
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_request'"
)
Search by timestamp
import time
from datetime import datetime
# Current time in milliseconds
current_time_ms = int(time.time() * 1000)
# Last 5 minutes
five_minutes_ago = current_time_ms - (5 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {five_minutes_ago}"
)
# Specific date range
start_date = int(datetime(2024, 1, 1).timestamp() * 1000)
end_date = int(datetime(2024, 1, 31).timestamp() * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms > {start_date} AND attributes.timestamp_ms < {end_date}"
)
# Using timestamp aliases
traces = mlflow.search_traces(filter_string=f"attributes.timestamp > {five_minutes_ago}")
Search by execution time
# Find slow traces (>5 seconds)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms > 5000")
# Find fast traces (<100ms)
traces = mlflow.search_traces(filter_string="attributes.execution_time_ms < 100")
# Performance range
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms > 100 AND attributes.execution_time_ms < 1000"
)
# Using execution time aliases
traces = mlflow.search_traces(filter_string="attributes.latency > 1000")
Search by tags
# Custom tags (set via mlflow.update_current_trace)
traces = mlflow.search_traces(filter_string="tags.customer_id = 'C001'")
traces = mlflow.search_traces(filter_string="tags.environment = 'production'")
traces = mlflow.search_traces(filter_string="tags.version = 'v2.1.0'")
# MLflow system tags (require backticks)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_chat_request'"
)
traces = mlflow.search_traces(
filter_string="tags.`mlflow.artifactLocation` != ''"
)
Search by metadata
# Search by response content (exact match)
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` = 'exact response text'"
)
# Find traces with any output
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.traceOutputs` != ''"
)
# Search by user
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.user` = 'alice@company.com'"
)
# Search by source file
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.name` = 'app.py'"
)
# Search by git information
traces = mlflow.search_traces(
filter_string="metadata.`mlflow.source.git.branch` = 'main'"
)
Complex filters with AND
# Recent successful production traces
current_time_ms = int(time.time() * 1000)
one_hour_ago = current_time_ms - (60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.status = 'OK' AND "
f"attributes.timestamp_ms > {one_hour_ago} AND "
f"tags.environment = 'production'"
)
# Fast traces from specific user
traces = mlflow.search_traces(
filter_string="attributes.execution_time_ms < 100 AND "
"metadata.`mlflow.user` = 'alice@company.com'"
)
# Specific function with performance threshold
traces = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'process_payment' AND "
"attributes.execution_time_ms > 1000"
)
Ordering results
# Most recent first
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=["attributes.timestamp_ms DESC"]
)
# Fastest first
traces = mlflow.search_traces(
order_by=["attributes.execution_time_ms ASC"]
)
# Multiple sort criteria
traces = mlflow.search_traces(
filter_string="attributes.status = 'OK'",
order_by=[
"attributes.timestamp_ms DESC",
"attributes.execution_time_ms ASC"
]
)
DataFrame operations
The DataFrame returned by mlflow.search_traces
contains these columns:
traces_df = mlflow.search_traces()
# Default columns
print(traces_df.columns)
# ['request_id', 'trace', 'timestamp_ms', 'status', 'execution_time_ms',
# 'request', 'response', 'request_metadata', 'spans', 'tags']
Extract span fields
# Extract specific span fields into DataFrame columns
traces = mlflow.search_traces(
extract_fields=[
"process_request.inputs.customer_id",
"process_request.outputs",
"validate_input.inputs",
"generate_response.outputs.message"
]
)
# Use extracted fields for evaluation dataset
eval_data = traces.rename(columns={
"process_request.inputs.customer_id": "customer",
"generate_response.outputs.message": "ground_truth"
})
Building dynamic queries
def build_trace_filter(status=None, user=None, min_duration=None,
max_duration=None, tags=None, after_timestamp=None):
"""Build dynamic filter string from parameters"""
conditions = []
if status:
conditions.append(f"attributes.status = '{status}'")
if user:
conditions.append(f"metadata.`mlflow.user` = '{user}'")
if min_duration:
conditions.append(f"attributes.execution_time_ms > {min_duration}")
if max_duration:
conditions.append(f"attributes.execution_time_ms < {max_duration}")
if after_timestamp:
conditions.append(f"attributes.timestamp_ms > {after_timestamp}")
if tags:
for key, value in tags.items():
# Handle dotted tag names
if '.' in key:
conditions.append(f"tags.`{key}` = '{value}'")
else:
conditions.append(f"tags.{key} = '{value}'")
return " AND ".join(conditions) if conditions else None
# Usage
filter_string = build_trace_filter(
status="OK",
user="alice@company.com",
min_duration=100,
tags={"environment": "production", "mlflow.traceName": "process_order"}
)
traces = mlflow.search_traces(filter_string=filter_string)
Practical examples reference
Error monitoring
Monitor and analyze errors in your production environment:
import mlflow
import time
import pandas as pd
def monitor_errors(experiment_name: str, hours: int = 1):
"""Monitor errors in the last N hours."""
# Calculate time window
current_time_ms = int(time.time() * 1000)
cutoff_time_ms = current_time_ms - (hours * 60 * 60 * 1000)
# Find all errors
failed_traces = mlflow.search_traces(
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff_time_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(failed_traces) == 0:
print(f"No errors found in the last {hours} hour(s)")
return
# Analyze error patterns
print(f"Found {len(failed_traces)} errors in the last {hours} hour(s)\n")
# Group by function name
error_by_function = failed_traces.groupby('tags.mlflow.traceName').size()
print("Errors by function:")
print(error_by_function.to_string())
# Show recent error samples
print("\nRecent error samples:")
for _, trace in failed_traces.head(5).iterrows():
print(f"- {trace['request_preview'][:60]}...")
print(f" Function: {trace.get('tags.mlflow.traceName', 'unknown')}")
print(f" Time: {pd.to_datetime(trace['timestamp_ms'], unit='ms')}")
print()
return failed_traces
Performance profiling
Analyze performance characteristics and identify bottlenecks:
def profile_performance(function_name: str = None, percentiles: list = [50, 95, 99]):
"""Profile performance metrics for traces."""
# Build filter
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
filter_string = " AND ".join(filter_parts) if filter_parts else None
# Get traces
traces = mlflow.search_traces(filter_string=filter_string)
if len(traces) == 0:
print("No traces found")
return
# Calculate percentiles
perf_stats = traces['execution_time_ms'].describe(percentiles=[p/100 for p in percentiles])
print(f"Performance Analysis ({len(traces)} traces)")
print("=" * 40)
for p in percentiles:
print(f"P{p}: {perf_stats[f'{p}%']:.1f}ms")
print(f"Mean: {perf_stats['mean']:.1f}ms")
print(f"Max: {perf_stats['max']:.1f}ms")
# Find outliers (>P99)
if 99 in percentiles:
p99_threshold = perf_stats['99%']
outliers = traces[traces['execution_time_ms'] > p99_threshold]
if len(outliers) > 0:
print(f"\nOutliers (>{p99_threshold:.0f}ms): {len(outliers)} traces")
for _, trace in outliers.head(3).iterrows():
print(f"- {trace['execution_time_ms']:.0f}ms: {trace['request_preview'][:50]}...")
return traces
User activity analysis
Track and analyze user behavior patterns:
def analyze_user_activity(user_id: str, days: int = 7):
"""Analyze activity patterns for a specific user."""
cutoff_ms = int((time.time() - days * 86400) * 1000)
traces = mlflow.search_traces(
filter_string=f"metadata.`mlflow.user` = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff_ms}",
order_by=["attributes.timestamp_ms DESC"]
)
if len(traces) == 0:
print(f"No activity found for user {user_id}")
return
print(f"User {user_id} Activity Report ({days} days)")
print("=" * 50)
print(f"Total requests: {len(traces)}")
# Daily activity
traces['date'] = pd.to_datetime(traces['timestamp_ms'], unit='ms').dt.date
daily_activity = traces.groupby('date').size()
print(f"\nDaily activity:")
print(daily_activity.to_string())
# Query categories
if 'tags.query_category' in traces.columns:
categories = traces['tags.query_category'].value_counts()
print(f"\nQuery categories:")
print(categories.to_string())
# Performance stats
print(f"\nPerformance:")
print(f"Average response time: {traces['execution_time_ms'].mean():.1f}ms")
print(f"Error rate: {(traces['status'] == 'ERROR').mean() * 100:.1f}%")
return traces
Best practices
1. Design a consistent tagging strategy
Create a tagging taxonomy for your organization:
class TraceTagging:
"""Standardized tagging strategy for traces."""
# Required tags for all traces
REQUIRED_TAGS = ["environment", "version", "service_name"]
# Category mappings
CATEGORIES = {
"user_management": ["login", "logout", "profile_update"],
"content_generation": ["summarize", "translate", "rewrite"],
"data_retrieval": ["search", "fetch", "query"]
}
@staticmethod
def tag_trace(operation: str, **kwargs):
"""Apply standardized tags to current trace."""
tags = {
"operation": operation,
"timestamp": datetime.now().isoformat(),
"service_name": "genai-platform"
}
# Add category based on operation
for category, operations in TraceTagging.CATEGORIES.items():
if operation in operations:
tags["category"] = category
break
# Add custom tags
tags.update(kwargs)
# Validate required tags
for required in TraceTagging.REQUIRED_TAGS:
if required not in tags:
tags[required] = "unknown"
mlflow.update_current_trace(tags=tags)
return tags
2. Build reusable search utilities
class TraceSearcher:
"""Reusable trace search utilities."""
def __init__(self, experiment_ids: list = None):
self.experiment_ids = experiment_ids
def recent_errors(self, hours: int = 1) -> pd.DataFrame:
"""Get recent error traces."""
cutoff = int((time.time() - hours * 3600) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.status = 'ERROR' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)
def slow_operations(self, threshold_ms: int = 5000) -> pd.DataFrame:
"""Find operations slower than threshold."""
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"attributes.execution_time_ms > {threshold_ms}",
order_by=["attributes.execution_time_ms DESC"]
)
def by_user(self, user_id: str, days: int = 7) -> pd.DataFrame:
"""Get traces for a specific user."""
cutoff = int((time.time() - days * 86400) * 1000)
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=f"tags.user_id = '{user_id}' AND "
f"attributes.timestamp_ms > {cutoff}",
order_by=["attributes.timestamp_ms DESC"]
)
def by_category(self, category: str, status: str = None) -> pd.DataFrame:
"""Get traces by category with optional status filter."""
filters = [f"tags.category = '{category}'"]
if status:
filters.append(f"attributes.status = '{status}'")
return mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=" AND ".join(filters)
)
def performance_report(self, function_name: str = None) -> dict:
"""Generate performance report."""
filter_parts = []
if function_name:
filter_parts.append(f"tags.`mlflow.traceName` = '{function_name}'")
filter_string = " AND ".join(filter_parts) if filter_parts else None
traces = mlflow.search_traces(
experiment_ids=self.experiment_ids,
filter_string=filter_string
)
if len(traces) == 0:
return {"error": "No traces found"}
return {
"total_traces": len(traces),
"error_rate": (traces['status'] == 'ERROR').mean(),
"avg_duration_ms": traces['execution_time_ms'].mean(),
"p50_duration_ms": traces['execution_time_ms'].quantile(0.5),
"p95_duration_ms": traces['execution_time_ms'].quantile(0.95),
"p99_duration_ms": traces['execution_time_ms'].quantile(0.99)
}
# Usage example
searcher = TraceSearcher()
errors = searcher.recent_errors(hours=24)
slow_ops = searcher.slow_operations(threshold_ms=10000)
user_traces = searcher.by_user("U001", days=30)
report = searcher.performance_report("process_request")
Next Steps
- Build evaluation datasets - Convert queried traces into test datasets
- Delete traces - Remove traces based on search criteria for data management