查询流式处理数据

可通过 Azure Databricks 使用结构化流式处理来查询流式处理数据源。 Azure Databricks 为 Python 和 Scala 中的流式处理工作负载提供广泛的支持,并支持使用 SQL 的大多数结构化流式处理功能。

以下示例演示如何在笔记本中交互式开发期间使用内存接收器手动检查流式处理数据。 由于笔记本 UI 中的行输出限制,可能无法观察流式处理查询读取的所有数据。 在生产工作负荷中,应仅通过将流式处理查询写入目标表或外部系统来触发流式处理查询。

注意

对流式处理数据的交互式查询的 SQL 支持仅限于在通用计算上运行的笔记本。 也可以在 Databricks SQL 或 Lakeflow 声明性管道中声明流式处理表时使用 SQL。 请参阅 流式处理表Lakeflow 声明式管道

向流式处理系统查询数据

Azure Databricks 为以下流式处理系统提供流数据读取器:

  • 卡 夫 卡
  • 动觉 (Kinesis)
  • PubSub
  • 脉冲星 (Pulsar)

针对这些系统初始化查询时,必须提供配置详细信息,具体取决于配置的环境和你选择从中读取的系统。 请参阅 Lakeflow Connect 中的标准连接器

涉及流式系统的常见工作负载包括将数据引入到湖屋中,并进行流处理以将数据传输到外部系统。 有关流式处理工作负载的详细信息,请参阅 结构化流概念

以下示例演示对 Kafka 进行的交互式流式读取:

Python语言

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

以流式读取方式查询表

默认情况下,Azure Databricks 使用 Delta Lake 创建所有表。 在对 Delta 表执行流式处理查询时,在表的版本提交后,查询会自动获取新记录。 默认情况下,流式处理查询要求源表仅包含追加的记录。 如果需要处理包含更新和删除的流数据,Databricks 建议使用 Lakeflow 声明性管道和 AUTO CDC ... INTO。 请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获

以下示例演示如何从表中进行交互式流读取:

Python语言

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

使用自动加载程序查询云对象存储中的数据

可以使用 Azure Databricks 的自动加载器,通过云数据连接器从云对象存储流式传输数据。 可以将连接器与存储在 Unity Catalog 卷或其他云对象存储位置中的文件配合使用。 Databricks 建议使用卷来管理对云对象存储中的数据的访问权限。 请参阅 “连接到数据源和外部服务”。

Azure Databricks 会优化此连接器,以便在云对象存储中流式引入数据,这些数据存储在常用的结构化、半结构化和非结构化格式中。 Databricks 建议以近乎原始的格式存储引入的数据,以最大程度地提高吞吐量,并最大程度地减少由于记录或架构更改而导致的潜在数据丢失。

有关从云对象存储引入数据的详细信息,请参阅 Lakeflow Connect 中的标准连接器

以下示例演示如何从卷内 JSON 文件目录中进行交互式流处理读取:

Python语言

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')