Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Los SDKs de Durable Task proporcionan una biblioteca cliente ligera para el Durable Task Scheduler. En esta guía rápida, aprenderá a crear orquestaciones que usan el patrón de aplicación de fan-out/fan-in para llevar a cabo el procesamiento paralelo.
Importante
Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.
Importante
Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.
- Configura y ejecuta el emulador del Durable Task Scheduler para el desarrollo local.
- Ejecute los proyectos de trabajo y cliente.
- Revisar el estado y el historial de orquestación a través del panel del Programador de tareas duraderas.
Prerrequisitos
Antes de comenzar:
- Asegúrese de que tiene el SDK de .NET 8 o posterior.
- Instale Docker para ejecutar el emulador.
- Clone el repositorio de GitHub del Programador de tareas duraderas para usar el ejemplo de inicio rápido.
- Asegúrese de que tiene Python 3.9+ o posterior.
- Instale Docker para ejecutar el emulador.
- Clone el repositorio de GitHub del Programador de tareas duraderas para usar el ejemplo de inicio rápido.
- Asegúrese de que tiene Java 8 o 11.
- Instale Docker para ejecutar el emulador.
- Clone el repositorio de GitHub del Programador de tareas duraderas para usar el ejemplo de inicio rápido.
Configuración del emulador del Programador de tareas duraderas
El código de aplicación busca un planificador desplegado y un recurso central de tareas. Si no se encuentra ninguno, el código vuelve al emulador. El emulador simula un programador y un centro de tareas en un contenedor de Docker, lo que lo convierte en ideal para el desarrollo local necesario en este inicio rápido.
Desde el
Azure-Samples/Durable-Task-Scheduler
directorio raíz, navegue al directorio de ejemplo del SDK de .NET.cd samples/durable-task-sdks/dotnet/FanOutFanIn
Extraiga la imagen de Docker para el emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:
- Punto de conexión:
http://localhost:8080
- Centro de tareas:
default
En el
Azure-Samples/Durable-Task-Scheduler
directorio raíz, navegue al directorio de ejemplos del SDK de Python.cd samples/durable-task-sdks/python/fan-out-fan-in
Extraiga la imagen de Docker para el emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:
- Punto de conexión:
http://localhost:8080
- Centro de tareas:
default
En el
Azure-Samples/Durable-Task-Scheduler
directorio raíz, dirígete al directorio de muestras del SDK de Java.cd samples/durable-task-sdks/java/fan-out-fan-in
Extraiga la imagen de Docker para el emulador.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Ejecutar el emulador. El contenedor puede tardar unos segundos en estar listo.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Dado que el código de ejemplo usa automáticamente la configuración predeterminada del emulador, no es necesario establecer ninguna variable de entorno. La configuración predeterminada del emulador para este inicio rápido es:
- Punto de conexión:
http://localhost:8080
- Centro de tareas:
default
Ejecución del inicio rápido
Desde el directorio
FanOutFanIn
, navegue hasta el directorioWorker
para compilar y ejecutar el worker.cd Worker dotnet build dotnet run
En un terminal independiente, desde el
FanOutFanIn
directorio, vaya alClient
directorio para compilar y ejecutar el cliente.cd Client dotnet build dotnet run
Descripción de la salida
Al ejecutar este ejemplo, recibirá la salida de los procesos de trabajo y cliente. Analiza lo que sucedió en el código cuando ejecutaste el proyecto.
Productividad laboral
La producción del trabajador muestra:
- Registro del orquestador y las actividades
- Entradas de registro cuando se llama a cada actividad
- Procesamiento paralelo de varios elementos de trabajo
- Agregación final de resultados
Salida del cliente
La salida del cliente muestra:
- La orquestación comienza con una lista de elementos de trabajo
- Identificador de instancia de orquestación único
- Resultados agregados finales, que muestran cada elemento de trabajo y su resultado correspondiente
- Recuento total de elementos procesados
Salida de ejemplo
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Active un entorno virtual de Python.
Instale los paquetes requeridos.
pip install -r requirements.txt
Inicie el trabajo.
python worker.py
Salida esperada
Puede ver la salida que indica que el trabajo se inició y está esperando elementos de trabajo.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
En un nuevo terminal, active el entorno virtual y ejecute el cliente.
Puede proporcionar el número de elementos de trabajo como argumento. Si no se proporciona ningún argumento, el ejemplo ejecuta 10 elementos de forma predeterminada.
python client.py [number_of_items]
Descripción de la salida
Al ejecutar este ejemplo, recibirá la salida de los procesos de trabajo y cliente. Analiza lo que sucedió en el código cuando ejecutaste el proyecto.
Productividad laboral
La producción del trabajador muestra:
- Registro del orquestador y de las actividades.
- Mensajes de estado al procesar cada elemento de trabajo en paralelo, lo que muestra que se están ejecutando simultáneamente.
- Retrasos aleatorios para cada elemento de trabajo (entre 0,5 y 2 segundos) para simular tiempos de procesamiento variables.
- Mensaje final que muestra la agregación de resultados.
Salida del cliente
La salida del cliente muestra:
- La orquestación comienza con un número especificado de elementos de trabajo.
- Identificador de instancia de orquestación único.
- Resultado agregado final, que incluye:
- Número total de elementos procesados
- La suma de todos los resultados (el resultado de cada elemento es el cuadrado de su valor)
- Promedio de todos los resultados
Salida de ejemplo
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
Desde el fan-out-fan-in
directorio, compile y ejecute la aplicación mediante Gradle.
./gradlew runFanOutFanInPattern
Sugerencia
Si recibe el mensaje zsh: permission denied: ./gradlew
de error , intente ejecutar chmod +x gradlew
antes de ejecutar la aplicación.
Descripción de la salida
Al ejecutar este ejemplo, recibirá la salida que muestra lo siguiente:
- Registro del orquestador y de las actividades.
- Mensajes de estado al procesar cada elemento de trabajo en paralelo, lo que muestra que se están ejecutando simultáneamente.
- Retrasos aleatorios para cada elemento de trabajo (entre 0,5 y 2 segundos) para simular tiempos de procesamiento variables.
- Mensaje final que muestra la agregación de resultados.
Analiza lo que sucedió en el código cuando ejecutaste el proyecto.
Salida de ejemplo
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Ahora que ejecutó el proyecto localmente, puede aprender a implementar en Azure mediante Azure Container Apps.
Ver estado y historial de orquestación
Puede ver el estado de orquestación y el historial a través del panel del Programador de tareas durables. De forma predeterminada, el panel se ejecuta en el puerto 8082.
- Vaya a http://localhost:8082 en el explorador web.
- Haga clic en el centro de tareas predeterminado . La instancia de orquestación que creó se encuentra en la lista.
- Haga clic en el identificador de instancia de orquestación para ver los detalles de ejecución, que incluyen:
- Ejecución paralela de varias tareas de actividad
- Paso de agregación fan-in
- Entrada y salida en cada paso
- El tiempo necesario para cada paso
Descripción de la estructura de código
El proyecto de trabajo
Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto de trabajo crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:
- Toma una lista de elementos de trabajo como entrada.
- Para los aficionados, cree una tarea independiente para cada elemento de trabajo mediante
ProcessWorkItemActivity
. - Ejecuta todas las tareas en paralelo.
- Espera a que todas las tareas se completen mediante
Task.WhenAll
. - La operación de fan-in agrega todos los resultados individuales mediante
AggregateResultsActivity
. - Devuelve el resultado agregado final al cliente.
El proyecto de trabajo contiene:
- ParallelProcessingOrchestration.cs: define las funciones de orquestador y actividad en un único archivo.
- Program.cs: configura el host de trabajo con un control adecuado de cadenas de conexión.
ParallelProcessingOrchestration.cs
Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Cada actividad se implementa como una clase independiente decorada con el [DurableTask]
atributo .
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
El trabajador usa Microsoft.Extensions.Hosting
para la gestión adecuada del ciclo de vida.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
El proyecto de cliente
El proyecto de cliente:
- Usa la misma lógica de cadena de conexión que el trabajador.
- Crea una lista de elementos de trabajo que se van a procesar en paralelo.
- Programa una instancia de orquestación con la lista como entrada.
- Espera a que la orquestación se complete y muestre los resultados agregados.
- Usa
WaitForInstanceCompletionAsync
para un sondeo eficaz.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto de trabajo crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:
- Recibe una lista de elementos de trabajo como entrada.
- Se expande creando tareas paralelas para cada elemento de trabajo (llamando
process_work_item
para cada uno). - Espera a que todas las tareas se completen mediante
task.when_all
. - A continuación, se produce un "fan-in" al agregar los resultados con la actividad
aggregate_results
. - El resultado agregado final se devuelve al cliente.
Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
El proyecto de cliente:
- Usa la misma lógica de cadena de conexión que el trabajador.
- Crea una lista de elementos de trabajo que se van a procesar en paralelo.
- Programa una instancia de orquestación con la lista como entrada.
- Espera a que la orquestación se complete y muestre los resultados agregados.
- Usa
wait_for_orchestration_completion
para un sondeo eficaz.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
Para demostrar el patrón fan-out/fan-in, la orquestación del proyecto FanOutFanInPattern
crea tareas de actividad paralelas y espera a que todas se completen. El orquestador:
- Toma una lista de elementos de trabajo como entrada.
- El patrón fan-out crea una tarea independiente para cada elemento de trabajo mediante ``.
- Ejecuta todas las tareas en paralelo.
- Espera a que todas las tareas se completen mediante ``.
- La operación de fan-in agrega todos los resultados individuales mediante ``.
- Devuelve el resultado agregado final al cliente.
El proyecto contiene:
-
El trabajo
DurableTaskSchedulerWorkerExtensions
: define las funciones de orquestador y actividad. -
DurableTaskSchedulerClientExtension
client: configura el host de trabajo con un control adecuado de cadenas de conexión.
Trabajador
Con el fan-out/fan-in, la orquestación crea tareas de actividad paralelas y espera a que todo se complete.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Cliente
El proyecto de cliente:
- Usa la misma lógica de cadena de conexión que el trabajador.
- Crea una lista de elementos de trabajo que se van a procesar en paralelo.
- Programa una instancia de orquestación con la lista como entrada.
- Espera a que la orquestación se complete y muestre los resultados agregados.
- Usa
waitForInstanceCompletion
para un sondeo eficaz.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Pasos siguientes
Ahora que ha ejecutado el ejemplo localmente utilizando el emulador del programador de tareas durables, intente crear un programador de tareas y un recurso de centro de tareas y luego desplegar en Azure Container Apps.