适用于: Databricks SQL
Databricks Runtime 11.3 LTS 及更高版本
返回自动加载程序或 read_files
流的文件级状态。
语法
cloud_files_state( { TABLE ( table_name ) | checkpoint } )
参数
-
table_name:由 写入的
read_files
的标识符。 名称不得包含时态规范。 在 Databricks Runtime 13.3 LTS 及更高版本中可用。 -
checkpoint
:STRING
文本。 使用自动加载程序源的流的检查点目录。 请参阅什么是自动加载程序?。
返回
返回具有以下架构的表:
path STRING NOT NULL PRIMARY KEY
文件的路径。
size BIGINT NOT NULL
文件的大小(以字节为单位)。
create_time TIMESTAMP NOT NULL
创建文件的时间。
discovery_time TIMESTAMP NOT NULL
适用于:
Databricks SQL
Databricks Runtime 16.4 及更高版本
发现文件的时间。
processed_time TIMESTAMP NOT NULL
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。处理文件的时间。 如果批处理遇到失败并重试,则可能会多次处理文件。 重试时,此字段包含最新的处理时间。
commit_time TIMESTAMP
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。文件在处理后提交到检查点的时间。 如果文件尚未处理,则为
NULL
。 对于将文件标记为已提交的操作,没有延迟保证;文件可能已处理,但会在稍后任意时间标记为已提交。 将文件标记为已提交意味着自动加载程序不需要再次处理该文件。archive_time TIMESTAMP
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。将文件存档的时间。 如果文件尚未存档,则为
NULL
。archive_mode STRING
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。MOVE
如果在文件归档时cloudFiles.cleanSource
被设置为MOVE
。DELETE
如果在文件归档时cloudFiles.cleanSource
被设置为DELETE
。NULL
如果cloudFiles.cleanSource
设置为OFF
(默认值)。move_location STRING
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。文件在存档操作过程中移动到的位置的完整路径,当
cloudFiles.cleanSource
被设置为MOVE
时。NULL
如果文件尚未存档或cloudFiles.cleanSource
为DELETE
或OFF
之一。source_id STRING
流式处理查询中自动加载程序源的 ID。 对于从单个云对象存储位置引入的流,此值为
'0'
。flow_name STRING
适用于: Databricks SQL 勾选“是” Databricks Runtime 13.3 及更高版本check marked yes
表示 DLT 中包含一个或多个云文件源的特定流式处理流。 如果未指定任何table_name,则为 NULL。
ingestion_state STRING
适用范围:
Databricks SQL
Databricks Runtime 16.4 及更高版本,在启用
cloudFiles.cleanSource
时。 请参阅 自动加载程序选项。文件是否已导入,通过以下状态之一表示:
-
NULL
:文件尚未处理,或者文件状态无法由自动加载程序确定。 -
PROCESSING
:正在处理该文件。 -
SKIPPED_CORRUPTED
:文件未引入,因为它已损坏。 -
SKIPPED_MISSING
:文件未引入,因为在处理过程中找不到该文件。 -
INGESTED
:该文件已由接收器至少处理一次。 如果流中发生故障,它可能再次由foreachBatch
等非幂等接收器处理。 只有非 nullcommit_time
字段且状态为INGESTED
的文件已完成处理。 -
NOT_RECOGNIZED_BY_DBR
:保留以维护版本的兼容性。 对于在较高版本的 Databricks Runtime 中引入且无法被早期版本识别的状态,将会显示这种状态。
-
权限
你需要具有:
- 对流式处理表的
OWNER
特权(如果使用流式处理表标识符)。 - 对检查点位置的
READ FILES
特权(如果提供的检查点在外部位置下)。
示例
-- Simple example from checkpoint
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint');
/some/input/path
/other/input/path
-- Simple example from source subdir
> SELECT path FROM CLOUD_FILES_STATE('/some/checkpoint/sources/0');
/some/input/path
/other/input/path
-- Simple example from streaming table
> SELECT path FROM CLOUD_FILES_STATE(TABLE(my_streaming_table));
/some/input/path
/other/input/path
局限性
若要解决第一个限制,可以创建 Databricks 作业 并使用笔记本任务中的表值函数提取和加载流式处理表的文件级状态。 例如:
在工作区的边栏中,单击“ 工作流”。
点击职位。
创建一个新的 DLT 管道 任务,用于生成流表。 例如:
CREATE OR REFRESH STREAMING LIVE TABLE <table-name> AS SELECT <select clause expressions> FROM STREAM read_files('/path/to/files', format => '<format>', ...)
创建一个新的 Notebook 任务,该任务使用
cloud_files_state
表值函数读取流表的文件级状态,并将状态加载到一个表中。 例如:spark .sql("""SELECT * FROM cloud_files_state(TABLE(<table-name>)""") .write .saveAsTable('<table-file-state>')
配置“笔记本”任务的“依赖”字段并选择 DLT 管道任务。