将 Delta Lake 与流式处理数据配合使用

已完成

我们浏览到此点的所有数据都是文件中的静态数据。 但是,许多数据分析方案涉及必须准实时处理的流式处理数据。 例如,可能需要捕获物联网 (IoT) 设备发出的读数,并在它们发生时将其存储在表中。

Spark 结构化流

典型的流处理解决方案涉及从源中不断读取数据流、选择性地处理它以选择特定字段、聚合值和对值进行分组,或以其他方式处理数据,并将结果写入接收器。

Spark 包括通过 Spark 结构化流式处理对流式处理数据的本机支持,这是一种基于无限数据帧的 API,在该数据帧中捕获流式处理数据进行处理。 Spark 结构化流式处理数据帧可以从多种不同类型的流式处理源中读取数据,包括网络端口、实时消息代理服务(如 Azure 事件中心或 Kafka)或文件系统位置。

小提示

有关 Spark 结构化流式处理的详细信息,请参阅 Spark 文档中的结构化流式处理编程指南

使用 Delta Lake 表进行流式处理

可以将 Delta Lake 表用作 Spark 结构化流式处理源或接收器。 例如,可以从 IoT 设备捕获实时数据流,并将流直接写入 Delta Lake 表作为接收器 - 使你能够查询表以查看最新的流式处理数据。 或者,可以将 Delta 表读取为流式处理源,这样就可以在将新数据添加到表中时不断报告新数据。

将 Delta Lake 表用作流式处理源

在以下 PySpark 示例中,Delta Lake 表用于存储 Internet 销售订单的详细信息。 创建一个流,用于在追加新数据时从 Delta Lake 表文件夹读取数据。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

注释

将 Delta Lake 表用作流式处理源时,只能将追加操作包含在流中。 除非指定 ignoreChangesignoreDeletes 选项,否则数据修改将导致错误。

将数据从 Delta Lake 表读取到流式处理数据帧后,可以使用 Spark 结构化流式处理 API 来处理这些数据。 在上面的示例中,只是显示了数据帧;但可以使用 Spark 结构化流式处理通过临时窗口聚合数据(例如计算每分钟下达的订单数量),并将聚合结果发送到下游进程以实现准实时可视化效果。

将 Delta Lake 表用作流式处理接收器

在以下 PySpark 示例中,从文件夹中的 JSON 文件中读取数据流。 每个文件中的 JSON 数据都包含 IoT 设备的状态,格式为 {"device":"Dev1","status":"ok"}。每当有文件添加到文件夹时,新数据就会添加到流中。 输入流是一个无限的数据帧,然后以增量格式将其写入 Delta Lake 表的文件夹位置。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

注释

checkpointLocation 选项用于写入跟踪流处理状态的检查点文件。 此文件使你能够在流处理中断的地方从故障中恢复。

流式处理过程启动后,可以查询要写入流式处理输出的 Delta Lake 表,以查看最新数据。 例如,以下代码为 Delta Lake 表文件夹创建目录表并对其进行查询:

%%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

若要停止写入 Delta Lake 表的数据流,可以使用流式处理查询的 stop 方法:

delta_stream.stop()

小提示

有关使用 Delta Lake 表进行流式处理数据的详细信息,请参阅 Delta Lake 文档中的表流式处理读取和写入