次の方法で共有


一般的なデータ読み込みパターン

自動ローダーを使用すると、一般的なデータ インジェスト タスクが簡単になります。 このクイック リファレンスでは、いくつかの一般的なパターンの例を示します。

glob パターンを使用したディレクトリまたはファイルのフィルター処理

glob パターンは、パスに指定されているときに、ディレクトリとファイルのフィルター処理に使用できます。

パターン 説明
? 任意の 1 文字と一致します
* 0 個以上の文字と一致します
[abc] 文字セット {a, b, c} の 1 文字と一致します。
[a-z] 文字範囲 {a…z} の 1 文字と一致します。
[^a] 文字セットまたは範囲 {a} からのものではない 1 文字と一致します。 ^ 文字は左角かっこのすぐ右側に表示されることに注意してください。
{ab,cd} 文字列セット {ab, cd} の文字列と一致します。
{ab,c{de, fh}} 文字列セット {ab, cde, cfh} の文字列と一致します。

次の例のように、プレフィックス パターンを指定するには path を使用します。

Python(プログラミング言語)

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

スカラ (プログラミング言語)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

サフィックス パターンを明示的に指定するには、オプション pathGlobFilter を使用する必要があります。 path は、プレフィックス フィルターのみを提供します。

たとえば、さまざまなサフィックスのファイルが含まれているディレクトリ内で png ファイルのみを解析する場合は、次のようにします。

Python(プログラミング言語)

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

スカラ (プログラミング言語)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

注意

自動ローダーの既定のグロビング動作は、他の Spark ファイル ソースの既定の動作とは異なります。 ファイル ソースに対する既定の Spark 動作に一致するグロビングを使用するには、読み取りに .option("cloudFiles.useStrictGlobber", "true") を追加します。 globbing の詳細については、次の表を参照してください。

パターン [ファイル パス] 既定の globber 厳密な globber
/a/b /a/b/c/file.txt はい はい
/a/b /a/b_dir/c/file.txt いいえ いいえ
/a/b /a/b.txt いいえ いいえ
/a/b/ /a/b.txt いいえ いいえ
/a/*/c/ /a/b/c/file.txt はい はい
/a/*/c/ /a/b/c/d/file.txt はい はい
/a/*/c/ /a/b/x/y/c/file.txt はい いいえ
/a/*/c /a/b/c_file.txt はい いいえ
/a/*/c/ /a/b/c_file.txt はい いいえ
/a/*/c/ /a/*/cookie/file.txt はい いいえ
/a/b* /a/b.txt はい はい
/a/b* /a/b/file.txt はい はい
/a/{0.txt,1.txt} /a/0.txt はい はい
/a/*/{0.txt,1.txt} /a/0.txt いいえ いいえ
/a/b/[cde-h]/i/ /a/b/c/i/file.txt はい はい

簡単な ETL を有効にする

データを失うことなく Delta Lake にデータを取得する簡単な方法は、次のパターンを使用し、自動ローダーでスキーマ推論を有効にすることです。 Databricks では、ソース データのスキーマが変更されたときにストリームを自動的に再起動するために、Azure Databricks ジョブで次のコードを実行することをお勧めします。 既定では、スキーマは文字列型として推論され、解析エラー (すべてが文字列として残っている場合は何もないはずです) は _rescued_dataに移動し、新しい列はすべてストリームに失敗し、スキーマを進化させます。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-___location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-___location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

適切に構造化されたデータのデータ損失を防ぐ

スキーマがわかっているが、予期しないデータを受け取るたびに知りたい場合、Databricks では rescuedDataColumnを使用することをお勧めします。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

スキーマと一致しない新しいフィールドが導入された場合にストリームの処理を停止する場合は、次を追加できます。

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

柔軟な半構造化データ パイプラインを有効にする

ベンダーから提供される情報に新しい列が追加されたデータを受け取る際、いつそれが行われたのか正確に把握できない場合や、データパイプラインを更新するための余裕がない可能性があります。 スキーマの進化を利用してストリームを再起動し、自動ローダーが推論されたスキーマを自動的に更新できるようになりました。 ベンダーが提供する可能性のある "スキーマレス" フィールドに schemaHints を利用することもできます。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

入れ子になった JSON データを変換する

自動ローダーは最上位レベルの JSON 列を文字列として推論するため、さらに変換を必要とする入れ子になった JSON オブジェクトを残すことができます。 半構造化データ アクセス API を使用して、複雑な JSON コンテンツをさらに変換できます。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

入れ子になった JSON データを推論する

入れ子になったデータがある場合は、 cloudFiles.inferColumnTypes オプションを使用して、データとその他の列型の入れ子構造を推測できます。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

ヘッダーのない CSV ファイルを読み込む

Python(プログラミング言語)

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

スカラ (プログラミング言語)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

ヘッダーを含む CSV ファイルにスキーマを適用する

Python(プログラミング言語)

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

スカラ (プログラミング言語)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

イメージまたはバイナリ データを ML 用の Delta Lake に取り込む

データが Delta Lake に格納されると、データに対して分散推論を実行できます。 「pandas UDF を使用して分散推論を実行する」を参照してください。

Python(プログラミング言語)

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Lakeflow 宣言型パイプラインのオートローダー構文

Lakeflow 宣言型パイプラインでは、自動ローダー用に少し変更された Python 構文が提供され、自動ローダーの SQL サポートが追加されます。

次の例では、自動ローダーを使用して CSV と JSON ファイルからデータセットを作成します。

Python(プログラミング言語)

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

自動ローダーでサポートされている形式オプションを使用できます。 read_filesのオプションは、キーと値のペアです。 サポートされている形式とオプションの詳細については、「 オプション」を参照してください。

例えば次が挙げられます。

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

次の例では、ヘッダーのあるタブ区切りの CSV ファイルからデータを読み取ります。

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

schemaを使用して、形式を手動で指定できます。schemaをサポートしていない形式のを指定する必要があります。

Python(プログラミング言語)

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

注意

Lakeflow 宣言パイプラインは、自動ローダーを使用してファイルを読み取るときに、スキーマとチェックポイントのディレクトリを自動的に構成および管理します。 ただし、これらのディレクトリのいずれかを手動で構成した場合、完全更新を実行しても、構成されたディレクトリの内容には影響しません。 Databricks では、処理中の予期しない副作用を回避するために、自動的に構成されたディレクトリを使用することが推奨されています。