読み込まれたデータのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。
自動ローダーでは、JSON BLOB 列で予期しない (データ型が異なるなど) データを "復旧" することもできます。これは、 半構造化データ アクセス API を使用して後でアクセスすることを選択できます。
スキーマの推論と進化では、次の形式がサポートされています。
ファイル形式 | サポートされているバージョン |
---|---|
JSON |
すべてのバージョン |
CSV |
すべてのバージョン |
XML |
Databricks Runtime 14.3 LTS 以降 |
Avro |
Databricks Runtime 10.4 LTS 以降 |
Parquet |
Databricks Runtime 11.3 LTS 以降 |
ORC |
サポートされていない |
Text |
適用できません (固定スキーマ) |
Binaryfile |
適用できません (固定スキーマ) |
スキーマ推論と進化の構文
オプション cloudFiles.schemaLocation
にターゲット ディレクトリを指定すると、スキーマの推論と進化が可能になります。
checkpointLocation
に指定したのと同じディレクトリを使用することを選択できます。
Lakeflow 宣言パイプラインを使用する場合、Azure Databricks はスキーマの場所とその他のチェックポイント情報を自動的に管理します。
注意
ターゲット テーブルに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。
次の例では、parquet
に cloudFiles.format
を使用します。 他のファイル ソースには、csv
、avro
、または json
を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。
Python(プログラミング言語)
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema ___location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
スカラ (プログラミング言語)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema ___location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
自動ローダー スキーマ推論のしくみ
最初にデータを読み取るときにスキーマを推測するために、自動ローダーは最初に検出した最初の 50 GB または 1000 個のファイルをサンプリングします。いずれか早い方の制限を超えています。 自動ローダーは、構成された_schemas
でcloudFiles.schemaLocation
ディレクトリにスキーマ情報を格納し、入力データへのスキーマの変更を経時的に追跡します。
注意
使用するサンプルのサイズを変更するには、SQL 構成を設定します。
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(バイト文字列 (例: 10gb
))
そして
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化に関する問題を回避します。 データ型 (JSON、CSV、XML) をエンコードしない形式の場合、自動ローダーはすべての列を文字列 (JSON ファイル内の入れ子になったフィールドを含む) として推論します。 型指定されたスキーマ (Parquet および Avro) の形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作を次の表にまとめます。
ファイル形式 | 既定の推論されたデータ型 |
---|---|
JSON |
糸 |
CSV |
糸 |
XML |
糸 |
Avro |
Avro スキーマでエンコードされた型 |
Parquet |
型が Parquet スキーマでエンコードされる |
Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて JSON、CSV、および XML ソースの列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypes
を true
に設定します。
注意
CSV データのスキーマを推論する場合、自動ローダーではファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false")
を指定します。 さらに、自動ローダーによって、サンプル内のすべてのファイルのスキーマがマージされ、グローバル スキーマが作成されます。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。
注意
2 つの Parquet ファイル内の列のデータ型が異なる場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 スキーマ ヒントを指定すると、自動ローダーは列を指定された型にキャストするのではなく、Parquet リーダーに指定した型として列を読み取るように指示します。 不一致が発生した場合、その列は、 復旧されたデータ列で復旧されます。
自動ローダー スキーマの進化のしくみ
自動ローダーはデータ処理中に新しい列が追加されたことを検出します。 自動ローダーが新しい列を検出すると、ストリームは UnknownFieldException
で停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。
Databricks では、このようなスキーマの変更後に自動的に再起動するように 、Lakeflow ジョブ を使用して自動ローダー ストリームを構成することをお勧めします。
自動ローダーでは、オプション cloudFiles.schemaEvolutionMode
で設定したスキーマの進化に対して、次のモードがサポートされます。
モード | 新しい列を読み取る場合の動作 |
---|---|
addNewColumns (既定値) |
ストリームが失敗します。 新しい列がスキーマに追加されます。 既存の列ではデータ型は進化しません。 |
rescue |
スキーマは進化せず、スキーマの変更によりストリームが失敗することはありません。 すべての新しい列が、復旧されたデータ列に記録されます。 |
failOnNewColumns |
ストリームが失敗します。 指定されたスキーマが更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。 |
none |
スキーマは進化せず、新しい列は無視されます。また、rescuedDataColumn オプションが設定されない限り、データは復旧されません。 スキーマの変更によりストリームが失敗することはありません。 |
注意
addNewColumns
モードはスキーマが指定されていない場合の既定値ですが、スキーマを指定する場合は none
が既定値です。
addNewColumns
は、ストリームのスキーマが指定されている場合は許可されませんが、 スキーマをスキーマ ヒントとして指定した場合は機能します。
自動ローダー使用時のパーティションの動作
データが Hive スタイルのパーティション分割でレイアウトされている場合、自動ローダーはデータの基になるディレクトリ構造からパーティション列を推論しようとします。 たとえば、ファイル パス base_path/event=click/date=2021-04-01/f0.json
では、パーティション列として date
と event
が推論されます。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive スタイルのパーティション分割が含まれていない場合、パーティション列は無視されます。
バイナリ ファイル (binaryFile
) 形式と text
ファイル形式には固定データ スキーマがありますが、パーティション列の推論はサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation
設定をお勧めします。 これにより、潜在的なエラーや情報の損失が回避され、自動ローダーが開始されるたびにパーティション列が推論されるのを防ぐことができます。
パーティション列はスキーマの進化とは見なされません。
base_path/event=click/date=2021-04-01/f0.json
などの初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.json
として新しいファイルの受信を開始した場合、自動ローダーは時間列を無視します。 新しいパーティション列の情報をキャプチャするには、 cloudFiles.partitionColumns
を event,date,hour
に設定します。
注意
cloudFiles.partitionColumns
オプションは、列名のコンマ区切りのリストを受け取ります。 ディレクトリ構造に key=value
ペアとして存在する列のみが解析されます。
救助されたデータ列は何ですか?
自動ローダーがスキーマを推論すると、復旧されたデータ列が _rescued_data
としてスキーマに自動的に追加されます。 オプション rescuedDataColumn
を設定することで、列の名前を変更したり、スキーマを指定する場合に含めたりすることができます。
復旧されたデータ列により、スキーマと一致しない列が削除されるのではなく、確実に復旧されます。 復旧されたデータ列には、次の理由で解析されていないデータが含まれています。
- スキーマに列がありません。
- 型が一致しない。
- 大文字と小文字が一致しない。
復旧されたデータ列には、復旧された列とレコードのソース ファイル パスを含む JSON が含まれています。
注意
JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVE
、DROPMALFORMED
、FAILFAST
の 3 つのモードがサポートされます。
rescuedDataColumn
と共に使用すると、データ型の不一致によって、DROPMALFORMED
モードでレコードが削除されたり、FAILFAST
モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath
を使用する場合、rescuedDataColumn
を使用すると、データ型の不一致は無効なレコードとは見なされません。
badRecordsPath
には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。
大文字と小文字が区別される動作を変更する
大文字と小文字の区別が有効になっていない限り、 abc
、 Abc
、および ABC
の列は、スキーマ推論の目的で同じ列と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。
スキーマ ヒントを使用して、どのケースを使用するかを強制できます。 選択が行われ、スキーマが推論されると、自動ローダーでは、選択された大文字と小文字のバリエーションがスキーマと一致しないと見なされません。
復旧されたデータ列が有効になっている場合、スキーマ以外のケースで名前が付けられたフィールドが_rescued_data
列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive
を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。
スキーマ ヒントを使用してスキーマ推論をオーバーライドする
スキーマ ヒントを使用して、推論されたスキーマに対して知っているスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、double
ではなくinteger
) を選択する場合は、次のような SQL スキーマ仕様構文を使用して、列データ型の任意の数のヒントを文字列として指定できます。
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
サポートされているデータ型の一覧については、 データ型 に関するドキュメントを参照してください。
ストリームの先頭に列がない場合は、スキーマ ヒントを使用して、推論されたスキーマにその列を追加することもできます。
スキーマ ヒントを使用して動作を確認するために推論されたスキーマの例を次に示します。
推定スキーマ:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
次に示すスキーマヒントを指定してください。
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
あなたは次を得ます:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
配列とマップのスキーマ ヒントのサポートは、 Databricks Runtime 9.1 LTS 以降で使用できます。
スキーマ ヒントを使用して動作を確認するために、複雑なデータ型を持つ推論されたスキーマの例を次に示します。
推定スキーマ:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
次に示すスキーマヒントを指定してください。
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
あなたは次を得ます:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
スキーマ ヒントは、自動ローダーにスキーマを指定 しない 場合にのみ使用されます。 スキーマ ヒントは、 cloudFiles.inferColumnTypes
が有効か無効かに関係なく使用できます。