次の方法で共有


低レベルのクライアント API (詳細)

MLflow クライアント API は、トレース ライフサイクル管理を直接きめ細かく制御します。 高度な API はほとんどのユース ケースをエレガントに処理しますが、クライアント API は、トレース作成、カスタム トレース ID、または既存の監視システムとの統合を明示的に制御する必要がある高度なシナリオに不可欠です。

Warnung

開始する前に: 高レベル API が要件を満たしていない場合にのみ、クライアント API を使用することをお勧めします。

  • 親子関係の自動検出なし
  • 手動の例外処理が必要
  • 自動トレース統合と互換性がありません
  • トレース ライフサイクルを完全に制御する
  • カスタム トレース ID 管理
  • 既存システムとの統合

核となる概念

トレース ライフサイクル

すべてのトレースは、明示的に管理する必要がある厳密なライフサイクルに従います。

graph LR
    A[Start Trace] --> B[Start Span 1]
    B --> C[Start Span 2]
    C --> D[End Span 2]
    D --> E[End Span 1]
    E --> F[End Trace]

Von Bedeutung

ゴールデン ルール: すべての start_trace または start_span 呼び出しには、対応する end_trace または end_span 呼び出しが必要です。 スパンを閉じなかった場合、トレースが不完全になります。

キー識別子

クライアント API の使用には、次の識別子を理解することが重要です。

識別子 説明 使用方法
request_id 一意のトレース識別子 トレース内のすべてのスパンをリンクする
span_id 一意のスパン識別子 特定の範囲を終点まで識別します
parent_id 親スパンの ID スパン階層を作成します

はじめに

クライアントを初期化する

from mlflow import MlflowClient

# Initialize client with default tracking URI
client = MlflowClient()

# Or specify a custom tracking URI
client = MlflowClient(tracking_uri="databricks")

トレースの開始

高度な API とは異なり、スパンを追加する前にトレースを明示的に開始する必要があります。

# Start a new trace - this creates the root span
root_span = client.start_trace(
    name="my_application_flow",
    inputs={"user_id": "123", "action": "generate_report"},
    attributes={"environment": "production", "version": "1.0.0"}
)

# Extract the request_id for subsequent operations
request_id = root_span.request_id
print(f"Started trace with ID: {request_id}")

子スパンの追加

アプリケーションのワークフローを表すスパンの階層を作成します。

# Create a child span for data retrieval
data_span = client.start_span(
    name="fetch_user_data",
    request_id=request_id,  # Links to the trace
    parent_id=root_span.span_id,  # Creates parent-child relationship
    inputs={"user_id": "123"},
    attributes={"database": "users_db", "query_type": "select"}
)

# Create a sibling span for processing
process_span = client.start_span(
    name="process_data",
    request_id=request_id,
    parent_id=root_span.span_id,  # Same parent as data_span
    inputs={"data_size": "1024KB"},
    attributes={"processor": "gpu", "batch_size": 32}
)

終了スパン

作成の逆の順序でスパンを終了します (LIFO - Last In、First Out):

# End the data retrieval span
client.end_span(
    request_id=data_span.request_id,
    span_id=data_span.span_id,
    outputs={"record_count": 42, "cache_hit": True},
    attributes={"duration_ms": 150}
)

# End the processing span
client.end_span(
    request_id=process_span.request_id,
    span_id=process_span.span_id,
    outputs={"processed_records": 42, "errors": 0},
    status="OK"
)

トレースの終了

ルートスパンを終了して、トレースを完了させましょう。

# End the root span (completes the trace)
client.end_trace(
    request_id=request_id,
    outputs={"report_url": "https://example.com/report/123"},
    attributes={"total_duration_ms": 1250, "status": "success"}
)

実際の例

例 1: エラー処理

適切なエラー処理により、例外が発生した場合でもトレースが完了します。

def traced_operation():
    client = MlflowClient()
    root_span = None

    try:
        # Start trace
        root_span = client.start_trace("risky_operation")

        # Start child span
        child_span = client.start_span(
            name="database_query",
            request_id=root_span.request_id,
            parent_id=root_span.span_id
        )

        try:
            # Risky operation
            result = perform_database_query()

            # End child span on success
            client.end_span(
                request_id=child_span.request_id,
                span_id=child_span.span_id,
                outputs={"result": result},
                status="OK"
            )
        except Exception as e:
            # End child span on error
            client.end_span(
                request_id=child_span.request_id,
                span_id=child_span.span_id,
                status="ERROR",
                attributes={"error": str(e)}
            )
            raise

    except Exception as e:
        # Log error to trace
        if root_span:
            client.end_trace(
                request_id=root_span.request_id,
                status="ERROR",
                attributes={"error_type": type(e).__name__, "error_message": str(e)}
            )
        raise
    else:
        # End trace on success
        client.end_trace(
            request_id=root_span.request_id,
            outputs={"status": "completed"},
            status="OK"
        )

例 2: カスタム トレース管理

既存のシステムと統合するためのカスタム トレース ID の生成と管理を実装します。

import uuid
from datetime import datetime

class CustomTraceManager:
    """Custom trace manager with business-specific trace IDs"""

    def __init__(self):
        self.client = MlflowClient()
        self.active_traces = {}

    def generate_trace_id(self, user_id: str, operation: str) -> str:
        """Generate custom trace ID based on business logic"""
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        return f"{user_id}_{operation}_{timestamp}_{uuid.uuid4().hex[:8]}"

    def start_custom_trace(self, user_id: str, operation: str, **kwargs):
        """Start trace with custom ID format"""
        trace_name = self.generate_trace_id(user_id, operation)

        root_span = self.client.start_trace(
            name=trace_name,
            attributes={
                "user_id": user_id,
                "operation": operation,
                "custom_trace_id": trace_name,
                **kwargs
            }
        )

        self.active_traces[trace_name] = root_span
        return root_span

    def get_active_trace(self, trace_name: str):
        """Retrieve active trace by custom name"""
        return self.active_traces.get(trace_name)

# Usage
manager = CustomTraceManager()
trace = manager.start_custom_trace(
    user_id="user123",
    operation="report_generation",
    report_type="quarterly"
)

入れ子スパンを用いたバッチ処理例 3

複数のレベルの入れ子を使用して複雑なワークフローを追跡します。

def batch_processor(items):
    client = MlflowClient()

    # Start main trace
    root = client.start_trace(
        name="batch_processing",
        inputs={"batch_size": len(items)}
    )

    results = []

    # Process each item
    for i, item in enumerate(items):
        # Create span for each item
        item_span = client.start_span(
            name=f"process_item_{i}",
            request_id=root.request_id,
            parent_id=root.span_id,
            inputs={"item_id": item["id"]}
        )

        try:
            # Validation span
            validation_span = client.start_span(
                name="validate",
                request_id=root.request_id,
                parent_id=item_span.span_id
            )

            is_valid = validate_item(item)

            client.end_span(
                request_id=validation_span.request_id,
                span_id=validation_span.span_id,
                outputs={"is_valid": is_valid}
            )

            if is_valid:
                # Processing span
                process_span = client.start_span(
                    name="transform",
                    request_id=root.request_id,
                    parent_id=item_span.span_id
                )

                result = transform_item(item)
                results.append(result)

                client.end_span(
                    request_id=process_span.request_id,
                    span_id=process_span.span_id,
                    outputs={"transformed": result}
                )

            # End item span
            client.end_span(
                request_id=item_span.request_id,
                span_id=item_span.span_id,
                status="OK"
            )

        except Exception as e:
            # Handle errors gracefully
            client.end_span(
                request_id=item_span.request_id,
                span_id=item_span.span_id,
                status="ERROR",
                attributes={"error": str(e)}
            )

    # End main trace
    client.end_trace(
        request_id=root.request_id,
        outputs={
            "processed_count": len(results),
            "success_rate": len(results) / len(items)
        }
    )

    return results

ベスト プラクティス

1. コンテキストマネージャーを安全に使用する

スパンが常に閉じられるように、カスタム コンテキスト マネージャーを作成します。

from contextlib import contextmanager

@contextmanager
def traced_span(client, name, request_id, parent_id=None, **kwargs):
    """Context manager for safe span management"""
    span = client.start_span(
        name=name,
        request_id=request_id,
        parent_id=parent_id,
        **kwargs
    )
    try:
        yield span
    except Exception as e:
        client.end_span(
            request_id=span.request_id,
            span_id=span.span_id,
            status="ERROR",
            attributes={"error": str(e)}
        )
        raise
    else:
        client.end_span(
            request_id=span.request_id,
            span_id=span.span_id,
            status="OK"
        )

# Usage
with traced_span(client, "my_operation", request_id, parent_id) as span:
    # Your code here
    result = perform_operation()

2. トレース状態管理を実装する

複雑なアプリケーションのトレース状態を管理する:

class TraceStateManager:
    """Manage trace state across application components"""

    def __init__(self):
        self.client = MlflowClient()
        self._trace_stack = []

    @property
    def current_trace(self):
        """Get current active trace"""
        return self._trace_stack[-1] if self._trace_stack else None

    def push_trace(self, name: str, **kwargs):
        """Start a new trace and push to stack"""
        if self.current_trace:
            # Create child span if trace exists
            span = self.client.start_span(
                name=name,
                request_id=self.current_trace.request_id,
                parent_id=self.current_trace.span_id,
                **kwargs
            )
        else:
            # Create new trace
            span = self.client.start_trace(name=name, **kwargs)

        self._trace_stack.append(span)
        return span

    def pop_trace(self, **kwargs):
        """End current trace and pop from stack"""
        if not self._trace_stack:
            return

        span = self._trace_stack.pop()

        if self._trace_stack:
            # End child span
            self.client.end_span(
                request_id=span.request_id,
                span_id=span.span_id,
                **kwargs
            )
        else:
            # End root trace
            self.client.end_trace(
                request_id=span.request_id,
                **kwargs
            )

3. 意味のある属性を追加する

デバッグに役立つコンテキストを使用してトレースを強化します。

# Good: Specific, actionable attributes
client.start_span(
    name="llm_call",
    request_id=request_id,
    parent_id=parent_id,
    attributes={
        "model": "gpt-4",
        "temperature": 0.7,
        "max_tokens": 1000,
        "prompt_template": "rag_v2",
        "user_tier": "premium"
    }
)

# Bad: Generic, unhelpful attributes
client.start_span(
    name="process",
    request_id=request_id,
    parent_id=parent_id,
    attributes={"step": 1, "data": "some data"}
)

よくある落とし穴

Warnung

次の一般的な間違いを避けます。

  1. スパンの終了を忘れずに - 常に try/finally またはコンテキストマネージャーを使用する
  2. 間違った親子関係 - スパン ID をダブルチェックする
  3. 高レベル API と低レベル API の混在 - 相互運用できない
  4. ハードコーディング トレース ID - 常に一意の ID を生成する
  5. スレッド セーフを無視する - クライアント API は既定ではスレッド セーフではありません

クライアント API を使用するタイミング

次の場合にクライアント API を使用します。

  • カスタム トレース ID 生成スキーム
  • 既存のトレース システムとの統合
  • 複雑なトレース ライフサイクル管理
  • 高度なスパン階層
  • カスタム トレース状態管理

次の場合は、クライアント API を使用しないでください。

  • 単純な関数トレース ( @mlflow.traceを使用)
  • ローカル Python アプリケーション (コンテキスト マネージャーを使用)
  • クイック プロトタイプ作成 (高度な API を使用)
  • 自動トレースとの統合

次のステップ

これらの推奨されるアクションとチュートリアルを使用して、体験を続けます。

リファレンス ガイド

このガイドで説明されている概念と機能の詳細なドキュメントを確認します。