ストリーミング データで Delta Lake を使用する
ここまで調べてきたすべてのデータは、ファイル内の静的データでした。 しかし、多くのデータ分析シナリオでは、準リアルタイムで処理する必要がある "ストリーミング" データを使用します。 たとえば、モノのインターネット (IoT) デバイスによって生成された読み取り値をキャプチャし、それらが発生したときにテーブルに格納する必要がある場合があります。
Spark 構造化ストリーミング
一般的なストリーム処理ソリューションでは、"ソース" から絶えずデータのストリームを読み取り、必要に応じてそれを処理して特定のフィールドを選択したり、値を集計およびグループ化したり、データを操作したり、または結果を "シンク" に書き込んだりします。
Spark には、 Spark Structured Streaming を介したストリーミング データのネイティブ サポートが含まれています。これは、ストリーミング データが処理のためにキャプチャされる無制限のデータフレームに基づく API です。 Spark Structured Streaming データフレームでは、ネットワーク ポート、Azure Event Hubs や Kafka などのリアルタイム メッセージ ブローカー サービス、ファイル システムの場所など、さまざまな種類のストリーミング ソースからデータを読み取ることができます。
ヒント
Spark 構造化ストリーミングの詳細については、Spark ドキュメントの 「構造化ストリーミング プログラミング ガイド 」を参照してください。
Delta Lake テーブルを使用したストリーミング
Delta Lake テーブルは、Spark Structured Streaming のソースまたはシンクとして使用できます。 たとえば、IoT デバイスからリアルタイム データのストリームをキャプチャし、ストリームをシンクとしての Delta Lake テーブルに直接書き込めば、テーブルにクエリを実行して最新のストリーミング データを表示できます。 または、Delta テーブルをストリーミング ソースとして読み取ると、テーブルに追加された新しいデータを絶えずレポートできます。
Delta Lake テーブルをストリーミング ソースとして使用する
次の PySpark の例では、Delta Lake テーブルを使ってインターネット販売注文の詳細を格納します。 新しいデータが追加されると、Delta Lake テーブル フォルダーからデータを読み取るストリームが作成されます。
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
注
Delta Lake テーブルをストリーミング ソースとして使用する場合、"追加" 操作のみをストリームに含めることができます。
ignoreChanges
または ignoreDeletes
オプションを指定しない限り、データを変更するとエラーが発生します。
Delta Lake テーブルからストリーミング データフレームにデータを読み取った後は、Spark Structured Streaming API を使って処理できます。 上記の例では、データフレームはシンプルに表示されます。しかし、Spark Structured Streaming を使用して、テンポラル ウィンドウにわたるデータを集計し (たとえば、1 分ごとに発注された注文数をカウントするなど)、集計された結果をダウンストリーム プロセスに送信して、準リアルタイムで視覚化することもできます。
Delta Lake テーブルをストリーミング シンクとして使用する
次の PySpark の例では、フォルダー内の JSON ファイルからデータのストリームが読み取られます。 各ファイルの JSON データには、IoT デバイスの状態が {"device":"Dev1","status":"ok"}
という形式で含まれます。フォルダーにファイルが追加されるたびに、ストリームに新しいデータが追加されます。 入力ストリームは境界のないデータフレームであり、Delta Lake テーブルのフォルダーの場所にデルタ形式で書き込まれます。
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
注
checkpointLocation
オプションは、ストリーム処理の状態を追跡するチェックポイント ファイルを書き込むために使用されます。 このファイルを使用すると、ストリーム処理が中断された時点で障害から復旧できます。
ストリーミング処理が開始されたら、ストリーミングの出力が書き込まれている Delta Lake テーブルに対してクエリを実行し、最新のデータを確認できます。 たとえば、次のコードでは、Delta Lake テーブル フォルダーのカタログ テーブルを作成し、クエリを実行します。
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Delta Lake テーブルに書き込まれているデータのストリームを停止するには、ストリーミング クエリの stop
メソッドを使用します。
delta_stream.stop()
ヒント
ストリーミング データに Delta Lake テーブルを使用する方法の詳細については、Delta Lake ドキュメントの テーブル ストリーミングの読み取りと書き込みを 参照してください。