Compartir a través de


Inicio rápido: Creación de una aplicación con SDK de Durable Task y Programador de tareas duraderas (versión preliminar)

Los SDKs de Durable Task proporcionan una biblioteca cliente ligera para el Durable Task Scheduler. En esta guía rápida, aprenderá a crear orquestaciones que usan el patrón de aplicación de fan-out/fan-in para llevar a cabo el procesamiento paralelo.

Importante

Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.

Importante

Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.

  • Configura y ejecuta el emulador del Durable Task Scheduler para el desarrollo local.
  • Ejecute los proyectos de trabajo y cliente.
  • Revisar el estado y el historial de orquestación a través del panel del Programador de tareas duraderas.

Prerrequisitos

Antes de comenzar:

Configuración del emulador del Programador de tareas duraderas

El código de aplicación busca un planificador desplegado y un recurso central de tareas. Si no se encuentra ninguno, el código vuelve al emulador. El emulador simula un programador y un centro de tareas en un contenedor de Docker, lo que lo convierte en ideal para el desarrollo local necesario en este inicio rápido.

  1. Desde el Azure-Samples/Durable-Task-Scheduler directorio raíz, navegue al directorio de ejemplo del SDK de .NET.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Extraiga la imagen de Docker para el emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:

  • Punto de conexión: http://localhost:8080
  • Centro de tareas: default
  1. En el Azure-Samples/Durable-Task-Scheduler directorio raíz, navegue al directorio de ejemplos del SDK de Python.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Extraiga la imagen de Docker para el emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:

  • Punto de conexión: http://localhost:8080
  • Centro de tareas: default
  1. En el Azure-Samples/Durable-Task-Scheduler directorio raíz, dirígete al directorio de muestras del SDK de Java.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Extraiga la imagen de Docker para el emulador.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:

  • Punto de conexión: http://localhost:8080
  • Centro de tareas: default

Ejecución del inicio rápido

  1. Desde el directorio FanOutFanIn, navegue hasta el directorio Worker para compilar y ejecutar el worker.

    cd Worker
    dotnet build
    dotnet run
    
  2. En un terminal independiente, desde el FanOutFanIn directorio, vaya al Client directorio para compilar y ejecutar el cliente.

    cd Client
    dotnet build
    dotnet run
    

Descripción de la salida

Al ejecutar este ejemplo, recibirá la salida de los procesos de trabajo y cliente. Analiza lo que sucedió en el código cuando ejecutaste el proyecto.

Productividad laboral

La producción del trabajador muestra:

  • Registro del orquestador y las actividades
  • Entradas de registro cuando se llama a cada actividad
  • Procesamiento paralelo de varios elementos de trabajo
  • Agregación final de resultados

Salida del cliente

La salida del cliente muestra:

  • La orquestación comienza con una lista de elementos de trabajo
  • Identificador de instancia de orquestación único
  • Resultados agregados finales, que muestran cada elemento de trabajo y su resultado correspondiente
  • Recuento total de elementos procesados

Salida de ejemplo

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Active un entorno virtual de Python.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Instale los paquetes requeridos.

    pip install -r requirements.txt
    
  3. Inicie el trabajo.

    python worker.py
    

    Salida esperada

    Puede ver la salida que indica que el trabajo se inició y está esperando elementos de trabajo.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. En un nuevo terminal, active el entorno virtual y ejecute el cliente.

    venv/Scripts/activate
    python client.py
    

    Puede proporcionar el número de elementos de trabajo como argumento. Si no se proporciona ningún argumento, el ejemplo ejecuta 10 elementos de forma predeterminada.

    python client.py [number_of_items]
    

Descripción de la salida

Al ejecutar este ejemplo, recibirá la salida de los procesos de trabajo y cliente. Analiza lo que sucedió en el código cuando ejecutaste el proyecto.

Productividad laboral

La producción del trabajador muestra:

  • Registro del orquestador y de las actividades.
  • Mensajes de estado al procesar cada elemento de trabajo en paralelo, lo que muestra que se están ejecutando simultáneamente.
  • Retrasos aleatorios para cada elemento de trabajo (entre 0,5 y 2 segundos) para simular tiempos de procesamiento variables.
  • Mensaje final que muestra la agregación de resultados.

Salida del cliente

La salida del cliente muestra:

  • La orquestación comienza con un número especificado de elementos de trabajo.
  • Identificador de instancia de orquestación único.
  • Resultado agregado final, que incluye:
    • Número total de elementos procesados
    • La suma de todos los resultados (el resultado de cada elemento es el cuadrado de su valor)
    • Promedio de todos los resultados

Salida de ejemplo

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

Desde el fan-out-fan-in directorio, compile y ejecute la aplicación mediante Gradle.

./gradlew runFanOutFanInPattern

Sugerencia

Si recibe el mensaje zsh: permission denied: ./gradlewde error , intente ejecutar chmod +x gradlew antes de ejecutar la aplicación.

Descripción de la salida

Al ejecutar este ejemplo, recibirá la salida que muestra lo siguiente:

  • Registro del orquestador y de las actividades.
  • Mensajes de estado al procesar cada elemento de trabajo en paralelo, lo que muestra que se están ejecutando simultáneamente.
  • Retrasos aleatorios para cada elemento de trabajo (entre 0,5 y 2 segundos) para simular tiempos de procesamiento variables.
  • Mensaje final que muestra la agregación de resultados.

Analiza lo que sucedió en el código cuando ejecutaste el proyecto.

Salida de ejemplo

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

Ahora que ejecutó el proyecto localmente, puede aprender a implementar en Azure mediante Azure Container Apps.

Ver estado y historial de orquestación

Puede ver el estado de orquestación y el historial a través del panel del Programador de tareas durables. De forma predeterminada, el panel se ejecuta en el puerto 8082.

  1. Vaya a http://localhost:8082 en el explorador web.
  2. Haga clic en el centro de tareas predeterminado . La instancia de orquestación que creó se encuentra en la lista.
  3. Haga clic en el identificador de instancia de orquestación para ver los detalles de ejecución, que incluyen:
    • Ejecución paralela de varias tareas de actividad
    • Paso de agregación fan-in
    • Entrada y salida en cada paso
    • El tiempo necesario para cada paso

Captura de pantalla que muestra los detalles de la instancia de orquestación para el ejemplo de .NET.

Captura de pantalla que muestra los detalles de la instancia de orquestación para el ejemplo de Python.

Captura de pantalla que muestra los detalles de la instancia de orquestación para el ejemplo de Java.

Descripción de la estructura de código

El proyecto de trabajo

Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto de trabajo crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:

  1. Toma una lista de elementos de trabajo como entrada.
  2. Para los aficionados, cree una tarea independiente para cada elemento de trabajo mediante ProcessWorkItemActivity.
  3. Ejecuta todas las tareas en paralelo.
  4. Espera a que todas las tareas se completen mediante Task.WhenAll.
  5. La operación de fan-in agrega todos los resultados individuales mediante AggregateResultsActivity.
  6. Devuelve el resultado agregado final al cliente.

El proyecto de trabajo contiene:

  • ParallelProcessingOrchestration.cs: define las funciones de orquestador y actividad en un único archivo.
  • Program.cs: configura el host de trabajo con un control adecuado de cadenas de conexión.

ParallelProcessingOrchestration.cs

Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Cada actividad se implementa como una clase independiente decorada con el [DurableTask] atributo .

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

El trabajador usa Microsoft.Extensions.Hosting para la gestión adecuada del ciclo de vida.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

El proyecto de cliente

El proyecto de cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador.
  • Crea una lista de elementos de trabajo que se van a procesar en paralelo.
  • Programa una instancia de orquestación con la lista como entrada.
  • Espera a que la orquestación se complete y muestre los resultados agregados.
  • Usa WaitForInstanceCompletionAsync para un sondeo eficaz.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto de trabajo crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:

  1. Recibe una lista de elementos de trabajo como entrada.
  2. Se expande creando tareas paralelas para cada elemento de trabajo (llamando process_work_item para cada uno).
  3. Espera a que todas las tareas se completen mediante task.when_all.
  4. A continuación, se produce un "fan-in" al agregar los resultados con la actividad aggregate_results.
  5. El resultado agregado final se devuelve al cliente.

Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

El proyecto de cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador.
  • Crea una lista de elementos de trabajo que se van a procesar en paralelo.
  • Programa una instancia de orquestación con la lista como entrada.
  • Espera a que la orquestación se complete y muestre los resultados agregados.
  • Usa wait_for_orchestration_completion para un sondeo eficaz.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto FanOutFanInPattern crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:

  1. Toma una lista de elementos de trabajo como entrada.
  2. El patrón fan-out crea una tarea independiente para cada elemento de trabajo mediante ``.
  3. Ejecuta todas las tareas en paralelo.
  4. Espera a que todas las tareas se completen mediante ``.
  5. La operación de fan-in agrega todos los resultados individuales mediante ``.
  6. Devuelve el resultado agregado final al cliente.

El proyecto contiene:

  • El trabajo DurableTaskSchedulerWorkerExtensions: define las funciones de orquestador y actividad.
  • DurableTaskSchedulerClientExtension client: configura el host de trabajo con un control adecuado de cadenas de conexión.

Trabajador

Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Cliente

El proyecto de cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador.
  • Crea una lista de elementos de trabajo que se van a procesar en paralelo.
  • Programa una instancia de orquestación con la lista como entrada.
  • Espera a que la orquestación se complete y muestre los resultados agregados.
  • Usa waitForInstanceCompletion para un sondeo eficaz.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Pasos siguientes

Ahora que ha ejecutado el ejemplo localmente utilizando el emulador del programador de tareas durables, intente crear un programador de tareas y un recurso de centro de tareas y luego desplegar en Azure Container Apps.