Time-Series Anomaly Detection Service at Microsoft

前言

论文原文:Time-Series Anomaly Detection Service at Microsoft,KDD 2019

「异常检测」旨在发现数据中的意外事件或罕见项目。它在许多工业应用中非常流行,是数据挖掘中的一个重要研究领域。

为了解决时间序列异常检测的问题,作者提出了一种基于 谱残差(SR) 以及 卷积神经网络(CNN) 的新算法。首次尝试将 SR 模型从视觉显著性检测领域借用到时间序列异常检测中。此外,作者创新性地将 SRCNN 结合起来,以提高 SR 模型的性能。

主要用于微软一个时间序列异常检测服务,帮助客户连续监测时间序列,并及时提醒潜在的事件。

INTRODUCTION

Challenge

  1. 缺乏标签。序列标注成本高。为了为单个业务场景提供异常检测服务,系统必须同时处理数百万个时间序列。此外,时间序列的数据分布是不断变化的,这需要系统识别异常情况,即使以前没有出现过类似的模式。
  2. 泛化能力。需要监控来自不同业务场景的各种时间序列,目前没有很好的通用解决方法。
  3. 效率。在业务应用程序中,监控系统必须接近实时地处理数百万个,甚至数十亿个时间序列,并且实时性要求高。

CONTRIBUTIONS

  1. 在异常检测领域中,首次采用了视觉显著性检测技术来检测时间序列数据中的异常。
  2. 结合了 SR 模型和 CNN 模型,以提高时间序列异常检测的精度。
  3. 从实际角度来看,该方案具有较好的通用性和效率。它可以很容易地与在线监控系统集成,从而为重要的在线指标提供快速警报。

SR (Spectral Residual)

Spectral Residual(光谱残差)是一种基于快速傅里叶变换的方法,是一种无监督方法。

Spectral Residual 原理:

自然图像的统计特性具有变换不变性:即将图像从原来的空间坐标变换到频率坐标系中,图像在空间中具有的统计特性在频域中仍然保留,这种不变性恰好保证了采用能量谱来刻画自然图像空间相关性的可靠性。

从信息论的角度出发,一张图像信息可以如下表示:

H(Image)=H(Innovation)+H(PriorKnowledge)H(\text{Image})=H(\text{Innovation}) + H(\text{PriorKnowledge})

H(Innovation)H(\text{Innovation})表示新奇的部分, H(PriorKnowledge)H(\text{PriorKnowledge})表示冗余部分。因此在进行检测时,应该首先去掉冗余部分。

SR 模型所做的事情也就是去除冗余部分,保留新奇部分。

Log spectrum representation

在自然图像统计的不变因子中,尺度不变性是最著名和研究最广泛的性质,它也被称为 1/f 定律。它表明,自然图像集合的平均傅里叶光谱的振幅A(f)\mathcal{A}(f)服从一个分布:

E{A(f)}1/fE\{\mathcal{A}(f)\} \propto 1/f

基于上述事实,作者首先通过对图像进行傅里叶变换,将其从空间域转到频域,对幅值取对数后得到 loglog 谱,由于 loglog 曲线满足局部线性条件,所以用局部平均滤波器对其进行平滑,获得平均频谱。

数学解释

A(f)=Amplitude(F[I(x)])P(f)=Phrase(F[I(x)])L(f)=log(A(f))R(f)=L(f)hn(f)L(f)S(x)=F1(exp(R(f)+iR(f)))\begin{aligned} \mathcal{A}(f) &= Amplitude \left ( \mathfrak{F}[\mathcal{I(x)}] \right ) \\ \mathcal{P}(f) &= Phrase \left ( \mathfrak{F}[\mathcal{I(x)}] \right ) \\ \mathcal{L}(f) &= \log \left (\mathcal{A}(f) \right ) \\ \mathcal{R}(f) &= \mathcal{L}(f) - h_n(f) * \mathcal{L}(f) \\ \mathcal{S}(x) &= \Vert \mathfrak{F}^{-1}(\exp(\mathcal{R}(f) + i\mathcal{R}(f))) \Vert \\ \end{aligned}

其中F,F1\mathfrak{F}, \mathfrak{F}^{-1}分别表示傅里叶变化和傅里叶反变化;I(x)\mathcal{I}(x)表示原图像;Amplitude(.),Phrase(.)Amplitude(.), Phrase(.)分别表示振幅和相位;hn(f)h_n(f)n×nn \times n大小的滤波器。

SR in time series

微软主要提出和比较了 SRSR+CNN 方法在时序数据异常检测上的效果,其中 SR 算法唯一的差别是输入变成了时序数据。

如下图所示,得到了 saliency map 之后,很容易利用一个简单的规则来注释异常点。可以采用一个简单的阈值 τ\tau 来注释异常点。

另外也可以采用动态阈值方法,详见:【论文阅读】Dynamic Error Thresholds

细节问题

一般来讲都是采用「滑动窗口」进行异常点检测的,往往我们需要检测的点都是位于一段序列的末端,而 SR 算法当检测的点位于序列中央的时候效果会比较好,因此在进行 SR 计算之前需要对序列进行简单的预测进而延长序列,下面是论文中采用的延长算法:

gˉ=1mi=1mg(xn,xni)xn+1=xnm+1+gˉm\bar{g} = \frac{1}{m} \sum_{i=1}^{m} g(x_n, x_{n-i}) \\ x_{n+1} = x_{n-m+1} + \bar{g} \cdot m

SR-CNN

异常注入

由于 SR 方法是通过简单的手动设置阈值进行分类的,因此可以使用 CNN 这种更加强大的分类器进行分类。但是 CNN 分类的话需要有明确的标签,因此可以通过异常注入的方法来制造标签:

x=(xˉ+mean)(1+var)r+xx = (\bar{x} + mean)(1 + var) \cdot r + x

在一个实验中,作者收集了具有合成异常的生产时间序列作为训练数据。其优点是,该检测器可以自适应时间序列分布的变化,而不需要手动标记的数据。

model architecture

SR-CNN 网络由两个一维卷积层(滤波器大小等于滑动窗口大小 ω\omega)和两个完全连接的层组成。第一个卷积层的信道大小等于 ω\omega;而第二个卷积层的信道大小增加了一倍。在 sigmoid 输出之前堆叠了两个完全连接的层。采用交叉熵作为损失函数,在训练过程中采用 SGD 优化器。

EXPERIMENTS

Datasets

  • KPI 竞赛数据集。该数据集由多个 KPI 曲线组成,是国内一个比赛的数据集,大部分的数据采集的频率都是一分钟,也有一部分是 5 分钟的。
  • Yahoo 数据集。雅虎是由雅虎实验室发布的异常检测的开放数据集。部分时间序列曲线是合成的(即模拟的);而另一部分则来自于雅虎服务的实际流量。
  • 微软内部数据集。时间序列包括不同的 kpi,包括收入、活跃用户、页面浏览量等。

实验结果

SR+DNN

Python 实现

notebook: SpectralResidual

SpectralResidual.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy as np


class SpectralResidual:
def __init__(self, series_window_size, mag_window_size, score_window_size) -> None:
self.EPS = 1e-8
self.series_window_size = series_window_size
self.mag_window_size = mag_window_size
self.score_window_size = score_window_size

def detect(self, values):
result = np.array([])
for i in range(0, len(values), self.series_window_size):
if i + self.series_window_size > len(values):
seg = values[i:len(values)]
else:
seg = values[i:i + self.series_window_size]
anomaly = self.__detect_core(seg)
result = np.concatenate((result, anomaly), axis=0)
return result

def __detect_core(self, values):

extended_series = SpectralResidual.extend_series(values)
mags = self.spectral_residual_transform(extended_series)
anomaly_scores = self.generate_spectral_score(mags)
return anomaly_scores[:len(values)]

def generate_spectral_score(self, mags):
ave_mag = average_filter(mags, n=self.score_window_size)
safeDivisors = np.clip(ave_mag, self.EPS, ave_mag.max())

raw_scores = np.abs(mags - ave_mag) / safeDivisors
scores = np.clip(raw_scores / 10.0, 0, 1.0)

return scores

def spectral_residual_transform(self, values):
"""
This method transform a time series into spectral residual series
:param values: list.
a list of float values.
:return: mag: list.
a list of float values as the spectral residual values
"""

trans = np.fft.fft(values)
mag = np.sqrt(trans.real**2 + trans.imag**2)
eps_index = np.where(mag <= self.EPS)[0]
mag[eps_index] = self.EPS

mag_log = np.log(mag)
mag_log[eps_index] = 0

spectral = np.exp(mag_log - average_filter(mag_log, n=self.mag_window_size))

trans.real = trans.real * spectral / mag
trans.imag = trans.imag * spectral / mag
trans.real[eps_index] = 0
trans.imag[eps_index] = 0

wave_r = np.fft.ifft(trans)
mag = np.sqrt(wave_r.real**2 + wave_r.imag**2)
return mag

@staticmethod
def predict_next(values):
"""
Predicts the next value by sum up the slope of the last value with previous values.
Mathematically, g = 1/m * sum_{i=1}^{m} g(x_n, x_{n-i}), x_{n+1} = x_{n-m+1} + g * m,
where g(x_i,x_j) = (x_i - x_j) / (i - j)
:param values: list.
a list of float numbers.
:return : float.
the predicted next value.
"""

if len(values) <= 1:
raise ValueError(f'data should contain at least 2 numbers')

v_last = values[-1]
n = len(values)

slopes = [(v_last - v) / (n - 1 - i) for i, v in enumerate(values[:-1])]

return values[1] + sum(slopes)

@staticmethod
def extend_series(values, extend_num=5, look_ahead=5):
"""
extend the array data by the predicted next value
:param values: list.
a list of float numbers.
:param extend_num: int, default 5.
number of values added to the back of data.
:param look_ahead: int, default 5.
number of previous values used in prediction.
:return: list.
The result array.
"""

if look_ahead < 1:
raise ValueError('look_ahead must be at least 1')

extension = [SpectralResidual.predict_next(values[-look_ahead - 2:-1])
] * extend_num
return np.concatenate((values, extension), axis=0)


def average_filter(values, n=3):
"""
Calculate the sliding window average for the give time series.
Mathematically, res[i] = sum_{j=i-t+1}^{i} values[j] / t, where t = min(n, i+1)
:param values: list.
a list of float numbers
:param n: int, default 3.
window size.
:return res: list.
a list of value after the average_filter process.
"""

if n >= len(values):
n = len(values)

res = np.cumsum(values, dtype=float)
res[n:] = res[n:] - res[:-n]
res[n:] = res[n:] / n

for i in range(1, n):
res[i] /= (i + 1)

return res

参考资料