在文件通知模式下配置自动加载程序流

本页介绍如何将自动加载程序流配置为使用文件通知模式以增量方式发现和引入云数据。

在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 与目录列表模式相比,文件通知模式的性能更高且可缩放。

可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。

注意

Azure 高级存储帐户不支持文件通知模式,因为高级帐户不支持队列存储。

警告

文件通知模式不支持更改自动加载程序的源路径。 如果使用文件通知模式并更改了路径,则可能无法引入目录更新时新目录中已存在的文件。

在外部位置上启用文件通知模式以及不启用文件事件

可通过两种方法将自动加载程序配置为使用文件通知模式:

  • 旧文件通知模式:单独管理每个 Auto Loader 数据流的文件通知队列。 自动加载程序自动设置通知服务和队列服务,该服务订阅输入目录中的文件事件。

    这是旧方法。

  • (推荐)文件事件(公开预览):在处理 Unity 目录中定义的任何外部位置文件的所有流时,使用一个 Azure Databricks 托管的文件通知队列。

    此方法要求为外部文件路径启用事件功能。 它比旧方法具有以下优势:

    • Azure Databricks 可以为你设置云存储帐户中的订阅和文件事件,而无需使用服务凭据或其他特定于云的身份验证选项向自动加载程序提供其他凭据。 请参阅(推荐)为外部位置启用文件事件
    • 您需要在云存储帐户中创建的 Azure 托管标识策略数量减少。
    • 由于不再需要为每个自动加载程序流创建队列,因此更容易避免达到 旧版自动加载程序文件通知模式中使用的云资源中列出的云提供程序通知限制。
    • Azure Databricks 会自动管理资源要求的优化,因此无需优化参数,例如 cloudFiles.fetchParallelism
    • 清理功能意味着您无需过于担心在云中创建的通知的生命周期,特别是在删除流或完全刷新的情况下。

Databricks 建议,如果在当前目录列表模式下使用自动加载程序,则迁移到文件通知模式并显示文件事件,以查看显著的性能改进。

将文件通知模式与文件事件配合使用

本部分介绍如何创建和更新自动加载程序流以使用文件事件。

重要

自动加载器对文件事件的支持目前处于公共预览版。 若要注册预览版,请联系 Azure Databricks 帐户团队。

在您开始之前

设置文件事件需要:

  • 已为 Unity Catalog 启用的 Azure Databricks 工作区。
  • 在 Unity 目录中创建存储凭据和外部位置对象的权限。

具有文件事件的自动加载程序流需要:

  • Databricks Runtime 14.3 LTS 或更高版本上的计算。

创建或更新使用文件事件的自动加载流之前:

配置说明

以下说明适用于是创建新的自动加载程序流还是迁移现有流,以将升级的文件通知模式与文件事件一起使用:

  1. 在 Unity 目录中创建存储凭据和外部位置,以授予对自动加载程序流云存储中源位置的访问权限。 请参阅创建外部位置以将云存储连接到 Azure Databricks

  2. 为外部位置启用文件事件。 请参阅(推荐)为外部位置启用文件事件

  3. 创建新的自动加载程序流或编辑现有的自动加载程序流以使用外部位置时:

    • 如果现有的 基于通知的自动加载程序流 使用来自外部位置的数据,请将其关闭并删除关联的通知资源。
    • 请确保未设置pathRewrites(这不是一个常见选项)。
    • 查看自动加载程序在使用文件事件管理文件通知时不考虑的设置列表。 避免在新的自动加载程序流中使用它们,并从您正在迁移到此模式的现有流中删除它们。
    • 在自动加载程序代码中将选项cloudFiles.useManagedFileEvents设置为true

例如:

autoLoaderStream = (spark.readStream
    .format("cloudFiles")
    ...
    .options("cloudFiles.useManagedFileEvents", True)
    ...)

如果使用的是 DLT,并且已有带流式表的 DLT 管道,请将其更新为包含 useManagedFileEvents 选项:

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
    FROM STREAM read_files('abfss://path/to/external/___location/or/volume',
                     format => '<format>',
                     useManagedFileEvents => 'True'
                     ...
                    )

不支持的自动加载器设置

当流使用文件事件时,不支持以下自动加载程序设置:

设置 改变
useIncremental 不再需要在文件通知的效率与目录列表的简单性之间做出决定。 带有文件事件的自动加载程序只有一种模式。
useNotifications 每个外部位置只有一个队列和存储事件订阅。
cloudFiles.fetchParallelism 具有文件事件的自动加载程序不提供手动并行优化。
cloudFiles.backfillInterval Azure Databricks 自动处理为文件事件启用的外部位置的回填。
cloudFiles.pathRewrites 仅当将外部数据位置装载到已弃用的 DBFS 时,此选项才适用。
resourceTags 应使用云控制台设置资源标记。

文件事件的自动加载程序的限制

文件事件服务通过缓存最近创建的文件来优化文件发现。 如果自动加载程序不经常运行,则此缓存可能会过期,自动加载程序会回退到目录列表以发现文件和更新缓存。 为了避免这种情况,请至少每七天调用一次自动加载程序。

有关文件事件限制的常规列表,请参阅 文件事件限制

单独管理每个自动加载程序流的文件通知队列(旧版)

重要

需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:

旧版自动加载程序文件通知模式中使用的云资源

如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。

下表列出了由自动加载程序为每个云提供商创建的资源。

云存储 订阅服务 队列服务 前缀 * 限制 **
Amazon S3 AWS SNS AWS SQS databricks-auto-ingest 每个 S3 存储桶 100 个
ADLS Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每存储帐户 500
谷歌云存储 (GCS) Google Pub/Sub Google Pub/Sub databricks-auto-ingest 每个 GCS 存储桶有 100 个
Azure Blob 存储 Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每存储帐户 500

* 自动加载程序用此前缀命名资源。

** 可以启动多少个并发文件通知管道

如果必须运行超过这些限制所允许数量的基于文件通知的自动加载流,可以使用 文件事件 或服务,例如 AWS Lambda、Azure Functions 或 Google Cloud Functions,将来自侦听整个容器或存储桶的单个队列的通知分发到目录专属队列。

旧版文件通知事件

当文件上传到 S3 存储桶时,Amazon S3 都会提供一个 ObjectCreated 事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。

Azure Data Lake Storage 为存储容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 自动加载程序流支持用于发现文件的 RenameFile 操作。 RenameFile 操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory 操作,以便发现文件。 RenameDirectory 操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

Google Cloud Storage 会在上传文件时提供一个 OBJECT_FINALIZE 事件,其中包括覆盖和文件副本。 失败的上传不会生成此事件。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

为 Azure Data Lake Storage 和 Azure Blob 存储配置文件通知所需的权限

你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储

若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。

可以使用以下方法之一进行身份验证:

获取身份验证凭据后,请将必要的权限分配给 Databricks 访问连接器(用于服务凭据)或 Microsoft Entra ID 应用(对于服务主体)。

  • 使用 Azure 内置角色

    为访问连接器分配输入路径所在的存储帐户的以下角色:

    • 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
    • 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,才需要此角色。

    请为相关资源组中的此访问连接器分配以下角色:

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 使用自定义角色

    如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    然后,可以将此自定义角色分配给访问连接器。

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

自动加载程序权限设置

Amazon S3 配置文件通知所需的权限

你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息

若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。 需要此标识和访问管理角色,才能为自动加载程序创建用于身份验证的服务凭据。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

其中:

  • <bucket-name>:流将在其中读取文件的 S3 存储桶名称,例如,auto-logs。 可以使用 * 作为通配符,例如 databricks-*-logs。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行 %fs mounts 列出笔记本中的所有 DBFS 装入点。
  • <region>:S3 存储桶所在的 AWS 区域,例如 us-west-2。 如果你不想要指定区域,请使用 *
  • <account-number>:拥有 S3 存储桶的 AWS 帐号,例如 123456789012。 如果你不想要指定帐号,请使用 *

SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*cloudFiles 源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。

注意

上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果你需要添加 S3 只读权限,请在 JSON 文档的 Action 语句的 DatabricksAutoLoaderSetup 列表中添加以下内容:

  • s3:ListBucket
  • s3:GetObject

初始设置后权限减少

上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。

重要

权限降低后,无法在出现故障时(例如,SQS 队列被意外删除)启动新的流式处理查询或重新创建资源;也无法使用云资源管理 API 来列出或拆卸资源。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

为 GCS 配置文件通知所需的权限

你必须对你的 GCS Bucket 和所有对象具有 listget 权限。 有关详细信息,请参阅有关 IAM 权限的 Google 文档。

若要使用文件通知模式,需要为 GCS 服务帐户添加权限,以及用于访问 Google Cloud Pub/Sub 资源的服务帐户。

Pub/Sub Publisher 角色添加到 GCS 服务帐户。 这样,该帐户就可以将事件通知消息从 GCS 桶发布到 Google Cloud Pub/Sub。

至于用于 Google Cloud Pub/Sub 资源的服务帐户,需要添加以下权限。 创建 Databricks 服务凭据时,会自动创建此服务帐户。 Databricks Runtime 16.1 及更高版本中提供了服务凭据支持。

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

为此,可以创建拥有这些权限的 IAM 自定义角色,或分配预先存在的 GCP 角色来涵盖这些权限。

查找 GCS 服务帐户

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 Cloud Storage > Settings。 “云存储服务帐号”部分包含 GCS 服务帐号的电子邮件。

GCS 服务帐户

创建用于文件通知模式的自定义 Google Cloud IAM 角色

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 IAM & Admin > Roles。 然后,在顶部创建一个角色或更新现有角色。 在角色创建或编辑屏幕中,单击 Add Permissions。 此时会弹出一个菜单,你可以在其中向角色添加所需的权限。

GCP IAM 自定义角色

手动配置或管理文件通知资源

特权用户可以手动配置或管理文件通知资源。

  • 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项
  • 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:

注意

必须要具有适当权限才能配置或修改云基础结构。 请参阅 AzureS3GCS 的权限文档。

Python语言

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala(编程语言)

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 来创建名称为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于存储系统,该系统汇总在 旧版自动加载程序文件通知模式下使用的云资源中)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用cloudFiles中的标识符将该标识符传递给 源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。

只有调用 "path" 时才需要提供 newManagersetUpNotificationServices 选项;对于 listNotificationServicestearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path

以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:

云存储 安装程序 API 列出 API 拆解 API
Amazon S3 所有版本 所有版本 所有版本
ADLS 所有版本 所有版本 所有版本
谷歌云存储 (GCS) Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本
Azure Blob 存储 所有版本 所有版本 所有版本

清理自动加载程序创建的事件通知资源

自动加载程序不会自动关闭文件通知资源。 若要拆解文件通知资源,必须使用云资源管理器,如上一部分所示。 还可以使用云提供商的 UI 或 API 手动删除这些资源。

排查常见错误

本部分介绍将自动加载程序与文件通知模式配合使用时出现的常见错误,以及如何解决这些问题。

无法创建事件网格订阅

如果在首次运行自动加载程序时出现以下错误消息,则事件网格未在 Azure 订阅中注册为资源提供程序。

java.lang.RuntimeException: Failed to create event grid subscription.

若要将事件网格注册为资源提供程序,请执行以下操作:

  1. 在 Azure 门户中,转到订阅。
  2. 单击“设置”部分的“资源提供程序”。
  3. 注册提供程序 Microsoft.EventGrid

执行事件网格订阅操作所需的授权

如果在首次运行自动加载程序时看到以下错误消息,请确认已为事件网格和存储帐户的服务主体分配“参与者”角色

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

事件网格客户端绕过代理

在 Databricks Runtime 15.2 及更高版本中,自动加载程序中的事件网格连接默认使用系统属性中的代理设置。 在 Databricks Runtime 13.3 LTS、14.3 LTS 和 15.0 到 15.2 中,可以通过设置“Spark 配置”属性 ,手动配置事件网格连接以使用代理spark.databricks.cloudFiles.eventGridClient.useSystemProperties true。 请参阅在 Azure Databricks 上设置 Spark 配置属性