Azure Databricks 通过“流式处理”选项卡下的 Spark UI 提供对结构化流式处理应用程序的内置监视。
区分 Spark UI 中的结构化流式处理查询
通过将 .queryName(<query-name>)
添加到 writeStream
代码为流提供唯一的查询名称,以便在 Spark UI 中轻松区分哪些指标属于哪个流。
将结构化流式处理指标推送到外部服务
可以使用 Apache Spark 的“流式处理查询侦听器”界面将流式处理指标推送到外部服务,以实现警报或仪表板用例。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以使用 Python 和 Scala 编写的 StreamingQueryListener
。
重要
以下限制适用于使用启用了 Unity Catalog 的计算访问模式的工作负荷:
-
StreamingQueryListener
需要 Databricks Runtime 15.1 或更高版本才能在采用专用访问模式的计算上使用凭据或与 Unity Catalog 管理的对象交互。 -
StreamingQueryListener
需要 Databricks Runtime 16.1 或更高版本来配置标准访问模式(以前共享访问模式)的 Scala 工作负荷。
注意
侦听器的处理延迟会显著影响查询处理速度。 建议限制这些侦听器中的处理逻辑,并选择写入到 Kafka 等快速响应系统以提高效率。
以下代码提供了用于实现侦听器的语法的基本示例:
Scala(编程语言)
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python语言
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
在结构化流式处理中定义可观察指标
可观察指标是可以在查询(数据帧)中定义的命名任意聚合函数。 在数据帧的执行达到完成点(即,完成批处理查询或达到流式处理循环)后,会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。
可以通过将侦听器附加到 Spark 会话来观察这些指标。 侦听器依赖于执行模式:
批处理模式:使用
QueryExecutionListener
。查询完成时调用
QueryExecutionListener
。 使用QueryExecution.observedMetrics
映射访问指标。流式处理或微批:使用 。
StreamingQueryListener
流式处理查询完成某个循环时调用
StreamingQueryListener
。 使用StreamingQueryProgress.observedMetrics
映射访问指标。 Azure Databricks 不支持流式处理使用continuous
触发器模式。
例如:
Scala(编程语言)
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python语言
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
映射 Unity Catalog、Delta Lake 和结构化流式处理指标表标识符
结构化流指标在多个位置使用 reservoirId
字段,以作为流式查询源的 Delta 表的唯一标识。
reservoirId
字段映射 Delta 表中存储在 Delta 事务日志中的唯一标识符。 此 ID 不会映射到 值,该值由 Unity Catalog 分配并在目录资源管理器中显示。tableId
使用以下语法查看 Delta 表的表标识符。 这适用于 Unity Catalog 托管表、Unity Catalog 外部表以及所有 Hive 元存储 Delta 表:
DESCRIBE DETAIL <table-name>
结果中显示的 id
字段是映射到流式处理指标中的 reservoirId
的标识符。
StreamingQueryListener 对象指标
领域 | 说明 |
---|---|
id |
在重启时保留的唯一查询 ID。 |
runId |
每次启动/重启时都会生成唯一的查询 ID。 请参阅 StreamingQuery.runId()。 |
name |
用户指定的查询名称。 如果未指定名称,则名称为 null。 |
timestamp |
执行微批处理的时间戳。 |
batchId |
当前正在处理的批数据的唯一 ID。 如果失败后重试,可多次执行给定批 ID。 同样,如果没有要处理的数据,则批 ID 不会递增。 |
batchDuration |
批处理作的处理持续时间(以毫秒为单位)。 |
numInputRows |
触发器中处理的记录数总和(涵盖所有来源)。 |
inputRowsPerSecond |
到达数据的(跨所有源)聚合速率。 |
processedRowsPerSecond |
Spark 处理数据时的(跨所有源)聚合速率。 |
StreamingQueryListener
还定义了以下字段,其中包含可以检查客户指标和源进度详细信息的对象:
领域 | 说明 |
---|---|
durationMs |
键入:ju.Map[String, JLong] 。 请参阅 durationMs 对象。 |
eventTime |
键入:ju.Map[String, String] 。 请参阅 eventTime 对象。 |
stateOperators |
键入:Array[StateOperatorProgress] 。 请参阅 stateOperators 对象。 |
sources |
键入:Array[SourceProgress] 。 请参阅 源对象。 |
sink |
键入:SinkProgress 。 请参阅 汇聚对象。 |
observedMetrics |
键入:ju.Map[String, Row] 。 可在 DataFrame/query 上定义的命名任意聚合函数(例如 df.observe )。 |
durationMs 对象
对象类型: ju.Map[String, JLong]
有关完成微批执行过程的各个阶段所所用的时间的信息。
领域 | 说明 |
---|---|
durationMs.addBatch |
执行微批所用的时间。 此时间不包括 Spark 规划微批所用的时间。 |
durationMs.getBatch |
检索与源偏移量有关的元数据所用的时间。 |
durationMs.latestOffset |
微批消耗的最新偏移量。 此进度对象是指从源检索最新偏移量所花费的时间。 |
durationMs.queryPlanning |
生成执行计划所用的时间。 |
durationMs.triggerExecution |
规划和执行微批所用的时间。 |
durationMs.walCommit |
提交新的可用偏移量所用的时间。 |
durationMs.commitBatch |
将数据写入接收器并提交到addBatch 所花费的时间。 仅适用于支持提交的接收器。 |
durationMs.commitOffsets |
将批处理提交到提交日志所需的时间。 |
eventTime 对象
对象类型: ju.Map[String, String]
有关在微批中处理的数据中看到的事件时间值的信息。 水印使用此数据来确定如何剪裁状态以处理结构化流式处理作业中定义的有状态聚合。
领域 | 说明 |
---|---|
eventTime.avg |
在该触发器中看到的平均事件时间。 |
eventTime.max |
在该触发器中看到的最大事件时间。 |
eventTime.min |
在该触发器中看到的最小事件时间。 |
eventTime.watermark |
触发器中使用的水印值。 |
stateOperators 对象
对象类型: Array[StateOperatorProgress]
该 stateOperators
对象包含有关结构化流式处理作业中定义的有状态操作以及由此生成的聚合的信息。
有关流状态运算符的更多详细信息,请参阅什么是有状态流式处理?
领域 | 说明 |
---|---|
stateOperators.operatorName |
指标与之相关的有状态运算符的名称,例如 symmetricHashJoin , dedupe 或 stateStoreSave 。 |
stateOperators.numRowsTotal |
有状态运算符或聚合的结果状态中的总行数。 |
stateOperators.numRowsUpdated |
有状态运算符或聚合的结果状态中已更新的总行数。 |
stateOperators.allUpdatesTimeMs |
此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。 |
stateOperators.numRowsRemoved |
从有状态运算符或聚合的结果状态中删除的总行数。 |
stateOperators.allRemovalsTimeMs |
此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。 |
stateOperators.commitTimeMs |
提交所有更新(添加和删除)并返回新版本所需的时间。 |
stateOperators.memoryUsedBytes |
状态存储使用的内存。 |
stateOperators.numRowsDroppedByWatermark |
被视为太晚而无法包含在有状态聚合中的行数。 仅流式处理聚合:聚合后删除的行数(不是原始输入行数)。 此数字并不精确,但可以表明存在被丢弃的延迟数据。 |
stateOperators.numShufflePartitions |
此有状态运算符的随机分区数。 |
stateOperators.numStateStoreInstances |
运算符已初始化和维护的实际状态存储实例。 对于许多有状态运算符,这与分区数相同。 但是,流对流的连接会为每个分区初始化四个状态存储实例。 |
stateOperators.customMetrics |
有关更多详细信息,请参阅本主题中的 stateOperators.customMetrics 。 |
StateOperatorProgress.customMetrics 对象
对象类型: ju.Map[String, JLong]
StateOperatorProgress
具有一个字段customMetrics
,其中包含在收集这些指标时所使用的特定于功能的指标。
功能 / 特点 | 说明 |
---|---|
RocksDB 状态存储 | RocksDB 状态存储的指标。 |
HDFS 状态存储 | HDFS 状态存储的指标。 |
流重复数据删除 | 行重复数据删除的指标。 |
流聚合 | 行聚合的指标。 |
流合并运算符 | 流联接运算符指标。 |
transformWithState |
运算符的 transformWithState 指标。 |
RocksDB 状态存储自定义指标
从 RocksDB 收集的信息,该源捕获的指标与它为结构化流式处理作业维护的状态值的性能和操作有关。 有关详细信息,请参阅在 Azure Databricks 上配置 RocksDB 状态存储。
领域 | 说明 |
---|---|
customMetrics.rocksdbBytesCopied |
RocksDB 文件管理器所跟踪的复制字节数。 |
customMetrics.rocksdbCommitCheckpointLatency |
创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbCompactLatency |
在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbCommitCompactLatency |
提交时的压缩时间(以毫秒为单位)。 |
customMetrics.rocksdbCommitFileSyncLatencyMs |
将本机 RocksDB 快照同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbCommitFlushLatency |
将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbCommitPauseLatency |
在检查点提交过程中停止后台工作线程(例如进行压缩)所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbCommitWriteBatchLatency |
将内存中结构中的分阶段写入 (WriteBatch ) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbFilesCopied |
RocksDB 文件管理器所跟踪的复制文件数。 |
customMetrics.rocksdbFilesReused |
RocksDB 文件管理器所跟踪的重用文件数。 |
customMetrics.rocksdbGetCount |
调用次数 get (不包括由gets 用来暂存写入的内存中批处理中的WriteBatch )。 |
customMetrics.rocksdbGetLatency |
基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。 |
customMetrics.rocksdbReadBlockCacheHitCount |
RocksDB 中块缓存中的缓存命中数。 |
customMetrics.rocksdbReadBlockCacheMissCount |
RocksDB 中块缓存未命中数。 |
customMetrics.rocksdbSstFileSize |
RocksDB 实例中所有静态排序表 (SST) 文件的大小。 |
customMetrics.rocksdbTotalBytesRead |
通过 get 操作读取的未压缩字节数。 |
customMetrics.rocksdbTotalBytesWritten |
通过 put 操作写入的未压缩字节总数。 |
customMetrics.rocksdbTotalBytesReadThroughIterator |
使用迭代器读取的未压缩数据的字节总数。 某些有状态操作(例如,FlatMapGroupsWithState 中的超时处理操作和水印)需要通过迭代器将数据读取到 Azure Databricks。 |
customMetrics.rocksdbTotalBytesReadByCompaction |
压缩进程从磁盘读取的字节数。 |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
压缩进程写入磁盘的字节总数。 |
customMetrics.rocksdbTotalCompactionLatencyMs |
RocksDB 压缩(包括后台压缩和提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。 |
customMetrics.rocksdbTotalFlushLatencyMs |
总刷新时间,包括后台刷新时间。 刷新操作是指 MemTable 在填满后刷新到存储的进程。
MemTables 是 RocksDB 中存储数据的第一级。 |
customMetrics.rocksdbZipFileBytesUncompressed |
文件管理器报告的未压缩 zip 文件的大小(以字节为单位)。 文件管理器用于管理物理 SST 文件磁盘空间的利用率和删除。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
保存到检查点位置的 RocksDB 快照的最新版本。 值为“-1”表示从未保存过任何快照。 由于快照特定于每个状态存储实例,因此此指标适用于特定的分区 ID 和状态存储名称。 |
customMetrics.rocksdbPutLatency |
总的调用延迟。 |
customMetrics.rocksdbPutCount |
看跌期权调用数。 |
customMetrics.rocksdbWriterStallLatencyMs |
编写器等待压缩或刷新完成的时间。 |
customMetrics.rocksdbTotalBytesWrittenByFlush |
通过刷新写入的总字节数 |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
固定块的内存使用情况 |
customMetrics.rocksdbNumInternalColFamiliesKeys |
内部列族的内部键数量 |
customMetrics.rocksdbNumExternalColumnFamilies |
外部列族的数量 |
customMetrics.rocksdbNumInternalColumnFamilies |
内部列簇的数目 |
HDFS 状态存储自定义指标
收集有关 HDFS 状态存储提供程序的行为和操作的信息。
领域 | 说明 |
---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
仅在当前版本上估计的状态大小。 |
customMetrics.loadedMapCacheHitCount |
在提供程序中缓存的状态的缓存命中数。 |
customMetrics.loadedMapCacheMissCount |
提供程序中缓存状态的缓存未命中次数。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
特定状态存储实例的上次上传快照版本。 |
重复数据删除自定义指标
收集有关重复数据消除行为和操作的信息。
领域 | 说明 |
---|---|
customMetrics.numDroppedDuplicateRows |
删除的重复行数。 |
customMetrics.numRowsReadDuringEviction |
状态逐出期间读取的状态行数。 |
聚合自定义指标
收集有关聚合行为和操作的信息。
领域 | 说明 |
---|---|
customMetrics.numRowsReadDuringEviction |
状态逐出期间读取的状态行数。 |
流联接自定义指标
收集有关流连接行为和操作的信息。
领域 | 说明 |
---|---|
customMetrics.skippedNullValueCount |
当null 被设置为spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled 时,跳过的true 值的数量。 |
transformWithState 自定义指标
收集有关 transformWithState
(TWS) 的行为和操作的信息。 关于更多详细信息 transformWithState
,请参阅 构建自定义有状态应用程序。
领域 | 说明 |
---|---|
customMetrics.initialStateProcessingTimeMs |
处理所有初始状态所花费的毫秒数。 |
customMetrics.numValueStateVars |
值状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numListStateVars |
列表状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numMapStateVars |
映射状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numDeletedStateVars |
已删除的状态变量数。 也为 transformWithStateInPandas . |
customMetrics.timerProcessingTimeMs |
处理所有计时器所花费的毫秒数 |
customMetrics.numRegisteredTimers |
已注册的计时器数。 也为 transformWithStateInPandas . |
customMetrics.numDeletedTimers |
已删除的计时器数。 也为 transformWithStateInPandas . |
customMetrics.numExpiredTimers |
过期的计时器数。 也为 transformWithStateInPandas . |
customMetrics.numValueStateWithTTLVars |
具有 TTL 的值状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numListStateWithTTLVars |
具有 TTL 的列表状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numMapStateWithTTLVars |
具有 TTL 的映射状态变量数。 也为 transformWithStateInPandas . |
customMetrics.numValuesRemovedDueToTTLExpiry |
由于 TTL 到期而删除的值数。 也为 transformWithStateInPandas . |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
由于 TTL 到期而增量删除的值数。 |
sources 对象
对象类型: Array[SourceProgress]
该 sources
对象包含流数据源的指标和信息。
领域 | 说明 |
---|---|
description |
流数据源表的详细描述。 |
startOffset |
流式处理作业启动的数据源表中的起始偏移量。 |
endOffset |
微批处理的最后偏移量。 |
latestOffset |
微批次处理的最新偏移量。 |
numInputRows |
从此源处理的输入行数。 |
inputRowsPerSecond |
从此源到来的数据处理速率,以秒为单位。 |
processedRowsPerSecond |
Spark 处理来自此源的数据的速率。 |
metrics |
键入:ju.Map[String, String] 。 包含特定数据源的自定义指标。 |
Azure Databricks 提供以下源对象实现:
注意
对于在窗体 sources.<startOffset / endOffset / latestOffset>.*
(或某种变体)中定义的字段,请将其解释为 3 个可能字段之一,所有这些字段都包含指示的子字段:
sources.startOffset.<child-field>
sources.endOffset.<child-field>
sources.latestOffset.<child-field>
Delta Lake 源对象
用于 Delta 表流数据源的自定义指标的定义。
领域 | 说明 |
---|---|
sources.description |
流式处理查询从中读取数据的源的说明。 例如:“DeltaSource[table]” 。 |
sources.<startOffset / endOffset>.sourceVersion |
对此偏移量进行编码的序列化版本。 |
sources.<startOffset / endOffset>.reservoirId |
正在读取的表的 ID。 此值用于在重启查询时检测错误配置。 请参阅 Map Unity Catalog、Delta Lake 和 Structured Streaming 指标表标识符。 |
sources.<startOffset / endOffset>.reservoirVersion |
当前正在处理的表的版本。 |
sources.<startOffset / endOffset>.index |
此版本中 AddFiles 序列中的索引。 此值用于将大型提交分解成多个批。 可通过对 modificationTimestamp 和 path 进行排序来创建此索引。 |
sources.<startOffset / endOffset>.isStartingVersion |
确定当前偏移量是否标记新的流查询的开头,而不是处理初始数据后发生的更改。 启动新的查询时,首先会处理表中存在的所有数据,然后再处理任何新到达的数据。 |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
为事件按时间排序而记录的事件时间。 待处理的初始快照数据的事件时间。 当以事件时间顺序处理初始快照时使用。 |
sources.latestOffset |
微批查询处理的最新偏移量。 |
sources.numInputRows |
从此源处理的输入行数。 |
sources.inputRowsPerSecond |
从此源传送处理数据的速率。 |
sources.processedRowsPerSecond |
Spark 处理来自此源的数据的速率。 |
sources.metrics.numBytesOutstanding |
未完成文件(RocksDB 跟踪的文件)的总大小。 这是 Delta 和自动加载程序作为流式处理源的积压工作指标。 |
sources.metrics.numFilesOutstanding |
要处理的未完成文件数。 这是 Delta 和自动加载程序作为流式处理源的积压工作指标。 |
Apache Kafka 源对象
用于 Apache Kafka 流数据源的自定义指标的定义。
领域 | 说明 |
---|---|
sources.description |
Kafka 源的详细说明,指定从中读取的确切 Kafka 主题。 例如:“KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” 。 |
sources.startOffset |
启动流式处理作业的 Kafka 主题内的起始偏移量。 |
sources.endOffset |
微批处理的最后偏移量。 对于正在进行的微批执行,这可能等于 latestOffset 。 |
sources.latestOffset |
微批计算出的最新偏移量。 当存在限流时,微批处理过程可能不会处理所有偏移量,这导致endOffset 与latestOffset 不一致。 |
sources.numInputRows |
从此源处理的输入行数。 |
sources.inputRowsPerSecond |
从此源传送处理数据的速率。 |
sources.processedRowsPerSecond |
Spark 处理来自此源的数据的速率。 |
sources.metrics.avgOffsetsBehindLatest |
流式处理查询在所有订阅主题中最新可用偏移量之后使用的平均偏移量。 |
sources.metrics.estimatedTotalBytesBehindLatest |
查询进程尚未从订阅主题使用的估计字节数。 |
sources.metrics.maxOffsetsBehindLatest |
流式处理查询在所有订阅主题中最新可用偏移量之后使用的最大偏移量。 |
sources.metrics.minOffsetsBehindLatest |
流式处理查询在所有订阅主题中最新可用偏移量之后使用的最小偏移量。 |
自动加载器源指标
用于自动加载程序流数据源的自定义指标的定义。
领域 | 说明 |
---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
按发现文件的顺序处理的文件序列中的当前位置。 |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
cloudFiles 源代码的实现版本。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
最近一次回填操作的开始时间。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
最近回填操作的结束时间。 |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
流重新启动之前提供的用户输入路径中的最后一个。 |
sources.metrics.numFilesOutstanding |
积压文件的数量 |
sources.metrics.numBytesOutstanding |
积压工作中文件的大小(字节) |
sources.metrics.approximateQueueSize |
消息队列的大致大小。 仅当启用了 cloudFiles.useNotifications 选项时。 |
PubSub 源指标
用于 PubSub 流数据源的自定义指标的定义。 有关监控 PubSub 流媒体源的更多详细信息,请参阅 监控流媒体指标。
领域 | 说明 |
---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
此偏移量编码的实现版本。 |
sources.<startOffset / endOffset / latestOffset>.seqNum |
正在处理的持久序列号。 |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
正在处理的最大提取纪元。 |
sources.metrics.numRecordsReadyToProcess |
当前积压工作中可用于处理的记录数。 |
sources.metrics.sizeOfRecordsReadyToProcess |
当前积压工作中未处理的数据的总大小(以字节为单位)。 |
sources.metrics.numDuplicatesSinceStreamStart |
自数据流启动以来处理的重复记录的总数。 |
Pulsar 源指标
用于 Pulsar 流数据源的自定义指标的定义。
领域 | 说明 |
---|---|
sources.metrics.numInputRows |
当前微批处理中处理的行数。 |
sources.metrics.numInputBytes |
当前微批处理中处理的字节总数。 |
汇集对象
对象类型: SinkProgress
领域 | 说明 |
---|---|
sink.description |
水槽的描述,详细说明正在使用的特定水槽实现。 |
sink.numOutputRows |
输出行数。 不同的接收器类型可能对值具有不同的行为和限制。 请参阅特定的支持类型 |
sink.metrics |
ju.Map[String, String] 下沉指标。 |
目前,Azure Databricks 提供两个特定的 sink
对象实现:
水槽类型 | 详细信息 |
---|---|
Delta 表 | 请参阅 Delta 接收器对象。 |
Apache Kafka 主题 | 请参阅 Kafka 接收器对象。 |
该 sink.metrics
字段对对象的两个 sink
变体的行为相同。
Delta Lake 接收器对象
领域 | 说明 |
---|---|
sink.description |
Delta 汇聚器的描述,详细描述正在使用的特定 Delta 汇聚器实现。 例如:“DeltaSink[table]” 。 |
sink.numOutputRows |
行数始终为 -1 ,因为 Spark 无法推断 DSv1 接收器的输出行,而 Delta Lake 接收器就是这种分类。 |
Apache Kafka 接收器对象
领域 | 说明 |
---|---|
sink.description |
流式处理查询正在写入的 Kafka 接收器的说明,详细描述正在使用的特定 Kafka 接收器实现。 例如:“org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” 。 |
sink.numOutputRows |
作为微批的一部分写入到输出表或接收器的行数。 在某些情况下,此值可以为“-1”,通常可解释为“未知”。 |
示例
Kafka-to-Kafka StreamingQueryListener 事件示例
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Delta Lake-to-Delta Lake StreamingQueryListener 事件示例
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Kinesis-to-Delta Lake StreamingQueryListener 事件示例
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Kafka+Delta Lake-to-Delta Lake StreamingQueryListener 事件示例
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener 事件的速率源示例
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}