你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Apache Spark 诊断发出器扩展是一个库,它允许 Spark 应用程序将日志、事件日志和指标发送到 Azure 事件中心、Azure Log Analytics 和 Azure 存储等目标。
本教程介绍如何使用证书和服务主体创建所需的 Azure 资源,并使用证书和服务主体配置 Spark 应用程序,以使用 Apache Spark 诊断发出器扩展向 Azure 事件中心发出日志、事件日志和指标。
先决条件
- 一份 Azure 订阅。 在开始之前,还可以 创建一个免费帐户 。
- Synapse Analytics 工作区。
- Azure 事件中心。
- Azure Key Vault
- 应用注册
注释
若要完成本教程的步骤,需要有权访问为其分配有“所有者”角色的资源组。
步骤 1. 注册应用程序
步骤 2. 在 Key Vault 中生成证书
导航至密钥保管库 (Key Vault)。
展开 对象,然后选择 “证书”。
单击“生成/导入”。
步骤 3. 信任应用程序中的证书
转到在步骤 1 ->Manage ->Manifest 中创建的应用。
将证书详细信息追加到清单文件以建立信任。
"trustedCertificateSubjects": [ { "authorityId": "00000000-0000-0000-0000-000000000001", "subjectName": "Your-Subject-of-Certificate", "revokedCertificateIdentifiers": [] } ]
步骤 4. 分配 Azure 事件中心数据发送者角色
在 Azure 事件中心,导航到访问控制(IAM)。
将 Azure 事件中心数据发送者角色分配给应用程序(服务主体)。
步骤 5. 在 Synapse 中创建链接服务
在 Synapse Analytics 工作区中,转到 管理 - >链接服务。
在 Synapse 中创建新的 链接服务 以连接到 Key Vault。
步骤 6. 对密钥保管库中的链接服务分配读取者角色
获取链接服务的工作区托管标识 ID。 链接服务的托管标识名称和对象 ID 位于“编辑链接服务”下。
在 Key Vault 中,为链接服务分配 一个读取者 角色。
步骤 7. 配置链接服务
收集以下值并添加到 Apache Spark 配置。
- <EMITTER_NAME>:发射器的名称。
- <CERTIFICATE_NAME>:在密钥保管库中生成的证书名称。
- <LINKED_SERVICE_NAME>:Azure Key Vault 链接服务名称。
- <>EVENT_HUB_HOST_NAME:Azure 事件中心主机名,可以在 Azure 事件中心命名空间 -> 概述 -> 主机名中找到它。
- <SERVICE_PRINCIPAL_TENANT_ID>:服务主体租户 ID,可以在“应用注册”-> 你的应用名称 ->“概述”>-“目录(租户) ID”中找到
- <>SERVICE_PRINCIPAL_CLIENT_ID:服务主体客户端 ID,可以在注册中找到它 - 应用名称 ->> 概述 -> 应用程序(客户端) ID
- <>EVENT_HUB_ENTITY_PATH:Azure 事件中心实体路径,可以在 Azure 事件中心命名空间 -> 概述 -> 主机名中找到它。
"spark.synapse.diagnostic.emitters": <EMITTER_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type": "AzureEventHub",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories": "DriverLog,ExecutorLog,EventLog,Metrics",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName": <CERTIFICATE_NAME>",
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.linkedService": <LINKED_SERVICE_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.hostName": <EVENT_HUB_HOST_NAME>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId": <SERVICE_PRINCIPAL_TENANT_ID>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId": <SERVICE_PRINCIPAL_CLIENT_ID>,
"spark.synapse.diagnostic.emitter.<EMITTER_NAME>.entityPath": <EVENT_HUB_ENTITY_PATH>
步骤 8。 提交 Apache Spark 应用程序并查看日志和指标
可以使用 Apache Log4j 库编写自定义日志。
Scala 的示例:
%%spark
val logger = org.apache.log4j.LogManager.getLogger("com.contoso.LoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
//log exception
try {
1/0
} catch {
case e:Exception =>logger.warn("Exception", e)
}
// run job for task level metrics
val data = sc.parallelize(Seq(1,2,3,4)).toDF().count()
PySpark 的示例:
%%pyspark
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.contoso.PythonLoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")