Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions tad/anomaly_detect_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _process_long_term_data(data, period, granularity, piecewise_median_period_w

all_data = []
# Subset x into piecewise_median_period_weeks chunks
for i in range(1, data.size + 1, num_obs_in_period):
for i in range(0, data.size, num_obs_in_period):
start_date = data.index[i]
# if there is at least 14 days left, subset it, otherwise subset last_date - 14 days
end_date = start_date + datetime.timedelta(days=num_days_in_period)
Expand Down Expand Up @@ -437,6 +437,8 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N

if alpha < 0.01 or alpha > 0.1:
logger.warning('alpha is the statistical significance and is usually between 0.01 and 0.1')
if period_override is None:
logger.warning('period_override can be left ONLY if the series has time interval of 1-unit, e.g. 1min, 1hour etc., MUST be set for others e.g. 5min.')

data, period, granularity = _get_data_tuple(x, period_override, resampling)
if granularity is 'day':
Expand All @@ -449,13 +451,24 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N
all_data = _process_long_term_data(data, period, granularity, piecewise_median_period_weeks) if longterm else [data]
all_anoms = pd.Series()
seasonal_plus_trend = pd.Series()
max_abs_norm_residual_array = pd.Series()
critical_value_array = pd.Series()

if verbose:
print(f'data size:{data.size}, period:{period}, granularity:{granularity}, piecewise_median_period_week:{piecewise_median_period_weeks}, sonly_last:{only_last}, max_anoms:{max_anoms}.')
print(f'all_data len:{len(all_data)}')
print(f'all_data 0: {all_data[0]}')
print(f'all_data 1: {all_data[1]}')
print(f'all_data -1: {all_data[-1]}')

# Detect anomalies on all data (either entire data in one-pass, or in 2 week blocks if longterm=True)
for series in all_data:
shesd = _detect_anoms(series, k=max_anoms, alpha=alpha, num_obs_per_period=period, use_decomp=True,
use_esd=False, direction=direction, verbose=verbose)
shesd_anoms = shesd['anoms']
shesd_stl = shesd['stl']
t_max_abs_norm_residual_array = shesd['max_abs_norm_residual_array']
t_critical_value_array = shesd['critical_value_array']

# -- Step 3: Use detected anomaly timestamps to extract the actual anomalies (timestamp and value) from the data
anoms = pd.Series() if shesd_anoms.empty else series.loc[shesd_anoms.index]
Expand All @@ -467,16 +480,19 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N
anoms = _perform_threshold_filter(anoms, periodic_max, threshold)

all_anoms = all_anoms.append(anoms)
max_abs_norm_residual_array = max_abs_norm_residual_array.append(t_max_abs_norm_residual_array)
critical_value_array = critical_value_array.append(t_critical_value_array)
seasonal_plus_trend = seasonal_plus_trend.append(shesd_stl)

# De-dupe
all_anoms.drop_duplicates(inplace=True)
seasonal_plus_trend.drop_duplicates(inplace=True)
all_anoms = all_anoms.loc[~all_anoms.index.duplicated(keep='first')]
seasonal_plus_trend = seasonal_plus_trend.loc[~seasonal_plus_trend.index.duplicated(keep='first')]
max_abs_norm_residual_array= max_abs_norm_residual_array.loc[~max_abs_norm_residual_array.index.duplicated(keep='first')]
critical_value_array = critical_value_array.loc[~critical_value_array.index.duplicated(keep='first')]

# If only_last is specified, create a subset of the data corresponding to the most recent day or hour
if only_last:
all_anoms = _get_only_last_results(
data, all_anoms, granularity, only_last)
all_anoms = _get_only_last_results(data, all_anoms, granularity, only_last)

# If there are no anoms, log it and return an empty anoms result
if all_anoms.empty:
Expand All @@ -498,7 +514,9 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N
return {
'anoms': all_anoms,
'expected': seasonal_plus_trend if e_value else None,
'plot': 'TODO' if plot else None
'plot': 'TODO' if plot else None,
'max_abs_norm_residual_array': max_abs_norm_residual_array,
'critical_value_array': critical_value_array
}


Expand Down Expand Up @@ -543,8 +561,11 @@ def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,
data, smoothed = _get_decomposed_data_tuple(data, num_obs_per_period)

max_outliers = _get_max_outliers(data, k)
length_x = data.size

R_idx = pd.Series()
max_abs_norm_residual_array = pd.Series(np.full(length_x, np.nan), index=data.index)
critical_value_array = pd.Series(np.full(length_x, np.nan), index=data.index)

n = data.size
# Compute test statistic until r=max_outliers values have been
Expand All @@ -565,7 +586,8 @@ def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,

ares = ares / data.mad()

tmp_anom_index = ares[ares.values == ares.max()].index
ares_max = ares.max()
tmp_anom_index = ares[ares.values == ares_max].index
cand = pd.Series(data.loc[tmp_anom_index], index=tmp_anom_index)

data.drop(tmp_anom_index, inplace=True)
Expand All @@ -575,10 +597,14 @@ def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,
alpha / (2 * (n - i + 1)))
t = sp.stats.t.ppf(p, n - i - 1)
lam = t * (n - i) / np.sqrt((n - i - 1 + t ** 2) * (n - i + 1))
if ares.max() > lam:
max_abs_norm_residual_array.loc[tmp_anom_index] = ares_max
critical_value_array.loc[tmp_anom_index] = lam
if ares_max > lam:
R_idx = R_idx.append(cand)

return {
'anoms': R_idx,
'stl': smoothed
'stl': smoothed,
'max_abs_norm_residual_array': max_abs_norm_residual_array,
'critical_value_array': critical_value_array
}