Compartir a través de


Inicio rápido: Hospedaje de una aplicación del SDK de Durable Task en Azure Container Apps (versión preliminar)

Importante

Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.

Importante

Actualmente, los SDK de Durable Task no están disponibles para JavaScript y PowerShell.

En esta guía de inicio rápido, ha aprendido a hacer lo siguiente:

  • Configura y ejecuta el emulador del Durable Task Scheduler para el desarrollo local.
  • Ejecute los proyectos de trabajo y cliente.
  • Compruebe los registros de Azure Container Apps.
  • Revise el estado y el historial de orquestación a través del panel del Programador de tareas duraderas.

Prerrequisitos

Antes de comenzar:

Preparación del proyecto

En una nueva ventana de terminal, desde el Azure-Samples/Durable-Task-Scheduler directorio, vaya al directorio de ejemplo.

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining

Implementación mediante la CLI para desarrolladores de Azure

  1. Ejecute azd up para aprovisionar la infraestructura e implementar la aplicación en Azure Container Apps con un solo comando.

    azd up
    
  2. Cuando se le solicite en el terminal, proporcione los parámetros siguientes.

    Parámetro Descripción
    Nombre del entorno Prefijo del grupo de recursos creado para contener todos los recursos de Azure.
    Ubicación de Azure Ubicación de Azure para los recursos.
    Suscripción a Azure La suscripción de Azure para los recursos.

    Este proceso puede tardar un tiempo en finalizar. A medida que se completa el comando azd up, la salida de la CLI muestra dos vínculos de Azure Portal para supervisar el progreso de la implementación. La salida también muestra cómo azd up hace lo siguiente:

    • Crea y configura todos los recursos necesarios de Azure a través de los archivos de Bicep proporcionados en el directorio ./infra mediante azd provision. Una vez aprovisionados por Azure Developer CLI, puede acceder a estos recursos a través de Azure Portal. Los archivos que aprovisionan los recursos de Azure incluyen:
      • main.parameters.json
      • main.bicep
      • Un directorio de recursos de app organizado por funcionalidad
      • Biblioteca de referencia de core que contiene los módulos de Bicep usados por la plantilla de azd
    • Implementa el código mediante azd deploy

    Salida prevista

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure Portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Confirmar la implementación exitosa

En Azure Portal, compruebe que las orquestaciones se ejecutan correctamente.

  1. Copie el nombre del grupo de recursos de la salida del terminal.

  2. Inicie sesión en Azure Portal y busque ese nombre del grupo de recursos.

  3. En la página de información general del grupo de recursos, haga clic en el recurso de aplicación contenedora cliente.

  4. Seleccione Supervisión>flujo de registros.

  1. Confirme que el contenedor del cliente registra las tareas de encadenamiento de funciones.

    Captura de pantalla del flujo de registro del contenedor de cliente en Azure Portal.

  2. Vuelva a la página del grupo de recursos para seleccionar el worker contenedor.

  3. Seleccione Supervisión>flujo de registros.

  4. Confirme que el contenedor del trabajo registra las tareas de encadenamiento de funciones.

    Captura de pantalla del flujo de registro del contenedor de trabajo en Azure Portal.

  1. Confirme que el contenedor del trabajo registra las tareas de encadenamiento de funciones.

    Captura de pantalla del flujo de registro de la aplicación de ejemplo de Java en Azure Portal.

Descripción del código

Proyecto de cliente

El proyecto cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador
  • Implementa un programador de orquestación secuencial que:
    • Programa 20 instancias de orquestación, una cada vez
    • Espera 5 segundos entre programar cada orquestación
    • Realiza un seguimiento de todas las instancias de orquestación en una lista
    • Espera a que se completen todas las orquestaciones antes de salir
  • Usa el registro estándar para mostrar el progreso y los resultados
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Proyecto Obrero

El proyecto Worker contiene:

  • GreetingOrchestration.cs: define las funciones de orquestador y actividad en un único archivo
  • Program.cs: configura el host de trabajo con un control adecuado de cadenas de conexión

Implementación de orquestación

La orquestación llama directamente a cada actividad en secuencia mediante el método estándar CallActivityAsync :

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Cada actividad se implementa como una clase independiente decorada con el [DurableTask] atributo :

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

El trabajador usa Microsoft.Extensions.Hosting para la adecuada administración del ciclo de vida.

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Cliente

El proyecto cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador
  • Implementa un programador de orquestación secuencial que:
    • Programa 20 instancias de orquestación, una cada vez
    • Espera 5 segundos entre programar cada orquestación
    • Realiza un seguimiento de todas las instancias de orquestación en una lista
    • Espera a que se completen todas las orquestaciones antes de salir
  • Usa el registro estándar para mostrar el progreso y los resultados
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Trabajador

Implementación de orquestación

La orquestación llama directamente a cada actividad en secuencia mediante la función estándar call_activity :

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Cada actividad se implementa como una función independiente:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

El trabajador usa DurableTaskSchedulerWorker para la adecuada administración del ciclo de vida.

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

La aplicación contenedora de ejemplo contiene tanto el código de trabajador como el de cliente.

Cliente

El código de cliente:

  • Usa la misma lógica de cadena de conexión que el trabajador
  • Implementa un programador de orquestación secuencial que:
    • Programa 20 instancias de orquestación, una cada vez
    • Espera 5 segundos entre programar cada orquestación
    • Realiza un seguimiento de todas las instancias de orquestación en una lista
    • Espera a que se completen todas las orquestaciones antes de salir
  • Usa el registro estándar para mostrar el progreso y los resultados
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Trabajador

La orquestación llama directamente a cada actividad en secuencia mediante el método estándar callActivity :

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Pasos siguientes