Skip to content
Merged
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 254 additions & 4 deletions msticpy/analysis/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@
outlier events in a single data set or using one data set as training
data and another on which to predict outliers.

In addition, Robust Random Cut Forest implemented after msticpy 2.16.x.
It's slower than SkLearn Isolation Forest however more considerable for
time series structure and can also process by streaming.

importted Robust Random Cut Forest python library
MIT License Copyright (c) 2018 kLabUM
https://klabum.github.io/rrcf/

"""

import math
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from tqdm.auto import tqdm

from .._version import VERSION
from ..common.exceptions import MsticpyImportExtraError
Expand All @@ -32,6 +42,16 @@
extra="ml",
) from imp_err

try:
import rrcf
except ImportError as imp_err:
raise MsticpyImportExtraError(
"To use RRCF algorism feature for anomaly detection,\
'pip install rrcf==0.4.4'",
title="Error importing rrcf (Robust Random Cut Forest)",
extra="ml",
) from imp_err

__version__ = VERSION
__author__ = "Ian Hellen, Tatsuya Hasegawa"

Expand All @@ -42,6 +62,7 @@ def identify_outliers(
x_predict: np.ndarray,
contamination: float = 0.05,
max_features: Optional[Union[int, float]] = None,
max_samples: Optional[Union[int, float]] = None,
) -> Tuple[IsolationForest, np.ndarray, np.ndarray]:
"""
Identify outlier items using SkLearn IsolationForest.
Expand All @@ -58,11 +79,15 @@ def identify_outliers(
Specifies max num or max rate of features
to be randomly selected when building each tree.
default: None => {math.floor(math.sqrt(cols))}
max_samples: int or float, optional
default: None => { min(100,rows) }
if you want to use more than 100 samples,
please specify via this option.

Returns
-------
Tuple[IsolationForest, np.ndarray, np.ndarray]
IsolationForest model, X_Outliers,
IsolationForest model, x_Outliers,
y_pred_outliers

"""
Expand All @@ -71,12 +96,20 @@ def identify_outliers(

# fit the model
rows, cols = x.shape
max_samples = min(100, rows)

# Determine sample size
if max_samples is None:
n_samples = min(100, rows)
elif isinstance(max_samples, float) and max_samples < 1: # rate: 0.1 - 1
n_samples = int(max_samples * rows)
else:
n_samples = min(max_samples, rows)

if not max_features:
max_features = math.floor(math.sqrt(cols))

clf = IsolationForest(
max_samples=max_samples,
max_samples=n_samples,
max_features=max_features,
random_state=rng,
contamination=contamination,
Expand All @@ -92,9 +125,226 @@ def identify_outliers(
return clf, x_outliers, y_pred_outliers


class RobustRandomCutForest:
"""
RobustRandomCutForest Class used in identify_outliers_rrcf().

It is similar structure to SKlearn IsolationForest.
"""

def __init__(
self,
num_trees: int = 100,
tree_size: int = 256,
contamination: float = 0.05,
max_samples: Optional[Union[int, float]] = None,
max_features: Optional[Union[int, float]] = None,
):
"""
Initialize Robust Random Cut Forest model.

Parameters
----------
num_trees: Number of trees in the forest
tree_size: Maximum number of points in each tree
contamination: Proportion of outliers in the data
max_samples: Number of samples to build each tree
max_features: Number of features to consider for splitting
"""
self.num_trees = num_trees
self.tree_size = tree_size
self.contamination = contamination
self.max_samples = max_samples
self.max_features = max_features
self.forest = []
self._feature_indices = None
self.n_features_in_ = None
self._train_scores = None
self.x_train = None

def _select_features(self, cols: int) -> np.ndarray:
"""Randomly select features for each tree."""
if self.max_features is None:
self.max_features = math.floor(math.sqrt(cols))

rng = np.random.RandomState(42)
return rng.choice(cols, size=self.max_features, replace=False)

def fit(self, x: np.ndarray) -> "RobustRandomCutForest":
"""Build the forest from training data."""
self.x_train = x.copy()
rows, cols = x.shape
self.n_features_in_ = cols
self._feature_indices = self._select_features(cols)
x_sub = x[:, self._feature_indices]

# Determine sample size
if self.max_samples is None:
n_samples = min(100, rows)
elif (
isinstance(self.max_samples, float) and self.max_samples < 1
): # rate: 0.1 - 1
n_samples = int(self.max_samples * rows)
else:
n_samples = min(self.max_samples, rows)

# Building Tree
self.forest = []
self._train_scores = np.zeros(n_samples)
for _ in tqdm(range(self.num_trees), desc="Building trees"):
tree = rrcf.RCTree()
self.forest.append(tree)
for index, point in enumerate(x_sub):
tree.insert_point(point, index=index)
self._train_scores[index] += tree.codisp(index)

self._train_scores /= self.num_trees

return self

def decision_function(self, x: np.ndarray) -> np.ndarray:
"""Compute anomaly scores for input data."""
# Score aggrigation in tree by parallel job.
print("Warning: RRCF decision_function may be too slow so takes long time.")
print("Warning: its Parallel jobs will use full CPU cores..")
print(
"Advice: Make sure your max_features and max_samples are what you really need."
)

x_sub = x[:, self._feature_indices]
n_points = x_sub.shape[0]

# Create batches
# batch_size: Number of points to process at once
batch_size = 500
batches = [
(start, min(start + batch_size, n_points))
for start in range(0, n_points, batch_size)
]

# Process trees in parallel
# n_jobs: Number of parallel jobs (-1 = use all cores)
tree_scores = Parallel(n_jobs=-1)(
delayed(self._process_tree)(tree, x_sub, batches)
for tree in tqdm(self.forest, desc="Scoring trees")
)

# Aggregate scores
scores = np.sum(tree_scores, axis=0) / self.num_trees
return scores # Lower = less anomalous, higher = more anomalous

def _process_tree(
self, tree: rrcf.RCTree, x_sub: np.ndarray, batches: list
) -> np.ndarray:
"""Process a single tree with batched operations."""
scores = np.zeros(x_sub.shape[0])
for start, end in batches:
batch = x_sub[start:end]
temp_indices = np.arange(1000000 + start, 1000000 + end)

# Insert batch
for idx, point in zip(temp_indices, batch):
tree.insert_point(point, index=idx)

# Calculate CoDisp
for i, idx in enumerate(temp_indices):
scores[start + i] = tree.codisp(idx)

# Remove batch
for idx in temp_indices:
tree.forget_point(idx)

return scores

def predict(self, x: Optional[np.ndarray] = None) -> np.ndarray:
"""Predict anomaly labels (-1 for outliers, 1 for inliers)."""
if x is None or np.array_equal(x, self.x_train):
scores = self._train_scores
else:
scores = self.decision_function(x)

# Calculate number of anomalies
n_anomalies = int(np.ceil(len(scores) * self.contamination))

# Get indices of top anomalies
anomaly_indices = np.argpartition(scores, -n_anomalies)[-n_anomalies:]

# Create label array
labels = np.ones(len(scores), dtype=int)
labels[anomaly_indices] = -1

return labels


# pylint: disable=invalid-name
def identify_outliers_rrcf(
x: np.ndarray,
x_predict: np.ndarray,
contamination: float = 0.05,
max_features: Optional[Union[int, float]] = None,
num_trees: int = 100,
tree_size: int = 256,
max_samples: Optional[Union[int, float]] = None,
) -> Tuple[RobustRandomCutForest, np.ndarray, np.ndarray]:
"""
Identify outlier items using RobustRandomCutForest.

MIT License Copyright (c) 2018 kLabUM.
(https://klabum.github.io/rrcf/).

Arguments:
---------
x : np.ndarray
Input data
x_predict : np.ndarray
Model
contamination : float
Percentage contamination (default: {0.05})
max_features : int or float, optional
Specifies max num or max rate of features
to be randomly selected when building each tree.
default: None => {math.floor(math.sqrt(cols))}
max_samples: int or float, optional
default: None => { min(100,rows) }
if you want to use more than 100 samples,
please specify via this option.

Returns
-------
Tuple[RobustRandomCutForest, np.ndarray, np.ndarray]
RobustRandomCutForest model, x_Outliers,
y_pred_outliers

"""
# pylint: disable=no-member

# fit the model
rows, cols = x.shape
max_samples = min(100, rows)
if not max_features:
max_features = math.floor(math.sqrt(cols))

clf = RobustRandomCutForest(
num_trees=num_trees,
tree_size=tree_size,
contamination=contamination,
max_features=max_features,
max_samples=max_samples,
)

# fit and train the model
clf.fit(x)
clf.predict(x)

y_pred_outliers = clf.predict(x_predict)

x_outliers = x_predict[y_pred_outliers == -1]
return clf, x_outliers, y_pred_outliers


# pylint: disable=too-many-arguments, too-many-locals
def plot_outlier_results(
clf: IsolationForest,
clf: Union[IsolationForest, RobustRandomCutForest],
x: np.ndarray,
x_predict: np.ndarray,
x_outliers: np.ndarray,
Expand Down
Loading