你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

如何使用证书和服务主体向 Azure 事件中心发出日志

Apache Spark 诊断发出器扩展是一个库,它允许 Spark 应用程序将日志、事件日志和指标发送到 Azure 事件中心、Azure Log Analytics 和 Azure 存储等目标。

本教程介绍如何使用证书和服务主体创建所需的 Azure 资源,并使用证书和服务主体配置 Spark 应用程序,以使用 Apache Spark 诊断发出器扩展向 Azure 事件中心发出日志、事件日志和指标。

先决条件

注释

若要完成本教程的步骤,需要有权访问为其分配有“所有者”角色的资源组。

步骤 1. 注册应用程序

  1. 登录到 Azure 门户 并转到 应用注册

  2. 为 Synapse 工作区创建新的应用注册。

    显示创建新应用注册的屏幕截图。

步骤 2. 在 Key Vault 中生成证书

  1. 导航至密钥保管库 (Key Vault)。

  2. 展开 对象,然后选择 “证书”。

  3. 单击“生成/导入”。

    显示为应用生成新证书的屏幕截图。

步骤 3. 信任应用程序中的证书

  1. 转到在步骤 1 ->Manage ->Manifest 中创建的应用。

  2. 将证书详细信息追加到清单文件以建立信任。

         "trustedCertificateSubjects": [ 
              { 
              "authorityId": "00000000-0000-0000-0000-000000000001", 
              "subjectName": "Your-Subject-of-Certificate", 
              "revokedCertificateIdentifiers": [] 
              } 
         ] 
    

    屏幕截图显示了在应用程序中信任证书。

步骤 4. 分配 Azure 事件中心数据发送者角色

  1. 在 Azure 事件中心,导航到访问控制(IAM)。

  2. 将 Azure 事件中心数据发送者角色分配给应用程序(服务主体)。

    显示分配 Azure 事件中心数据发送者角色的屏幕截图。

步骤 5. 在 Synapse 中创建链接服务

  1. 在 Synapse Analytics 工作区中,转到 管理 - >链接服务

  2. 在 Synapse 中创建新的 链接服务 以连接到 Key Vault

    显示在 synapse 中创建链接服务的屏幕截图。

步骤 6. 对密钥保管库中的链接服务分配读取者角色

  1. 获取链接服务的工作区托管标识 ID。 链接服务的托管标识名称和对象 ID 位于“编辑链接服务”下

    屏幕截图显示了托管标识名称和对象 ID 位于“编辑链接服务”中。

  2. Key Vault 中,为链接服务分配 一个读取者 角色。

步骤 7. 配置链接服务

收集以下值并添加到 Apache Spark 配置。

  • <EMITTER_NAME>:发射器的名称。
  • <CERTIFICATE_NAME>:在密钥保管库中生成的证书名称。
  • <LINKED_SERVICE_NAME>:Azure Key Vault 链接服务名称。
  • <>EVENT_HUB_HOST_NAME:Azure 事件中心主机名,可以在 Azure 事件中心命名空间 -> 概述 -> 主机名中找到它。
  • <SERVICE_PRINCIPAL_TENANT_ID>:服务主体租户 ID,可以在“应用注册”-> 你的应用名称 ->“概述”>-“目录(租户) ID”中找到
  • <>SERVICE_PRINCIPAL_CLIENT_ID:服务主体客户端 ID,可以在注册中找到它 - 应用名称 ->> 概述 -> 应用程序(客户端) ID
  • <>EVENT_HUB_ENTITY_PATH:Azure 事件中心实体路径,可以在 Azure 事件中心命名空间 -> 概述 -> 主机名中找到它。
     "spark.synapse.diagnostic.emitters": <EMITTER_NAME>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.type": "AzureEventHub",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.categories": "DriverLog,ExecutorLog,EventLog,Metrics",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.certificateName": <CERTIFICATE_NAME>",
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.certificate.keyVault.linkedService": <LINKED_SERVICE_NAME>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.hostName": <EVENT_HUB_HOST_NAME>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.tenantId": <SERVICE_PRINCIPAL_TENANT_ID>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.clientId": <SERVICE_PRINCIPAL_CLIENT_ID>,
     "spark.synapse.diagnostic.emitter.<EMITTER_NAME>.entityPath": <EVENT_HUB_ENTITY_PATH>

步骤 8。 提交 Apache Spark 应用程序并查看日志和指标

可以使用 Apache Log4j 库编写自定义日志。

Scala 的示例:

%%spark
val logger = org.apache.log4j.LogManager.getLogger("com.contoso.LoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
//log exception
try {
      1/0
 } catch {
      case e:Exception =>logger.warn("Exception", e)
}
// run job for task level metrics
val data = sc.parallelize(Seq(1,2,3,4)).toDF().count()

PySpark 的示例:

%%pyspark
logger = sc._jvm.org.apache.log4j.LogManager.getLogger("com.contoso.PythonLoggerExample")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")