適用対象: Databricks SQL
Databricks Runtime 11.3 LTS 以上
自動ローダーまたは read_files
ストリームのファイル レベルの状態を返します。
構文
cloud_files_state( { TABLE ( table_name ) | checkpoint } )
引数
-
table_name: によって書き込まれる
read_files
の識別子。 この名前には、テンポラル仕様を含めることはできません。 Databricks Runtime 13.3 LTS 以降で使用できます。 -
checkpoint
:STRING
リテラルです。 自動ローダー ソースを使用するストリームのチェックポイント ディレクトリ。 「自動ローダー」を参照してください。
戻り値
次のスキーマを持つテーブルを返します。
path STRING NOT NULL PRIMARY KEY
ファイルのパス。
size BIGINT NOT NULL
ファイルのサイズ (バイト単位)。
create_time TIMESTAMP NOT NULL
ファイルが作成された時刻。
discovery_time TIMESTAMP NOT NULL
適用対象:
Databricks SQL
Databricks Runtime 16.4 以降
ファイルが検出された時刻。
processed_time TIMESTAMP NOT NULL
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
ファイルが処理された時刻。 バッチでエラーが発生し、再試行されると、ファイルが複数回処理される可能性があります。 再試行が行われると、このフィールドには最新の処理時間が含まれます。
commit_time TIMESTAMP
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
処理後にファイルがチェックポイントにコミットされた時刻。 ファイルがまだ処理されていない場合は
NULL
。 ファイルをコミット済みとしてマークする待機時間は保証されません。ファイルは処理される可能性がありますが、後で任意にコミット済みとしてマークされます。 ファイルをコミット済みとしてマークすることは、自動ローダーがそのファイルを再度処理するために要求しないということを意味します。archive_time TIMESTAMP
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
ファイルがアーカイブされた時刻。 ファイルがアーカイブされていない場合は
NULL
。archive_mode STRING
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
MOVE
ファイルのアーカイブ時にcloudFiles.cleanSource
がMOVE
に設定されている場合は。DELETE
ファイルのアーカイブ時にcloudFiles.cleanSource
がDELETE
に設定されている場合は。NULL
cloudFiles.cleanSource
がOFF
に設定されている場合 (既定値)。move_location STRING
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
cloudFiles.cleanSource
がMOVE
に設定されているアーカイブ操作中にファイルが移動された場所の完全パス。NULL
ファイルがアーカイブされていない場合、またはcloudFiles.cleanSource
がDELETE
またはOFF
のいずれかである場合。source_id STRING
ストリーミング クエリの自動ローダー ソースの ID。 1 つのクラウド オブジェクト ストアの場所から取り込みを行うストリームに対しては、この値は
'0'
です。flow_name STRING
適用対象:
Databricks SQL
Databricks Runtime 13.3 以降
1 つ以上のクラウド ファイル ソースを含む Lakeflow 宣言パイプラインの特定のストリーミング フローを表します。 table_nameが指定されていない場合は NULL。
ingestion_state STRING
適用対象:
Databricks SQL と Databricks Runtime 16.4 以降、
が有効化されている場合。 自動ローダーオプションを参照してください。
ファイルが取り込まれたかどうか 。次のいずれかの状態で示されます。
-
NULL
: ファイルがまだ処理されていないか、自動ローダーでファイルの状態を判断できません。 -
PROCESSING
: ファイルが処理中です。 -
SKIPPED_CORRUPTED
: ファイルが破損しているため、取り込まれませんでした。 -
SKIPPED_MISSING
: 処理中に見つからなかったため、ファイルは取り込まれませんでした。 -
INGESTED
: ファイルはシンクによって少なくとも 1 回処理されています。 ストリームで障害が発生した場合、foreachBatch
のような非冪等のシンクによって再処理されることがあります。 null 以外のcommit_time
フィールドを持つファイルのみがINGESTED
状態で処理を完了しています。 -
NOT_RECOGNIZED_BY_DBR
: バージョンの互換性のために確保されています。 この状態は、以前のバージョンの Databricks Runtime によって認識されない、以降の Databricks Runtime バージョンで導入された状態に対して表示されます。
-
アクセス許可
次のものが必要です。
例
-- Simple example from checkpoint
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint');
/some/input/path
/other/input/path
-- Simple example from source subdir
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint/sources/0');
/some/input/path
/other/input/path
-- Simple example from streaming table
> SELECT path FROM CLOUD_FILES_STATE(TABLE(my_streaming_table));
/some/input/path
/other/input/path
制限事項
- ETL パイプラインでテーブル値関数を使用して、別のパイプラインによって生成されたストリーミング テーブルのファイル レベルの状態を読み取ることはできません。
- クエリは、異なるストリーミング テーブルで複数
cloud_files_state
テーブル値関数を参照することはできません。
最初の制限を回避するには、 Databricks ジョブ を作成し、ノートブック タスクでテーブル値関数を使用して、ストリーミング テーブルのファイル レベルの状態を抽出して読み込むことができます。 例えば次が挙げられます。
ワークスペースのサイドバーで、[ ジョブ] と [パイプライン] をクリックします。
ジョブをクリックしてください。
ストリーミング テーブルを生成する新しい パイプライン タスクを作成します。 例えば次が挙げられます。
CREATE OR REFRESH STREAMING LIVE TABLE <table-name> AS SELECT <select clause expressions> FROM STREAM read_files('/path/to/files', format => '<format>', ...)
テーブル値関数を使用してストリーミング テーブルのファイル レベルの状態を読み取り、テーブルに読み込む新しい
cloud_files_state
タスクを作成します。 例えば次が挙げられます。spark .sql("""SELECT * FROM cloud_files_state(TABLE(<table-name>)""") .write .saveAsTable('<table-file-state>')
ノートブック タスクの [依存] フィールドを構成し、[パイプライン] タスクを選択します。