次の方法で共有


チュートリアル: Lakeflow 宣言パイプラインで変更データ キャプチャを使用して ETL パイプラインを構築する

データ オーケストレーションと自動ローダー用の Lakeflow 宣言パイプラインを使用して、変更データ キャプチャ (CDC) を使用して ETL (抽出、変換、読み込み) パイプラインを作成してデプロイする方法について説明します。 ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックや重複除去の記録などの要件に基づいてデータを変換し、データ ウェアハウスやデータ レイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、MySQL データベースの customers テーブルのデータを使用して、次の操作を行います。

  • Debezium またはその他のツールを使用してトランザクション データベースから変更を抽出し、クラウド オブジェクト ストレージ (S3 フォルダー、ADLS、GCS) に保存します。 チュートリアルを簡略化するために、外部 CDC システムの設定をスキップします。
  • 自動ローダーを使用して、クラウド オブジェクト ストレージからメッセージを増分読み込みし、生メッセージを customers_cdc テーブルに格納します。 自動ローダーはスキーマを推論し、スキーマの進化を処理します。
  • ビュー customers_cdc_clean を追加して、期待値を使用してデータ品質を確認します。 たとえば、アップサート操作を実行するために使用するので、idnullとしてはいけません。
  • クリーンアップされた CDC データに対して AUTO CDC ... INTO (アップサートを実行) を実行して、最終的な customers テーブルに変更を適用します
  • すべての変更を追跡するために、Lakeflow 宣言型パイプラインで型 2 の緩やかに変化するディメンション (SCD2) を作成する方法を示します。

目標は、生データをほぼリアルタイムで取り込み、データ品質を確保しながらアナリスト チームのテーブルを構築することです。

このチュートリアルでは medallion Lakehouse アーキテクチャを使用します。このアーキテクチャでは、ブロンズ レイヤーを介して生データを取り込み、シルバー レイヤーでデータをクリーンアップおよび検証し、ゴールド レイヤーを使用してディメンション モデリングと集計を適用します。 詳細については、「 medallion lakehouse のアーキテクチャとは」 を参照してください。

実装するフローは次のようになります。

CDC と LDP

Lakeflow 宣言パイプライン、自動ローダー、CDC の詳細については、「Lakeflow 宣言パイプライン」、「自動ローダーとは」、および「変更データ キャプチャ (CDC)」を参照してください。

必要条件

このチュートリアルを完了するには、次の要件を満たす必要があります。

ETL パイプラインでのデータ キャプチャの変更

変更データ キャプチャ (CDC) は、トランザクション データベース (MySQL や PostgreSQL など) またはデータ ウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作をキャプチャします。通常は、外部システムでテーブルを再具体化するためのストリームとしてキャプチャされます。 CDC を使用すると、増分読み込みが可能になり、一括読み込みの更新が不要になります。

このチュートリアルを簡略化するには、外部 CDC システムの設定をスキップします。 BLOB ストレージ (S3、ADLS、GCS) で CDC データを JSON ファイルとして起動して実行し、保存することを検討できます。

CDC のデータ取得

さまざまな CDC ツールを使用できます。 オープンソース リーダー ソリューションの 1 つが Debezium ですが、Fivetran、Qlik Replicate、Streamset、Talend、Oracle GoldenGate、AWS DMS など、データ ソースを簡略化する他の実装が存在します。

このチュートリアルでは、Debezium や DMS などの外部システムの CDC データを使用します。 Debezium は、変更されたすべての行をキャプチャします。 通常、データ変更の履歴を Kafka ログに送信するか、ファイルとして保存します。

CDC 情報を customers テーブル (JSON 形式) から取り込み、それが正しいことを確認してから、Lakehouse で顧客テーブルを具体化する必要があります。

Debezium からの CDC 入力

変更ごとに、更新される行のすべてのフィールド (idfirstnamelastnameemailaddress) を含む JSON メッセージが表示されます。 さらに、次のような追加のメタデータ情報があります。

  • operation: 通常、操作コード (DELETEAPPENDUPDATE)。
  • operation_date: 各操作アクションのレコードの日付とタイムスタンプ。

Debezium などのツールでは、変更前の行の値など、より高度な出力を生成できますが、このチュートリアルではわかりやすくするために省略します。

手順 0: チュートリアル データのセットアップ

まず、新しいノートブックを作成し、このチュートリアルで使用するデモ ファイルをワークスペースにインストールする必要があります。

  1. 左上隅の [ 新規 ] をクリックします。

  2. [ ノートブック] をクリックします。

  3. ノートブックのタイトルを<から>に変更します。

  4. 上部にあるノートブックのタイトルの横にあるノートブックの既定の言語を Python に設定します。

  5. チュートリアルで使用するデータセットを生成するには、最初のセルに次のコードを入力し、「 Shift + Enter 」と入力してコードを実行します。

    # You can change the catalog, schema, dbName, and db. If you do so, you must also
    # change the names in the rest of the tutorial.
    catalog = "main"
    schema = dbName = db = "dbdemos_dlt_cdc"
    volume_name = "raw_data"
    
    spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
    volume_folder =  f"/Volumes/{catalog}/{db}/{volume_name}"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exists, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  6. このチュートリアルで使用するデータをプレビューするには、次のセルにコードを入力し、「 Shift + Enter 」と入力してコードを実行します。

    display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
    

手順 1: パイプラインを作成する

まず、Lakeflow 宣言型パイプラインで ETL パイプラインを作成します。 Lakeflow 宣言パイプラインは、Lakeflow 宣言パイプライン構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義されている依存関係を解決することによってパイプラインを作成します。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインに複数の言語固有のノートブックまたはファイルを追加できます。 詳細については、「Lakeflow 宣言型パイプライン」を参照してください。

重要

ソース コードの作成用にノートブックを自動的に作成して構成するには、[ソース コード] フィールドを空白のままにします。

このチュートリアルでは、サーバーレス コンピューティングと Unity カタログを使用します。 指定されていないすべての構成オプションで、既定の設定を使用します。 ワークスペースでサーバーレス コンピューティングが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。 既定のコンピューティング設定を使用する場合は、[パイプラインの作成] UI の [宛先] セクションの [ストレージ オプション] で Unity カタログを手動で選択する必要があります。

Lakeflow 宣言パイプラインで新しい ETL パイプラインを作成するには、次の手順に従います。

  1. ワークスペースで、[ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプライン
  2. [ 新規] の [ ETL パイプライン] をクリックします。
  3. [パイプライン名 に、一意のパイプライン名を入力します。
  4. [サーバーレス] チェック ボックスをオンにします。
  5. トリガーパイプライン モードで選択します。 これにより、AvailableNow トリガーを使用してストリーミング フローが実行され、既存のすべてのデータが処理され、ストリームがシャットダウンされます。
  6. [変換先] で、テーブルが発行される Unity カタログの場所を構成するには、既存のカタログを選択し、スキーマに新しい名前を書き込んでカタログに新しいスキーマを作成します。
  7. Create をクリックしてください。

新しいパイプラインのパイプライン UI が表示されます。

空のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。 ノートブックは、ユーザー ディレクトリ内の新しいディレクトリに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、/Users/someone@example.com/my_pipeline/my_pipeline のようにします。

  1. このノートブックにアクセスするためのリンクは、Pipeline の詳細 パネルの Source コード フィールドにあります。 リンクをクリックしてノートブックを開き、次の手順に進みます。
  2. 右上の Connect をクリックして、コンピューティング構成メニューを開きます。
  3. 手順 1 で作成したパイプラインの名前にカーソルを合わせます。
  4. [接続] をクリックします。
  5. 上部にあるノートブックのタイトルの横にあるノートブックの既定の言語 (Python または SQL) を選択します。

重要

ノートブックに含めることができるプログラミング言語は 1 つだけです。 パイプライン ソース コード ノートブックで Python と SQL コードを混在させないでください。

Lakeflow 宣言パイプラインを開発するときは、Python または SQL のいずれかを選択できます。 このチュートリアルには、両方の言語の例が含まれています。 言語の選択に基づいて、既定のノートブック言語を選択していることを確認します。

Lakeflow 宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、「Lakeflow 宣言型パイプライン」の ノートブックを使用した ETL パイプラインの開発とデバッグに関するページを参照してください。

手順 2: 自動ローダーを使用してデータを増分的に取り込む

最初の手順では、クラウド ストレージからブロンズ レイヤーに生データを取り込みます。

これはいくつかの理由で困難になる可能性があり、その理由の一部は以下の通りです:

  • 大規模に動作し、何百万もの小さなファイルを取り込む可能性があります。
  • スキーマと JSON 型を推論します。
  • 不適切な JSON スキーマを使用して不適切なレコードを処理します。
  • スキーマの進化 (たとえば、顧客テーブルの新しい列) に注意してください。

自動ローダーを使用すると、スキーマの推論やスキーマの進化など、このインジェストが簡素化され、数百万もの受信ファイルにスケーリングされます。 自動ローダーは、 cloudFiles を使用する Python と、 SELECT * FROM STREAM read_files(...) を使用した SQL で使用でき、さまざまな形式 (JSON、CSV、Apache Avro など) で使用できます。

テーブルをストリーミング テーブルとして定義すると、新しい受信データのみを使用できます。 ストリーミング テーブルとして定義しない場合は、使用可能なすべてのデータをスキャンして取り込みます。 詳細については、 ストリーミング テーブルを 参照してください。

  1. 自動ローダーを使用して受信データを取り込むには、次のコードをコピーしてノートブックの最初のセルに貼り付けます。 前の手順で選択したノートブックの既定の言語に応じて、Python または SQL を使用できます。

    パイソン

    from dlt import *
    from pyspark.sql.functions import *
    
    # Create the target bronze table
    dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

手順 3: データ品質を追跡するためのクリーンアップと期待

ブロンズ レイヤーが定義されたら、次の条件を確認して、データ品質を制御するための期待値を追加して、シルバー レイヤーを作成します。

  • ID を nullしないでください。
  • CDC 操作タイプは有効でなければなりません。
  • jsonは、オートローダーによって適切に読み取られた必要があります。

これらの条件のいずれかが考慮されない場合、行は削除されます。

詳細については、 パイプラインの期待に応えたデータ品質の管理に関する ページを参照してください。

  1. 下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。

  2. クレンジングされたテーブルを含むシルバー レイヤーを作成し、制約を適用するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    パイソン

    dlt.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          dlt.read_stream("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

手順 4: AUTO CDC フローを使用して顧客テーブルを具体化する

customers テーブルには、最も up-to-date ビューが含まれており、元のテーブルのレプリカになります。

これは、手動で実装するのは容易ではありません。 最新の行を保持するには、データ重複除去などを考慮する必要があります。

ただし、Lakeflow 宣言パイプラインは、 AUTO CDC 操作を使用してこれらの課題を解決します。

  1. 下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。

  2. Lakeflow 宣言パイプラインで AUTO CDC を使用して CDC データを処理するには、次のコードをコピーしてノートブックの新しいセルに貼り付けます。

    パイソン

    dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dlt.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

ステップ 5: タイプ 2 の緩やかに変化するディメンション (SCD2)

多くの場合、 APPENDUPDATE、および DELETEに起因するすべての変更を追跡するテーブルを作成する必要があります。

  • 履歴: テーブルに対するすべての変更の履歴を保持する必要があります。
  • 追跡可能性: 発生した操作を確認します。

SCD2 と Lakeflow 宣言型パイプライン

Delta では変更データ フロー (CDF) がサポートされており、 table_change は SQL と Python でテーブルの変更を照会できます。 ただし、CDF の主なユース ケースは、パイプライン内の変更をキャプチャし、最初からテーブルの変更の完全なビューを作成しないことです。

順序が整っていないイベントがある場合は、実装が特に複雑になります。 変更をタイムスタンプで並べ替え、過去に行った変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、前のエントリを更新する必要があります。

Lakeflow 宣言型パイプラインでは、この複雑さが排除され、最初からのすべての変更を含む個別のテーブルを作成できます。 このテーブルは、必要に応じて、特定のパーティション/zorder 列と共に大規模に使用できます。 順序が乱れたフィールドは、_sequence_byに基づいて自動的に処理されます。

SCD2 テーブルを作成するには、SQL で STORED AS SCD TYPE 2 するか、Python で stored_as_scd_type="2" するオプションを使用する必要があります。

次のオプションを使用して、フィーチャが追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. 下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    パイソン

    # create the table
    dlt.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dlt.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW cusotmers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

手順 6: 情報を最も変更したユーザーを追跡する具体化されたビューを作成する

テーブル customers_history には、ユーザーが自分の情報に加えたすべての履歴変更が含まれています。 これで、情報を最も変更したユーザーを追跡する単純な具体化されたビューがゴールド レイヤーに作成されます。 これは、実際のシナリオで不正行為の検出分析やユーザーの推奨事項に使用できます。 さらに、SCD2 で変更を適用すると重複が既に削除されているため、ユーザー ID ごとに行を直接カウントできます。

  1. 下の [ 編集] と [ 挿入] セル をクリックして、新しい空のセルを挿入します。

  2. 次のコードをコピーして、ノートブックの新しいセルに貼り付けます。

    パイソン

    @dlt.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        dlt.read("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  3. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

手順 7: ETL パイプラインを実行するジョブを作成する

次に、Databricks ジョブを使用してデータインジェスト、処理、分析の手順を自動化するワークフローを作成します。

  1. ワークスペースで、[ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプライン
  2. [ 新規] の [ ジョブ] をクリックします。
  3. タスクのタイトル ボックスで、 新しいジョブ <日付と時刻> をジョブ名に置き換えます。 たとえば、CDC customers workflow のようにします。
  4. [タスク名] に、最初のタスクの名前 (例: ETL_customers_data) を入力します。
  5. 種類で、パイプラインを選択します。
  6. [パイプライン] で、手順 1 で作成したパイプラインを選択します。
  7. Create をクリックしてください。
  8. ワークフローを実行するには、[ 今すぐ実行] をクリックします。 実行の詳細を表示するには、[ 実行 ] タブをクリックします。タスクをクリックすると、タスク実行の詳細が表示されます。
  9. ワークフローが完了したときに結果を表示するには、[ Go to the latest successful run]\(成功した最新の実行に移動 \) またはジョブ実行の 開始時刻 をクリックします。 [出力] ページが表示され、クエリの結果が表示されます。

ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。

手順 8: ジョブをスケジュールする

スケジュールに従って ETL パイプラインを実行するには、次の手順に従います。

  1. [ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプライン
  2. 必要に応じて、ジョブ私が所有 フィルターを選択します。
  3. [名前] 列で、ジョブ名をクリックします。 サイド パネルが ジョブの詳細として表示されます。
  4. [スケジュールとトリガー] パネルで [ トリガーの追加 ] をクリックし、[ トリガーの 種類 ] で [スケジュール済み ] を選択 します
  5. 期間、開始時刻、タイム ゾーンを指定します。
  6. [保存] をクリックします。

その他のリソース