次の方法で共有


クイック スタート: Python を使用してイベント ハブとの間でイベントを送信または受信する

このクイック スタートでは、 azure-eventhub Python パッケージを使用してイベント ハブとの間でイベントを送受信する方法について説明します。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を満たしていることを確認します。

  • Microsoft Azure サブスクリプション: 無料試用版 がない場合はサインアップします。
  • Python 3.8 以降: pip がインストールおよび更新されていることを確認します。
  • Visual Studio Code (推奨):または任意の他の IDE を使用します。
  • Event Hubs 名前空間とイベント ハブ: Azure portal で作成するには、このガイドに従います。

イベントを送信するためにパッケージをインストールする

Event Hubs 向けの Python パッケージをインストールするには、Python をパス設定した状態でコマンド プロンプトを開きます。 サンプルを保持するフォルダーにディレクトリを変更します。

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Azure に対してアプリを認証する

このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法について説明します。

  • パスワードレス。 Microsoft Entra ID とロールベースのアクセス制御 (RBAC) のセキュリティ プリンシパルを使用して、Event Hubs 名前空間に接続します。 コード、構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。
  • 接続文字列。 接続文字列を使用して Event Hubs 名前空間に接続します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。

実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、Azure サービスの Service Bus の認証と承認とパスワードレス接続に関するページを参照してください。

Microsoft Entra ユーザーにロールを割り当てる

ローカルで開発するときは、Azure Event Hubs に接続するユーザー アカウントに適切なアクセス許可があることを確認します。 メッセージを送受信するには、 Azure Event Hubs データ所有者 ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 詳細については、「 Azure RBAC のスコープについて」ページを 参照してください。

次の例では、ユーザー アカウントに Azure Event Hubs Data Owner ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Event Hubs の Azure の組み込みロール

Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure には、Event Hubs 名前空間へのアクセスを承認するための次の組み込みロールが用意されています。

  • Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスを有効にします。
  • Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
  • Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。

カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。

重要

ほとんどの場合、ロールの割り当てが Azure に伝達されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [ + 追加] を選択します。 次に、[ ロールの割り当ての追加] を選択します。

    ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubs Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [ アクセスの割り当て] で、[ ユーザー、グループ、またはサービス プリンシパル] を選択します。 [ + メンバーの選択] を選択します

  7. ダイアログで、Microsoft Entra ユーザー名 (通常は user@___domain のメール アドレス) を検索します。 ダイアログの下部にある [選択] を 選択 します。

  8. [ 確認と割り当て] を選択して、最後のページに移動します。 [ 確認と割り当て ] をもう一度選択してプロセスを完了します。

送信イベント

このセクションでは、前に作成したイベント ハブにイベントを送信する Python スクリプトを作成します。

  1. Visual Studio Codeなど、お使いの Python エディターを開きます。

  2. send.py という名前のスクリプトを作成します。 このスクリプトは、前に作成したイベント ハブにイベントのバッチを送信します。

  3. 次のコードを send.py に貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - 名前空間の [概要 ] ページに完全修飾名が表示されます。 <NAMESPACENAME>>.servicebus.windows.netの形式にする必要があります。
    • EVENT_HUB_NAME - イベント ハブの名前。
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    接続文字列を使用してイベント ハブに非同期的にイベントを送信するためのその他のオプションの例については、 GitHub send_async.py ページを参照してください。 示されているパターンは、パスワードレスでイベントを送信する場合にも適用できます。

受信イベント

このクイックスタートでは、Azure Blob Storage をチェックポイント ストアとして使用します。 チェックポイント ストアは、チェックポイント (最後の読み取り位置) を保持するために使用されます。

チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従います。

  • コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
  • ストレージ アカウントを他の目的で使用しないでください。
  • コンテナーを他の目的で使用しないでください。
  • デプロイされたアプリケーションと同じリージョンにストレージ アカウントを作成します。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。

Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。

  • 階層型名前空間
  • BLOB の論理的な削除
  • バージョン管理

Azure Storage アカウントと BLOB コンテナーを作成する

次の手順に従って、Azure Storage アカウントと BLOB コンテナーを作成します。

  1. Azure ストレージ アカウントの作成
  2. BLOB コンテナーを作成します
  3. BLOB コンテナーを認証します。

接続文字列とコンテナー名を記録しておいてください。後から受信コードで使用します。

ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに適切なアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、 ストレージ BLOB データ共同作成者 が必要です。 このロールを自分に割り当てるには、 ユーザー アクセス管理者 ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールを割り当てる必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 詳しくは、「Azure RBAC のスコープについて」を参照してください。

このシナリオでは、ストレージ アカウントをスコープとするアクセス許可をユーザー アカウントに割り当てて、 最小限の特権の原則に従います。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。

次の例では、 ストレージ BLOB データ共同作成者 ロールをユーザー アカウントに割り当てます。これによって、ストレージ アカウント内の BLOB データへの読み取りと書き込みの両方のアクセスが提供されます。

重要

ほとんどの場合、ロールの割り当てが Azure に伝達されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。

  2. ストレージ アカウント ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [ + 追加] を選択します。 次に、[ ロールの割り当ての追加] を選択します。

    ストレージ アカウント ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、 ストレージ BLOB データ共同作成者を検索します。 一致する結果を選択し、次に [次へ]を押します。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@___domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [ 確認と割り当て] を選択して、最後のページに移動します。 [ 確認と割り当て ] をもう一度選択してプロセスを完了します。

イベントを受信するためにパッケージをインストールする

受信側では、追加で 1 つ以上のパッケージをインストールする必要があります。 このクイックスタートでは、Azure Blob Storage を使用してチェックポイントを保持し、プログラムが既に読み取ったイベントを読み取らないようにします。 BLOB 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

イベントを受信する Python スクリプトの作成

このセクションでは、イベント ハブからイベントを受信する Python スクリプトを作成します。

  1. Visual Studio Codeなど、お使いの Python エディターを開きます。

  2. recv.py という名前のスクリプトを作成します。

  3. 次のコードを recv.py に貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • BLOB_STORAGE_ACCOUNT_URL - この値は次の形式にする必要があります。 https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Azure ストレージ アカウント内の BLOB コンテナーの名前。
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - 名前空間の [概要 ] ページに完全修飾名が表示されます。 <NAMESPACENAME>>.servicebus.windows.netの形式にする必要があります。
    • EVENT_HUB_NAME - イベント ハブの名前。
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    接続文字列を使用してイベント ハブから非同期的にイベントを受信するためのその他のオプションの例については、 GitHub recv_with_checkpoint_store_async.py ページを参照してください。 示されているパターンは、パスワードレスでイベントを受信する場合にも適用できます。

受信側アプリを実行する

  1. コマンド プロンプトを起動します。

  2. 次のコマンドを実行し、 Event Hubs 名前空間の Azure Event Hubs データ所有者 ロールと Azure ストレージ アカウントの ストレージ BLOB データ共同作成者 ロールに追加されたアカウントを使用してサインインします。

    az login
    
  3. receive.py ファイルがあるフォルダーに切り替え、次のコマンドを実行します。

    python recv.py
    

送信側アプリを実行する

  1. コマンド プロンプトを起動します。

  2. 次のコマンドを実行し、 Event Hubs 名前空間の Azure Event Hubs データ所有者 ロールと Azure ストレージ アカウントの ストレージ BLOB データ共同作成者 ロールに追加されたアカウントを使用してサインインします。

    az login
    
  3. send.py があるフォルダーに切り替え、次のコマンドを実行します。

    python send.py
    

イベント ハブに送信されたメッセージが受信側ウィンドウに表示されます。

トラブルシューティング

受信側ウィンドウにイベントが表示されない場合、またはコードでエラーが報告される場合は、次のトラブルシューティングのヒントを試してください。

  • recy.py からの結果が表示されない場合は、send.py 複数回実行します。

  • パスワードなしのコード (資格情報を使用) を使用しているときに "コルーチン" に関するエラーが表示される場合は、azure.identity.aio からのインポートを使用していることを確認してください。

  • パスワードなしのコード (資格情報を含む) で "閉じていないクライアント セッション" が表示される場合は、完了したら資格情報を閉じてください。 詳細については、非同期資格情報に関するページをご覧ください。

  • ストレージにアクセスするときに recv.py の認可エラーが表示される場合は、「Azure ストレージ アカウントと BLOB コンテナーを作成する」の手順に従い、ストレージ BLOB データ共同作成者 ロールをサービス プリンシパルに割り当ててください。

  • パーティション ID が異なるイベントを受け取った場合、この結果が予想されます。 パーティションはデータ編成メカニズムであり、コンシューマー アプリケーションで必要とされるダウンストリーム並列処理に関連します。 イベント ハブでのパーティションの数は、予想される同時接続のリーダー数に直接関連します。 詳細については、パーティションの詳細に関するページを参照してください。

次のステップ

このクイック スタートでは、イベントを非同期的に送受信しました。 イベントを同期的に送受信する方法については、GitHub の sync_samples ページにあるサンプルを参照してください。

Python 用 Azure Event Hubs クライアント ライブラリのサンプルで、さらに多くの例や高度なシナリオを探索してください。