次の方法で共有


cloud_files_state テーブル値関数

適用対象:check marked yes Databricks SQL 「はい」のチェック マーク Databricks Runtime 11.3 LTS 以上

自動ローダーまたは read_files ストリームのファイル レベルの状態を返します。

構文

cloud_files_state( { TABLE ( table_name ) | checkpoint } )

引数

  • table_name: によって書き込まれるread_filesの識別子。 この名前には、テンポラル仕様を含めることはできません。 Databricks Runtime 13.3 LTS 以降で使用できます。
  • checkpoint: STRING リテラルです。 自動ローダー ソースを使用するストリームのチェックポイント ディレクトリ。 「自動ローダー」を参照してください。

戻り値

次のスキーマを持つテーブルを返します。

  • path STRING NOT NULL PRIMARY KEY

    ファイルのパス。

  • size BIGINT NOT NULL

    ファイルのサイズ (バイト単位)。

  • create_time TIMESTAMP NOT NULL

    ファイルが作成された時刻。

  • discovery_time TIMESTAMP NOT NULL

    適用対象:チェックマークが付いている Databricks SQL チェックマークが付いている Databricks Runtime 16.4 以降

    ファイルが検出された時刻。

  • processed_time TIMESTAMP NOT NULL

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    ファイルが処理された時刻。 バッチでエラーが発生し、再試行されると、ファイルが複数回処理される可能性があります。 再試行が行われると、このフィールドには最新の処理時間が含まれます。

  • commit_time TIMESTAMP

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    処理後にファイルがチェックポイントにコミットされた時刻。 ファイルがまだ処理されていない場合は NULL。 ファイルをコミット済みとしてマークする待機時間は保証されません。ファイルは処理される可能性がありますが、後で任意にコミット済みとしてマークされます。 ファイルをコミット済みとしてマークすることは、自動ローダーがそのファイルを再度処理するために要求しないということを意味します。

  • archive_time TIMESTAMP

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    ファイルがアーカイブされた時刻。 ファイルがアーカイブされていない場合は NULL

  • archive_mode STRING

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    MOVE ファイルのアーカイブ時に cloudFiles.cleanSourceMOVE に設定されている場合は。

    DELETE ファイルのアーカイブ時に cloudFiles.cleanSourceDELETE に設定されている場合は。

    NULL cloudFiles.cleanSourceOFFに設定されている場合 (既定値)。

  • move_location STRING

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    cloudFiles.cleanSourceMOVE に設定されているアーカイブ操作中にファイルが移動された場所の完全パス。

    NULL ファイルがアーカイブされていない場合、または cloudFiles.cleanSourceDELETE または OFFのいずれかである場合。

  • source_id STRING

    ストリーミング クエリの自動ローダー ソースの ID。 1 つのクラウド オブジェクト ストアの場所から取り込みを行うストリームに対しては、この値は '0' です。

  • flow_name STRING

    適用対象:check marked yes Databricks SQL Databricks Runtime 13.3 以降

    1 つ以上のクラウド ファイル ソースを含む Lakeflow 宣言パイプラインの特定のストリーミング フローを表します。 table_nameが指定されていない場合は NULL。

  • ingestion_state STRING

    適用対象:チェックマークが付けられた「はい」Databricks SQL と Databricks Runtime 16.4 以降、check marked yes が有効化されている場合。 自動ローダーオプションを参照してください。

    ファイルが取り込まれたかどうか 。次のいずれかの状態で示されます。

    • NULL: ファイルがまだ処理されていないか、自動ローダーでファイルの状態を判断できません。
    • PROCESSING: ファイルが処理中です。
    • SKIPPED_CORRUPTED: ファイルが破損しているため、取り込まれませんでした。
    • SKIPPED_MISSING: 処理中に見つからなかったため、ファイルは取り込まれませんでした。
    • INGESTED: ファイルはシンクによって少なくとも 1 回処理されています。 ストリームで障害が発生した場合、foreachBatch のような非冪等のシンクによって再処理されることがあります。 null 以外のcommit_timeフィールドを持つファイルのみがINGESTED状態で処理を完了しています。
    • NOT_RECOGNIZED_BY_DBR: バージョンの互換性のために確保されています。 この状態は、以前のバージョンの Databricks Runtime によって認識されない、以降の Databricks Runtime バージョンで導入された状態に対して表示されます。

アクセス許可

次のものが必要です。

  • OWNER ストリーミング テーブル識別子を使用する場合は、ストリーミング テーブルに対する特権。
  • READ FILESでチェックポイントを提供する場合は、チェックポイントの場所に対する 権限。

-- Simple example from checkpoint
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint');
  /some/input/path
  /other/input/path

-- Simple example from source subdir
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint/sources/0');
  /some/input/path
  /other/input/path

-- Simple example from streaming table
> SELECT path FROM CLOUD_FILES_STATE(TABLE(my_streaming_table));
  /some/input/path
  /other/input/path

制限事項

  • ETL パイプラインでテーブル値関数を使用して、別のパイプラインによって生成されたストリーミング テーブルのファイル レベルの状態を読み取ることはできません。
  • クエリは、異なるストリーミング テーブルで複数 cloud_files_state テーブル値関数を参照することはできません。

最初の制限を回避するには、 Databricks ジョブ を作成し、ノートブック タスクでテーブル値関数を使用して、ストリーミング テーブルのファイル レベルの状態を抽出して読み込むことができます。 例えば次が挙げられます。

  1. ワークスペースのサイドバーで、[ ジョブ] と [パイプライン] をクリックします。

  2. ジョブをクリックしてください。

  3. ストリーミング テーブルを生成する新しい パイプライン タスクを作成します。 例えば次が挙げられます。

    CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
    AS SELECT <select clause expressions>
    FROM STREAM read_files('/path/to/files', format => '<format>', ...)
    
  4. テーブル値関数を使用してストリーミング テーブルのファイル レベルの状態を読み取り、テーブルに読み込む新しい cloud_files_state タスクを作成します。 例えば次が挙げられます。

    spark
      .sql("""SELECT * FROM cloud_files_state(TABLE(<table-name>)""")
      .write
      .saveAsTable('<table-file-state>')
    
  5. ノートブック タスクの [依存] フィールドを構成し、[パイプライン] タスクを選択します。