【论文阅读】Detecting Spacecraft Anomalies Using LSTMs and Nonparametric Dynamic Thresholding
authors:: Kyle Hundman, Valentino Constantinou, Christopher Laporte, Ian Colwell, Tom Soderstrom
container:: Proceedings of the 24th ACM SIGKDD International Conference on Knowledge Discovery & Data Mining
year:: 2018
DOI:: 10.1145/3219819.3219845
rating:: ⭐⭐⭐
share:: true
comment:: 重点在于通过设定目标函数,最优化σ \sigma σ ,以此确定阈值范围。
前言
基于预测和阈值的方法来做异常检测,在常规的方法中,我们往往会使用一种最朴素的方法来确定阈值,也就是设置一个常数,如果超过这个常数,那么认为某个点是「异常」的。
论文《Detecting Spacecraft Anomalies Using LSTMs and Nonparametric Dynamic Thresholding》 提到了一种动态设置阈值的方法,最近也是学习实现了一下。
你可以在这里找到原文:
Detecting Spacecraft Anomalies Using LSTMs and Nonparametric Dynamic Thresholding
Errors and Smoothing
在论文中,作者首先使用 LSTM 对序列进行预测,得到预测值。
每一步 t t t 可以得到一个预测值 y ^ ( t ) \hat y^{(t)} y ^ ( t ) ,计算预测误差为 e t = ∣ y ( t ) − y ^ ( t ) ∣ e^{t} = \vert y^{(t)} - \hat y^{(t)} \vert e t = ∣ y ( t ) − y ^ ( t ) ∣
而后就可以得到一个误差序列,序列的长度为h h h 。
e = [ e ( t − h ) , … , e ( t − l s ) , … , e ( t − 1 ) , e t ] \mathbf{e} = [e^{(t-h)}, \dots, e^{(t-l_s)}, \dots, e^{(t-1)}, e^t]
e = [ e ( t − h ) , … , e ( t − l s ) , … , e ( t − 1 ) , e t ]
简单来说,也就是实际值y y y 与预测值y ^ \hat y y ^ 之间的误差的绝对值序列,每次取时间窗口大小为h h h 进行异常检测。
对上述误差序列进行指数加权滑动平均(EWMA),这一操作的目的在于:LSTM 的预测中经常出现的误差峰值,当数据发生突变时,往往不能很好地被预测到,这就会使得e ( t ) e^{(t)} e ( t ) 变得非常大,但其实这是一个非常常见的现象。通过 EWMA 就可以很好地消除这一现象,得到以下序列:
e s = [ e s ( t − h ) , … , e s ( t − l s ) , … , e s ( t − 1 ) , e s t ] \mathbf{e}_s = [e_s^{(t-h)}, \dots, e_s^{(t-l_s)}, \dots, e_s^{(t-1)}, e_s^t]
e s = [ e s ( t − h ) , … , e s ( t − l s ) , … , e s ( t − 1 ) , e s t ]
Threshold Calculation and Anomaly Scoring
在将误差经过平滑处理之后,开始进行确定阈值。
总体来说,这个方法的思想也很简单,就是用到了「均值」和「标准差」。
我们通过下面这个式子来确定阈值 ϵ \epsilon ϵ 。
ϵ = μ ( e s ) + z σ ( e s ) \mathbf{\epsilon} = \mu(\mathbf{e}_s) + \mathbf{z}\sigma(\mathbf{e}_s)
ϵ = μ ( e s ) + z σ ( e s )
其中 z 是人为规定的常数,阈值是通过下式决定的:
ϵ = a r g m a x ( ϵ ) = Δ μ ( e s ) / μ ( e s ) + Δ σ ( e s ) / σ ( e s ) ∣ e a ∣ + ∣ E s e q ∣ 2 \epsilon = argmax(\mathbf{\epsilon}) = \frac{\Delta \mu(\mathbf{e}_s) / \mu(\mathbf{e}_s) + \Delta \sigma(\mathbf{e}_s) / \sigma(\mathbf{e}_s)}{\vert \mathbf{e}_a \vert + \vert \mathbf{E}_{seq} \vert^2}
ϵ = a r g ma x ( ϵ ) = ∣ e a ∣ + ∣ E se q ∣ 2 Δ μ ( e s ) / μ ( e s ) + Δ σ ( e s ) / σ ( e s )
其中:
Δ μ ( e s ) = μ ( e s ) − μ ( { e s ∈ e s ∣ e s < ϵ } ) Δ σ ( e s ) = σ ( e s ) − σ ( { e s ∈ e s ∣ e s < ϵ } ) e a = { e s ∈ e s ∣ e s > ϵ } E s e q = continuous sequences of e a ∈ e a \Delta \mu(\mathbf{e}_s) = \mu(\mathbf{e}_s) - \mu(\{e_s \in \mathbf{e}_s \vert e_s < \epsilon\}) \\ \Delta \sigma(\mathbf{e}_s) = \sigma(\mathbf{e}_s) - \sigma(\{e_s \in \mathbf{e}_s \vert e_s < \epsilon \}) \\ \mathbf{e}_a = \{e_s \in \mathbf{e}_s \vert e_s > \epsilon\} \\ \mathbf{E}_{seq} = \text{continuous sequences of } e_a \in \mathbf{e}_a
Δ μ ( e s ) = μ ( e s ) − μ ({ e s ∈ e s ∣ e s < ϵ }) Δ σ ( e s ) = σ ( e s ) − σ ({ e s ∈ e s ∣ e s < ϵ }) e a = { e s ∈ e s ∣ e s > ϵ } E se q = continuous sequences of e a ∈ e a
具体来说,作者通过枚举z z z ,每次计算得到一个阈值,使得上式取到最大值。其中Δ μ ( e s ) \Delta \mu(\mathbf{e}_s) Δ μ ( e s ) 和Δ σ ( e s ) \Delta \sigma(\mathbf{e}_s) Δ σ ( e s ) 分别是去除异常点前后「均值」和「标准差」的变化。分母e a \mathbf{e}_a e a 为序列中异常值的个数,E s e q \mathbf{E}_{seq} E se q 为异常值连续序列的个数。
简单来说,过程是这样的,首先枚举z z z ,然后计算得到一个阈值,然后计算去除超过阈值的点的前后均值和标准差的变化,并通过上式计算得分,使得得分最大的z z z 也就是我们需要的。
Pruning Anomalies
另外,作者提出了一种修剪异常点的方法。
选取异常序列中的最大值序列和正常序列的最大值降序排列,组成序列e m a x e_{max} e ma x
对e m a x e_{max} e ma x 中的连续两个序列计算:
d ( i ) = e m a x ( i − 1 ) − e m a x ( i ) e m a x ( i − 1 ) , i ∈ { 1 , 2 , … , ( ∣ E s e q ∣ + 1 ) } d^{(i)} = \frac{e_{max}^{(i-1)} - e_{max}^{(i)}}{e_{max}^{(i-1)}} \quad ,i \in \{1,2,\dots,(\vert \mathbf{E}_{seq} \vert + 1)\}
d ( i ) = e ma x ( i − 1 ) e ma x ( i − 1 ) − e ma x ( i ) , i ∈ { 1 , 2 , … , ( ∣ E se q ∣ + 1 )}
接着是指定一个最小下降率p p p ,若d ( i ) < p d^{(i)} < p d ( i ) < p ,则认为e m a x ( i − 1 ) e_{max}^{(i-1)} e ma x ( i − 1 ) 所属的连续序列是正常值,否则还是异常值。
如上图所示,设p = 0.1 p=0.1 p = 0.1 ,Anomaly 2 的斜率为 ( 0.01396 – 0.01072 ) / 0.01396 = 0.23 > p (0.01396 – 0.01072)/0.01396 = 0.23 > p ( 0.01396–0.01072 ) /0.01396 = 0.23 > p ,因此依旧是异常,而 Anomaly 1 的斜率为 0.07 < p 0.07 < p 0.07 < p ,所以将其视为正常值。
这一流程的想法在于,如果异常值中的值与正常序列的最大值差距不大,则他们可能只是正常的抖动,而不是真正意义上的异常值。
代码实现
你可以在这里找到论文原文的代码:khundman/telemanom
由于论文原文中动态阈值部分的代码很大程度上与其他部分结合在了一起,导致阅读体验有所不佳。为了需要,我也仔细阅读了原文代码,参考原文代码对动态阈值部分进行了封装,并对几个点进行了修改。
Data and calculations for a specific window of prediction errors. Includes finding thresholds, pruning, and scoring anomalous sequences for errors and inverted errors (flipped around mean) - significant drops in values can also be anomalous.
异常窗口类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 class ErrorWindow (): def __init__ (self, e_s ): self.e_s = e_s self.i_anom = np.array([]) self.E_seq = np.array([]) self.non_anom_max = float ('-inf' ) self.sd_lim = 12.0 self.sd_threshold = self.sd_lim self.mean_e_s = np.mean(self.e_s) self.sd_e_s = np.std(self.e_s) self.epsilon = self.mean_e_s + self.sd_lim * self.sd_e_s self.p = 0.15 def find_epsilon (self ): '''寻找最佳的 z 值''' e_s = self.e_s max_score = float ('-inf' ) for z in np.arange(1.5 , self.sd_lim, 0.5 ): epsilon = self.mean_e_s + (self.sd_e_s * z) pruned_e_s = e_s[e_s < epsilon] i_anom = np.argwhere(e_s >= epsilon).reshape(-1 ) i_anom = np.sort(np.unique(i_anom)) if len (i_anom) > 0 : groups = [list (group) for group in mit.consecutive_groups(i_anom)] E_seq = [(g[0 ], g[-1 ]) for g in groups if not g[0 ] == g[-1 ]] mean_delta = (self.mean_e_s - np.mean(pruned_e_s)) / self.mean_e_s sd_delta = (self.sd_e_s - np.std(pruned_e_s)) / self.sd_e_s score = (mean_delta + sd_delta) / (len (E_seq)**2 + len (i_anom)) if score >= max_score and len (E_seq) < 6 and len (i_anom) < (len (e_s) / 2 ): max_score = score self.sd_threshold = z self.epsilon = self.mean_e_s + z * self.sd_e_s def compare_to_epsilon (self ): '''获取当前窗口小于阈值的最大值''' e_s = self.e_s epsilon = self.epsilon i_anom = np.argwhere(e_s >= epsilon).reshape(-1 ) if len (i_anom) == 0 : return i_anom = np.sort(np.unique(i_anom)) window_indices = np.setdiff1d(np.arange(0 , len (e_s)), i_anom) non_anom_max = np.max (np.take(e_s, window_indices)) groups = [list (group) for group in mit.consecutive_groups(i_anom)] E_seq = [(g[0 ], g[-1 ]) for g in groups if not g[0 ] == g[-1 ]] self.i_anom = i_anom self.E_seq = E_seq self.non_anom_max = non_anom_max def prune_anoms (self ): '''修剪异常点''' E_seq = self.E_seq e_s = self.e_s non_anom_max = self.non_anom_max if len (E_seq) == 0 : return E_seq_max = np.array([max (e_s[e[0 ]:e[1 ] + 1 ]) for e in E_seq]) E_seq_max_sorted = np.sort(E_seq_max)[::-1 ] E_seq_max_sorted = np.append(E_seq_max_sorted, [non_anom_max]) i_to_remove = np.array([]) for i in range (0 , len (E_seq_max_sorted) - 1 ): if (E_seq_max_sorted[i] - E_seq_max_sorted[i+1 ]) \ / E_seq_max_sorted[i] < self.p: i_to_remove = np.append( i_to_remove, np.argwhere(E_seq_max == E_seq_max_sorted[i])).astype(int ) else : i_to_remove = np.array([]) i_to_remove.sort() if len (i_to_remove) > 0 : E_seq = np.delete(E_seq, i_to_remove, axis=0 ) if len (E_seq) == 0 : self.i_anom = np.array([]) return indices_to_keep = np.concatenate( [range (e_seq[0 ], e_seq[1 ] + 1 ) for e_seq in E_seq]) mask = np.isin(self.i_anom, indices_to_keep) self.i_anom = self.i_anom[mask]
划窗异常检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def detect_anomaly (actual, pred, window_size: int ): '''异常检测 Args: ------- actual(array): 真实值 pred(array): 预测值 window_size(int): 窗口大小 Returns: ------- anomaly_list(List): 异常点序号 ''' e = abs (actual - pred) smoothing_window = int (window_size * 0.05 ) e_s = pd.DataFrame(e).ewm(span=smoothing_window).mean().values.flatten() anomaly_list = np.array([]) for i in range (len (e_s) // window_size): cur = np.array(e_s[i * window_size:(i + 1 ) * window_size]) window = ErrorWindow(cur) window.find_epsilon() window.compare_to_epsilon() if len (window.i_anom) == 0 : continue window.prune_anoms() if len (window.i_anom) == 0 : continue window.i_anom = np.sort(np.unique(window.i_anom)) anomaly_list = np.append(anomaly_list, window.i_anom + i * window_size).astype('int' ) return anomaly_list.tolist()
关于 inverted errors
在论文中没有提到的一个重要部分是「inverted errors」,也就是翻转的误差,具体来说也就是根据误差的均值线进行翻转,以此得到一个 inverted errors
。
1 e_s_inv = [mean_e_s + (mean_e_s - e) for e in e_s]
具体来说,原文代码针对 errors
和 inverted errors
这两个序列分别进行了异常检测的步骤:确定阈值,修剪异常点。
1 2 3 4 5 6 7 8 window.find_epsilon() window.find_epsilon(inverse=True ) window.compare_to_epsilon() window.compare_to_epsilon(inverse=True ) window.prune_anoms() window.prune_anoms(inverse=True )
但让我疑惑的点是,为什么要对翻转的序列再次进行异常检测?
可以预见的一种情况是:某个点相对附近一些点的误差非常小,但是在翻转之后,它就有很大的可能被认为是异常点。难道这个点不应该是正常点吗?在计算误差时,本身就使用了绝对值,为什么仍要根据误差的均值进行翻转?
这也是让我困惑的一点,因此在上面的代码中,我将 inverted errors
部分的代码去掉了。
关于 buffer
在原文代码中,另外还有一个点,也就是在上面的代码中被我注释掉的 buffer
缓冲区。事实上,我能理解但没完全理解。
具体来说,原文代码中将那些超出阈值的异常点检测出来之后,又进行了一步操作:将这个异常点前后 buffer
个元素加入异常点序列,然后在进行后面的步骤。
确实,异常点周围的点同样很可能也是异常点,即便它没有超过阈值。这样操作也可以为后面异常值的修剪做准备,我相信会有很大一批这样的点被修剪。
但是,两者之间异常点的检测数量差异仍然是巨大的,有很大一部分小误差点被认为是异常点,事实上也是如此。这也是让我纠结的一个点。获取调整窗口的值会让这些异常点的数量减少,但是这并不能解决根本问题。最后还是先注释掉吧。
参考资料