你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用 Durable Task SDK 和 Durable Task Scheduler 创建应用(预览版)

Durable Task SDK 为 Durable Task 计划程序提供轻型客户端库。 本快速入门介绍如何创建使用扇出/扇入应用程序模式执行并行处理的业务流程。

重要

目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。

重要

目前,持久任务 SDK 不适用于 JavaScript 和 PowerShell。

  • 设置并运行 Durable Task Scheduler 模拟器进行本地开发。
  • 运行辅助角色和客户端项目。
  • 通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。

先决条件

开始之前:

设置持久任务计划程序模拟器

应用程序代码查找已部署的计划程序和任务中心资源。 如果找不到任何代码,则代码会回退到模拟器。 模拟器模拟 Docker 容器中的计划程序和任务中心,使其成为本快速入门中所需的本地开发的理想工具。

  1. Azure-Samples/Durable-Task-Scheduler从根目录导航到 .NET SDK 示例目录。

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. 拉取 Docker 映像以供模拟器使用。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 运行模拟器。 容器可能需要几秒钟才能准备就绪。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:

  • 终结点:http://localhost:8080
  • 任务中心:default
  1. Azure-Samples/Durable-Task-Scheduler从根目录导航到 Python SDK 示例目录。

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. 拉取模拟器的 Docker 映像。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 运行模拟器。 容器可能需要几秒钟才能准备就绪。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:

  • 终结点:http://localhost:8080
  • 任务中心:default
  1. Azure-Samples/Durable-Task-Scheduler从根目录导航到 Java SDK 示例目录。

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. 拉取用于模拟器的 Docker 镜像。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 运行模拟器。 容器可能需要几秒钟才能准备就绪。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由于示例代码自动使用默认模拟器设置,因此无需设置任何环境变量。 本快速入门的默认模拟器设置包括:

  • 终结点:http://localhost:8080
  • 任务中心:default

运行快速入门

  1. FanOutFanIn 目录中导航到要生成并运行辅助角色的 Worker 目录。

    cd Worker
    dotnet build
    dotnet run
    
  2. 在单独的终端中,从 FanOutFanIn 目录导航到要生成并运行客户端的 Client 目录。

    cd Client
    dotnet build
    dotnet run
    

理解输出内容

运行此示例时,你会收到工作进程和客户端进程的输出。 解压缩运行项目时代码中发生的情况。

工人产出

工作器输出显示:

  • 业务流程协调程序和活动的注册
  • 调用每个活动时记录条目
  • 多个工作项的并行处理
  • 结果的最终聚合

客户端输出

客户端输出显示:

  • 从工作项列表开始的业务流程
  • 唯一的业务流程实例 ID
  • 最终聚合结果,显示每个工作项及其相应的结果
  • 已处理项的总计数

示例输出

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. 激活 Python 虚拟环境。

    python -m venv venv
    /venv/Scripts/activate
    
  2. 安装所需程序包。

    pip install -r requirements.txt
    
  3. 启动辅助角色。

    python worker.py
    

    预期输出

    显示的输出指示辅助角色已启动并正在等待工作项。

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. 在新终端中,激活虚拟环境并运行客户端。

    venv/Scripts/activate
    python client.py
    

    可以将工作项数作为参数提供。 如果未提供任何参数,则示例默认运行 10 个项目。

    python client.py [number_of_items]
    

理解输出内容

运行此示例时,你会收到工作进程和客户端进程的输出。 解压缩运行项目时代码中发生的情况。

工人产出

工作器输出显示:

  • 业务流程协调程序和活动的注册。
  • 并行处理每个工作项时的状态消息,显示它们正在并发执行。
  • 每个工作项的随机延迟(介于 0.5 到 2 秒之间),以模拟不同的处理时间。
  • 显示结果聚合的最终消息。

客户端输出

客户端输出显示:

  • 以指定的工作项数开始的业务流程。
  • 唯一的业务流程实例 ID。
  • 最终聚合结果,其中包括:
    • 已处理的项总数
    • 所有结果的总和(每个项结果是其值的平方)
    • 所有结果的平均值

示例输出

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

fan-out-fan-in从目录中,使用 Gradle 生成并运行应用程序。

./gradlew runFanOutFanInPattern

小窍门

如果收到错误消息 zsh: permission denied: ./gradlew,请尝试在运行应用程序之前运行 chmod +x gradlew

理解输出内容

运行此示例时,您会看到输出显示:

  • 业务流程协调程序和活动的注册。
  • 并行处理每个工作项时的状态消息,显示它们正在并发执行。
  • 每个工作项的随机延迟(介于 0.5 到 2 秒之间),以模拟不同的处理时间。
  • 显示结果聚合的最终消息。

解压缩运行项目时代码中发生的情况。

示例输出

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

在本地运行项目后,现在可以了解如何部署到 Azure 容器应用中托管的 Azure

查看编排状态和历史记录

可以通过 Durable Task Scheduler 仪表板查看业务流程状态和历史记录。 默认情况下,仪表板在端口 8082 上运行。

  1. 在 Web 浏览器中导航到 http://localhost:8082 。
  2. 单击 默认 任务中心。 您创建的协调实例位于列表中。
  3. 单击编排实例 ID 以查看执行详细信息,其中包括:
    • 多个活动任务的并行执行
    • 扇入聚合步骤
    • 每个步骤中的输入和输出
    • 每个步骤所用的时间

显示 .NET 示例协调实例详细信息的屏幕截图。

显示 Python 示例协调实例详细信息的屏幕截图。

显示 Java 示例业务流程实例详细信息的屏幕截图。

了解代码结构

辅助角色项目

为了演示扇出/扇入模式,辅助角色项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:

  1. 将工作项列表作为输入。
  2. 通过使用 ProcessWorkItemActivity 为每个工作项创建单独的任务来扇出。
  3. 并行执行所有任务。
  4. 使用 Task.WhenAll 等待所有任务都完成。
  5. 通过使用 AggregateResultsActivity 聚合所有单个结果来扇入。
  6. 将最终聚合结果返回到客户端。

辅助角色项目包含:

  • ParallelProcessingOrchestration.cs:在单个文件中定义业务流程协调程序和活动函数。
  • Program.cs:配置工作主机,并正确处理连接字符串。

ParallelProcessingOrchestration.cs

通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

每个活动都作为单独的类实现,并用 [DurableTask] 特性进行修饰。

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

工人使用 Microsoft.Extensions.Hosting 进行适当的生命周期管理。

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

客户项目

客户端项目:

  • 使用与辅助角色相同的连接字符串逻辑。
  • 创建要并行处理的工作项的列表。
  • 将列表用作输入来计划业务流程实例。
  • 等待编排完成并显示汇总结果。
  • 使用 WaitForInstanceCompletionAsync 以实现高效的轮询。
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

为了演示扇出/扇入模式,辅助角色项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:

  1. 接收一个工作项列表作为输入。
  2. 它通过为每个工作项创建并行任务(为每个工作项调用 process_work_item)来“扇出”。
  3. 它使用 task.when_all 等待所有任务完成。
  4. 然后,它通过将结果与 aggregate_results 活动聚合在一起来“扇入”。
  5. 最终聚合结果将返回到客户端。

通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

客户端项目:

  • 使用与辅助角色相同的连接字符串逻辑。
  • 创建要并行处理的工作项的列表。
  • 将列表用作输入来计划业务流程实例。
  • 等待编排完成并展示聚合结果。
  • 使用 wait_for_orchestration_completion 以实现高效的轮询。
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

为了演示扇出/扇入模式FanOutFanInPattern 项目业务流程会创建并行活动任务,并等待所有任务完成。 对于业务流程协调程序:

  1. 将工作项列表作为输入。
  2. 通过使用 `` 为每个工作项创建单独的任务来扇出。
  3. 并行执行所有任务。
  4. 使用 `` 等待所有任务完成。
  5. 通过使用 `` 聚合所有单个结果来扇入。
  6. 将最终聚合结果返回到客户端。

项目包含:

  • DurableTaskSchedulerWorkerExtensions 辅助角色:定义业务流程协调程序和活动函数。
  • DurableTaskSchedulerClientExtension客户端:设置工作主机以正确处理连接字符串。

工人

通过使用扇出/扇入,业务流程会创建并行活动任务,并等待所有任务完成。

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

客户

客户端项目:

  • 使用与辅助角色相同的连接字符串逻辑。
  • 创建要并行处理的工作项的列表。
  • 将列表用作输入来计划业务流程实例。
  • 等待编排完成并展示聚合结果。
  • 使用 waitForInstanceCompletion 以实现高效的轮询。
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

后续步骤

现在,你已使用 Durable Task Scheduler 模拟器在本地运行示例,请尝试创建计划程序和任务中心资源并部署到 Azure 容器应用。