重要
現時点では、Durable Task SDK は JavaScript と PowerShell では使用できません。
重要
現時点では、Durable Task SDK は JavaScript と PowerShell では使用できません。
このクイックスタートでは、次の方法について説明します。
- ローカル開発用の Durable Task Scheduler エミュレーターを設定して実行します。
- ワーカープロジェクトとクライアントプロジェクトを実行します。
- Azure Container Apps のログを確認します。
- Durable Task Scheduler ダッシュボードを使用して、オーケストレーションの状態と履歴を確認します。
[前提条件]
開始する前に、次の手順を実行します。
- .NET 8 SDK 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Azure Developer CLI をインストールする
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
- Python 3.9 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Azure Developer CLI をインストールする
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
- Java 8 または 11 があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
- Azure Developer CLI をインストールする
- Durable Task Scheduler GitHub リポジトリを複製して、クイックスタート サンプルを使用します。
プロジェクトを準備する
新しいターミナル ウィンドウで、 Azure-Samples/Durable-Task-Scheduler
ディレクトリからサンプル ディレクトリに移動します。
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
Azure Developer CLI を使用してデプロイする
azd up
を実行して、1 つのコマンドでインフラストラクチャをプロビジョニングしてアプリケーションを Azure Container Apps にデプロイします。azd up
ターミナルでメッセージが表示されたら、次のパラメーターを指定します。
パラメーター 説明 環境名 すべての Azure リソースを保持するために作成されたリソース グループのプレフィックス。 Azure の場所 リソースの Azure の場所。 Azure サブスクリプション リソースの Azure サブスクリプション。 この処理は、完了までに時間がかかる場合があります。
azd up
コマンドが完了すると、CLI 出力に、デプロイの進行状況を監視するための 2 つの Azure portal リンクが表示されます。 出力には、azd up
がどのように機能するかも示されています。-
./infra
を使用して、azd provision
ディレクトリ内の指定されている Bicep ファイルを使用して、必要なすべての Azure リソースを作成して構成します。 Azure Developer CLI によってプロビジョニングされると、Azure portal からこれらのリソースにアクセスできます。 Azure リソースをプロビジョニングするファイルは次のとおりです。main.parameters.json
main.bicep
- 機能別に整理された
app
リソース ディレクトリ -
core
テンプレートで使用される Bicep モジュールを含むazd
リファレンス ライブラリ
-
azd deploy
を使用したコードのデプロイ
想定される出力
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.
-
デプロイが成功したことを確認する
Azure portal で、オーケストレーションが正常に実行されていることを確認します。
ターミナル出力からリソース グループ名をコピーします。
Azure portal にサインインし、そのリソース グループ名を検索します。
リソース グループの概要ページで、クライアント コンテナー アプリ リソースをクリックします。
[ 監視>ログ ストリーム] を選択します。
クライアント コンテナーが関数チェーン タスクをログに記録することを確認します。
リソース グループ ページに戻り、
worker
コンテナーを選択します。[ 監視>ログ ストリーム] を選択します。
ワーカー コンテナーが関数チェーン タスクをログに記録することを確認します。
サンプル コンテナー アプリが関数チェーン タスクをログに記録することを確認します。
コードの概要
クライアント プロジェクト
クライアント プロジェクト:
- ワーカーと同じ接続文字列ロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
労働者プロジェクト
Worker プロジェクトには次のものが含まれます。
- GreetingOrchestration.cs: オーケストレーター関数とアクティビティ関数を 1 つのファイルで定義します
- Program.cs: 適切な接続文字列処理を使用してワーカー ホストを設定します
オーケストレーションの実装
オーケストレーションは、標準の CallActivityAsync
メソッドを使用して、各アクティビティを順番に直接呼び出します。
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
各アクティビティは、 [DurableTask]
属性で修飾された個別のクラスとして実装されます。
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
ワーカーは、適切なライフサイクル管理のために Microsoft.Extensions.Hosting
を使用します。
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
顧客
クライアント プロジェクト:
- ワーカーと同じ接続文字列ロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
勤労者
オーケストレーションの実装
オーケストレーションは、標準の call_activity
関数を使用して、各アクティビティを順番に直接呼び出します。
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
各アクティビティは、個別の関数として実装されます。
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
ワーカーは、適切なライフサイクル管理のために DurableTaskSchedulerWorker
を使用します。
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
サンプル コンテナー アプリには、worker コードとクライアント コードの両方が含まれています。
顧客
クライアント コード:
- ワーカーと同じ接続文字列ロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
勤労者
オーケストレーションは、標準の callActivity
メソッドを使用して、各アクティビティを順番に直接呼び出します。
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();