你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Durable Task SDK 为 Durable Task 计划程序提供轻型客户端库。 本快速入门介绍如何创建使用扇出/扇入应用程序模式执行并行处理的业务流程。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
重要
目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。
- 设置并运行 Durable Task Scheduler 模拟器进行本地开发。
- 运行辅助角色和客户端项目。
- 通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。
先决条件
开始之前:
- 请确保具有 .NET 8 SDK 或更高版本。
- 安装 Docker 以运行模拟器。
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Python 3.9+ 或更高版本。
- 安装 Docker 以运行模拟器。
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
- 请确保具有 Java 8 或 11。
- 安装 Docker 来运行模拟器。
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
设置持久任务计划程序模拟器
应用程序代码查找已部署的计划程序和任务中心资源。 如果找不到任何代码,则代码会回退到模拟器。 模拟器模拟 Docker 容器中的计划程序和任务中心,使其成为本快速入门中所需的本地开发的理想工具。
Azure-Samples/Durable-Task-Scheduler
从根目录导航到 .NET SDK 示例目录。cd samples/durable-task-sdks/dotnet/FanOutFanIn
拉取 Docker 映像以供模拟器使用。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
运行模拟器。 容器可能需要几秒钟才能准备就绪。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:
- 终结点:
http://localhost:8080
- 任务中心:
default
Azure-Samples/Durable-Task-Scheduler
从根目录导航到 Python SDK 示例目录。cd samples/durable-task-sdks/python/fan-out-fan-in
拉取模拟器的 Docker 映像。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
运行模拟器。 容器可能需要几秒钟才能准备就绪。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:
- 终结点:
http://localhost:8080
- 任务中心:
default
Azure-Samples/Durable-Task-Scheduler
从根目录导航到 Java SDK 示例目录。cd samples/durable-task-sdks/java/fan-out-fan-in
拉取用于模拟器的 Docker 镜像。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
运行模拟器。 容器可能需要几秒钟才能准备就绪。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:
- 终结点:
http://localhost:8080
- 任务中心:
default
运行快速入门
从
FanOutFanIn
目录中导航到要生成并运行辅助角色的Worker
目录。cd Worker dotnet build dotnet run
在单独的终端中,从
FanOutFanIn
目录导航到要生成并运行客户端的Client
目录。cd Client dotnet build dotnet run
理解输出内容
运行此示例时,你会收到工作进程和客户端进程的输出。 解压缩运行项目时代码中发生的情况。
工人产出
工作器输出显示:
- 业务流程协调程序和活动的注册
- 调用每个活动时记录条目
- 多个工作项的并行处理
- 结果的最终聚合
客户端输出
客户端输出显示:
- 从工作项列表开始的业务流程
- 唯一的业务流程实例 ID
- 最终聚合结果,显示每个工作项及其相应的结果
- 已处理项的总计数
示例输出
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
激活 Python 虚拟环境。
python -m venv venv /venv/Scripts/activate
安装所需程序包。
pip install -r requirements.txt
启动辅助角色。
python worker.py
预期输出
显示的输出指示辅助角色已启动并正在等待工作项。
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
在新终端中,激活虚拟环境并运行客户端。
venv/Scripts/activate python client.py
可以将工作项数作为参数提供。 如果未提供任何参数,则示例默认运行 10 个项目。
python client.py [number_of_items]
理解输出内容
运行此示例时,你会收到工作进程和客户端进程的输出。 解压缩运行项目时代码中发生的情况。
工人产出
工作器输出显示:
- 业务流程协调程序和活动的注册。
- 并行处理每个工作项时的状态消息,显示它们正在并发执行。
- 每个工作项的随机延迟(介于 0.5 到 2 秒之间),以模拟不同的处理时间。
- 显示结果聚合的最终消息。
客户端输出
客户端输出显示:
- 以指定的工作项数开始的业务流程。
- 唯一的业务流程实例 ID。
- 最终聚合结果,其中包括:
- 已处理的项总数
- 所有结果的总和(每个项结果是其值的平方)
- 所有结果的平均值
示例输出
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
fan-out-fan-in
从目录中,使用 Gradle 生成并运行应用程序。
./gradlew runFanOutFanInPattern
小窍门
如果收到错误消息 zsh: permission denied: ./gradlew
,请尝试在运行应用程序之前运行 chmod +x gradlew
。
理解输出内容
运行此示例时,您会看到输出显示:
- 业务流程协调程序和活动的注册。
- 并行处理每个工作项时的状态消息,显示它们正在并发执行。
- 每个工作项的随机延迟(介于 0.5 到 2 秒之间),以模拟不同的处理时间。
- 显示结果聚合的最终消息。
示例输出
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
在本地运行项目后,现在可以了解如何部署到 Azure 容器应用中托管的 Azure。
查看编排状态和历史记录
可以通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。 默认情况下,仪表板在端口 8082 上运行。
- 在 Web 浏览器中导航到 http://localhost:8082 。
- 单击 默认 任务中心。 您创建的协调实例位于列表中。
- 单击编排实例 ID 以查看执行详细信息,其中包括:
- 多个活动任务的并行执行
- 扇入聚合步骤
- 每个步骤中的输入和输出
- 每个步骤所用的时间
了解代码结构
辅助角色项目
为了演示扇出/扇入模式,辅助角色项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:
- 将工作项列表作为输入。
- 通过使用
ProcessWorkItemActivity
为每个工作项创建单独的任务来扇出。 - 并行执行所有任务。
- 使用
Task.WhenAll
等待所有任务都完成。 - 通过使用
AggregateResultsActivity
聚合所有单个结果来扇入。 - 将最终聚合结果返回到客户端。
辅助角色项目包含:
- ParallelProcessingOrchestration.cs:在单个文件中定义业务流程协调程序和活动函数。
- Program.cs:配置工作主机,并正确处理连接字符串。
ParallelProcessingOrchestration.cs
通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
每个活动都作为单独的类实现,并用 [DurableTask]
特性进行修饰。
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
工人使用 Microsoft.Extensions.Hosting
进行适当的生命周期管理。
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
客户项目
客户端项目:
- 使用与辅助角色相同的连接字符串逻辑。
- 创建要并行处理的工作项的列表。
- 将列表用作输入来计划业务流程实例。
- 等待编排完成并显示汇总结果。
- 使用
WaitForInstanceCompletionAsync
以实现高效的轮询。
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
为了演示扇出/扇入模式,辅助角色项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:
- 接收一个工作项列表作为输入。
- 它通过为每个工作项创建并行任务(为每个工作项调用
process_work_item
)来“扇出”。 - 它使用
task.when_all
等待所有任务完成。 - 然后,它通过将结果与
aggregate_results
活动聚合在一起来“扇入”。 - 最终聚合结果将返回到客户端。
通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
客户端项目:
- 使用与辅助角色相同的连接字符串逻辑。
- 创建要并行处理的工作项的列表。
- 将列表用作输入来计划业务流程实例。
- 等待编排完成并展示聚合结果。
- 使用
wait_for_orchestration_completion
以实现高效的轮询。
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
为了演示扇出/扇入模式,FanOutFanInPattern
项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:
- 将工作项列表作为输入。
- 通过使用 `` 为每个工作项创建单独的任务来扇出。
- 并行执行所有任务。
- 使用 `` 等待所有任务完成。
- 通过使用 `` 聚合所有单个结果来扇入。
- 将最终聚合结果返回到客户端。
项目包含:
DurableTaskSchedulerWorkerExtensions
辅助角色:定义业务流程协调程序和活动函数。DurableTaskSchedulerClientExtension
客户端:设置工作主机以正确处理连接字符串。
工人
通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
客户
客户端项目:
- 使用与辅助角色相同的连接字符串逻辑。
- 创建要并行处理的工作项的列表。
- 将列表用作输入来计划业务流程实例。
- 等待编排完成并展示聚合结果。
- 使用
waitForInstanceCompletion
以实现高效的轮询。
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
后续步骤
现在,你已使用 Durable Task Scheduler 模拟器在本地运行示例,请尝试创建计划程序和任务中心资源并部署到 Azure 容器应用。