次の方法で共有


デコレーターと Fluent API (推奨)

自動トレース統合に加えて、MLflow Tracing SDK を使用して Python コードをインストルメント化できます。 これは、カスタム Python コードをインストルメント化する必要がある場合に特に便利です。

デコレーター

@mlflow.traceデコレーターを使用すると、任意の関数のスパンを作成できます。 この方法では、最小限の労力でコードにトレースを追加する簡単で効果的な方法が提供されます。

  • MLflow は関数間の 親子関係 を検出し、自動トレース統合と互換性があります。
  • 関数の実行中に 例外 をキャプチャし、それらをスパン イベントとして記録します。
  • 関数の 名前、入力、出力、実行時間を自動的にログに記録します
  • などのmlflow.openai.autolog機能と共に使用できます。

@mlflow\.trace デコレーターは現在、次の種類の関数をサポートしています。

関数型 サポートされています
同期 イエス
非同期 はい (>= 2.16.0)
ジェネレータ はい (>= 2.20.2)
非同期ジェネレーター はい (>= 2.20.2)

次のコードは、Python 関数をトレースするためにデコレーターを使用する最小の例です。

ヒント

完全な可観測性を確保するために、複数のデコレーターを使用する場合は、通常、 @mlflow.trace デコレーターが 最も外側 にある必要があります。 詳細な説明と例については、 Using @mlflow.trace with Other Decorators を参照してください。

import mlflow


@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
    return x + 1


@mlflow.trace(span_type="func", attributes={"key1": "value1"})
def minus_1(x):
    return x - 1


@mlflow.trace(name="Trace Test")
def trace_test(x):
    step1 = add_1(x)
    return minus_1(step1)


trace_test(4)

トレーシングデコレーター

トレースに同じ名前の複数のスパンが含まれている場合、MLflow は自動的にインクリメントするサフィックス ( _1_2など) を追加します。

スパンのカスタマイズ

@mlflow.traceデコレーターは、作成するスパンをカスタマイズするために、次の引数を受け入れます。

  • name パラメーターを使用して、既定のスパン名 (修飾関数の名前) をオーバーライドします。
  • span_type パラメーターを使用してスパンの種類を設定します。 組み込みの スパン型 または文字列のいずれかを設定します。
  • attributes パラメーターを使用して、スパンにカスタム属性を追加します。

ヒント

@mlflow.traceを他のデコレーター (Web フレームワークなど) と組み合わせる場合は、最も外側であることが重要です。 正しい順序付けの明確な例と正しくない順序については、 Using @mlflow.trace with Other Decoratorsを参照してください。

@mlflow.trace(
    name="call-local-llm", span_type=SpanType.LLM, attributes={"model": "gpt-4o-mini"}
)
def invoke(prompt: str):
    return client.invoke(
        messages=[{"role": "user", "content": prompt}], model="gpt-4o-mini"
    )

または、 mlflow.get_current_active_span API を使用して、関数内のスパンを動的に更新することもできます。

@mlflow.trace(span_type=SpanType.LLM)
def invoke(prompt: str):
    model_id = "gpt-4o-mini"
    # Get the current span (created by the @mlflow.trace decorator)
    span = mlflow.get_current_active_span()
    # Set the attribute to the span
    span.set_attributes({"model": model_id})
    return client.invoke(messages=[{"role": "user", "content": prompt}], model=model_id)

他のデコレーターでの @mlflow.trace の使用

1 つの関数に複数のデコレーターを適用する場合は、@mlflow.traceのデコレーター (一番上のデコレーター) としてを配置することが重要です。 これにより、MLflow は、内部デコレーターの動作を含め、関数の実行全体をキャプチャできます。

@mlflow.traceが最も外側のデコレーターでない場合、関数の実行に対する可視性が制限されているか、正しくない可能性があり、不完全なトレースや、関数の入力、出力、実行時間が誤って表示される可能性があります。

次の概念例を考えてみましょう。

import mlflow
import functools
import time

# A hypothetical additional decorator
def simple_timing_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} executed in {end_time - start_time:.4f} seconds by simple_timing_decorator.")
        return result
    return wrapper

# Correct order: @mlflow.trace is outermost
@mlflow.trace(name="my_decorated_function_correct_order")
@simple_timing_decorator
# @another_framework_decorator # e.g., @app.route("/mypath") from Flask
def my_complex_function(x, y):
    # Function logic here
    time.sleep(0.1) # Simulate work
    return x + y

# Incorrect order: @mlflow.trace is NOT outermost
@simple_timing_decorator
@mlflow.trace(name="my_decorated_function_incorrect_order")
# @another_framework_decorator
def my_other_complex_function(x, y):
    time.sleep(0.1)
    return x * y

# Example calls
if __name__ == "__main__":
    print("Calling function with correct decorator order:")
    my_complex_function(5, 3)

    print("\nCalling function with incorrect decorator order:")
    my_other_complex_function(5, 3)

my_complex_functionの例 (正しい順序) では、@mlflow.traceは、simple_timing_decoratorによって追加された時間を含む、完全な実行をキャプチャします。 my_other_complex_function (正しくない順序) では、MLflow によってキャプチャされたトレースは、総実行時間を正確に反映しなかったり、simple_timing_decoratorが確認する前に@mlflow.traceによって行われた入力/出力の変更を見逃す可能性があります。

トレース タグの追加

トレースにタグを追加して、トレース レベルで追加のメタデータを提供できます。 トレースにタグを設定するには、いくつかの方法があります。 その他の方法については、 how-to guide を参照してください。

@mlflow.trace
def my_func(x):
    mlflow.update_current_trace(tags={"fruit": "apple"})
    return x + 1

UI での要求と応答のプレビューのカスタマイズ

MLflow UI の [トレース] タブにはトレースの一覧が表示され、 Request 列と Response 列には、各トレースのエンド ツー エンドの入力と出力のプレビューが表示されます。 これにより、各トレースが表す内容をすばやく理解できます。

既定では、これらのプレビューは固定の文字数に切り詰められます。 ただし、request_preview関数内のresponse_previewパラメーターとmlflow.update_current_trace()パラメーターを使用して、これらの列に表示される内容をカスタマイズできます。 これは、既定の切り捨てが最も関連性の高い情報を表示しない可能性がある複雑な入力または出力に特に便利です。

次に示すのは、長いドキュメントとユーザーの指示を処理するトレースのカスタム要求プレビューを設定し、UI の Request 列に最も関連性の高い情報を表示する例です。

import mlflow

@mlflow.trace(name="Summarization Pipeline")
def summarize_document(document_content: str, user_instructions: str):
    # Construct a custom preview for the request column
    # For example, show beginning of document and user instructions
    request_p = f"Doc: {document_content[:30]}... Instr: {user_instructions[:30]}..."
    mlflow.update_current_trace(request_preview=request_p)

    # Simulate LLM call
    # messages = [
    #     {"role": "system", "content": "Summarize the following document based on user instructions."},
    #     {"role": "user", "content": f"Document: {document_content}\nInstructions: {user_instructions}"}
    # ]
    # completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
    # summary = completion.choices[0].message.content
    summary = f"Summary of document starting with '{document_content[:20]}...' based on '{user_instructions}'"

    # Customize the response preview
    response_p = f"Summary: {summary[:50]}..."
    mlflow.update_current_trace(response_preview=response_p)

    return summary

# Example Call
long_document = "This is a very long document that contains many details about various topics..." * 10
instructions = "Focus on the key takeaways regarding topic X."
summary_result = summarize_document(long_document, instructions)
# print(summary_result)

トレースに request_previewresponse_preview を設定することで (通常はルート スパン)、メイン トレース リスト ビューでの全体的な相互作用の概要を制御し、トレースを一目で識別して理解しやすくします。

例外の自動処理

トレース インストルメント化された操作の処理中に Exception が発生した場合、呼び出しが成功しなかったことを示すメッセージが UI 内に表示され、デバッグに役立つデータの部分的なキャプチャが使用できるようになります。 さらに、発生した例外に関する詳細は、部分的に完了したスパンの Events に含まれ、コード内で問題が発生している場所の特定に役立ちます。

トレース エラー

自動トレースとの組み合わせ

@mlflow.traceデコレーターは、自動トレースと組み合わせて使用し、統一された統合トレース内で自動トレースと手動で定義されたスパンを組み合わせることができます。 詳細については 、こちらをご覧ください

ストリーミング

MLflow 2.20.2 以降、 @mlflow.trace デコレーターを使用して、ジェネレーターまたは反復子を返す関数をトレースできます。

@mlflow.trace
def stream_data():
    for i in range(5):
        yield i

上記の例では、 stream_data 関数の 1 つのスパンを持つトレースを生成します。 既定では、MLflow は、ジェネレーターによって生成されたすべての要素をスパンの出力のリストとしてキャプチャします。 上の例では、スパンの出力は [0, 1, 2, 3, 4]されます。

ストリーム関数のスパンは、返された反復子の 使用が開始されたときに開始され、反復子が使い果たされたとき、または反復処理中に例外が発生したときに終了します。

出力リジューサーの使用

要素を 1 つのスパン出力として集計する場合は、 output_reducer パラメーターを使用して、要素を集計するカスタム関数を指定できます。 カスタム関数は、生成された要素の一覧を入力として受け取る必要があります。

from typing import List, Any

@mlflow.trace(output_reducer=lambda x: ",".join(x))
def stream_data():
    for c in "hello":
        yield c

上の例では、スパンの出力は "h,e,l,l,o"されます。 未加工のチャンクは、MLflow トレース UI のスパンの [ Events ] タブに引き続き表示されるので、デバッグ時に個々の生成された値を調べることができます。

一般的な出力リデューサパターン

出力リジューサーを実装するための一般的なパターンを次に示します。

トークン集計
from typing import List, Dict, Any

def aggregate_tokens(chunks: List[str]) -> str:
    """Concatenate streaming tokens into complete text"""
    return "".join(chunks)

@mlflow.trace(output_reducer=aggregate_tokens)
def stream_text():
    for word in ["Hello", " ", "World", "!"]:
        yield word
メトリックの集計
def aggregate_metrics(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate streaming metrics into summary statistics"""
    values = [c["value"] for c in chunks if "value" in c]
    return {
        "count": len(values),
        "sum": sum(values),
        "average": sum(values) / len(values) if values else 0,
        "max": max(values) if values else None,
        "min": min(values) if values else None
    }

@mlflow.trace(output_reducer=aggregate_metrics)
def stream_metrics():
    for i in range(10):
        yield {"value": i * 2, "timestamp": time.time()}
エラー収集
def collect_results_and_errors(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Separate successful results from errors"""
    results = []
    errors = []

    for chunk in chunks:
        if chunk.get("error"):
            errors.append(chunk["error"])
        else:
            results.append(chunk.get("data"))

    return {
        "results": results,
        "errors": errors,
        "success_rate": len(results) / len(chunks) if chunks else 0,
        "has_errors": len(errors) > 0
    }

高度な例: OpenAI ストリーミング

次に、 output_reducer を使用して OpenAI LLM からの ChatCompletionChunk 出力を 1 つのメッセージ オブジェクトに統合する高度な例を示します。

ヒント

運用環境のユース ケースでは OpenAI の自動トレース を使用することをお勧めします。これは自動的に処理されます。 次の例はデモ用です。

import mlflow
import openai
from openai.types.chat import *
from typing import Optional


def aggregate_chunks(outputs: list[ChatCompletionChunk]) -> Optional[ChatCompletion]:
    """Consolidate ChatCompletionChunks to a single ChatCompletion"""
    if not outputs:
        return None

    first_chunk = outputs[0]
    delta = first_chunk.choices[0].delta
    message = ChatCompletionMessage(
        role=delta.role, content=delta.content, tool_calls=delta.tool_calls or []
    )
    finish_reason = first_chunk.choices[0].finish_reason
    for chunk in outputs[1:]:
        delta = chunk.choices[0].delta
        message.content += delta.content or ""
        message.tool_calls += delta.tool_calls or []
        finish_reason = finish_reason or chunk.choices[0].finish_reason

    base = ChatCompletion(
        id=first_chunk.id,
        choices=[Choice(index=0, message=message, finish_reason=finish_reason)],
        created=first_chunk.created,
        model=first_chunk.model,
        object="chat.completion",
    )
    return base


@mlflow.trace(output_reducer=aggregate_chunks)
def predict(messages: list[dict]):
    stream = openai.OpenAI().chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True,
    )
    for chunk in stream:
        yield chunk


for chunk in predict([{"role": "user", "content": "Hello"}]):
    print(chunk)

上記の例では、生成された predict スパンには、カスタム レジューサー関数によって集計された 1 つのチャット完了メッセージが出力として含まれます。

Real-World ユース ケース

一般的な GenAI シナリオの出力リジューサーのその他の例を次に示します。

JSON 解析を使用した LLM 応答
from typing import List, Dict, Any
import json

def parse_json_from_llm(content: str) -> str:
    """Extract and clean JSON from LLM responses that may include markdown"""
    # Remove common markdown code block wrappers
    if content.startswith("```json") and content.endswith("```"):
        content = content[7:-3]  # Remove ```json prefix and ``` suffix
    elif content.startswith("```") and content.endswith("```"):
        content = content[3:-3]  # Remove generic ``` wrappers
    return content.strip()

def json_stream_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate LLM streaming output and parse JSON response"""
    full_content = ""
    metadata = {}
    errors = []

    # Process different chunk types
    for chunk in chunks:
        chunk_type = chunk.get("type", "content")

        if chunk_type == "content" or chunk_type == "token":
            full_content += chunk.get("content", "")
        elif chunk_type == "metadata":
            metadata.update(chunk.get("data", {}))
        elif chunk_type == "error":
            errors.append(chunk.get("error"))

    # Return early if errors occurred
    if errors:
        return {
            "status": "error",
            "errors": errors,
            "raw_content": full_content,
            **metadata
        }

    # Try to parse accumulated content as JSON
    try:
        cleaned_content = parse_json_from_llm(full_content)
        parsed_data = json.loads(cleaned_content)

        return {
            "status": "success",
            "data": parsed_data,
            "raw_content": full_content,
            **metadata
        }
    except json.JSONDecodeError as e:
        return {
            "status": "parse_error",
            "error": f"Failed to parse JSON: {str(e)}",
            "raw_content": full_content,
            **metadata
        }

@mlflow.trace(output_reducer=json_stream_reducer)
def generate_structured_output(prompt: str, schema: dict):
    """Generate structured JSON output from an LLM"""
    # Simulate streaming JSON generation
    yield {"type": "content", "content": '{"name": "John", '}
    yield {"type": "content", "content": '"email": "john@example.com", '}
    yield {"type": "content", "content": '"age": 30}'}

    # Add metadata
    trace_id = mlflow.get_current_active_span().request_id if mlflow.get_current_active_span() else None
    yield {"type": "metadata", "data": {"trace_id": trace_id, "model": "gpt-4"}}
OpenAI を使用した構造化出力の生成

OpenAI で出力リジューサーを使用して、構造化された JSON 応答を生成して解析する完全な例を次に示します。

import json
import mlflow
import openai
from typing import List, Dict, Any, Optional

def structured_output_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Aggregate streaming chunks into structured output with comprehensive error handling.
    Handles token streaming, metadata collection, and JSON parsing.
    """
    content_parts = []
    trace_id = None
    model_info = None
    errors = []

    for chunk in chunks:
        chunk_type = chunk.get("type", "token")

        if chunk_type == "token":
            content_parts.append(chunk.get("content", ""))
        elif chunk_type == "trace_info":
            trace_id = chunk.get("trace_id")
            model_info = chunk.get("model")
        elif chunk_type == "error":
            errors.append(chunk.get("message"))

    # Join all content parts
    full_content = "".join(content_parts)

    # Base response
    response = {
        "trace_id": trace_id,
        "model": model_info,
        "raw_content": full_content
    }

    # Handle errors
    if errors:
        response["status"] = "error"
        response["errors"] = errors
        return response

    # Try to extract and parse JSON
    try:
        # Clean markdown wrappers if present
        json_content = full_content.strip()
        if json_content.startswith("```json") and json_content.endswith("```"):
            json_content = json_content[7:-3].strip()
        elif json_content.startswith("```") and json_content.endswith("```"):
            json_content = json_content[3:-3].strip()

        parsed_data = json.loads(json_content)
        response["status"] = "success"
        response["data"] = parsed_data

    except json.JSONDecodeError as e:
        response["status"] = "parse_error"
        response["error"] = f"JSON parsing failed: {str(e)}"
        response["error_position"] = e.pos if hasattr(e, 'pos') else None

    return response

@mlflow.trace(output_reducer=structured_output_reducer)
async def generate_customer_email(
    customer_name: str,
    issue: str,
    sentiment: str = "professional"
) -> None:
    """
    Generate a structured customer service email response.
    Demonstrates real-world streaming with OpenAI and structured output parsing.
    """
    client = openai.AsyncOpenAI()

    system_prompt = """You are a customer service assistant. Generate a professional email response in JSON format:
    {
        "subject": "email subject line",
        "greeting": "personalized greeting",
        "body": "main email content addressing the issue",
        "closing": "professional closing",
        "priority": "high|medium|low"
    }"""

    user_prompt = f"Customer: {customer_name}\nIssue: {issue}\nTone: {sentiment}"

    try:
        # Stream the response
        stream = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            stream=True,
            temperature=0.7
        )

        # Yield streaming tokens
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield {
                    "type": "token",
                    "content": chunk.choices[0].delta.content
                }

        # Add trace metadata
        if current_span := mlflow.get_current_active_span():
            yield {
                "type": "trace_info",
                "trace_id": current_span.request_id,
                "model": "gpt-4o-mini"
            }

    except Exception as e:
        yield {
            "type": "error",
            "message": f"OpenAI API error: {str(e)}"
        }

# Example usage
async def main():
    # This will automatically aggregate the streamed output into structured JSON
    async for chunk in generate_customer_email(
        customer_name="John Doe",
        issue="Product arrived damaged",
        sentiment="empathetic"
    ):
        # In practice, you might send these chunks to a frontend
        print(chunk.get("content", ""), end="", flush=True)

この例では、いくつかの実際のパターンを紹介します。

  • ストリーミング UI の更新: トークンは到着時に表示できます
  • 構造化された出力検証: JSON 解析によって応答形式が保証される
  • エラーの回復性: API エラーと解析エラーを適切に処理する
  • トレースの関連付け: ストリーミング出力をデバッグ用の MLflow トレースにリンクします
マルチモデル応答集計
def multi_model_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Aggregate responses from multiple models"""
    responses = {}
    latencies = {}

    for chunk in chunks:
        model = chunk.get("model")
        if model:
            responses[model] = chunk.get("response", "")
            latencies[model] = chunk.get("latency", 0)

    return {
        "responses": responses,
        "latencies": latencies,
        "fastest_model": min(latencies, key=latencies.get) if latencies else None,
        "consensus": len(set(responses.values())) == 1
    }

出力リジューサーのテスト

出力リジューサーはトレース フレームワークとは別にテストできるため、エッジ ケースを正しく処理することが容易になります。

import unittest
from typing import List, Dict, Any

def my_reducer(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Example reducer to be tested"""
    if not chunks:
        return {"status": "empty", "total": 0}

    total = sum(c.get("value", 0) for c in chunks)
    errors = [c for c in chunks if c.get("error")]

    return {
        "status": "error" if errors else "success",
        "total": total,
        "count": len(chunks),
        "average": total / len(chunks) if chunks else 0,
        "error_count": len(errors)
    }

class TestOutputReducer(unittest.TestCase):
    def test_normal_case(self):
        chunks = [
            {"value": 10},
            {"value": 20},
            {"value": 30}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["status"], "success")
        self.assertEqual(result["total"], 60)
        self.assertEqual(result["average"], 20.0)

    def test_empty_input(self):
        result = my_reducer([])
        self.assertEqual(result["status"], "empty")
        self.assertEqual(result["total"], 0)

    def test_error_handling(self):
        chunks = [
            {"value": 10},
            {"error": "Network timeout"},
            {"value": 20}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["status"], "error")
        self.assertEqual(result["total"], 30)
        self.assertEqual(result["error_count"], 1)

    def test_missing_values(self):
        chunks = [
            {"value": 10},
            {"metadata": "some info"},  # No value field
            {"value": 20}
        ]
        result = my_reducer(chunks)
        self.assertEqual(result["total"], 30)
        self.assertEqual(result["count"], 3)

ヒント

  • 出力レジューサーは、メモリ内のすべてのチャンクを一度に受信します。 非常に大きなストリームの場合は、ストリーミングの代替手段またはチャンク戦略の実装を検討してください。
  • ジェネレーターが完全に使用されるまでスパンは開いたままであり、待機時間メトリックに影響します。
  • レジューサーはステートレスであり、予測可能な動作を確保するために副作用を生じないようにする必要があります。

関数ラッピング

関数ラップは、定義を変更せずに既存の関数にトレースを追加する柔軟な方法を提供します。 これは、コントロールの外部で定義されているサードパーティの関数または関数にトレースを追加する場合に特に便利です。 外部関数を @mlflow.trace でラップすることで、その入力、出力、実行コンテキストをキャプチャできます。

関数を動的にラップする場合、"最も外側" という概念が引き続き適用されます。 関数全体の呼び出しをキャプチャするために、ラップされた関数の特定のポイントでトレースラッパーを適用する必要があります。

import math
import mlflow


def invocation(x, y, exp=2):
    # Wrap an external function from the math library
    traced_pow = mlflow.trace(math.pow)
    raised = traced_pow(x, exp)

    traced_factorial = mlflow.trace(math.factorial)
    factorial = traced_factorial(int(raised))
    return response


invocation(4)

コンテキスト マネージャー

デコレーターに加えて、MLflow では、 mlflow.start_span コンテキスト マネージャーを使用して、カプセル化された任意のコード ブロック内でアクセスできるスパンを作成できます。 コード内の複雑な相互作用を、1つの関数の境界をキャプチャするだけでは得られないような細かい詳細で捉えることができるため、役立ちます。

デコレーターと同様に、コンテキスト マネージャーは親子関係、例外、実行時間を自動的にキャプチャし、自動トレースで動作します。 ただし、スパンの名前、入力、出力は手動で指定する必要があります。 コンテキスト マネージャーから返される mlflow.entities.Span オブジェクトを使用して設定できます。

with mlflow.start_span(name="my_span") as span:
    span.set_inputs({"x": 1, "y": 2})
    z = x + y
    span.set_outputs(z)

OpenAI のデコレーターと自動トレースの両方と組み合わせて mlflow.start_span コンテキスト マネージャーを使用する、もう少し複雑な例を次に示します。

import mlflow
from mlflow.entities import SpanType


@mlflow.trace(span_type=SpanType.CHAIN)
def start_session():
    messages = [{"role": "system", "content": "You are a friendly chat bot"}]
    while True:
        with mlflow.start_span(name="User") as span:
            span.set_inputs(messages)
            user_input = input(">> ")
            span.set_outputs(user_input)

        if user_input == "BYE":
            break

        messages.append({"role": "user", "content": user_input})

        response = openai.OpenAI().chat.completions.create(
            model="gpt-4o-mini",
            max_tokens=100,
            messages=messages,
        )
        answer = response.choices[0].message.content
        print(f"🤖: {answer}")

        messages.append({"role": "assistant", "content": answer})


mlflow.openai.autolog()
start_session()

マルチスレッド

MLflow トレースはスレッド セーフであり、トレースはスレッドごとに既定で分離されます。 ただし、いくつかの追加の手順で複数のスレッドにまたがるトレースを作成することもできます。

MLflow では、Python の組み込みの ContextVar メカニズムを使用してスレッド セーフを確保します。これは、既定ではスレッド間で伝達されません。 そのため、次の例に示すように、コンテキストをメイン スレッドからワーカー スレッドに手動でコピーする必要があります。

import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import mlflow
from mlflow.entities import SpanType
import openai

client = openai.OpenAI()

# Enable MLflow Tracing for OpenAI
mlflow.openai.autolog()


@mlflow.trace
def worker(question: str) -> str:
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": question},
    ]
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.1,
        max_tokens=100,
    )
    return response.choices[0].message.content


@mlflow.trace
def main(questions: list[str]) -> list[str]:
    results = []
    # Almost same as how you would use ThreadPoolExecutor, but two additional steps
    #  1. Copy the context in the main thread using copy_context()
    #  2. Use ctx.run() to run the worker in the copied context
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        for question in questions:
            ctx = contextvars.copy_context()
            futures.append(executor.submit(ctx.run, worker, question))
        for future in as_completed(futures):
            results.append(future.result())
    return results


questions = [
    "What is the capital of France?",
    "What is the capital of Germany?",
]

main(questions)

マルチスレッド トレース

ヒント

これに対し、 ContextVar は既定で 非同期 タスクにコピーされます。 そのため、 asyncioを使用するときにコンテキストを手動でコピーする必要はありません。これは、MLflow トレースを使用して Python で同時 I/O バインド タスクを処理する簡単な方法です。

(高度な)低レベルのクライアント API

デコレーターまたはコンテキスト マネージャーが要件を満たしていない場合は、低レベルのクライアント API を使用できます。 たとえば、異なる関数からスパンを開始および終了する必要がある場合もあります。 クライアント API は、MLflow REST API のシン ラッパーとして設計されており、トレース ライフサイクルをより細かく制御できます。 詳細については、ガイドを参照してください。

Warnung

クライアント API を使用する場合は、次の制限事項に注意してください。

  • 親子関係は自動的にはキャプチャされません。 親スパンの ID を手動で渡す必要があります。
  • クライアント API を使用して作成されたスパンは、自動トレーススパンと組み合わせません。
  • 試験段階としてマークされている低レベルの API は、バックエンド実装の更新に基づいて変更される可能性があります。

## Next steps
Continue your journey with these recommended actions and tutorials.

- [Low-level Client APIs](/mlflow3/genai/tracing/app-instrumentation/manual-tracing/low-level-api.md) - Learn advanced tracing control for complex scenarios
- [Debug & observe your app](/mlflow3/genai/tracing/observe-with-traces/index.md) - Use your traced app for debugging and analysis
- [Combine with automatic tracing](/mlflow3/genai/tracing/app-instrumentation/automatic.md) - Mix manual and automatic tracing

## Reference guides
Explore detailed documentation for concepts and features mentioned in this guide.

- [Tracing data model](/mlflow3/genai/tracing/data-model.md) - Understand the structure of traces and spans
- [Tracing concepts](/mlflow3/genai/tracing/tracing-101.md) - Learn fundamentals of distributed tracing
- [FAQ](/mlflow3/genai/tracing/faq.md) - Common questions about tracing implementation