Compartir a través de


Inicio rápido: Envío o recepción de eventos desde Azure Event Hubs

En este inicio rápido, aprenderá a enviar y recibir eventos desde un centro de eventos de Azure mediante el paquete java azure-messaging-eventhubs .

Sugerencia

Si trabaja con recursos de Azure Event Hubs en una aplicación de Spring, se recomienda considerar Spring Cloud Azure como alternativa. Spring Cloud Azure es un proyecto de código abierto que proporciona una integración perfecta de Spring con los servicios de Azure. Para más información sobre Spring Cloud Azure y para ver un ejemplo con Event Hubs, consulte Spring Cloud Stream con Azure Event Hubs.

Prerrequisitos

Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.

Para completar este inicio rápido, necesita los siguientes requisitos previos:

  • Una suscripción a Microsoft Azure. Para usar los servicios de Azure, entre los que se incluye Azure Event Hubs, se necesita una suscripción. Si no se dispone de una cuenta de Azure, es posible registrarse para obtener una evaluación gratuita, o bien usar las ventajas que disfrutan los suscriptores MSDN al crear una cuenta.
  • Un entorno de desarrollo de Java. En este inicio rápido se usa Eclipse. Se requiere el Kit de desarrollo de Java (JDK) con la versión 8 o posterior.
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos. El primer paso consiste en usar Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo. A continuación, obtenga la cadena de conexión para el espacio de nombres de Event Hubs siguiendo las instrucciones del artículo: Obtención de la cadena de conexión. La utilizará más adelante en este inicio rápido.

Envío de eventos

En esta sección se muestra cómo crear una aplicación Java para enviar eventos a un centro de eventos.

Adición de referencia a la biblioteca de Azure Event Hubs

En primer lugar, cree un nuevo proyecto de Maven para una aplicación de consola o shell en su entorno de desarrollo de Java favorito. Actualice el archivo pom.xml como sigue. La biblioteca cliente de Java para Event Hubs está disponible en el repositorio central de Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>

Nota:

Actualice la versión a la versión más reciente publicada en el repositorio de Maven.

Autenticación de la aplicación en Azure

En este inicio rápido se muestran dos maneras de conectarse a Azure Event Hubs:

  • Sin contraseña. Use la entidad de seguridad en Microsoft Entra ID y el control de acceso basado en rol (RBAC) para conectarse a un espacio de nombres de Event Hubs. No es necesario preocuparse por tener cadenas de conexión codificadas de forma rígida en el código, en un archivo de configuración o en almacenamiento seguro como Azure Key Vault.
  • Cadena de conexión. Use una cadena de conexión para conectarse a un espacio de nombres de Event Hubs. Si no está familiarizado con Azure, es posible que encuentre la opción de cadena de conexión más fácil de seguir.

Se recomienda usar la opción sin contraseña en aplicaciones reales y entornos de producción. Para más información, consulte Autenticación y autorización de Service Bus yconexiones sin contraseña para los servicios de Azure.

Asignación de roles al usuario de Microsoft Entra

Al desarrollar localmente, asegúrese de que la cuenta de usuario que se conecta a Azure Event Hubs tiene los permisos correctos. Necesita el rol Propietario de datos de Azure Event Hubs para enviar y recibir mensajes. Para asignarse este rol, necesita el rol Administrador de acceso de usuario u otro rol que incluya la Microsoft.Authorization/roleAssignments/write acción. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Para más información, consulte Descripción del ámbito de la página RBAC de Azure .

En el ejemplo siguiente se asigna el rol Azure Event Hubs Data Owner a la cuenta de usuario, que proporciona un acceso completo a los recursos de Azure Event Hubs. En un escenario real, siga el Principio de privilegios mínimos para conceder a los usuarios solo los permisos mínimos necesarios para un entorno de producción más seguro.

Roles integrados de Azure para Azure Event Hubs

En el caso de Azure Event Hubs, la administración de los espacios de nombres y de todos los recursos relacionados mediante Azure Portal y la API de administración de recursos de Azure, ya se ha protegido mediante el modelo de Azure RBAC. Azure proporciona los siguientes roles integrados para autorizar el acceso a un espacio de nombres de Event Hubs:

Si desea crear un rol personalizado, consulte Derechos necesarios para las operaciones de Event Hubs.

Importante

En la mayoría de los casos, la asignación de roles tardará uno o dos minutos en propagarse en Azure. En raras ocasiones, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, localice el espacio de nombres de Event Hubs mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de información general, seleccione Control de acceso (IAM) en el menú de la izquierda.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior. A continuación, seleccione Agregar asignación de roles.

    Captura de pantalla que muestra cómo asignar un rol.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque Azure Event Hubs Data Owner y seleccione el resultado coincidente. Después, haga clic en Siguiente.

  6. En Asignar acceso a, seleccione Usuario, grupo o entidad de servicio. A continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente la dirección de correo electrónico user@___domain ). Elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final. Seleccione Revisar y asignar de nuevo para completar el proceso.

Escribir código para enviar mensajes al centro de eventos

Agregue una clase denominada Sendery agregue el código siguiente a la clase :

Importante

  • Actualice <NAMESPACE NAME> con el nombre del espacio de nombres de Event Hubs.
  • Actualice <EVENT HUB NAME> con el nombre del centro de eventos.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Compile el programa y asegúrese de que no haya errores. Ejecutará este programa después de ejecutar el programa receptor.

Recepción de eventos

El código de este tutorial se basa en el ejemplo EventProcessorClient en GitHub, que puede examinar para ver la aplicación de trabajo completa.

Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:

  • Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero use un contenedor por cada grupo.
  • No use la cuenta de almacenamiento para nada más.
  • No use el contenedor para nada más.
  • Cree la cuenta de almacenamiento en la misma región que la aplicación implementada. Si la aplicación fuera local, intente elegir la región más cercana posible.

En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.

  • Espacio de nombres jerárquico
  • Eliminación temporal de blobs
  • Versionamiento

Creación de una instancia de Azure Storage y un contenedor de blobs

En este inicio rápido, se usa Azure Storage (concretamente Blob Storage) como almacén de puntos de control. El punto de comprobación es un proceso por el que un procesador de eventos marca o confirma la posición del último evento procesado correctamente dentro de una partición. Normalmente, el marcado de un punto de control se realiza dentro de la función que procesa los eventos. Para más información sobre los puntos de control, consulte Procesador de eventos.

Siga estos pasos para crear una cuenta de Azure Storage.

  1. Crear una cuenta de almacenamiento de Azure
  2. Creación de un contenedor de blobs
  3. Autenticación en el contenedor de blobs

Al desarrollar localmente, asegúrese de que la cuenta de usuario que tiene acceso a los datos de blobs tiene los permisos correctos. Necesita colaborador de datos de Storage Blob para leer y escribir datos de blobs. Para asignarse este rol usted mismo, debe tener asignado el rol Administrador de acceso de usuario, u otro rol que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Para obtener más información, vea Descripción del ámbito de Azure RBAC.

En este escenario, asignará permisos a la cuenta de usuario, en el ámbito de la cuenta de almacenamiento, para seguir el principio de privilegios mínimos. Esta práctica solo proporciona a los usuarios los permisos mínimos necesarios y crea entornos de producción más seguros.

En el ejemplo siguiente se asigna el rol Colaborador de datos de Storage Blob a la cuenta de usuario, que proporciona acceso de lectura y escritura a los datos de blobs de la cuenta de almacenamiento.

Importante

En la mayoría de los casos, la asignación de roles tardará uno o dos minutos en propagarse en Azure. En raras ocasiones, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.

  1. En Azure Portal, busque la cuenta de almacenamiento mediante la barra de búsqueda principal o el panel de navegación de la izquierda.

  2. En la página de la cuenta de almacenamiento, seleccione Control de acceso (IAM) en el menú izquierdo.

  3. En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.

  4. Seleccione + Agregar en el menú superior. A continuación, seleccione Agregar asignación de roles.

    Captura de pantalla que muestra cómo asignar un rol de cuenta de almacenamiento.

  5. Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. Para este ejemplo, busque Colaborador de datos de Storage Blob. Seleccione el resultado coincidente y, a continuación, elija Siguiente.

  6. En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.

  7. En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@___domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.

  8. Seleccione Revisar y asignar para ir a la página final. Seleccione Revisar y asignar de nuevo para completar el proceso.

Adición de bibliotecas de Event Hubs al proyecto de Java

Agregue las siguientes dependencias en el archivo pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.20.6</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Agregue las siguientes import instrucciones en la parte superior del archivo Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Cree una clase denominada Receivery agregue las siguientes variables de cadena a la clase . Reemplace los marcadores de posición por los valores correctos.

    Importante

    Reemplace los marcadores de posición por los valores correctos.

    • <NAMESPACE NAME> por el nombre del espacio de nombres de Event Hubs.
    • <EVENT HUB NAME> con el nombre del centro de eventos en el espacio de nombres.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Agregue el método siguiente main a la clase .

    Importante

    Reemplace los marcadores de posición por los valores correctos.

    • <STORAGE ACCOUNT NAME> con el nombre de la cuenta de Azure Storage.
    • <CONTAINER NAME> con el nombre del contenedor de blobs de la cuenta de almacenamiento.
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Agregue los dos métodos auxiliares (PARTITION_PROCESSOR y ERROR_HANDLER) que procesan eventos y errores a la Receiver clase .

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Compile el programa y asegúrese de que no haya errores.

Ejecución de las aplicaciones

  1. Ejecute primero la aplicación Receiver .

  2. A continuación, ejecute la aplicación Sender .

  3. En la ventana Aplicación receptora , confirme que ve los eventos publicados por la aplicación Remitente.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Presione ENTRAR en la ventana de la aplicación receptora para detener la aplicación.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Consulte los siguientes ejemplos en GitHub: