defdetect(self, values): result = np.array([]) for i inrange(0, len(values), self.series_window_size): if i + self.series_window_size > len(values): seg = values[i:len(values)] else: seg = values[i:i + self.series_window_size] anomaly = self.__detect_core(seg) result = np.concatenate((result, anomaly), axis=0) return result
defspectral_residual_transform(self, values): """ This method transform a time series into spectral residual series :param values: list. a list of float values. :return: mag: list. a list of float values as the spectral residual values """
@staticmethod defpredict_next(values): """ Predicts the next value by sum up the slope of the last value with previous values. Mathematically, g = 1/m * sum_{i=1}^{m} g(x_n, x_{n-i}), x_{n+1} = x_{n-m+1} + g * m, where g(x_i,x_j) = (x_i - x_j) / (i - j) :param values: list. a list of float numbers. :return : float. the predicted next value. """
iflen(values) <= 1: raise ValueError(f'data should contain at least 2 numbers')
v_last = values[-1] n = len(values)
slopes = [(v_last - v) / (n - 1 - i) for i, v inenumerate(values[:-1])]
return values[1] + sum(slopes)
@staticmethod defextend_series(values, extend_num=5, look_ahead=5): """ extend the array data by the predicted next value :param values: list. a list of float numbers. :param extend_num: int, default 5. number of values added to the back of data. :param look_ahead: int, default 5. number of previous values used in prediction. :return: list. The result array. """
if look_ahead < 1: raise ValueError('look_ahead must be at least 1')
defaverage_filter(values, n=3): """ Calculate the sliding window average for the give time series. Mathematically, res[i] = sum_{j=i-t+1}^{i} values[j] / t, where t = min(n, i+1) :param values: list. a list of float numbers :param n: int, default 3. window size. :return res: list. a list of value after the average_filter process. """
if n >= len(values): n = len(values)
res = np.cumsum(values, dtype=float) res[n:] = res[n:] - res[:-n] res[n:] = res[n:] / n