次の方法で共有


自動ローダーでスキーマの推論と進化を構成する

読み込まれたデータのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。

自動ローダーでは、JSON BLOB 列で予期しない (データ型が異なるなど) データを "復旧" することもできます。これは、 半構造化データ アクセス API を使用して後でアクセスすることを選択できます。

スキーマの推論と進化では、次の形式がサポートされています。

ファイル形式 サポートされているバージョン
JSON すべてのバージョン
CSV すべてのバージョン
XML Databricks Runtime 14.3 LTS 以降
Avro Databricks Runtime 10.4 LTS 以降
Parquet Databricks Runtime 11.3 LTS 以降
ORC サポートされていない
Text 適用できません (固定スキーマ)
Binaryfile 適用できません (固定スキーマ)

スキーマ推論と進化の構文

オプション cloudFiles.schemaLocation にターゲット ディレクトリを指定すると、スキーマの推論と進化が可能になります。 checkpointLocation に指定したのと同じディレクトリを使用することを選択できます。 Lakeflow 宣言パイプラインを使用する場合、Azure Databricks はスキーマの場所とその他のチェックポイント情報を自動的に管理します。

注意

ターゲット テーブルに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。

次の例では、parquetcloudFiles.format を使用します。 他のファイル ソースには、csvavro、または json を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。

Python(プログラミング言語)

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

スカラ (プログラミング言語)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema ___location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

自動ローダー スキーマ推論のしくみ

最初にデータを読み取るときにスキーマを推測するために、自動ローダーは最初に検出した最初の 50 GB または 1000 個のファイルをサンプリングします。いずれか早い方の制限を超えています。 自動ローダーは、構成された_schemascloudFiles.schemaLocationディレクトリにスキーマ情報を格納し、入力データへのスキーマの変更を経時的に追跡します。

注意

使用するサンプルのサイズを変更するには、SQL 構成を設定します。

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(バイト文字列 (例: 10gb))

そして

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化に関する問題を回避します。 データ型 (JSON、CSV、XML) をエンコードしない形式の場合、自動ローダーはすべての列を文字列 (JSON ファイル内の入れ子になったフィールドを含む) として推論します。 型指定されたスキーマ (Parquet および Avro) の形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作を次の表にまとめます。

ファイル形式 既定の推論されたデータ型
JSON
CSV
XML
Avro Avro スキーマでエンコードされた型
Parquet 型が Parquet スキーマでエンコードされる

Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて JSON、CSV、および XML ソースの列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypestrue に設定します。

注意

CSV データのスキーマを推論する場合、自動ローダーではファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false") を指定します。 さらに、自動ローダーによって、サンプル内のすべてのファイルのスキーマがマージされ、グローバル スキーマが作成されます。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。

注意

2 つの Parquet ファイル内の列のデータ型が異なる場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 スキーマ ヒントを指定すると、自動ローダーは列を指定された型にキャストするのではなく、Parquet リーダーに指定した型として列を読み取るように指示します。 不一致が発生した場合、その列は、 復旧されたデータ列で復旧されます

自動ローダー スキーマの進化のしくみ

自動ローダーはデータ処理中に新しい列が追加されたことを検出します。 自動ローダーが新しい列を検出すると、ストリームは UnknownFieldExceptionで停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。

Databricks では、このようなスキーマの変更後に自動的に再起動するように 、Lakeflow ジョブ を使用して自動ローダー ストリームを構成することをお勧めします。

自動ローダーでは、オプション cloudFiles.schemaEvolutionModeで設定したスキーマの進化に対して、次のモードがサポートされます。

モード 新しい列を読み取る場合の動作
addNewColumns (既定値) ストリームが失敗します。 新しい列がスキーマに追加されます。 既存の列ではデータ型は進化しません。
rescue スキーマは進化せず、スキーマの変更によりストリームが失敗することはありません。 すべての新しい列が、復旧されたデータ列に記録されます。
failOnNewColumns ストリームが失敗します。 指定されたスキーマが更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。
none スキーマは進化せず、新しい列は無視されます。また、rescuedDataColumn オプションが設定されない限り、データは復旧されません。 スキーマの変更によりストリームが失敗することはありません。

注意

addNewColumns モードはスキーマが指定されていない場合の既定値ですが、スキーマを指定する場合は none が既定値です。 addNewColumns は、ストリームのスキーマが指定されている場合は許可されませんが、 スキーマをスキーマ ヒントとして指定した場合は機能します。

自動ローダー使用時のパーティションの動作

データが Hive スタイルのパーティション分割でレイアウトされている場合、自動ローダーはデータの基になるディレクトリ構造からパーティション列を推論しようとします。 たとえば、ファイル パス base_path/event=click/date=2021-04-01/f0.json では、パーティション列として dateevent が推論されます。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive スタイルのパーティション分割が含まれていない場合、パーティション列は無視されます。

バイナリ ファイル (binaryFile) 形式と text ファイル形式には固定データ スキーマがありますが、パーティション列の推論はサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation 設定をお勧めします。 これにより、潜在的なエラーや情報の損失が回避され、自動ローダーが開始されるたびにパーティション列が推論されるのを防ぐことができます。

パーティション列はスキーマの進化とは見なされません。 base_path/event=click/date=2021-04-01/f0.jsonなどの初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.jsonとして新しいファイルの受信を開始した場合、自動ローダーは時間列を無視します。 新しいパーティション列の情報をキャプチャするには、 cloudFiles.partitionColumnsevent,date,hour に設定します。

注意

cloudFiles.partitionColumnsオプションは、列名のコンマ区切りのリストを受け取ります。 ディレクトリ構造に key=value ペアとして存在する列のみが解析されます。

救助されたデータ列は何ですか?

自動ローダーがスキーマを推論すると、復旧されたデータ列が _rescued_dataとしてスキーマに自動的に追加されます。 オプション rescuedDataColumnを設定することで、列の名前を変更したり、スキーマを指定する場合に含めたりすることができます。

復旧されたデータ列により、スキーマと一致しない列が削除されるのではなく、確実に復旧されます。 復旧されたデータ列には、次の理由で解析されていないデータが含まれています。

  • スキーマに列がありません。
  • 型が一致しない。
  • 大文字と小文字が一致しない。

復旧されたデータ列には、復旧された列とレコードのソース ファイル パスを含む JSON が含まれています。

注意

JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と共に使用すると、データ型の不一致によって、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath を使用する場合、rescuedDataColumn を使用すると、データ型の不一致は無効なレコードとは見なされません。 badRecordsPath には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。

大文字と小文字が区別される動作を変更する

大文字と小文字の区別が有効になっていない限り、 abcAbc、および ABC の列は、スキーマ推論の目的で同じ列と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。 スキーマ ヒントを使用して、どのケースを使用するかを強制できます。 選択が行われ、スキーマが推論されると、自動ローダーでは、選択された大文字と小文字のバリエーションがスキーマと一致しないと見なされません。

復旧されたデータ列が有効になっている場合、スキーマ以外のケースで名前が付けられたフィールドが_rescued_data列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。

スキーマ ヒントを使用してスキーマ推論をオーバーライドする

スキーマ ヒントを使用して、推論されたスキーマに対して知っているスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、doubleではなくinteger) を選択する場合は、次のような SQL スキーマ仕様構文を使用して、列データ型の任意の数のヒントを文字列として指定できます。

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

サポートされているデータ型の一覧については、 データ型 に関するドキュメントを参照してください。

ストリームの先頭に列がない場合は、スキーマ ヒントを使用して、推論されたスキーマにその列を追加することもできます。

スキーマ ヒントを使用して動作を確認するために推論されたスキーマの例を次に示します。

推定スキーマ:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

次に示すスキーマヒントを指定してください。

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

あなたは次を得ます:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意

配列とマップのスキーマ ヒントのサポートは、 Databricks Runtime 9.1 LTS 以降で使用できます。

スキーマ ヒントを使用して動作を確認するために、複雑なデータ型を持つ推論されたスキーマの例を次に示します。

推定スキーマ:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

次に示すスキーマヒントを指定してください。

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

あなたは次を得ます:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意

スキーマ ヒントは、自動ローダーにスキーマを指定 しない 場合にのみ使用されます。 スキーマ ヒントは、 cloudFiles.inferColumnTypes が有効か無効かに関係なく使用できます。