Durable Task SDK には、Durable Task Scheduler 用の軽量クライアント ライブラリが用意されています。 このクイック スタートでは、 ファンアウト/ファンイン アプリケーション パターン を使用して並列処理を実行するオーケストレーションを作成する方法について説明します。
重要
現時点では、Durable Task SDK は JavaScript と PowerShell では使用できません。
重要
現時点では、Durable Task SDK は JavaScript と PowerShell では使用できません。
- ローカル開発用の Durable Task Scheduler エミュレーターを設定して実行します。
- ワーカープロジェクトとクライアントプロジェクトを実行します。
- Durable Task Scheduler ダッシュボードを使用して、オーケストレーションの状態と履歴を確認します。
[前提条件]
開始する前に、次の手順を実行します。
- .NET 8 SDK 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
- Python 3.9 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
- Java 8 または 11 があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
Durable Task Scheduler エミュレーターを設定する
アプリケーション コードは、デプロイされたスケジューラとタスク ハブ リソースを探します。 何も見つからない場合、コードはエミュレーターにフォールバックします。 エミュレーターは Docker コンテナー内のスケジューラとタスク ハブをシミュレートするため、このクイックスタートで必要なローカル開発に最適です。
Azure-Samples/Durable-Task-Scheduler
ルート ディレクトリから、.NET SDK サンプル ディレクトリに移動します。cd samples/durable-task-sdks/dotnet/FanOutFanIn
エミュレーターの Docker イメージをプルします。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
エミュレーターを実行します。 コンテナーの準備に数秒かかる場合があります。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
サンプル コードでは既定のエミュレーター設定が自動的に使用されるため、環境変数を設定する必要はありません。 このクイック スタートの既定のエミュレーター設定は次のとおりです。
- エンドポイント:
http://localhost:8080
- タスク ハブ:
default
Azure-Samples/Durable-Task-Scheduler
ルート ディレクトリから、Python SDK サンプル ディレクトリに移動します。cd samples/durable-task-sdks/python/fan-out-fan-in
エミュレーターの Docker イメージをプルします。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
エミュレーターを実行します。 コンテナーの準備に数秒かかる場合があります。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
サンプル コードでは既定のエミュレーター設定が自動的に使用されるため、環境変数を設定する必要はありません。 このクイック スタートの既定のエミュレーター設定は次のとおりです。
- エンドポイント:
http://localhost:8080
- タスク ハブ:
default
Azure-Samples/Durable-Task-Scheduler
ルート ディレクトリから、Java SDK サンプル ディレクトリに移動します。cd samples/durable-task-sdks/java/fan-out-fan-in
エミュレーターの Docker イメージをプルします。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
エミュレーターを実行します。 コンテナーの準備に数秒かかる場合があります。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
サンプル コードでは既定のエミュレーター設定が自動的に使用されるため、環境変数を設定する必要はありません。 このクイック スタートの既定のエミュレーター設定は次のとおりです。
- エンドポイント:
http://localhost:8080
- タスク ハブ:
default
クイック スタートを実行する
FanOutFanIn
ディレクトリから、Worker
ディレクトリに移動してワーカーをビルドして実行します。cd Worker dotnet build dotnet run
別のターミナルで、
FanOutFanIn
ディレクトリからClient
ディレクトリに移動して、クライアントをビルドして実行します。cd Client dotnet build dotnet run
出力を理解すること
このサンプルを実行すると、ワーカー プロセスとクライアント プロセスの両方から出力を受け取ります。 プロジェクトの実行時にコードで何が起こったかをアンパックします。
作業者の成果
ワーカーの出力には、次の情報が表示されます。
- オーケストレーターとアクティビティの登録
- 各アクティビティが呼び出されたときにエントリをログに記録する
- 複数の作業項目の並列処理
- 結果の最終的な集計
クライアント出力
クライアントの出力は次のように表示されます。
- オーケストレーションは作業項目の一覧から始まります。
- 一意のオーケストレーション インスタンス ID
- 各作業項目とそれに対応する結果を示す最終的な集計結果
- 処理済みアイテムの合計数
出力例
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Python 仮想環境をアクティブ化します。
必要なパッケージをインストールします。
pip install -r requirements.txt
ワーカーを開始します。
python worker.py
想定される出力
ワーカーが起動され、作業項目を待っていることを示す出力を確認できます。
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
新しいターミナルで、仮想環境をアクティブ化し、クライアントを実行します。
作業項目の数を引数として指定できます。 引数が指定されていない場合、この例では既定で 10 個の項目が実行されます。
python client.py [number_of_items]
出力を理解すること
このサンプルを実行すると、ワーカー プロセスとクライアント プロセスの両方から出力を受け取ります。 プロジェクトの実行時にコードで何が起こったかをアンパックします。
作業者の成果
ワーカーの出力には、次の情報が表示されます。
- オーケストレーターとアクティビティの登録。
- 各作業項目を並行して処理するときに、同時に実行されていることを示すステータス メッセージ。
- さまざまな処理時間をシミュレートするための各作業項目のランダムな遅延 (0.5 ~ 2 秒)。
- 結果の集計を示す最後のメッセージ。
クライアント出力
クライアントの出力は次のように表示されます。
- オーケストレーションは指定された作業項目の数から始まります。
- 一意のオーケストレーション インスタンス ID。
- 最終的な集計結果には、次のものが含まれます。
- 処理されたアイテムの合計数
- すべての結果の合計 (各項目の結果は値の 2 乗)
- すべての結果の平均
出力例
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
fan-out-fan-in
ディレクトリから、Gradle を使用してアプリケーションをビルドして実行します。
./gradlew runFanOutFanInPattern
ヒント
zsh: permission denied: ./gradlew
エラー メッセージが表示された場合は、アプリケーションを実行する前にchmod +x gradlew
を実行してみてください。
出力を理解すること
このサンプルを実行すると、次のような出力が表示されます。
- オーケストレーターとアクティビティの登録。
- 各作業項目を並行して処理するときに、同時に実行されていることを示すステータス メッセージ。
- さまざまな処理時間をシミュレートするための各作業項目のランダムな遅延 (0.5 ~ 2 秒)。
- 結果の集計を示す最後のメッセージ。
プロジェクトの実行時にコードで何が起こったかをアンパックします。
出力例
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
プロジェクトをローカルで実行したので、 Azure Container Apps でホストされている Azure にデプロイする方法を学習できるようになりました。
オーケストレーションの状態と履歴を表示する
Durable Task Scheduler ダッシュボードを使用して、オーケストレーションの状態と履歴を表示できます。 既定では、ダッシュボードはポート 8082 で実行されます。
- Web ブラウザーで http://localhost:8082 に移動します。
- 既定のタスク ハブをクリックします。 作成したオーケストレーション インスタンスが一覧に表示されます。
- オーケストレーション インスタンス ID をクリックして、次のような実行の詳細を表示します。
- 複数のアクティビティ タスクの並列実行
- ファンイン集計手順
- 各ステップでの入力と出力
- 各ステップにかかった時間
コード構造について
労働者プロジェクト
ファンアウト/ファンイン パターンを示すために、worker プロジェクト オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。 オーケストレーター:
- 作業項目の一覧を入力として受け取ります。
-
ProcessWorkItemActivity
を使用して作業項目ごとに個別のタスクを作成してファンアウトします。 - すべてのタスクを並列で実行します。
-
Task.WhenAll
を使用して、すべてのタスクが完了するまで待機します。 -
AggregateResultsActivity
を使用して、すべての個々の結果を集計してファンインします。 - 最終的な集計結果をクライアントに返します。
作業員のプロジェクトには次のものが含まれます。
- ParallelProcessingOrchestration.cs: オーケストレーター関数とアクティビティ関数を 1 つのファイルで定義します。
- Program.cs: 適切な接続文字列処理を使用してワーカー ホストを設定します。
ParallelProcessingOrchestration.cs
ファンアウト/ファンインを使用して、オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
各アクティビティは、 [DurableTask]
属性で修飾された個別のクラスとして実装されます。
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
ワーカーは、適切なライフサイクル管理のために Microsoft.Extensions.Hosting
を使用します。
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
クライアント プロジェクト
クライアント プロジェクト:
- ワーカーと同じ接続文字列ロジックを使用します。
- 並列処理する作業項目の一覧を作成します。
- リストを入力として使用してオーケストレーション インスタンスをスケジュールします。
- オーケストレーションが完了するまで待機し、集計された結果を表示します。
- 効率的なポーリングに
WaitForInstanceCompletionAsync
を使用します。
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
ファンアウト/ファンイン パターンを示すために、worker プロジェクト オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。 オーケストレーター:
- 作業項目の一覧を入力として受け取ります。
- 作業項目ごとに並列タスクを作成して "ファンアウト" します (各作業項目に対して
process_work_item
を呼び出します)。 -
task.when_all
を使用して、すべてのタスクが完了するまで待機します。 - その後、
aggregate_results
アクティビティで結果を集計して "ファンイン" します。 - 最終的な集計結果がクライアントに返されます。
ファンアウト/ファンインを使用して、オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
クライアント プロジェクト:
- ワーカーと同じ接続文字列ロジックを使用します。
- 並列処理する作業項目の一覧を作成します。
- リストを入力として使用してオーケストレーション インスタンスをスケジュールします。
- オーケストレーションが完了するまで待機し、集計された結果を表示します。
- 効率的なポーリングに
wait_for_orchestration_completion
を使用します。
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
ファンアウト/ファンイン パターンを示すために、FanOutFanInPattern
プロジェクト オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。 オーケストレーター:
- 作業項目の一覧を入力として受け取ります。
- `` を使用して、作業項目ごとに個別のタスクを作成してファンアウトします。
- すべてのタスクを並列で実行します。
- '' を使用して、すべてのタスクが完了するまで待機します。
- `` を使用して、すべての個々の結果を集計してファンインします。
- 最終的な集計結果をクライアントに返します。
プロジェクトには次のものが含まれます。
-
DurableTaskSchedulerWorkerExtensions
worker: オーケストレーター関数とアクティビティ関数を定義します。 -
DurableTaskSchedulerClientExtension
クライアント: 適切な接続文字列処理を使用してワーカー ホストを設定します。
勤労者
ファンアウト/ファンインを使用して、オーケストレーションは並列アクティビティ タスクを作成し、すべてが完了するまで待機します。
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
顧客
クライアント プロジェクト:
- ワーカーと同じ接続文字列ロジックを使用します。
- 並列処理する作業項目の一覧を作成します。
- リストを入力として使用してオーケストレーション インスタンスをスケジュールします。
- オーケストレーションが完了するまで待機し、集計された結果を表示します。
- 効率的なポーリングに
waitForInstanceCompletion
を使用します。
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
次のステップ
Durable Task Scheduler エミュレーターを使用してサンプルをローカルで実行したので、スケジューラとタスク ハブ リソースを作成し、Azure Container Apps にデプロイしてみてください。