Apache Spark 診断エミッター拡張機能は、Spark アプリケーションがログ、イベント ログ、メトリックを Azure Event Hubs、Azure Log Analytics、Azure Storage などの宛先に送信できるようにするライブラリです。
このチュートリアルでは、必要な Azure リソースを作成し、証明書とサービス プリンシパルを使用して Spark アプリケーションを構成し、Apache Spark 診断エミッタ拡張機能を使用してログ、イベント ログ、メトリックを Azure Event Hubs に出力する方法について説明します。
[前提条件]
- Azure サブスクリプション。 開始する前 に無料アカウントを作成 することもできます。
- Synapse Analytics ワークスペース。
- Azure Event Hubs。
- Azure Key Vault
- アプリの登録
注
このチュートリアルの手順を完了するには、所有者ロールが割り当てられているリソース グループにアクセスできる必要があります。
ステップ 1. アプリケーションを登録する
Azure portal にサインインし、[アプリの登録] に移動します。
Synapse ワークスペースの新しいアプリ登録を作成します。
手順 2. Key Vault で証明書を生成する
Key Vault に移動します。
オブジェクトを展開し、証明書を選択します。
[ 生成/インポート] をクリックします。
手順 3. アプリケーションで証明書を信頼する
手順 1 - >Manage ->Manifest で作成したアプリに移動します。
信頼を確立するために、証明書の詳細をマニフェスト ファイルに追加します。
"trustedCertificateSubjects": [ { "authorityId": "00000000-0000-0000-0000-000000000001", "subjectName": "Your-Subject-of-Certificate", "revokedCertificateIdentifiers": [] } ]
手順 4. Azure Event Hubs のデータ送信者ロールを割り当てる
Azure Event Hubs で、アクセス制御 (IAM) に移動します。
Azure Event Hubs データ送信者ロールをアプリケーション (サービス プリンシパル) に割り当てます。
ステップ 5. Synapse でリンクされたサービスを作成する
Synapse Analytics ワークスペースで、[ 管理 ] ->リンクされたサービスに移動します。
Key Vault に接続するために、Synapse で新しいリンクされたサービスを作成します。
ステップ 6. Key Vault のリンクされたサービスに閲覧者ロールを割り当てる
リンクされたサービスからワークスペースのマネージド ID を取得します。 リンクされたサービスの マネージド ID 名 と オブジェクト ID は、[ リンクされたサービスの編集] にあります。
Key Vault で、リンクされたサービスに閲覧者ロールを割り当てます。
手順 7. リンクされたサービスで構成する
次の値を収集し、Apache Spark 構成に追加します。
- <EMITTER_NAME>: エミッターの名前。
- <CERTIFICATE_NAME>: キー コンテナーで生成した証明書名。
- <LINKED_SERVICE_NAME>: Azure Key Vault のリンクされたサービス名。
- <EVENT_HUB_HOST_NAME>: Azure Event Hubs ホスト名は、Azure Event Hubs 名前空間 -> Overview -> Host name にあります。
- <SERVICE_PRINCIPAL_TENANT_ID>: サービス プリンシパル テナント ID。アプリの登録 -> アプリ名 -> 概要 -> ディレクトリ (テナント) ID で確認できます
- <SERVICE_PRINCIPAL_CLIENT_ID>: サービス プリンシパル クライアント IDは、登録 -> あなたのアプリ名 -> 概要 -> アプリケーション(クライアント) IDで見つけることができます。
- <EVENT_HUB_ENTITY_PATH>: Azure Event Hubs エンティティ パスは、Azure Event Hubs 名前空間 -> Overview -> Host name にあります。
"spark.synapse.diagnostic.emitters": <EMITTER_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type": "AzureEventHub",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories": "DriverLog,ExecutorLog,EventLog,Metrics",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName": <CERTIFICATE_NAME>",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.linkedService": <LINKED_SERVICE_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.hostName": <EVENT_HUB_HOST_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId": <SERVICE_PRINCIPAL_TENANT_ID>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId": <SERVICE_PRINCIPAL_CLIENT_ID>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.entityPath": <EVENT_HUB_ENTITY_PATH>
手順 8. Apache Spark アプリケーションを送信してログとメトリックを表示する
Apache Log4j ライブラリを使用して、カスタム ログを書き込むことができます。
Scala の例:
%%spark
val logger = org.apache.log4j.LogManager.getLogger("com.contoso.LoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
//log exception
try {
1/0
} catch {
case e:Exception =>logger.warn("Exception", e)
}
// run job for task level metrics
val data = sc.parallelize(Seq(1,2,3,4)).toDF().count()
PySpark の例:
%%pyspark
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.contoso.PythonLoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")