Azure 流分析在云和 Azure IoT Edge 中都提供基于机器学习的内置异常检测功能,可用于监视两种最常见的异常:临时和持久性。 使用 AnomalyDetection_SpikeAndDip 和 AnomalyDetection_ChangePoint 函数,可以直接在流分析作业中执行异常情况检测。
机器学习模型假定采用统一采样的时序。 如果时序不统一,可以在调用异常情况检测之前使用翻转窗口插入一个聚合步骤。
机器学习作目前不支持季节性趋势或多变量相关性。
在 Azure 流分析中使用机器学习进行异常情况检测
以下视频演示如何使用 Azure 流分析中的机器学习函数实时检测异常。
模范行为
通常,模型的准确性会随着滑动窗口中的更多数据而提高。 指定滑动窗口中的数据被视为该时间范围内其正常值范围的一部分。 该模型仅考虑滑动窗口中的事件历史记录,以检查当前事件是否异常。 当滑动窗口移动时,旧的数据将从模型的训练中被移除。
这些函数通过根据到目前为止所看到的情况建立一定的正常标准来运行。 通过在置信度级别内根据建立的法线进行比较来识别离群值。 窗口大小应基于为正常行为训练模型所需的最小事件,以便在发生异常时能够识别它。
模型的响应时间随历史记录大小而增加,因为它需要与更多的过去事件进行比较。 建议仅包含所需数量的事件,以提高性能。
时序中的差距可能是模型未在特定时间点接收事件的结果。 流分析将使用插补逻辑来处理这种情况。 同一滑动窗口的历史记录大小以及持续时间用于计算预期到达事件的平均速率。
此处提供的异常生成器可用于为 IoT 中心提供具有不同异常模式的数据。 可以使用这些异常情况检测函数来设置 Azure 流分析作业,以便从此 IoT 中心读取并检测异常。
峰值和下降
时序事件流中的临时异常称为峰值和下降。 可以使用基于机器学习的运算符 (AnomalyDetection_SpikeAndDip)监视峰值和下降。
在同一滑动窗口中,如果第二个峰值小于第一个峰值,则与指定置信度内的第一个峰值的分数相比,较小的峰值的计算分数可能不够显著。 可以尝试降低模型的置信度来检测此类异常。 但是,如果开始收到过多的警报,则可以使用更高的置信区间。
以下示例查询假定在包含120个事件历史记录的2分钟滑动窗口中,每秒输入一个事件,以统一的速度进行。 最终的 SELECT 语句提取并输出分数和异常状态,置信度为 95%。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep
变化点
时序事件流中的持久异常是事件流中值分布的变化,例如级别更改和趋势。 在流分析中,使用基于机器学习 的 AnomalyDetection_ChangePoint 运算符检测此类异常。
持久更改的持续时间比峰值和下降要长得多,并可能表示灾难性事件。 永久性更改通常对裸眼不可见,但可以使用 AnomalyDetection_ChangePoint 运算符检测到。
下图是级别更改的示例:
下图是趋势更改的示例:
以下示例查询假定在 20 分钟的滑动窗口中,输入速率为每秒一个事件,且历史记录大小为 1,200 个事件。 最终的 SELECT 语句提取并输出分数和异常状态,置信度为 80%。
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
性能特征
这些模型的性能取决于历史记录大小、窗口持续时间、事件加载以及是否使用函数级别分区。 本部分讨论这些配置,并提供有关如何维持每秒 1 K、5 K 和 10K 事件的引入速率的示例。
- 历史记录大小 - 这些模型使用 历史记录大小以线性方式执行。 历史记录大小越长,模型评分新事件所花费的时间就越长。 这是因为模型将新事件与历史记录缓冲区中的每个过去事件进行比较。
- 窗口持续时间 - 窗口持续时间 应反映接收历史记录大小指定的事件所需的时间。 如果窗口中没有这么多事件,Azure 流分析会插补缺失值。 因此,CPU 消耗量是历史记录大小的函数。
- 事件加载 - 事件负载越大,模型执行的工作越多,会影响 CPU 消耗。 假设易并行有利于业务逻辑利用更多的输入分区,则可以通过易并行来横向扩展作业。
- 函数级别分区 - 函数级别分区 是在异常情况检测函数调用中使用
PARTITION BY
完成的。 这种类型的分区会增加开销,因为需要同时为多个模型维护状态。 函数级分区在设备级别分区等方案中使用。
关系
历史记录大小、窗口持续时间和事件总负载按以下方式相关:
windowDuration (毫秒) = 1000 * historySize / (每秒输入事件总数 / 输入分区计数)
按 deviceId 对函数进行分区时,将“PARTITION BY deviceId”添加到异常检测函数调用中。
观测结果
下表包括针对非分区事例的单个节点(6 SU)的吞吐量观察值:
历史记录大小(事件数) | 窗口持续时间(ms) | 每秒输入事件总数 |
---|---|---|
六十 | 55 | 2,200 |
600 | 728 | 1,650 |
6,000 | 10,910 | 1,100 |
下表包括分区事例的单个节点(6 SU)的吞吐量观察值:
历史记录大小(事件数) | 窗口持续时间(ms) | 每秒输入事件总数 | 设备计数 |
---|---|---|---|
六十 | 1,091 | 1,100 | 10 |
600 | 10,910 | 1,100 | 10 |
6,000 | 218,182 | <550 | 10 |
六十 | 21,819 | 550 | 100 |
600 | 218,182 | 550 | 100 |
6,000 | 2,181,819 | <550 | 100 |
运行上述非分区配置的示例代码位于 Azure 示例的 大规模流式处理存储库 中。 该代码创建一个没有函数级分区的流分析作业,该作业使用事件中心作为输入和输出。 输入负载是使用测试客户端生成的。 每个输入事件都是一个 1 KB json 文档。 事件模拟发送 JSON 数据的 IoT 设备(最多 1 K 台设备)。 历史记录大小、窗口持续时间和总事件负载因两个输入分区而异。
注释
若要获得更准确的估算值,请根据具体的方案自定义示例。
识别瓶颈
若要识别管道中的瓶颈,请在 Azure 流分析作业中使用“指标”窗格。 查看输入/输出事件的吞吐量,以及“水印延迟”或“积压事件”,以确定作业是否跟得上输入速率。 对于事件中心指标,请查看“限制的请求数”并相应地调整阈值单位。 对于 Azure Cosmos DB 指标,请查看“吞吐量”下的“每个分区键范围所使用的最大 RU/秒”,以确保均匀使用分区键范围。 对于 Azure SQL DB,请监视“日志 IO”和“CPU”。