cloud_files_state 表值函数

适用于:勾选“是” Databricks SQL 勾选“是” Databricks Runtime 11.3 LTS 及更高版本

返回自动加载程序或 read_files 流的文件级状态。

语法

cloud_files_state( { TABLE ( table_name ) | checkpoint } )

参数

  • table_name:由 写入的read_files的标识符。 名称不得包含时态规范。 在 Databricks Runtime 13.3 LTS 及更高版本中可用。
  • checkpointSTRING 文本。 使用自动加载程序源的流的检查点目录。 请参阅什么是自动加载程序?

返回

返回具有以下架构的表:

  • path STRING NOT NULL PRIMARY KEY

    文件的路径。

  • size BIGINT NOT NULL

    文件的大小(以字节为单位)。

  • create_time TIMESTAMP NOT NULL

    创建文件的时间。

  • discovery_time TIMESTAMP NOT NULL

    适用于:检查标记为“是”的 Databricks SQL 检查标记为“是”的 Databricks Runtime 16.4 及更高版本

    发现文件的时间。

  • processed_time TIMESTAMP NOT NULL

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    处理文件的时间。 如果批处理遇到失败并重试,则可能会多次处理文件。 重试时,此字段包含最新的处理时间。

  • commit_time TIMESTAMP

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    文件在处理后提交到检查点的时间。 如果文件尚未处理,则为 NULL。 对于将文件标记为已提交的操作,没有延迟保证;文件可能已处理,但会在稍后任意时间标记为已提交。 将文件标记为已提交意味着自动加载程序不需要再次处理该文件。

  • archive_time TIMESTAMP

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    将文件存档的时间。 如果文件尚未存档,则为 NULL

  • archive_mode STRING

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    MOVE 如果在文件归档时 cloudFiles.cleanSource 被设置为 MOVE

    DELETE 如果在文件归档时 cloudFiles.cleanSource 被设置为 DELETE

    NULL 如果 cloudFiles.cleanSource 设置为 OFF (默认值)。

  • move_location STRING

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    文件在存档操作过程中移动到的位置的完整路径,当 cloudFiles.cleanSource 被设置为 MOVE 时。

    NULL如果文件尚未存档或cloudFiles.cleanSourceDELETEOFF之一。

  • source_id STRING

    流式处理查询中自动加载程序源的 ID。 对于从单个云对象存储位置引入的流,此值为 '0'

  • flow_name STRING

    适用于: Databricks SQL 勾选“是” Databricks Runtime 13.3 及更高版本check marked yes

    表示 DLT 中包含一个或多个云文件源的特定流式处理流。 如果未指定任何table_name,则为 NULL。

  • ingestion_state STRING

    适用范围:标记为“是”的 Databricks SQL 标记为“是”的 Databricks Runtime 16.4 及更高版本,在启用cloudFiles.cleanSource时。 请参阅 自动加载程序选项

    文件是否已导入,通过以下状态之一表示:

    • NULL:文件尚未处理,或者文件状态无法由自动加载程序确定。
    • PROCESSING:正在处理该文件。
    • SKIPPED_CORRUPTED:文件未引入,因为它已损坏。
    • SKIPPED_MISSING:文件未引入,因为在处理过程中找不到该文件。
    • INGESTED:该文件已由接收器至少处理一次。 如果流中发生故障,它可能再次由 foreachBatch 等非幂等接收器处理。 只有非 null commit_time 字段且状态为 INGESTED 的文件已完成处理。
    • NOT_RECOGNIZED_BY_DBR:保留以维护版本的兼容性。 对于在较高版本的 Databricks Runtime 中引入且无法被早期版本识别的状态,将会显示这种状态。

权限

你需要具有:

  • 对流式处理表的 OWNER 特权(如果使用流式处理表标识符)。
  • 对检查点位置的 READ FILES 特权(如果提供的检查点在外部位置下)。

示例

-- Simple example from checkpoint
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint');
  /some/input/path
  /other/input/path

-- Simple example from source subdir
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint/sources/0');
  /some/input/path
  /other/input/path

-- Simple example from streaming table
> SELECT path FROM CLOUD_FILES_STATE(TABLE(my_streaming_table));
  /some/input/path
  /other/input/path

局限性

  • 不能在 DLT 管道 中使用表值函数来读取另一个管道生成的 流式处理表 的文件级状态。
  • 查询不能在不同的流式表上同时引用多个 cloud_files_state 表值函数。

若要解决第一个限制,可以创建 Databricks 作业 并使用笔记本任务中的表值函数提取和加载流式处理表的文件级状态。 例如:

  1. 在工作区的边栏中,单击“ 工作流”。

  2. 点击职位。

  3. 创建一个新的 DLT 管道 任务,用于生成流表。 例如:

    CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
    AS SELECT <select clause expressions>
    FROM STREAM read_files('/path/to/files', format => '<format>', ...)
    
  4. 创建一个新的 Notebook 任务,该任务使用 cloud_files_state 表值函数读取流表的文件级状态,并将状态加载到一个表中。 例如:

    spark
      .sql("""SELECT * FROM cloud_files_state(TABLE(<table-name>)""")
      .write
      .saveAsTable('<table-file-state>')
    
  5. 配置笔记本”任务的“依赖”字段并选择 DLT 管道任务。