Share via


Monitoring Structured Streaming queries on Azure Databricks

Azure Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query-name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark's Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, StreamingQueryListener is available in Python and Scala.

Important

The following limitations apply for workloads using Unity Catalog-enabled compute access modes:

  • StreamingQueryListener requires Databricks Runtime 15.1 or above to use credentials or interact with objects managed by Unity Catalog on compute with dedicated access mode.
  • StreamingQueryListener requires Databricks Runtime 16.1 or above for Scala workloads configured with standard access mode (formerly shared access mode).

Note

Processing latency with listeners can significantly affect query processing speeds. It's advised to limit processing logic in these listeners and opt for writing to fast-response systems like Kafka for efficiency.

The following code provides basic examples of the syntax for implementing a listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or microbatch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Azure Databricks does not support the continuous trigger mode for streaming.

For example:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Map Unity Catalog, Delta Lake, and Structured Streaming metrics table identifiers

Structured Streaming metrics use the reservoirId field in several places for the unique identity of a Delta table used as a source for a streaming query.

The reservoirId field maps the unique identifier stored by the Delta table in the Delta transaction log. This ID does not map to the tableId value assigned by Unity Catalog and displayed in Catalog Explorer.

Use the following syntax to review the table identifier for a Delta table. This works for Unity Catalog managed tables, Unity Catalog external tables, and all Hive metastore Delta tables:

DESCRIBE DETAIL <table-name>

The id field displayed in the results is the identifier that maps to the reservoirId in the streaming metrics.

StreamingQueryListener object metrics

Fields Description
id A unique query ID that persists across restarts.
runId A query ID that is unique for every start/restart. See StreamingQuery.runId().
name The user-specified name of the query. Name is null if no name is specified.
timestamp The timestamp for the execution of the microbatch.
batchId A unique ID for the current batch of data being processed. In the case of retries after a failure, a given batch ID may be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented.
batchDuration The processing duration of a batch operation, in milliseconds.
numInputRows The aggregate (across all sources) number of records processed in a trigger.
inputRowsPerSecond The aggregate (across all sources) rate of arriving data.
processedRowsPerSecond The aggregate (across all sources) rate at which Spark is processing data.

StreamingQueryListener also defines the following fields which contain objects you can examine for customer metrics and source progress details:

Fields Description
durationMs Type: ju.Map[String, JLong]. See durationMs object.
eventTime Type: ju.Map[String, String]. See eventTime object.
stateOperators Type: Array[StateOperatorProgress]. See stateOperators object.
sources Type: Array[SourceProgress]. See sources object.
sink Type: SinkProgress. See sink object.
observedMetrics Type: ju.Map[String, Row]. Named arbitrary aggregate functions that can be defined on a DataFrame/query (such as df.observe).

durationMs object

Object type: ju.Map[String, JLong]

Information about the time it takes to complete various stages of the microbatch execution process.

Fields Description
durationMs.addBatch The time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch.
durationMs.getBatch The time it takes to retrieve the metadata about the offsets from the source.
durationMs.latestOffset The latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources.
durationMs.queryPlanning The time taken to generate the execution plan.
durationMs.triggerExecution The time it takes to plan and execute the microbatch.
durationMs.walCommit The time taken to commit the new available offsets.
durationMs.commitBatch The time taken to commit the data written to the sink during addBatch. Only present for sinks that support commit.
durationMs.commitOffsets The time taken to commit the batch to the commit log.

eventTime object

Object type: ju.Map[String, String]

Information about the event time value seen within the data being processed in the microbatch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.

Fields Description
eventTime.avg The average event time seen in that trigger.
eventTime.max The maximum event time seen in that trigger.
eventTime.min The minimum event time seen in that trigger.
eventTime.watermark The value of the watermark used in that trigger.

stateOperators object

Object type: Array[StateOperatorProgress] The stateOperators object contains information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.

For more details on stream state operators, see What is stateful streaming?.

Fields Description
stateOperators.operatorName The name of the stateful operator to which the metrics relate, such as symmetricHashJoin, dedupe, or stateStoreSave.
stateOperators.numRowsTotal The total number of rows in state as a result of a stateful operator or aggregation.
stateOperators.numRowsUpdated The total number of rows updated in state as a result of a stateful operator or aggregation.
stateOperators.allUpdatesTimeMs This metric is currently not measurable by Spark and is planned to be removed in future updates.
stateOperators.numRowsRemoved The total number of rows removed from state as a result of a stateful operator or aggregation.
stateOperators.allRemovalsTimeMs This metric is currently not measurable by Spark and is planned to be removed in future updates.
stateOperators.commitTimeMs The time taken to commit all updates (puts and removes) and return a new version.
stateOperators.memoryUsedBytes Memory used by the state store.
stateOperators.numRowsDroppedByWatermark The number of rows that are considered too late to be included in a stateful aggregation. Streaming aggregations only: The number of rows dropped post-aggregation (not raw input rows). This number is not precise, but provides an indication that there is late data being dropped.
stateOperators.numShufflePartitions The number of shuffle partitions for this stateful operator.
stateOperators.numStateStoreInstances The actual state store instance that the operator has initialized and maintained. For many stateful operators, this is the same as the number of partitions. However, stream-stream joins initialize four state store instances per partition.
stateOperators.customMetrics See stateOperators.customMetrics in this topic for more details.

StateOperatorProgress.customMetrics object

Object type: ju.Map[String, JLong]

StateOperatorProgress has a field, customMetrics, which contains the metrics specific to feature you are using when gathering those metrics.

Feature Description
RocksDB state store Metrics for RocksDB state store.
HDFS state store Metrics for HDFS state store.
Stream deduplication Metrics for row deduplication.
Stream aggregation Metrics for row aggregation.
Stream join operator Metrics for stream join operator.
transformWithState Metrics for transformWithState operator.

RocksDB state store custom metrics

Information collected from RocksDB capturing metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Azure Databricks.

Fields Description
customMetrics.rocksdbBytesCopied The number of bytes copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency The time in milliseconds taking a snapshot of native RocksDB and write it to a local directory.
customMetrics.rocksdbCompactLatency The time in milliseconds compacting (optional) during the checkpoint commit.
customMetrics.rocksdbCommitCompactLatency The compaction time during commit, in milliseconds.
customMetrics.rocksdbCommitFileSyncLatencyMs The time in milliseconds syncing the native RocksDB snapshot to external storage (the checkpoint ___location).
customMetrics.rocksdbCommitFlushLatency The time in milliseconds flushing the RocksDB in-memory changes to the local disk.
customMetrics.rocksdbCommitPauseLatency The time in milliseconds stopping the background worker threads as part of the checkpoint commit, such as for compaction.
customMetrics.rocksdbCommitWriteBatchLatency The time in milliseconds applying the staged writes in in-memory structure (WriteBatch) to native RocksDB.
customMetrics.rocksdbFilesCopied The number of files copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbFilesReused The number of files reused as tracked by the RocksDB File Manager.
customMetrics.rocksdbGetCount The number of get calls (does not include gets from WriteBatch - in-memory batch used for staging writes).
customMetrics.rocksdbGetLatency The average time in nanoseconds for the underlying native RocksDB::Get call.
customMetrics.rocksdbReadBlockCacheHitCount The count of cache hits from the block cache in RocksDB.
customMetrics.rocksdbReadBlockCacheMissCount The count of the block cache misses in RocksDB.
customMetrics.rocksdbSstFileSize The size of all Static Sorted Table (SST) files in the RocksDB instance.
customMetrics.rocksdbTotalBytesRead The number of uncompressed bytes read by get operations.
customMetrics.rocksdbTotalBytesWritten The total number of uncompressed bytes written by put operations.
customMetrics.rocksdbTotalBytesReadThroughIterator The total number of bytes of uncompressed data read using an iterator. Some stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data into Azure Databricks through an iterator.
customMetrics.rocksdbTotalBytesReadByCompaction The number of bytes that the compaction process reads from the disk.
customMetrics.rocksdbTotalBytesWrittenByCompaction The total number of bytes the compaction process writes to the disk.
customMetrics.rocksdbTotalCompactionLatencyMs The time in milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit.
customMetrics.rocksdbTotalFlushLatencyMs The total flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it's full. MemTables are the first level where data is stored in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed The size in bytes of the uncompressed zip files as reported by the File Manager. The File Manager manages the physical SST file disk space utilization and deletion.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> The most recent version of the RocksDB snapshot saved to the checkpoint ___location. A value of "-1" indicates that no snapshot has ever been saved. Since snapshots are specific to each state store instance, this metric applies to a particular partition ID and state store name.
customMetrics.rocksdbPutLatency The total put call latency.
customMetrics.rocksdbPutCount The number of put calls.
customMetrics.rocksdbWriterStallLatencyMs The writer wait time for compaction or flush to finish.
customMetrics.rocksdbTotalBytesWrittenByFlush The total bytes written by flush
customMetrics.rocksdbPinnedBlocksMemoryUsage The memory usage for pinned blocks
customMetrics.rocksdbNumInternalColFamiliesKeys The number of internal keys for internal column families
customMetrics.rocksdbNumExternalColumnFamilies The number of external column families
customMetrics.rocksdbNumInternalColumnFamilies The number of internal column families

HDFS state store custom metrics

Information collected about HDFS state store provider behaviors and operations.

Fields Description
customMetrics.stateOnCurrentVersionSizeBytes The estimated size of state only on current version.
customMetrics.loadedMapCacheHitCount The count of cache hit on states cached in provider.
customMetrics.loadedMapCacheMissCount The count of cache miss on states cached in provider.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> The last uploaded version of the snapshot for a specific state store instance.

Deduplication custom metrics

Information collected about deduplication behaviors and operations.

Fields Description
customMetrics.numDroppedDuplicateRows The number of duplicate rows dropped.
customMetrics.numRowsReadDuringEviction The number of state rows read during state eviction.

Aggregation custom metrics

Information collected about aggregation behaviors and operations.

Fields Description
customMetrics.numRowsReadDuringEviction The number of state rows read during state eviction.

Stream join custom metrics

Information collected about stream join behaviors and operations.

Fields Description
customMetrics.skippedNullValueCount The number of skipped null values, when spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled is set to true.

transformWithState custom metrics

Information collected about transformWithState (TWS) behaviors and operations. For more details on transformWithState, see Build a custom stateful application.

Fields Description
customMetrics.initialStateProcessingTimeMs Number of milliseconds taken to process all initial state .
customMetrics.numValueStateVars Number of value state variables. Also present for transformWithStateInPandas.
customMetrics.numListStateVars Number of list state variables. Also present for transformWithStateInPandas.
customMetrics.numMapStateVars Number of map state variables. Also present for transformWithStateInPandas.
customMetrics.numDeletedStateVars Number of deleted state variables. Also present for transformWithStateInPandas.
customMetrics.timerProcessingTimeMs Number of milliseconds taken to process all timers
customMetrics.numRegisteredTimers Number of registered timers. Also present for transformWithStateInPandas.
customMetrics.numDeletedTimers Number of deleted timers. Also present for transformWithStateInPandas.
customMetrics.numExpiredTimers Number of expired timers. Also present for transformWithStateInPandas.
customMetrics.numValueStateWithTTLVars Number of value state variables with TTL. Also present for transformWithStateInPandas.
customMetrics.numListStateWithTTLVars Number of list state variables with TTL. Also present for transformWithStateInPandas.
customMetrics.numMapStateWithTTLVars Number of map state variables with TTL. Also present for transformWithStateInPandas.
customMetrics.numValuesRemovedDueToTTLExpiry Number of values removed due to TTL expiry. Also present for transformWithStateInPandas.
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry Number of values incrementally removed due to TTL expiry.

sources object

Object type: Array[SourceProgress]

The sources object contains information and metrics for streaming data sources.

Fields Description
description A detailed description of the streaming data source table.
startOffset The starting offset number within the data source table at which the streaming job started.
endOffset The last offset processed by the microbatch.
latestOffset The latest offset processed by the microbatch.
numInputRows The number of input rows processed from this source.
inputRowsPerSecond The rate, in seconds, at which data is arriving for processing from this source.
processedRowsPerSecond The rate at which Spark is processing data from this source.
metrics Type: ju.Map[String, String]. Contains custom metrics for a specific data source.

Azure Databricks provides the following sources object implementation:

Note

For fields defined in the form sources.<startOffset / endOffset / latestOffset>.* (or some variation), interpret it as one of the (up to these) 3 possible fields, all containing the indicated child field:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake sources object

Definitions for custom metrics used for Delta table streaming data sources.

Fields Description
sources.description The description of the source from which the streaming query is reading from. For example: “DeltaSource[table]”.
sources.<startOffset / endOffset>.sourceVersion The version of serialization with which this offset is encoded.
sources.<startOffset / endOffset>.reservoirId The ID of the table being read. This is used to detect misconfiguration when restarting a query. See Map Unity Catalog, Delta Lake, and Structured Streaming metrics table identifiers.
sources.<startOffset / endOffset>.reservoirVersion The version of the table that is currently processing.
sources.<startOffset / endOffset>.index The index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path.
sources.<startOffset / endOffset>.isStartingVersion Identifies whether current offset marks the start of a new streaming query rather than the processing of changes that occurred after the initial data was processed. When starting a new query, all data present in the table at the start is processed first, and then any new data that arrives.
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis Event time recorded for event time ordering. The event time of initial snapshot data that's pending to be processed. Used when processing an initial snapshot with event time order.
sources.latestOffset The latest offset processed by the microbatch query.
sources.numInputRows The number of input rows processed from this source.
sources.inputRowsPerSecond The rate at which data is arriving for processing from this source.
sources.processedRowsPerSecond The rate at which Spark is processing data from this source.
sources.metrics.numBytesOutstanding The combined size of the outstanding files (files tracked by RocksDB). This is the backlog metric for Delta and Auto Loader as the streaming source.
sources.metrics.numFilesOutstanding The number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source.

Apache Kafka sources object

Definitions for custom metrics used for Apache Kafka streaming data sources.

Fields Description
sources.description A detailed description of the Kafka source, specifying the exact Kafka topic being read from. For example: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset The starting offset number within the Kafka topic at which the streaming job started.
sources.endOffset The last offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution.
sources.latestOffset The latest offset figured by the microbatch. The microbatching process might not process all offsets when there is throttling, which results in endOffset and latestOffset differing.
sources.numInputRows The number of input rows processed from this source.
sources.inputRowsPerSecond The rate at which data is arriving for processing from this source.
sources.processedRowsPerSecond The rate at which Spark is processing data from this source.
sources.metrics.avgOffsetsBehindLatest The average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.estimatedTotalBytesBehindLatest The estimated number of bytes that the query process has not consumed from the subscribed topics.
sources.metrics.maxOffsetsBehindLatest The maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.minOffsetsBehindLatest The minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

Auto Loader sources metrics

Definitions for custom metrics used for Auto Loader streaming data sources.

Fields Description
sources.<startOffset / endOffset / latestOffset>.seqNum The current position in the sequence of files being processed in the order the files were discovered.
sources.<startOffset / endOffset / latestOffset>.sourceVersion The implementation version of the cloudFiles source.
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs The start time of the most recent backfill operation.
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs The end time of the most recent backfill operation.
sources.<startOffset / endOffset / latestOffset>.lastInputPath The last user-provided input path of the stream before the stream was restarted.
sources.metrics.numFilesOutstanding The number of files in the backlog
sources.metrics.numBytesOutstanding The size (bytes) of the files in the backlog
sources.metrics.approximateQueueSize The approximate size of the message queue. Only when cloudFiles.useNotifications option is enabled.

PubSub sources metrics

Definitions for custom metrics used for PubSub streaming data sources. For more details on monitoring PubSub streaming sources, see Monitoring streaming metrics.

Fields Description
sources.<startOffset / endOffset / latestOffset>.sourceVersion The implementation version that this offset is encoded with.
sources.<startOffset / endOffset / latestOffset>.seqNum The persisted sequence number that is being processed.
sources.<startOffset / endOffset / latestOffset>.fetchEpoch The largest fetch epoch being processed.
sources.metrics.numRecordsReadyToProcess The number of records available for processing in the current backlog.
sources.metrics.sizeOfRecordsReadyToProcess The total size in bytes, of unprocessed data in the current backlog.
sources.metrics.numDuplicatesSinceStreamStart The total count of duplicate records processed by the stream since it started.

Pulsar sources metrics

Definitions for custom metrics used for Pulsar streaming data sources.

Fields Description
sources.metrics.numInputRows The number of rows processed in the current micro-batch.
sources.metrics.numInputBytes The total number of bytes processed in the current micro-batch.

sink object

Object type: SinkProgress

Fields Description
sink.description The description of the sink, detailing the specific sink implementation being used.
sink.numOutputRows The number of output rows. Different sink types may have different behaviors or restrictions for the values. See the specific supported types
sink.metrics ju.Map[String, String] of sink metrics.

Currently, Azure Databricks provides two specific sink object implementations:

Sink type Details
Delta table See Delta sink object.
Apache Kafka topic See Kafka sink object.

The sink.metrics field behaves the same for both variants of the sink object.

Delta Lake sink object

Fields Description
sink.description The description of the Delta sink, detailing the specific Delta sink implementation being used. For example: “DeltaSink[table]”.
sink.numOutputRows The number of rows is always -1 because Spark can't infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink.

Apache Kafka sink object

Fields Description
sink.description The description of the Kafka sink to which the streaming query is writing, detailing the specific Kafka sink implementation being used. For example: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows The number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”.

Examples

Example Kafka-to-Kafka StreamingQueryListener event

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Example Delta Lake-to-Delta Lake StreamingQueryListener event

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Example Kinesis-to-Delta Lake StreamingQueryListener event

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Example Kafka+Delta Lake-to-Delta Lake StreamingQueryListener event

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Example rate source to Delta Lake StreamingQueryListener event

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}