次の方法で共有


Apache Spark アプリケーションを Azure Event Hubs に接続する

このチュートリアルでは、リアルタイム ストリーミングのために Spark アプリケーションを Event Hubs に接続する手順について説明します。 この統合により、お使いのプロトコル クライアントを変更せずにストリーミングを行ったり、独自の Kafka または Zookeeper クラスターを実行したりすることができます。 このチュートリアルには、Apache Spark v2.4 以上および Apache Kafka v2.0 以上が必要となります。

このサンプルは GitHub で入手できます。

このチュートリアルでは、以下の内容を学習します。

  • Event Hubs 名前空間を作成する
  • サンプル プロジェクトを複製する
  • Spark の実行
  • Kafka の Event Hubs からの読み取り
  • Kafka の Event Hubs への書き込み

前提条件

このチュートリアルを開始する前に、次の内容を確認してください。

Spark-Kafka アダプターは、Spark v2.4 の時点で Kafka v2.0 をサポートするように更新されました。 Spark の以前のリリースでは、アダプターは Kafka v0.10 以降をサポートしていましたが、特に Kafka v0.10 API に依存していました。 Kafka 用 Event Hubs では Kafka v0.10 がサポートされていないため、v2.4 より前のバージョンの Spark の Spark-Kafka アダプターは、Kafka エコシステムの Event Hubs ではサポートされていません。

Event Hubs 名前空間を作成する

Event Hubs サービスとの間で送受信を行うには、イベント ハブの名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 Event Hubs の接続文字列と完全修飾ドメイン名 (FQDN) を、後で使用するために取得します。 手順については、「Get an Event Hubs connection string (Event Hubs の接続文字列を取得する)」を参照してください。

サンプル プロジェクトを複製する

Azure Event Hubs リポジトリを複製し、 tutorials/spark サブフォルダーに移動します。

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Kafka の Event Hubs からの読み取り

いくつかの構成の変更により、Kafka の Event Hubs から読み取りを開始できます。 名前空間の詳細で BOOTSTRAP_SERVERSEH_SASL を更新し、Kafka と同様に Event Hubs でストリーミングを開始できます。 完全なサンプル コードについては、GitHub の sparkConsumer.scala ファイルを参照してください。

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

次のようなエラーが発生した場合は、spark.readStream呼び出しに.option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")を追加して、もう一度やり直してください。

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Kafka の Event Hubs への書き込み

Kafka に書き込むのと同じ方法で Event Hubs に書き込むこともできます。 BOOTSTRAP_SERVERSEH_SASLをEvent Hubsの名前空間からの情報で変更するために、設定を更新することを忘れないでください。 完全なサンプル コードについては、GitHub の sparkProducer.scala ファイルを参照してください。

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

次のステップ

Event Hubs と Kafka 用 Event Hubs の詳細については、次の記事を参照してください。