Databricks 建议在生产环境中运行自动加载程序时遵循流式处理最佳做法。
Databricks 建议在 Lakeflow 声明性管道中使用自动加载程序进行增量数据引入。 Lakeflow 声明性管道扩展了 Apache Spark 结构化流式处理中的功能,并允许你只编写几行声明性 Python 或 SQL 来部署生产质量的数据管道,其中包含:
监视自动加载程序
查询自动加载程序发现的文件
注意
此 cloud_files_state
函数在 Databricks Runtime 11.3 LTS 和更高版本中可用。
自动加载程序提供用于检查流状态的 SQL API。 使用 cloud_files_state
函数,可以找到有关自动加载程序流发现的文件的元数据。 只需从 cloud_files_state
查询,提供与自动加载程序流关联的检查点位置。
SELECT * FROM cloud_files_state('path/to/checkpoint');
侦听流更新
若要进一步监视自动加载程序流,Databricks 建议使用 Apache Spark 的流式处理查询侦听器接口。
自动加载程序在每批次中向流式处理查询侦听器报告指标。 你可以在流式处理查询进度仪表板的“原始数据”选项卡下的 numFilesOutstanding
和 numBytesOutstanding
指标中查看积压工作 (backlog) 中存在多少个文件以及积压工作 (backlog) 量有多大:
{
"sources": [
{
"description": "CloudFilesSource[/path/to/source]",
"metrics": {
"numFilesOutstanding": "238",
"numBytesOutstanding": "163939124006"
}
}
]
}
在 Databricks Runtime 10.4 LTS 及更高版本中,对于 AWS 和 Azure,使用文件通知模式时,指标还将包括云队列中文件事件的近似数量(表示为 approximateQueueSize
)。
成本注意事项
运行自动加载程序时,主要成本来源是计算资源和文件发现的成本。
为了降低计算成本,Databricks 建议在没有较低延迟要求的情况下,使用 Lakeflow 作业将 Auto Loader 计划为批处理作业Trigger.AvailableNow
,而不是持续运行。 请参阅配置结构化流式处理触发器间隔。
文件发现成本的形式可以是目录列表模式下存储帐户的 LIST 操作、订阅服务上的 API 请求,以及文件通知模式下的队列服务。 为了降低文件发现成本,Databricks 的建议是:
- 在目录列表模式下连续运行自动加载程序时提供
ProcessingTime
触发器 - 以词汇顺序构建文件到存储帐户的上传,以尽可能利用增量列表(已弃用)
- 在无法增量列出时利用文件通知
- 使用资源标记来标记自动加载程序创建的资源,以跟踪你的成本
使用 Trigger.AvailableNow 和速率限制
注意
在 Databricks Runtime 10.4 LTS 及更高版本中可用。
可以使用 Trigger.AvailableNow
将自动加载程序计划为在 Lakeflow 作业中作为批处理作业运行。
AvailableNow
触发器将指示自动加载程序处理在查询开始时间之前到达的所有文件。 在流开始后上传的新文件将被忽略,直到下一次触发。
使用 Trigger.AvailableNow
,文件发现将与数据处理异步进行,并且可以通过速率限制跨多个微批处理数据。 默认情况下,自动加载程序每个微批处理最多处理 1000 个文件。 可以通过配置 cloudFiles.maxFilesPerTrigger
和 cloudFiles.maxBytesPerTrigger
来配置应在微批处理中处理的文件数或字节数。 文件限制是硬限制,但字节限制是软限制,这意味着可以处理的字节数多于 maxBytesPerTrigger
提供的字节数。 当这两个选项同时提供时,自动加载程序将处理达到其中一个限制所需的文件数量。
检查点位置
检查点位置用于存储流的状态和进度信息。 Databricks 建议将检查点位置设置为没有云对象生命周期策略的位置。 如果根据策略清理检查点位置中的文件,则流状态已损坏。 如果发生这种情况,则必须从头开始重启流。
文件事件跟踪
自动加载程序使用 RocksDB 跟踪检查点位置中的已发现文件,以提供恰好一次的引入保证。 对于大容量或长期引入流,Databricks 建议升级到 Databricks Runtime 15.4 LTS 或更高版本。 在这些版本中,自动加载程序不会等待在流启动之前下载整个 RocksDB 状态,这可以加速流启动时间。
如果要防止文件状态在没有限制的情况下增长,还可以考虑使用 cloudFiles.maxFileAge
选项使超过特定年龄的文件事件过期。 可为 cloudFiles.maxFileAge
设置的最小值为 "14 days"
。 RocksDB 中的删除表现为墓碑条目。 因此,您可能会看到由于事件过期,存储使用量暂时增加,然后才开始趋于稳定。
警告
cloudFiles.maxFileAge
作为针对高容量数据集的成本控制机制提供。 过于激进地调整 cloudFiles.maxFileAge
可能会导致数据质量问题,例如重复引入或缺少文件。 因此,Databricks 建议为 cloudFiles.maxFileAge
使用保守设置,例如 90 天,这与类似数据引入解决方案建议的值相当。
尝试优化 cloudFiles.maxFileAge
选项可能会导致自动加载程序忽略未处理的文件,或导致已处理的文件过期并重新处理,从而导致出现重复数据。 下面是选择 cloudFiles.maxFileAge
时需要注意的几个事项:
- 如果流在很长时间后重启,将忽略从队列中拉取的超过
cloudFiles.maxFileAge
的文件通知事件。 同样,如果使用目录列表,将忽略在停机期间可能出现的超过cloudFiles.maxFileAge
的文件。 - 如果使用目录列表模式并使用
cloudFiles.maxFileAge
(例如将其设置为"1 month"
),然后停止流并在将cloudFiles.maxFileAge
设置为"2 months"
的情况下重启流,那么,系统将重新处理超过 1 个月但未到 2 个月的文件。
如果你在首次启动流时设置此选项,则不会引入超过 cloudFiles.maxFileAge
的数据。因此,如果你需要引入旧数据,则不应在首次启动流时设置此选项, 而是应在后续运行中设置此选项。
使用 cloudFiles.backfillInterval 触发常规回填
在极少数情况下,仅依赖于通知系统(例如达到通知消息保留限制)时,文件可能会丢失或延迟。 如果对数据完整性和 SLA 有严格的要求,请考虑设置 cloudFiles.backfillInterval
以指定间隔触发异步回填。 例如,将其设置为每日回填的持续时间为一天,或每周回填的持续时间为一周。 触发定期回填不会导致重复。