次の方法で共有


チュートリアル: Lakeflow 宣言パイプラインを使用して ETL パイプラインを構築する

Lakeflow 宣言パイプラインと自動ローダーを使用して、データ オーケストレーション用の ETL (抽出、変換、読み込み) パイプラインを作成してデプロイする方法について説明します。 ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックや重複除去の記録などの要件に基づいてデータを変換し、データ ウェアハウスやデータ レイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、Lakeflow 宣言パイプラインと自動ローダーを使用して、次の操作を行います。

  • ターゲット テーブルに生のソース データを取り込みます。
  • 生のソース データを変換し、変換されたデータを 2 つのターゲットマテリアライズド ビューに書き込みます。
  • 変換されたデータに対してクエリを実行します。
  • Databricks ジョブを使用して ETL パイプラインを自動化します。

Lakeflow 宣言パイプラインと自動ローダーの詳細については、「Lakeflow 宣言パイプライン自動ローダーとは」を参照してください。

必要条件

このチュートリアルを完了するには、次の要件を満たす必要があります。

データセットについて

この例で使用するデータセットは、現代音楽トラックの特徴とメタデータのコレクションである Million Song Dataset のサブセットです。 このデータセットは、Azure Databricks ワークスペースに含まれているサンプル データセット内にあります。

手順 1: パイプラインを作成する

まず、Lakeflow 宣言型パイプラインで ETL パイプラインを作成します。 Lakeflow 宣言パイプラインは、Lakeflow 宣言パイプライン構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義されている依存関係を解決することによってパイプラインを作成します。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインに複数の言語固有のノートブックまたはファイルを追加できます。 詳細については、「Lakeflow 宣言型パイプライン」を参照してください。

重要

ソース コードを自動的に作成するためのノートブックを作成および構成するには、[ソース コード] フィールドを空白のままにします。

このチュートリアルでは、サーバーレス コンピューティングと Unity カタログを使用します。 指定されていないすべての構成オプションで、既定の設定を使用します。 ワークスペースでサーバーレス コンピューティングが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。 既定のコンピューティング設定を使用する場合は、[パイプラインの作成] UI の [宛先] セクションの [ストレージ オプション] で Unity カタログを手動で選択する必要があります。

Lakeflow 宣言パイプラインで新しい ETL パイプラインを作成するには、次の手順に従います。

  1. ワークスペースで、[ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプライン
  2. [ 新規] の [ ETL パイプライン] をクリックします。
  3. [パイプライン名 に、一意のパイプライン名を入力します。
  4. [サーバーレス] チェック ボックスをオンにします。
  5. [変換先] で、テーブルが発行される Unity カタログの場所を構成するには、既存のカタログを選択し、スキーマに新しい名前を書き込んでカタログに新しいスキーマを作成します。
  6. Create をクリックしてください。

新しいパイプラインのパイプライン UI が表示されます。

手順 2: パイプラインを開発する

重要

ノートブックに含めることができるプログラミング言語は 1 つだけです。 パイプライン ソース コード ノートブックで Python と SQL コードを混在させないでください。

この手順では、Databricks Notebooks を使用して、Lakeflow 宣言型パイプラインのソース コードを対話的に開発および検証します。

このコードでは、増分データ インジェストに自動ローダーを使用します。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。 詳細については、「自動ローダーとは」を参照してください。

空のソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。 ノートブックは、ユーザー ディレクトリ内の新しいディレクトリに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、「 /Users/someone@example.com/my_pipeline/my_pipeline 」のように入力します。

パイプラインを開発するときは、Python または SQL のいずれかを選択できます。 両方の言語の例が含まれています。 言語の選択に基づいて、既定のノートブック言語を選択していることを確認します。 Lakeflow 宣言型パイプラインのコード開発に対するノートブックのサポートの詳細については、「Lakeflow 宣言型パイプライン」の ノートブックを使用した ETL パイプラインの開発とデバッグに関するページを参照してください。

  1. このノートブックにアクセスするためのリンクは、Pipeline の詳細 パネルの Source コード フィールドにあります。 リンクをクリックしてノートブックを開き、次の手順に進みます。

  2. 右上の Connect をクリックして、コンピューティング構成メニューを開きます。

  3. 手順 1 で作成したパイプラインの名前にカーソルを合わせます。

  4. [接続] をクリックします。

  5. 上部にあるノートブックのタイトルの横にあるノートブックの既定の言語 (Python または SQL) を選択します。

  6. 次のコードをコピーしてノートブックのセルに貼り付けます。

    Python(プログラミング言語)

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. [ 開始 ] をクリックして、接続されたパイプラインの更新を開始します。

手順 3: 変換されたデータに対してクエリを実行する

この手順では、ETL パイプラインで処理されたデータに対してクエリを実行して、曲データを分析します。 これらのクエリでは、前の手順で作成した準備済みレコードを使用します。

まず、1990 年以降、毎年最も多くの曲をリリースしているアーティストを検索するクエリを実行します。

  1. サイドバーで、[ SQL エディター] アイコン[SQL エディター] をクリックします。

  2. [ 追加] または [プラス] アイコン の [新しいタブ] アイコンをクリックし、メニューから [ 新しいクエリの作成 ] を選択します。

  3. 次のように入力します。

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    <catalog><schema>を、テーブルのカタログとスキーマの名前に置き換えます。 たとえば、「 data_pipelines.songs_data.top_artists_by_year 」のように入力します。

  4. [ 選択したファイルを実行] をクリックします。

次に、4/4ビートと踊り可能なテンポの曲を見つける別のクエリを実行します。

  1. [ 追加またはプラス] アイコン の新しいタップ アイコンをクリックし、メニューから [ 新しいクエリの作成 ] を選択します。

  2. 次のコードを入力します。

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    <catalog><schema>を、テーブルのカタログとスキーマの名前に置き換えます。 たとえば、「 data_pipelines.songs_data.songs_prepared 」のように入力します。

  3. [ 選択したファイルを実行] をクリックします。

手順 4: パイプラインを実行するジョブを作成する

次に、Databricks ジョブを使用してデータインジェスト、処理、分析の手順を自動化するワークフローを作成します。

  1. ワークスペースで、[ワークフロー] アイコンをクリックします。サイドバーのジョブとパイプライン
  2. [ 新規] の [ ジョブ] をクリックします。
  3. タスクのタイトル ボックスで、 新しいジョブ <日付と時刻> をジョブ名に置き換えます。 たとえば、「 Songs workflow 」のように入力します。
  4. [タスク名] に、最初のタスクの名前 (例: ETL_songs_data) を入力します。
  5. 種類で、パイプラインを選択します。
  6. [パイプライン] で、手順 1 で作成したパイプラインを選択します。
  7. Create をクリックしてください。
  8. ワークフローを実行するには、[ 今すぐ実行] をクリックします。 実行の詳細を表示するには、[ 実行 ] タブをクリックします。タスクをクリックすると、タスク実行の詳細が表示されます。
  9. ワークフローが完了したときに結果を表示するには、[ Go to the latest successful run]\(成功した最新の実行に移動 \) またはジョブ実行の 開始時刻 をクリックします。 [出力] ページが表示され、クエリの結果が表示されます。

ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。

手順 5: パイプライン ジョブをスケジュールする

スケジュールに従って ETL パイプラインを実行するには、次の手順に従います。

  1. ジョブと同じ Azure Databricks ワークスペースの [ジョブ] & [パイプライン] UI に移動します。
  2. 必要に応じて、ジョブ私が所有 フィルターを選択します。
  3. [名前] 列で、ジョブ名をクリックします。 サイド パネルにジョブの詳細が表示されます。
  4. [スケジュールとトリガー] パネルで [ トリガーの追加 ] をクリックし、[ トリガーの 種類 ] で [スケジュール済み ] を選択 します
  5. 期間、開始時刻、タイム ゾーンを指定します。
  6. [保存] をクリックします。

詳細情報