监视 Azure Databricks 上的结构化流式处理查询

Azure Databricks 通过“流式处理”选项卡下的 Spark UI 提供对结构化流式处理应用程序的内置监视。

区分 Spark UI 中的结构化流式处理查询

通过将 .queryName(<query-name>) 添加到 writeStream 代码为流提供唯一的查询名称,以便在 Spark UI 中轻松区分哪些指标属于哪个流。

将结构化流式处理指标推送到外部服务

可以使用 Apache Spark 的“流式处理查询侦听器”界面将流式处理指标推送到外部服务,以实现警报或仪表板用例。 在 Databricks Runtime 11.3 LTS 及更高版本中,可以使用 Python 和 Scala 编写的 StreamingQueryListener

重要

以下限制适用于使用启用了 Unity Catalog 的计算访问模式的工作负荷:

  • StreamingQueryListener 需要 Databricks Runtime 15.1 或更高版本才能在采用专用访问模式的计算上使用凭据或与 Unity Catalog 管理的对象交互。
  • StreamingQueryListener 需要 Databricks Runtime 16.1 或更高版本来配置标准访问模式(以前共享访问模式)的 Scala 工作负荷。

注意

侦听器的处理延迟会显著影响查询处理速度。 建议限制这些侦听器中的处理逻辑,并选择写入到 Kafka 等快速响应系统以提高效率。

以下代码提供了用于实现侦听器的语法的基本示例:

Scala(编程语言)

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python语言

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

在结构化流式处理中定义可观察指标

可观察指标是可以在查询(数据帧)中定义的命名任意聚合函数。 在数据帧的执行达到完成点(即,完成批处理查询或达到流式处理循环)后,会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。

可以通过将侦听器附加到 Spark 会话来观察这些指标。 侦听器依赖于执行模式:

  • 批处理模式:使用 QueryExecutionListener

    查询完成时调用 QueryExecutionListener。 使用 QueryExecution.observedMetrics 映射访问指标。

  • 流式处理或微批:使用 StreamingQueryListener

    流式处理查询完成某个循环时调用 StreamingQueryListener。 使用 StreamingQueryProgress.observedMetrics 映射访问指标。 Azure Databricks 不支持流式处理使用 continuous 触发器模式。

例如:

Scala(编程语言)

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python语言

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

映射 Unity Catalog、Delta Lake 和结构化流式处理指标表标识符

结构化流指标在多个位置使用 reservoirId 字段,以作为流式查询源的 Delta 表的唯一标识。

reservoirId 字段映射 Delta 表中存储在 Delta 事务日志中的唯一标识符。 此 ID 不会映射到 值,该值由 Unity Catalog 分配并在目录资源管理器中显示。tableId

使用以下语法查看 Delta 表的表标识符。 这适用于 Unity Catalog 托管表、Unity Catalog 外部表以及所有 Hive 元存储 Delta 表:

DESCRIBE DETAIL <table-name>

结果中显示的 id 字段是映射到流式处理指标中的 reservoirId 的标识符。

StreamingQueryListener 对象指标

领域 说明
id 在重启时保留的唯一查询 ID。
runId 每次启动/重启时都会生成唯一的查询 ID。 请参阅 StreamingQuery.runId()
name 用户指定的查询名称。 如果未指定名称,则名称为 null。
timestamp 执行微批处理的时间戳。
batchId 当前正在处理的批数据的唯一 ID。 如果失败后重试,可多次执行给定批 ID。 同样,如果没有要处理的数据,则批 ID 不会递增。
batchDuration 批处理作的处理持续时间(以毫秒为单位)。
numInputRows 触发器中处理的记录数总和(涵盖所有来源)。
inputRowsPerSecond 到达数据的(跨所有源)聚合速率。
processedRowsPerSecond Spark 处理数据时的(跨所有源)聚合速率。

StreamingQueryListener 还定义了以下字段,其中包含可以检查客户指标和源进度详细信息的对象:

领域 说明
durationMs 键入:ju.Map[String, JLong]。 请参阅 durationMs 对象
eventTime 键入:ju.Map[String, String]。 请参阅 eventTime 对象
stateOperators 键入:Array[StateOperatorProgress]。 请参阅 stateOperators 对象
sources 键入:Array[SourceProgress]。 请参阅 源对象
sink 键入:SinkProgress。 请参阅 汇聚对象
observedMetrics 键入:ju.Map[String, Row]。 可在 DataFrame/query 上定义的命名任意聚合函数(例如 df.observe)。

durationMs 对象

对象类型ju.Map[String, JLong]

有关完成微批执行过程的各个阶段所所用的时间的信息。

领域 说明
durationMs.addBatch 执行微批所用的时间。 此时间不包括 Spark 规划微批所用的时间。
durationMs.getBatch 检索与源偏移量有关的元数据所用的时间。
durationMs.latestOffset 微批消耗的最新偏移量。 此进度对象是指从源检索最新偏移量所花费的时间。
durationMs.queryPlanning 生成执行计划所用的时间。
durationMs.triggerExecution 规划和执行微批所用的时间。
durationMs.walCommit 提交新的可用偏移量所用的时间。
durationMs.commitBatch 将数据写入接收器并提交到addBatch所花费的时间。 仅适用于支持提交的接收器。
durationMs.commitOffsets 将批处理提交到提交日志所需的时间。

eventTime 对象

对象类型ju.Map[String, String]

有关在微批中处理的数据中看到的事件时间值的信息。 水印使用此数据来确定如何剪裁状态以处理结构化流式处理作业中定义的有状态聚合。

领域 说明
eventTime.avg 在该触发器中看到的平均事件时间。
eventTime.max 在该触发器中看到的最大事件时间。
eventTime.min 在该触发器中看到的最小事件时间。
eventTime.watermark 触发器中使用的水印值。

stateOperators 对象

对象类型Array[StateOperatorProgress]stateOperators 对象包含有关结构化流式处理作业中定义的有状态操作以及由此生成的聚合的信息。

有关流状态运算符的更多详细信息,请参阅什么是有状态流式处理?

领域 说明
stateOperators.operatorName 指标与之相关的有状态运算符的名称,例如 symmetricHashJoindedupestateStoreSave
stateOperators.numRowsTotal 有状态运算符或聚合的结果状态中的总行数。
stateOperators.numRowsUpdated 有状态运算符或聚合的结果状态中已更新的总行数。
stateOperators.allUpdatesTimeMs 此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。
stateOperators.numRowsRemoved 从有状态运算符或聚合的结果状态中删除的总行数。
stateOperators.allRemovalsTimeMs 此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。
stateOperators.commitTimeMs 提交所有更新(添加和删除)并返回新版本所需的时间。
stateOperators.memoryUsedBytes 状态存储使用的内存。
stateOperators.numRowsDroppedByWatermark 被视为太晚而无法包含在有状态聚合中的行数。 仅流式处理聚合:聚合后删除的行数(不是原始输入行数)。 此数字并不精确,但可以表明存在被丢弃的延迟数据。
stateOperators.numShufflePartitions 此有状态运算符的随机分区数。
stateOperators.numStateStoreInstances 运算符已初始化和维护的实际状态存储实例。 对于许多有状态运算符,这与分区数相同。 但是,流对流的连接会为每个分区初始化四个状态存储实例。
stateOperators.customMetrics 有关更多详细信息,请参阅本主题中的 stateOperators.customMetrics

StateOperatorProgress.customMetrics 对象

对象类型ju.Map[String, JLong]

StateOperatorProgress具有一个字段customMetrics,其中包含在收集这些指标时所使用的特定于功能的指标。

功能 / 特点 说明
RocksDB 状态存储 RocksDB 状态存储的指标
HDFS 状态存储 HDFS 状态存储的指标
流重复数据删除 行重复数据删除的指标
流聚合 行聚合的指标
流合并运算符 流联接运算符指标
transformWithState 运算符的 transformWithState 指标

RocksDB 状态存储自定义指标

从 RocksDB 收集的信息,该源捕获的指标与它为结构化流式处理作业维护的状态值的性能和操作有关。 有关详细信息,请参阅在 Azure Databricks 上配置 RocksDB 状态存储

领域 说明
customMetrics.rocksdbBytesCopied RocksDB 文件管理器所跟踪的复制字节数。
customMetrics.rocksdbCommitCheckpointLatency 创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCompactLatency 在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitCompactLatency 提交时的压缩时间(以毫秒为单位)。
customMetrics.rocksdbCommitFileSyncLatencyMs 将本机 RocksDB 快照同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitFlushLatency 将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitPauseLatency 在检查点提交过程中停止后台工作线程(例如进行压缩)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitWriteBatchLatency 将内存中结构中的分阶段写入 (WriteBatch) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。
customMetrics.rocksdbFilesCopied RocksDB 文件管理器所跟踪的复制文件数。
customMetrics.rocksdbFilesReused RocksDB 文件管理器所跟踪的重用文件数。
customMetrics.rocksdbGetCount 调用次数 get(不包括由gets用来暂存写入的内存中批处理中的WriteBatch)。
customMetrics.rocksdbGetLatency 基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。
customMetrics.rocksdbReadBlockCacheHitCount RocksDB 中块缓存中的缓存命中数。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB 中块缓存未命中数。
customMetrics.rocksdbSstFileSize RocksDB 实例中所有静态排序表 (SST) 文件的大小。
customMetrics.rocksdbTotalBytesRead 通过 get 操作读取的未压缩字节数。
customMetrics.rocksdbTotalBytesWritten 通过 put 操作写入的未压缩字节总数。
customMetrics.rocksdbTotalBytesReadThroughIterator 使用迭代器读取的未压缩数据的字节总数。 某些有状态操作(例如,FlatMapGroupsWithState 中的超时处理操作和水印)需要通过迭代器将数据读取到 Azure Databricks。
customMetrics.rocksdbTotalBytesReadByCompaction 压缩进程从磁盘读取的字节数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 压缩进程写入磁盘的字节总数。
customMetrics.rocksdbTotalCompactionLatencyMs RocksDB 压缩(包括后台压缩和提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbTotalFlushLatencyMs 总刷新时间,包括后台刷新时间。 刷新操作是指 MemTable 在填满后刷新到存储的进程。 MemTables 是 RocksDB 中存储数据的第一级。
customMetrics.rocksdbZipFileBytesUncompressed 文件管理器报告的未压缩 zip 文件的大小(以字节为单位)。 文件管理器用于管理物理 SST 文件磁盘空间的利用率和删除。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> 保存到检查点位置的 RocksDB 快照的最新版本。 值为“-1”表示从未保存过任何快照。 由于快照特定于每个状态存储实例,因此此指标适用于特定的分区 ID 和状态存储名称。
customMetrics.rocksdbPutLatency 总的调用延迟。
customMetrics.rocksdbPutCount 看跌期权调用数。
customMetrics.rocksdbWriterStallLatencyMs 编写器等待压缩或刷新完成的时间。
customMetrics.rocksdbTotalBytesWrittenByFlush 通过刷新写入的总字节数
customMetrics.rocksdbPinnedBlocksMemoryUsage 固定块的内存使用情况
customMetrics.rocksdbNumInternalColFamiliesKeys 内部列族的内部键数量
customMetrics.rocksdbNumExternalColumnFamilies 外部列族的数量
customMetrics.rocksdbNumInternalColumnFamilies 内部列簇的数目

HDFS 状态存储自定义指标

收集有关 HDFS 状态存储提供程序的行为和操作的信息。

领域 说明
customMetrics.stateOnCurrentVersionSizeBytes 仅在当前版本上估计的状态大小。
customMetrics.loadedMapCacheHitCount 在提供程序中缓存的状态的缓存命中数。
customMetrics.loadedMapCacheMissCount 提供程序中缓存状态的缓存未命中次数。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> 特定状态存储实例的上次上传快照版本。

重复数据删除自定义指标

收集有关重复数据消除行为和操作的信息。

领域 说明
customMetrics.numDroppedDuplicateRows 删除的重复行数。
customMetrics.numRowsReadDuringEviction 状态逐出期间读取的状态行数。

聚合自定义指标

收集有关聚合行为和操作的信息。

领域 说明
customMetrics.numRowsReadDuringEviction 状态逐出期间读取的状态行数。

流联接自定义指标

收集有关流连接行为和操作的信息。

领域 说明
customMetrics.skippedNullValueCount null被设置为spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled时,跳过的true值的数量。

transformWithState 自定义指标

收集有关 transformWithState (TWS) 的行为和操作的信息。 关于更多详细信息 transformWithState,请参阅 构建自定义有状态应用程序

领域 说明
customMetrics.initialStateProcessingTimeMs 处理所有初始状态所花费的毫秒数。
customMetrics.numValueStateVars 值状态变量数。 也为 transformWithStateInPandas.
customMetrics.numListStateVars 列表状态变量数。 也为 transformWithStateInPandas.
customMetrics.numMapStateVars 映射状态变量数。 也为 transformWithStateInPandas.
customMetrics.numDeletedStateVars 已删除的状态变量数。 也为 transformWithStateInPandas.
customMetrics.timerProcessingTimeMs 处理所有计时器所花费的毫秒数
customMetrics.numRegisteredTimers 已注册的计时器数。 也为 transformWithStateInPandas.
customMetrics.numDeletedTimers 已删除的计时器数。 也为 transformWithStateInPandas.
customMetrics.numExpiredTimers 过期的计时器数。 也为 transformWithStateInPandas.
customMetrics.numValueStateWithTTLVars 具有 TTL 的值状态变量数。 也为 transformWithStateInPandas.
customMetrics.numListStateWithTTLVars 具有 TTL 的列表状态变量数。 也为 transformWithStateInPandas.
customMetrics.numMapStateWithTTLVars 具有 TTL 的映射状态变量数。 也为 transformWithStateInPandas.
customMetrics.numValuesRemovedDueToTTLExpiry 由于 TTL 到期而删除的值数。 也为 transformWithStateInPandas.
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry 由于 TTL 到期而增量删除的值数。

sources 对象

对象类型Array[SourceProgress]

sources 对象包含流数据源的指标和信息。

领域 说明
description 流数据源表的详细描述。
startOffset 流式处理作业启动的数据源表中的起始偏移量。
endOffset 微批处理的最后偏移量。
latestOffset 微批次处理的最新偏移量。
numInputRows 从此源处理的输入行数。
inputRowsPerSecond 从此源到来的数据处理速率,以秒为单位。
processedRowsPerSecond Spark 处理来自此源的数据的速率。
metrics 键入:ju.Map[String, String]。 包含特定数据源的自定义指标。

Azure Databricks 提供以下源对象实现:

注意

对于在窗体 sources.<startOffset / endOffset / latestOffset>.* (或某种变体)中定义的字段,请将其解释为 3 个可能字段之一,所有这些字段都包含指示的子字段:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake 源对象

用于 Delta 表流数据源的自定义指标的定义。

领域 说明
sources.description 流式处理查询从中读取数据的源的说明。 例如:“DeltaSource[table]”
sources.<startOffset / endOffset>.sourceVersion 对此偏移量进行编码的序列化版本。
sources.<startOffset / endOffset>.reservoirId 正在读取的表的 ID。 此值用于在重启查询时检测错误配置。 请参阅 Map Unity Catalog、Delta Lake 和 Structured Streaming 指标表标识符
sources.<startOffset / endOffset>.reservoirVersion 当前正在处理的表的版本。
sources.<startOffset / endOffset>.index 此版本中 AddFiles 序列中的索引。 此值用于将大型提交分解成多个批。 可通过对 modificationTimestamppath 进行排序来创建此索引。
sources.<startOffset / endOffset>.isStartingVersion 确定当前偏移量是否标记新的流查询的开头,而不是处理初始数据后发生的更改。 启动新的查询时,首先会处理表中存在的所有数据,然后再处理任何新到达的数据。
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis 为事件按时间排序而记录的事件时间。 待处理的初始快照数据的事件时间。 当以事件时间顺序处理初始快照时使用。
sources.latestOffset 微批查询处理的最新偏移量。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 从此源传送处理数据的速率。
sources.processedRowsPerSecond Spark 处理来自此源的数据的速率。
sources.metrics.numBytesOutstanding 未完成文件(RocksDB 跟踪的文件)的总大小。 这是 Delta 和自动加载程序作为流式处理源的积压工作指标。
sources.metrics.numFilesOutstanding 要处理的未完成文件数。 这是 Delta 和自动加载程序作为流式处理源的积压工作指标。

Apache Kafka 源对象

用于 Apache Kafka 流数据源的自定义指标的定义。

领域 说明
sources.description Kafka 源的详细说明,指定从中读取的确切 Kafka 主题。 例如:“KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset 启动流式处理作业的 Kafka 主题内的起始偏移量。
sources.endOffset 微批处理的最后偏移量。 对于正在进行的微批执行,这可能等于 latestOffset
sources.latestOffset 微批计算出的最新偏移量。 当存在限流时,微批处理过程可能不会处理所有偏移量,这导致endOffsetlatestOffset不一致。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 从此源传送处理数据的速率。
sources.processedRowsPerSecond Spark 处理来自此源的数据的速率。
sources.metrics.avgOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的平均偏移量。
sources.metrics.estimatedTotalBytesBehindLatest 查询进程尚未从订阅主题使用的估计字节数。
sources.metrics.maxOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最大偏移量。
sources.metrics.minOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最小偏移量。

自动加载器源指标

用于自动加载程序流数据源的自定义指标的定义。

领域 说明
sources.<startOffset / endOffset / latestOffset>.seqNum 按发现文件的顺序处理的文件序列中的当前位置。
sources.<startOffset / endOffset / latestOffset>.sourceVersion cloudFiles 源代码的实现版本。
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs 最近一次回填操作的开始时间。
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs 最近回填操作的结束时间。
sources.<startOffset / endOffset / latestOffset>.lastInputPath 流重新启动之前提供的用户输入路径中的最后一个。
sources.metrics.numFilesOutstanding 积压文件的数量
sources.metrics.numBytesOutstanding 积压工作中文件的大小(字节)
sources.metrics.approximateQueueSize 消息队列的大致大小。 仅当启用了 cloudFiles.useNotifications 选项时。

PubSub 源指标

用于 PubSub 流数据源的自定义指标的定义。 有关监控 PubSub 流媒体源的更多详细信息,请参阅 监控流媒体指标

领域 说明
sources.<startOffset / endOffset / latestOffset>.sourceVersion 此偏移量编码的实现版本。
sources.<startOffset / endOffset / latestOffset>.seqNum 正在处理的持久序列号。
sources.<startOffset / endOffset / latestOffset>.fetchEpoch 正在处理的最大提取纪元。
sources.metrics.numRecordsReadyToProcess 当前积压工作中可用于处理的记录数。
sources.metrics.sizeOfRecordsReadyToProcess 当前积压工作中未处理的数据的总大小(以字节为单位)。
sources.metrics.numDuplicatesSinceStreamStart 自数据流启动以来处理的重复记录的总数。

Pulsar 源指标

用于 Pulsar 流数据源的自定义指标的定义。

领域 说明
sources.metrics.numInputRows 当前微批处理中处理的行数。
sources.metrics.numInputBytes 当前微批处理中处理的字节总数。

汇集对象

对象类型SinkProgress

领域 说明
sink.description 水槽的描述,详细说明正在使用的特定水槽实现。
sink.numOutputRows 输出行数。 不同的接收器类型可能对值具有不同的行为和限制。 请参阅特定的支持类型
sink.metrics ju.Map[String, String] 下沉指标。

目前,Azure Databricks 提供两个特定的 sink 对象实现:

水槽类型 详细信息
Delta 表 请参阅 Delta 接收器对象
Apache Kafka 主题 请参阅 Kafka 接收器对象

sink.metrics 字段对对象的两个 sink 变体的行为相同。

Delta Lake 接收器对象

领域 说明
sink.description Delta 汇聚器的描述,详细描述正在使用的特定 Delta 汇聚器实现。 例如:“DeltaSink[table]”
sink.numOutputRows 行数始终为 -1,因为 Spark 无法推断 DSv1 接收器的输出行,而 Delta Lake 接收器就是这种分类。

Apache Kafka 接收器对象

领域 说明
sink.description 流式处理查询正在写入的 Kafka 接收器的说明,详细描述正在使用的特定 Kafka 接收器实现。 例如:“org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows 作为微批的一部分写入到输出表或接收器的行数。 在某些情况下,此值可以为“-1”,通常可解释为“未知”。

示例

Kafka-to-Kafka StreamingQueryListener 事件示例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake-to-Delta Lake StreamingQueryListener 事件示例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis-to-Delta Lake StreamingQueryListener 事件示例

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake-to-Delta Lake StreamingQueryListener 事件示例

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener 事件的速率源示例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}