Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este inicio rápido, aprenderá a enviar y recibir eventos desde un centro de eventos mediante el paquete de Python azure-eventhub .
Requisitos previos
Si es la primera vez que usa Azure Event Hubs, consulte la información general de Event Hubs antes de continuar con este inicio rápido.
Para completar este inicio rápido, asegúrese de que tiene los siguientes requisitos previos:
- Suscripción a Microsoft Azure: regístrese para obtener una evaluación gratuita si no tiene una.
- Python 3.8 o posterior: asegúrese de que pip está instalado y actualizado.
- Visual Studio Code (recomendado): o use cualquier otro IDE que prefiera.
- Espacio de nombres de Event Hubs y centro de eventos: Siga esta guía para crearlos en el Azure Portal.
Instalación de los paquetes para el envío de eventos
Para instalar los paquetes de Python para Event Hubs, abra un símbolo del sistema que tenga Python en la ruta de acceso. Cambie el directorio a la carpeta donde desea guardar los ejemplos.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Autenticación de la aplicación en Azure
En este inicio rápido se muestran dos maneras de conectarse a Azure Event Hubs:
- Sin contraseña. Use la entidad de seguridad en Microsoft Entra ID y el control de acceso basado en rol (RBAC) para conectarse a un espacio de nombres de Event Hubs. No es necesario preocuparse por tener cadenas de conexión codificadas de forma rígida en el código, en un archivo de configuración o en almacenamiento seguro como Azure Key Vault.
- Cadena de conexión. Use una cadena de conexión para conectarse a un espacio de nombres de Event Hubs. Si no está familiarizado con Azure, es posible que encuentre la opción de cadena de conexión más fácil de seguir.
Se recomienda usar la opción sin contraseña en aplicaciones reales y entornos de producción. Para más información, consulte Autenticación y autorización de Service Bus yconexiones sin contraseña para los servicios de Azure.
Asignación de roles al usuario de Microsoft Entra
Al desarrollar localmente, asegúrese de que la cuenta de usuario que se conecta a Azure Event Hubs tiene los permisos correctos. Necesita el rol Propietario de datos de Azure Event Hubs para enviar y recibir mensajes. Para asignarse este rol, necesita el rol Administrador de acceso de usuario u otro rol que incluya la Microsoft.Authorization/roleAssignments/write
acción. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Para más información, consulte Descripción del ámbito de la página RBAC de Azure .
En el ejemplo siguiente se asigna el rol Azure Event Hubs Data Owner
a la cuenta de usuario, que proporciona un acceso completo a los recursos de Azure Event Hubs. En un escenario real, siga el Principio de privilegios mínimos para conceder a los usuarios solo los permisos mínimos necesarios para un entorno de producción más seguro.
Roles integrados de Azure para Azure Event Hubs
En el caso de Azure Event Hubs, la administración de los espacios de nombres y de todos los recursos relacionados mediante Azure Portal y la API de administración de recursos de Azure, ya se ha protegido mediante el modelo de Azure RBAC. Azure proporciona los siguientes roles integrados para autorizar el acceso a un espacio de nombres de Event Hubs:
- Propietario de datos de Azure Event Hubs: Habilita el acceso a datos al espacio de nombres de Event Hubs y a sus entidades (colas, temas, suscripciones y filtros).
- Emisor de datos de Azure Event Hubs: use este rol para proporcionar acceso al emisor a un espacio de nombres de Event Hubs y sus entidades.
- Receptor de datos de Azure Event Hubs: use este rol para proporcionar acceso al receptor a un espacio de nombres de Event Hubs y sus entidades.
Si desea crear un rol personalizado, consulte Derechos necesarios para las operaciones de Event Hubs.
Importante
En la mayoría de los casos, la asignación de roles tardará uno o dos minutos en propagarse en Azure. En raras ocasiones, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.
En Azure Portal, localice el espacio de nombres de Event Hubs mediante la barra de búsqueda principal o el panel de navegación de la izquierda.
En la página de información general, seleccione Control de acceso (IAM) en el menú de la izquierda.
En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.
Seleccione + Agregar en el menú superior. A continuación, seleccione Agregar asignación de roles.
Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. En este ejemplo, busque
Azure Event Hubs Data Owner
y seleccione el resultado coincidente. Después, haga clic en Siguiente.En Asignar acceso a, seleccione Usuario, grupo o entidad de servicio. A continuación, elija + Seleccionar miembros.
En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente la dirección de correo electrónico user@___domain ). Elija Seleccionar en la parte inferior del cuadro de diálogo.
Seleccione Revisar y asignar para ir a la página final. Seleccione Revisar y asignar de nuevo para completar el proceso.
Envío de eventos
En esta sección, cree un script de Python para enviar eventos al centro de eventos que creó anteriormente.
Abra el editor de Python que prefiera, como Visual Studio Code.
Cree un script llamado send.py. Este script envía un lote de eventos al centro de eventos que creó anteriormente.
Pegue el siguiente código en send.py:
En el código, use valores reales para reemplazar los siguientes marcadores de posición:
-
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
: verá el nombre completo en la página Información general del espacio de nombres. Debe tener el formato :<NAMESPACENAME>>.servicebus.windows.net
. -
EVENT_HUB_NAME
: nombre del centro de eventos.
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) print("Producer client created successfully.") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Nota
Para obtener ejemplos de otras opciones para enviar eventos a un centro de eventos de forma asincrónica mediante una cadena de conexión, consulte la página send_async.py de GitHub. Los patrones que se muestran también son aplicables al envío de eventos sin contraseña.
-
Recepción de eventos
En este inicio rápido se usa Azure Blob Storage como almacén de puntos de control. El almacén de puntos de control se usa para conservar los puntos de control (es decir, las últimas posiciones de lectura).
Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:
- Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
- No use la cuenta de almacenamiento para nada más.
- No use el contenedor para nada más.
- Cree la cuenta de almacenamiento en la misma región que la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.
En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.
- Espacio de nombres jerárquico
- Eliminación temporal de blobs
- Control de versiones
Creación de una cuenta de Azure Storage y un contenedor de blobs
Cree una cuenta de Azure Storage y un contenedor de blobs en ella, para lo que debe seguir estos pasos:
- Creación de una cuenta de Azure Storage
- Cree un contenedor de blobs.
- Autenticación en el contenedor de blobs.
Asegúrese de registrar la cadena de conexión y el nombre del contenedor para usarlo posteriormente en el código de recepción.
Al desarrollar localmente, asegúrese de que la cuenta de usuario que tiene acceso a los datos de blobs tiene los permisos correctos. Necesita Colaborador de datos de Storage Blob para leer y escribir datos de blobs. Para asignarse a sí mismo este rol, debe tener asignado el rol de Administrador de acceso de usuario u otro rol que incluya la acción Microsoft.Authorization/roleAssignments/write. Puede asignar roles RBAC de Azure a un usuario mediante Azure Portal, la CLI de Azure o Azure PowerShell. Para obtener más información, vea Descripción del ámbito de Azure RBAC.
En este escenario, asignas permisos a tu cuenta de usuario, con alcance a la cuenta de almacenamiento, para seguir el principio de privilegios mínimos. Esta práctica solo proporciona a los usuarios los permisos mínimos necesarios y crea entornos de producción más seguros.
En el ejemplo siguiente, se asigna el rol Storage Blob Data Contributor a tu cuenta de usuario, lo que proporciona acceso de lectura y escritura a los datos de blob en tu cuenta de almacenamiento.
Importante
En la mayoría de los casos, la asignación de roles tardará uno o dos minutos en propagarse en Azure. En raras ocasiones, puede tardar hasta ocho minutos. Si recibe errores de autenticación al ejecutar por primera vez el código, espere unos instantes e inténtelo de nuevo.
En Azure Portal, busque la cuenta de almacenamiento mediante la barra de búsqueda principal o el panel de navegación de la izquierda.
En la página de la cuenta de almacenamiento, seleccione Control de acceso (IAM) en el menú izquierdo.
En la página Control de acceso (IAM), seleccione la pestaña Asignación de roles.
Seleccione + Agregar en el menú superior. A continuación, seleccione Agregar asignación de roles.
Puede usar el cuadro de búsqueda para filtrar los resultados por el rol deseado. Para este ejemplo, busque Colaborador de datos de Storage Blob. Seleccione el resultado coincidente y, a continuación, elija Siguiente.
En la pestaña Asignar acceso a, seleccione Usuario, grupo o entidad de servicio y, a continuación, elija + Seleccionar miembros.
En el cuadro de diálogo, busque el nombre de usuario de Microsoft Entra (normalmente su dirección de correo electrónico de user@___domain) y, a continuación, elija Seleccionar en la parte inferior del cuadro de diálogo.
Seleccione Revisar y asignar para ir a la página final. Seleccione Revisar y asignar de nuevo para completar el proceso.
Instalación de los paquetes para la recepción de eventos
Para la recepción, debe instalar uno o varios paquetes más. En este inicio rápido, usará Azure Blob Storage para conservar los puntos de control para que el programa no lea los eventos que ya leyó. Ejecuta los puntos de comprobación de metadatos en los mensajes recibidos a intervalos regulares en un blob. Este enfoque facilita la tarea de seguir recibiendo posteriormente mensajes desde donde lo dejó.
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
Creación de un script de Python para recibir eventos
En esta sección, creará un script de Python para recibir eventos del centro de eventos:
Abra el editor de Python que prefiera, como Visual Studio Code.
Cree un script llamado recv.py.
Pegue el siguiente código en recv.py:
En el código, use valores reales para reemplazar los siguientes marcadores de posición:
-
BLOB_STORAGE_ACCOUNT_URL
- Este valor debe tener el formato :https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
-
BLOB_CONTAINER_NAME
: nombre del contenedor de blobs en la cuenta de Azure Storage. -
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
: verá el nombre completo en la página Información general del espacio de nombres. Debe tener el formato :<NAMESPACENAME>>.servicebus.windows.net
. -
EVENT_HUB_NAME
: nombre del centro de eventos.
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Nota
Para obtener ejemplos de otras opciones para recibir eventos de un centro de eventos de forma asincrónica mediante una cadena de conexión, consulte la página recv_with_checkpoint_store_async.py de GitHub. Los patrones que se muestran también son aplicables a la recepción de eventos sin contraseña.
-
Ejecución de la aplicación del receptor
Inicie un símbolo del sistema.
Ejecute el comando siguiente e inicie sesión con la cuenta que se agregó al rol Propietario de datos de Azure Event Hubs en el espacio de nombres de Event Hubs y el rol Colaborador de datos de Storage Blob en la cuenta de Azure Storage.
az login
Cambie a la carpeta que tiene el archivo receive.py y ejecute el siguiente comando:
python recv.py
Ejecución de la aplicación del remitente
Inicie un símbolo del sistema.
Ejecute el comando siguiente e inicie sesión con la cuenta que se agregó al rol Propietario de datos de Azure Event Hubs en el espacio de nombres de Event Hubs y el rol Colaborador de datos de Storage Blob en la cuenta de Azure Storage.
az login
Cambie a la carpeta que tiene el send.py y ejecute este comando:
python send.py
La ventana del destinatario debería mostrar los mensajes que se enviaron al centro de eventos.
Solución de problemas
Si no ve eventos en la ventana del receptor o el código notifica un error, pruebe las siguientes sugerencias de solución de problemas:
Si no ve resultados de recy.py, ejecute send.py varias veces.
Si obtiene errores de "corrutina" al usar el código sin contraseña (con credenciales), asegúrese de que está realizando la importación desde
azure.identity.aio
.Si obtiene el mensaje "Sesión de cliente no cerrada" con el código sin contraseña (con credenciales), asegúrese de cerrar la credencial cuando termine. Para obtener más información, consulte Credenciales asincrónicas.
Si obtiene errores de autorización con recv.py al acceder al almacenamiento, asegúrese de seguir los pasos descritos en Creación de una cuenta de Azure Storage y un contenedor de blobs y de asignar el rol Colaborador de datos de Storage Blob a la entidad de servicio.
Si recibe eventos con identificadores de partición diferentes, es un resultado normal. Las particiones son un mecanismo de organización de datos relacionado con el paralelismo de bajada necesario para consumir las aplicaciones. El número de particiones de un centro de eventos está directamente relacionado con el número de lectores simultáneos que espera tener. Para obtener más información, consulte Particiones.
Pasos siguientes
En este inicio rápido, ha enviado y recibido eventos de forma asincrónica. Para aprender a enviar y recibir eventos de forma sincrónica, vaya a la página sync_samples de GitHub.
Explore más ejemplos y escenarios avanzados en la biblioteca cliente de Azure Event Hubs para ejemplos de Python.