データ オーケストレーションと自動ローダー用の Lakeflow 宣言パイプラインを使用して、変更データ キャプチャ (CDC) を使用して ETL (抽出、変換、読み込み) パイプラインを作成してデプロイする方法について説明します。 ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックや重複除去の記録などの要件に基づいてデータを変換し、データ ウェアハウスやデータ レイクなどのターゲット システムにデータを書き込む手順を実装します。
このチュートリアルでは、MySQL データベースの customers
テーブルのデータを使用して、次の操作を行います。
- Debezium またはその他のツールを使用してトランザクション データベースから変更を抽出し、クラウド オブジェクト ストレージ (S3 フォルダー、ADLS、GCS) に保存します。 チュートリアルを簡略化するために、外部 CDC システムの設定をスキップします。
- 自動ローダーを使用して、クラウド オブジェクト ストレージからメッセージを増分読み込みし、生メッセージを
customers_cdc
テーブルに格納します。 自動ローダーはスキーマを推論し、スキーマの進化を処理します。 - ビュー
customers_cdc_clean
を追加して、期待値を使用してデータ品質を確認します。 たとえば、アップサート操作を実行するために使用するので、id
をnull
としてはいけません。 - クリーンアップされた CDC データに対して
AUTO CDC ... INTO
(アップサートを実行) を実行して、最終的なcustomers
テーブルに変更を適用します - すべての変更を追跡するために、Lakeflow 宣言型パイプラインで型 2 の緩やかに変化するディメンション (SCD2) を作成する方法を示します。
目標は、生データをほぼリアルタイムで取り込み、データ品質を確保しながらアナリスト チームのテーブルを構築することです。
このチュートリアルでは medallion Lakehouse アーキテクチャを使用します。このアーキテクチャでは、ブロンズ レイヤーを介して生データを取り込み、シルバー レイヤーでデータをクリーンアップおよび検証し、ゴールド レイヤーを使用してディメンション モデリングと集計を適用します。 詳細については、「 medallion lakehouse のアーキテクチャとは」 を参照してください。
実装するフローは次のようになります。
Lakeflow 宣言パイプライン、自動ローダー、CDC の詳細については、「Lakeflow 宣言パイプライン」、「自動ローダーとは」、および「変更データ キャプチャ (CDC)」を参照してください。
必要条件
このチュートリアルを完了するには、次の要件を満たす必要があります。
- Azure Databricks ワークスペースにログインします。
- ワークスペースに対して Unity カタログ を有効にします。
- アカウントでサーバーレス コンピューティングを有効にする。 サーバーレス Lakeflow 宣言型パイプラインは、すべてのワークスペース リージョンで使用できるわけではありません。 利用可能な地域は地域限定機能をご覧ください。
- コンピューティング リソースを作成したり、コンピューティング リソースにアクセスしたりするためのアクセス許可を持っている。
-
カタログに新しいスキーマを作成するためのアクセス許可を持っている。 必要なアクセス許可は、
ALL PRIVILEGES
またはUSE CATALOG
とCREATE SCHEMA
です。 -
既存のスキーマに新しいボリュームを作成するためのアクセス許可を持っている。 必要なアクセス許可は、
ALL PRIVILEGES
またはUSE SCHEMA
とCREATE VOLUME
です。
ETL パイプラインでのデータ キャプチャの変更
変更データ キャプチャ (CDC) は、トランザクション データベース (MySQL や PostgreSQL など) またはデータ ウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作をキャプチャします。通常は、外部システムでテーブルを再具体化するためのストリームとしてキャプチャされます。 CDC を使用すると、増分読み込みが可能になり、一括読み込みの更新が不要になります。
注
このチュートリアルを簡略化するには、外部 CDC システムの設定をスキップします。 BLOB ストレージ (S3、ADLS、GCS) で CDC データを JSON ファイルとして起動して実行し、保存することを検討できます。
CDC のデータ取得
さまざまな CDC ツールを使用できます。 オープンソース リーダー ソリューションの 1 つが Debezium ですが、Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate、AWS DMS など、データ ソースを簡略化する他の実装が存在します。
このチュートリアルでは、Debezium や DMS などの外部システムの CDC データを使用します。 Debezium は、変更されたすべての行をキャプチャします。 通常、データ変更の履歴を Kafka ログに送信するか、ファイルとして保存します。
CDC 情報を customers
テーブル (JSON 形式) から取り込み、それが正しいことを確認してから、Lakehouse で顧客テーブルを具体化する必要があります。
Debezium からの CDC 入力
変更ごとに、更新される行のすべてのフィールド (id
、 firstname
、 lastname
、 email
、 address
) を含む JSON メッセージが表示されます。 さらに、次のような追加のメタデータ情報があります。
-
operation
: 通常、操作コード (DELETE
、APPEND
、UPDATE
)。 -
operation_date
: 各操作アクションのレコードの日付とタイムスタンプ。
Debezium などのツールでは、変更前の行の値など、より高度な出力を生成できますが、このチュートリアルではわかりやすくするために省略します。
手順 0: チュートリアル データのセットアップ
まず、新しいノートブックを作成し、このチュートリアルで使用するデモ ファイルをワークスペースにインストールする必要があります。
左上隅の [ 新規 ] をクリックします。
[ ノートブック] をクリックします。
ノートブックのタイトルを<から>に変更します。
上部にあるノートブックのタイトルの横にあるノートブックの既定の言語を Python に設定します。
チュートリアルで使用するデータセットを生成するには、最初のセルに次のコードを入力し、「 Shift + Enter 」と入力してコードを実行します。
# You can change the catalog, schema, dbName, and db. If you do so, you must also # change the names in the rest of the tutorial. catalog = "main" schema = dbName = db = "dbdemos_dlt_cdc" volume_name = "raw_data" spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`') spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`') volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exists, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
このチュートリアルで使用するデータをプレビューするには、次のセルにコードを入力し、「 Shift + Enter 」と入力してコードを実行します。
display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
手順 1: パイプラインを作成する
まず、Lakeflow 宣言型パイプラインで ETL パイプラインを作成します。 Lakeflow 宣言パイプラインは、Lakeflow 宣言パイプライン構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義されている依存関係を解決することによってパイプラインを作成します。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインに複数の言語固有のノートブックまたはファイルを追加できます。 詳細については、「Lakeflow 宣言型パイプライン」を参照してください。
重要
ソース コードの作成用にノートブックを自動的に作成して構成するには、[ソース コード] フィールドを空白のままにします。
このチュートリアルでは、サーバーレス コンピューティングと Unity カタログを使用します。 指定されていないすべての構成オプションで、既定の設定を使用します。 ワークスペースでサーバーレス コンピューティングが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。 既定のコンピューティング設定を使用する場合は、[パイプラインの作成] UI の [宛先] セクションの [ストレージ オプション] で Unity カタログを手動で選択する必要があります。
Lakeflow 宣言パイプラインで新しい ETL パイプラインを作成するには、次の手順に従います。
- ワークスペースで、[ワークフロー] アイコンをクリック
サイドバーのジョブとパイプライン。
- [ 新規] の [ ETL パイプライン] をクリックします。
- [パイプライン名 に、一意のパイプライン名を入力します。
- [サーバーレス] チェック ボックスをオンにします。
- トリガー をパイプライン モードで選択します。 これにより、AvailableNow トリガーを使用してストリーミング フローが実行され、既存のすべてのデータが処理され、ストリームがシャットダウンされます。
- [変換先] で、テーブルが発行される Unity カタログの場所を構成するには、既存のカタログを選択し、スキーマに新しい名前を書き込んでカタログに新しいスキーマを作成します。
- Create をクリックしてください。
新しいパイプラインのパイプライン UI が表示されます。
空のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。 ノートブックは、ユーザー ディレクトリ内の新しいディレクトリに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、/Users/someone@example.com/my_pipeline/my_pipeline
のようにします。
- このノートブックにアクセスするためのリンクは、Pipeline の詳細 パネルの Source コード フィールドにあります。 リンクをクリックしてノートブックを開き、次の手順に進みます。
- 右上の Connect をクリックして、コンピューティング構成メニューを開きます。
- 手順 1 で作成したパイプラインの名前にカーソルを合わせます。
- [接続] をクリックします。
- 上部にあるノートブックのタイトルの横にあるノートブックの既定の言語 (Python または SQL) を選択します。
重要
ノートブックに含めることができるプログラミング言語は 1 つだけです。 パイプライン ソース コード ノートブックで Python と SQL コードを混在させないでください。
Lakeflow 宣言パイプラインを開発するときは、Python または SQL のいずれかを選択できます。 このチュートリアルには、両方の言語の例が含まれています。 言語の選択に基づいて、既定のノートブック言語を選択していることを確認します。
Lakeflow 宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、「Lakeflow 宣言型パイプライン」の ノートブックを使用した ETL パイプラインの開発とデバッグに関するページを参照してください。
手順 2: 自動ローダーを使用してデータを増分的に取り込む
最初の手順では、クラウド ストレージからブロンズ レイヤーに生データを取り込みます。
これはいくつかの理由で困難になる可能性があり、その理由の一部は以下の通りです:
- 大規模に動作し、何百万もの小さなファイルを取り込む可能性があります。
- スキーマと JSON 型を推論します。
- 不適切な JSON スキーマを使用して不適切なレコードを処理します。
- スキーマの進化 (たとえば、顧客テーブルの新しい列) に注意してください。
自動ローダーを使用すると、スキーマの推論やスキーマの進化など、このインジェストが簡素化され、数百万もの受信ファイルにスケーリングされます。 自動ローダーは、 cloudFiles
を使用する Python と、 SELECT * FROM STREAM read_files(...)
を使用した SQL で使用でき、さまざまな形式 (JSON、CSV、Apache Avro など) で使用できます。
テーブルをストリーミング テーブルとして定義すると、新しい受信データのみを使用できます。 ストリーミング テーブルとして定義しない場合は、使用可能なすべてのデータをスキャンして取り込みます。 詳細については、 ストリーミング テーブルを 参照してください。
自動ローダーを使用して受信データを取り込むには、次のコードをコピーしてノートブックの最初のセルに貼り付けます。 前の手順で選択したノートブックの既定の言語に応じて、Python または SQL を使用できます。
パイソン
from dlt import * from pyspark.sql.functions import * # Create the target bronze table dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers", format => "json", inferColumnTypes => "true" )
[ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。
手順 3: データ品質を追跡するためのクリーンアップと期待
ブロンズ レイヤーが定義されたら、次の条件を確認して、データ品質を制御するための期待値を追加して、シルバー レイヤーを作成します。
- ID を
null
しないでください。 - CDC 操作タイプは有効でなければなりません。
-
json
は、オートローダーによって適切に読み取られた必要があります。
これらの条件のいずれかが考慮されない場合、行は削除されます。
詳細については、 パイプラインの期待に応えたデータ品質の管理に関する ページを参照してください。
下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。
クレンジングされたテーブルを含むシルバー レイヤーを作成し、制約を適用するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。
パイソン
dlt.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( dlt.read_stream("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;
[ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。
手順 4: AUTO CDC フローを使用して顧客テーブルを具体化する
customers
テーブルには、最も up-to-date ビューが含まれており、元のテーブルのレプリカになります。
これは、手動で実装するのは容易ではありません。 最新の行を保持するには、データ重複除去などを考慮する必要があります。
ただし、Lakeflow 宣言パイプラインは、 AUTO CDC
操作を使用してこれらの課題を解決します。
下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。
Lakeflow 宣言パイプラインで
AUTO CDC
を使用して CDC データを処理するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。パイソン
dlt.create_streaming_table(name="customers", comment="Clean, materialized customers") dlt.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )
SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;
[ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。
ステップ 5: タイプ 2 の緩やかに変化するディメンション (SCD2)
多くの場合、 APPEND
、 UPDATE
、および DELETE
に起因するすべての変更を追跡するテーブルを作成する必要があります。
- 履歴: テーブルに対するすべての変更の履歴を保持する必要があります。
- 追跡可能性: 発生した操作を確認します。
SCD2 と Lakeflow 宣言型パイプライン
Delta では変更データ フロー (CDF) がサポートされており、 table_change
は SQL と Python でテーブルの変更を照会できます。 ただし、CDF の主なユース ケースは、パイプライン内の変更をキャプチャし、最初からテーブルの変更の完全なビューを作成しないことです。
順序が整っていないイベントがある場合は、実装が特に複雑になります。 変更をタイムスタンプで並べ替え、過去に行った変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、前のエントリを更新する必要があります。
Lakeflow 宣言型パイプラインでは、この複雑さが排除され、最初からのすべての変更を含む個別のテーブルを作成できます。 このテーブルは、必要に応じて、特定のパーティション/zorder 列と共に大規模に使用できます。 順序が乱れたフィールドは、_sequence_byに基づいて自動的に処理されます。
SCD2 テーブルを作成するには、SQL で STORED AS SCD TYPE 2
するか、Python で stored_as_scd_type="2"
するオプションを使用する必要があります。
注
次のオプションを使用して、フィーチャが追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。
次のコードをコピーして、ノートブックの新しいセルに貼り付けます。
パイソン
# create the table dlt.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dlt.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updates
SQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW cusotmers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;
[ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。
手順 6: 情報を最も変更したユーザーを追跡する具体化されたビューを作成する
テーブル customers_history
には、ユーザーが自分の情報に加えたすべての履歴変更が含まれています。 これで、情報を最も変更したユーザーを追跡する単純な具体化されたビューがゴールド レイヤーに作成されます。 これは、実際のシナリオで不正行為の検出分析やユーザーの推奨事項に使用できます。 さらに、SCD2 で変更を適用すると重複が既に削除されているため、ユーザー ID ごとに行を直接カウントできます。
下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。
次のコードをコピーして、ノートブックの新しいセルに貼り付けます。
パイソン
@dlt.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( dlt.read("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )
SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id
[ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。
手順 7: ETL パイプラインを実行するジョブを作成する
次に、Databricks ジョブを使用してデータインジェスト、処理、分析の手順を自動化するワークフローを作成します。
- ワークスペースで、[ワークフロー] アイコンをクリック
サイドバーのジョブとパイプライン。
- [ 新規] の [ ジョブ] をクリックします。
- タスクのタイトル ボックスで、 新しいジョブ <日付と時刻> をジョブ名に置き換えます。 たとえば、
CDC customers workflow
のようにします。 -
[タスク名] に、最初のタスクの名前 (例:
ETL_customers_data
) を入力します。 - 種類で、パイプラインを選択します。
- [パイプライン] で、手順 1 で作成したパイプラインを選択します。
- Create をクリックしてください。
- ワークフローを実行するには、[ 今すぐ実行] をクリックします。 実行の詳細を表示するには、[ 実行 ] タブをクリックします。タスクをクリックすると、タスク実行の詳細が表示されます。
- ワークフローが完了したときに結果を表示するには、[ Go to the latest successful run]\(成功した最新の実行に移動 \) またはジョブ実行の 開始時刻 をクリックします。 [出力] ページが表示され、クエリの結果が表示されます。
ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。
手順 8: ジョブをスケジュールする
スケジュールに従って ETL パイプラインを実行するには、次の手順に従います。
- [
サイドバーのジョブとパイプライン。
- 必要に応じて、ジョブ と 私が所有 フィルターを選択します。
- [名前] 列で、ジョブ名をクリックします。 サイド パネルが ジョブの詳細として表示されます。
- [スケジュールとトリガー] パネルで [ トリガーの追加 ] をクリックし、[ トリガーの 種類 ] で [スケジュール済み ] を選択 します。
- 期間、開始時刻、タイム ゾーンを指定します。
- [保存] をクリックします。