重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
在此快速入门中,您将学习如何:
- 设置并运行 Durable Task Scheduler 模拟器进行本地开发。
- 运行辅助角色和客户端项目。
- 检查 Azure 容器应用日志。
- 通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。
先决条件
开始之前:
- 请确保具有 .NET 8 SDK 或更高版本。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Python 3.9+ 或更高版本。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Java 8 或 11。
- 安装 Docker 来运行模拟器。
- 安装 Azure Developer CLI
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
准备项目
在新的终端窗口中,从 Azure-Samples/Durable-Task-Scheduler
目录中导航到示例目录。
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
使用 Azure 开发人员 CLI 进行部署
运行
azd up
以预配基础结构,并通过单个命令将应用程序部署到 Azure 容器应用。azd up
当终端中出现提示时,请提供以下参数。
参数 DESCRIPTION 环境名称 为保存所有 Azure 资源而创建的资源组的前缀。 Azure 位置 资源的 Azure 位置。 Azure 订阅 资源的 Azure 订阅。 此过程可能需要一段时间才能完成。 完成
azd up
命令后,CLI 输出将显示两个用于监视部署进度的 Azure 门户链接。 输出还演示了如何运行azd up
:- 使用
./infra
通过azd provision
目录中提供的 Bicep 文件创建和配置所有必要的 Azure 资源。 Azure Developer CLI 预配这些资源后,你可以通过 Azure 门户访问这些资源。 用于预配 Azure 资源的文件包括:main.parameters.json
main.bicep
- 按功能组织的
app
资源目录 - 一个
core
参考库,其中包含azd
模板使用的 Bicep 模块
- 使用
azd deploy
部署代码
预期输出
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.
- 使用
确认部署是否成功
在 Azure 门户中,验证业务流程是否成功运行。
从终端输出复制资源组名称。
登录到 Azure 门户 并搜索该资源组名称。
在资源组概述页中,单击客户端容器应用资源。
选择“ 监视>日志流”。
确认客户端容器正在记录函数链任务。
导航回资源组页以选择
worker
容器。选择“ 监视>日志流”。
确认辅助角色容器正在记录函数链任务。
确认示例容器应用正在记录函数链接任务。
了解代码
客户项目
客户项目:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器:
- 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
- 使用标准日志记录显示进度和结果
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
辅助角色项目
辅助角色项目包含:
- GreetingOrchestration.cs:在单个文件中定义协调程序和活动函数
- Program.cs:设置工作主机并正确管理连接字符串
业务流程实现
业务流程使用标准 CallActivityAsync
方法按顺序直接调用每个活动:
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
每个活动都实现为一个用 [DurableTask]
属性修饰的独立类。
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
工作人员使用Microsoft.Extensions.Hosting
进行适当的生命周期管理。
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
客户
客户项目:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器:
- 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
- 使用标准日志记录显示进度和结果
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
工人
业务流程实现
业务流程使用标准 call_activity
函数按顺序直接调用每个活动:
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
每个活动都作为单独的函数实现:
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
工作人员使用DurableTaskSchedulerWorker
进行适当的生命周期管理。
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
示例容器应用包含工作器和客户端代码。
客户
客户端代码:
- 使用与辅助角色相同的连接字符串逻辑
- 实现一个顺序编排调度器,该调度器:
- 计划 20 个业务流程实例,一次一个
- 在计划每个业务流程之间等待 5 秒
- 跟踪列表中的所有编排实例
- 在退出前等待所有业务流程完成
- 使用标准日志记录显示进度和结果
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
工人
业务流程使用标准 callActivity
方法按顺序直接调用每个活动:
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();