MLflow クライアント API は、トレース ライフサイクル管理を直接きめ細かく制御します。 高度な API はほとんどのユース ケースをエレガントに処理しますが、クライアント API は、トレース作成、カスタム トレース ID、または既存の監視システムとの統合を明示的に制御する必要がある高度なシナリオに不可欠です。
Warnung
開始する前に: 高レベル API が要件を満たしていない場合にのみ、クライアント API を使用することをお勧めします。
- 親子関係の自動検出なし
- 手動の例外処理が必要
- 自動トレース統合と互換性がありません
- トレース ライフサイクルを完全に制御する
- カスタム トレース ID 管理
- 既存システムとの統合
核となる概念
トレース ライフサイクル
すべてのトレースは、明示的に管理する必要がある厳密なライフサイクルに従います。
graph LR
A[Start Trace] --> B[Start Span 1]
B --> C[Start Span 2]
C --> D[End Span 2]
D --> E[End Span 1]
E --> F[End Trace]
Von Bedeutung
ゴールデン ルール: すべての start_trace
または start_span
呼び出しには、対応する end_trace
または end_span
呼び出しが必要です。 スパンを閉じなかった場合、トレースが不完全になります。
キー識別子
クライアント API の使用には、次の識別子を理解することが重要です。
識別子 | 説明 | 使用方法 |
---|---|---|
request_id |
一意のトレース識別子 | トレース内のすべてのスパンをリンクする |
span_id |
一意のスパン識別子 | 特定の範囲を終点まで識別します |
parent_id |
親スパンの ID | スパン階層を作成します |
はじめに
クライアントを初期化する
from mlflow import MlflowClient
# Initialize client with default tracking URI
client = MlflowClient()
# Or specify a custom tracking URI
client = MlflowClient(tracking_uri="databricks")
トレースの開始
高度な API とは異なり、スパンを追加する前にトレースを明示的に開始する必要があります。
# Start a new trace - this creates the root span
root_span = client.start_trace(
name="my_application_flow",
inputs={"user_id": "123", "action": "generate_report"},
attributes={"environment": "production", "version": "1.0.0"}
)
# Extract the request_id for subsequent operations
request_id = root_span.request_id
print(f"Started trace with ID: {request_id}")
子スパンの追加
アプリケーションのワークフローを表すスパンの階層を作成します。
# Create a child span for data retrieval
data_span = client.start_span(
name="fetch_user_data",
request_id=request_id, # Links to the trace
parent_id=root_span.span_id, # Creates parent-child relationship
inputs={"user_id": "123"},
attributes={"database": "users_db", "query_type": "select"}
)
# Create a sibling span for processing
process_span = client.start_span(
name="process_data",
request_id=request_id,
parent_id=root_span.span_id, # Same parent as data_span
inputs={"data_size": "1024KB"},
attributes={"processor": "gpu", "batch_size": 32}
)
終了スパン
作成の逆の順序でスパンを終了します (LIFO - Last In、First Out):
# End the data retrieval span
client.end_span(
request_id=data_span.request_id,
span_id=data_span.span_id,
outputs={"record_count": 42, "cache_hit": True},
attributes={"duration_ms": 150}
)
# End the processing span
client.end_span(
request_id=process_span.request_id,
span_id=process_span.span_id,
outputs={"processed_records": 42, "errors": 0},
status="OK"
)
トレースの終了
ルートスパンを終了して、トレースを完了させましょう。
# End the root span (completes the trace)
client.end_trace(
request_id=request_id,
outputs={"report_url": "https://example.com/report/123"},
attributes={"total_duration_ms": 1250, "status": "success"}
)
実際の例
例 1: エラー処理
適切なエラー処理により、例外が発生した場合でもトレースが完了します。
def traced_operation():
client = MlflowClient()
root_span = None
try:
# Start trace
root_span = client.start_trace("risky_operation")
# Start child span
child_span = client.start_span(
name="database_query",
request_id=root_span.request_id,
parent_id=root_span.span_id
)
try:
# Risky operation
result = perform_database_query()
# End child span on success
client.end_span(
request_id=child_span.request_id,
span_id=child_span.span_id,
outputs={"result": result},
status="OK"
)
except Exception as e:
# End child span on error
client.end_span(
request_id=child_span.request_id,
span_id=child_span.span_id,
status="ERROR",
attributes={"error": str(e)}
)
raise
except Exception as e:
# Log error to trace
if root_span:
client.end_trace(
request_id=root_span.request_id,
status="ERROR",
attributes={"error_type": type(e).__name__, "error_message": str(e)}
)
raise
else:
# End trace on success
client.end_trace(
request_id=root_span.request_id,
outputs={"status": "completed"},
status="OK"
)
例 2: カスタム トレース管理
既存のシステムと統合するためのカスタム トレース ID の生成と管理を実装します。
import uuid
from datetime import datetime
class CustomTraceManager:
"""Custom trace manager with business-specific trace IDs"""
def __init__(self):
self.client = MlflowClient()
self.active_traces = {}
def generate_trace_id(self, user_id: str, operation: str) -> str:
"""Generate custom trace ID based on business logic"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
return f"{user_id}_{operation}_{timestamp}_{uuid.uuid4().hex[:8]}"
def start_custom_trace(self, user_id: str, operation: str, **kwargs):
"""Start trace with custom ID format"""
trace_name = self.generate_trace_id(user_id, operation)
root_span = self.client.start_trace(
name=trace_name,
attributes={
"user_id": user_id,
"operation": operation,
"custom_trace_id": trace_name,
**kwargs
}
)
self.active_traces[trace_name] = root_span
return root_span
def get_active_trace(self, trace_name: str):
"""Retrieve active trace by custom name"""
return self.active_traces.get(trace_name)
# Usage
manager = CustomTraceManager()
trace = manager.start_custom_trace(
user_id="user123",
operation="report_generation",
report_type="quarterly"
)
入れ子スパンを用いたバッチ処理例 3
複数のレベルの入れ子を使用して複雑なワークフローを追跡します。
def batch_processor(items):
client = MlflowClient()
# Start main trace
root = client.start_trace(
name="batch_processing",
inputs={"batch_size": len(items)}
)
results = []
# Process each item
for i, item in enumerate(items):
# Create span for each item
item_span = client.start_span(
name=f"process_item_{i}",
request_id=root.request_id,
parent_id=root.span_id,
inputs={"item_id": item["id"]}
)
try:
# Validation span
validation_span = client.start_span(
name="validate",
request_id=root.request_id,
parent_id=item_span.span_id
)
is_valid = validate_item(item)
client.end_span(
request_id=validation_span.request_id,
span_id=validation_span.span_id,
outputs={"is_valid": is_valid}
)
if is_valid:
# Processing span
process_span = client.start_span(
name="transform",
request_id=root.request_id,
parent_id=item_span.span_id
)
result = transform_item(item)
results.append(result)
client.end_span(
request_id=process_span.request_id,
span_id=process_span.span_id,
outputs={"transformed": result}
)
# End item span
client.end_span(
request_id=item_span.request_id,
span_id=item_span.span_id,
status="OK"
)
except Exception as e:
# Handle errors gracefully
client.end_span(
request_id=item_span.request_id,
span_id=item_span.span_id,
status="ERROR",
attributes={"error": str(e)}
)
# End main trace
client.end_trace(
request_id=root.request_id,
outputs={
"processed_count": len(results),
"success_rate": len(results) / len(items)
}
)
return results
ベスト プラクティス
1. コンテキストマネージャーを安全に使用する
スパンが常に閉じられるように、カスタム コンテキスト マネージャーを作成します。
from contextlib import contextmanager
@contextmanager
def traced_span(client, name, request_id, parent_id=None, **kwargs):
"""Context manager for safe span management"""
span = client.start_span(
name=name,
request_id=request_id,
parent_id=parent_id,
**kwargs
)
try:
yield span
except Exception as e:
client.end_span(
request_id=span.request_id,
span_id=span.span_id,
status="ERROR",
attributes={"error": str(e)}
)
raise
else:
client.end_span(
request_id=span.request_id,
span_id=span.span_id,
status="OK"
)
# Usage
with traced_span(client, "my_operation", request_id, parent_id) as span:
# Your code here
result = perform_operation()
2. トレース状態管理を実装する
複雑なアプリケーションのトレース状態を管理する:
class TraceStateManager:
"""Manage trace state across application components"""
def __init__(self):
self.client = MlflowClient()
self._trace_stack = []
@property
def current_trace(self):
"""Get current active trace"""
return self._trace_stack[-1] if self._trace_stack else None
def push_trace(self, name: str, **kwargs):
"""Start a new trace and push to stack"""
if self.current_trace:
# Create child span if trace exists
span = self.client.start_span(
name=name,
request_id=self.current_trace.request_id,
parent_id=self.current_trace.span_id,
**kwargs
)
else:
# Create new trace
span = self.client.start_trace(name=name, **kwargs)
self._trace_stack.append(span)
return span
def pop_trace(self, **kwargs):
"""End current trace and pop from stack"""
if not self._trace_stack:
return
span = self._trace_stack.pop()
if self._trace_stack:
# End child span
self.client.end_span(
request_id=span.request_id,
span_id=span.span_id,
**kwargs
)
else:
# End root trace
self.client.end_trace(
request_id=span.request_id,
**kwargs
)
3. 意味のある属性を追加する
デバッグに役立つコンテキストを使用してトレースを強化します。
# Good: Specific, actionable attributes
client.start_span(
name="llm_call",
request_id=request_id,
parent_id=parent_id,
attributes={
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 1000,
"prompt_template": "rag_v2",
"user_tier": "premium"
}
)
# Bad: Generic, unhelpful attributes
client.start_span(
name="process",
request_id=request_id,
parent_id=parent_id,
attributes={"step": 1, "data": "some data"}
)
よくある落とし穴
Warnung
次の一般的な間違いを避けます。
- スパンの終了を忘れずに - 常に try/finally またはコンテキストマネージャーを使用する
- 間違った親子関係 - スパン ID をダブルチェックする
- 高レベル API と低レベル API の混在 - 相互運用できない
- ハードコーディング トレース ID - 常に一意の ID を生成する
- スレッド セーフを無視する - クライアント API は既定ではスレッド セーフではありません
クライアント API を使用するタイミング
次の場合にクライアント API を使用します。
- カスタム トレース ID 生成スキーム
- 既存のトレース システムとの統合
- 複雑なトレース ライフサイクル管理
- 高度なスパン階層
- カスタム トレース状態管理
次の場合は、クライアント API を使用しないでください。
- 単純な関数トレース (
@mlflow.trace
を使用) - ローカル Python アプリケーション (コンテキスト マネージャーを使用)
- クイック プロトタイプ作成 (高度な API を使用)
- 自動トレースとの統合
次のステップ
これらの推奨されるアクションとチュートリアルを使用して、体験を続けます。
- アプリをデバッグして観察 する - クライアント API で作成されたトレースを分析する
- SDK を使用してトレースを照会 する - トレースされたデータにプログラムでアクセスする
- High-Level API - ほとんどのユース ケースに代わるシンプルな方法
リファレンス ガイド
このガイドで説明されている概念と機能の詳細なドキュメントを確認します。
- トレース データ モデル - トレースとスパン構造の詳細
- トレースの概念 - 分散トレースの基礎について
- FAQ - 低レベル API に関する一般的な質問