Skip to content

Commit 088592f

Browse files
author
AllenBaranov
committed
Added multivariate detector pipeline with formatting methods
1 parent f4c431e commit 088592f

File tree

11 files changed

+1121
-163
lines changed

11 files changed

+1121
-163
lines changed

sigllm/primitives/forecasting/huggingface.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
DEFAULT_PAD_TOKEN = '<pad>'
1515

1616
VALID_NUMBERS = list('0123456789')
17+
VALID_MULTIVARIATE_SYMBOLS = []
1718

1819
DEFAULT_MODEL = 'mistralai/Mistral-7B-Instruct-v0.2'
1920

@@ -53,6 +54,7 @@ def __init__(
5354
raw=False,
5455
samples=1,
5556
padding=0,
57+
multivariate_allowed_symbols = [],
5658
):
5759
self.name = name
5860
self.sep = sep
@@ -62,6 +64,7 @@ def __init__(
6264
self.raw = raw
6365
self.samples = samples
6466
self.padding = padding
67+
self.multivariate_allowed_symbols = multivariate_allowed_symbols
6568

6669
self.tokenizer = AutoTokenizer.from_pretrained(self.name, use_fast=False)
6770

@@ -85,6 +88,9 @@ def __init__(
8588
token = self.tokenizer.convert_tokens_to_ids(number)
8689
valid_tokens.append(token)
8790

91+
for symbol in self.multivariate_allowed_symbols:
92+
valid_tokens.append(self.tokenizer.convert_tokens_to_ids(symbol))
93+
8894
valid_tokens.append(self.tokenizer.convert_tokens_to_ids(self.sep))
8995
self.invalid_tokens = [
9096
[i] for i in range(len(self.tokenizer) - 1) if i not in valid_tokens
@@ -116,7 +122,7 @@ def forecast(self, X, **kwargs):
116122
tokenized_input = self.tokenizer([text], return_tensors='pt').to('cuda')
117123

118124
input_length = tokenized_input['input_ids'].shape[1]
119-
average_length = input_length / len(text.split(','))
125+
average_length = input_length / len(text.split(self.sep))
120126
max_tokens = (average_length + self.padding) * self.steps
121127

122128
generate_ids = self.model.generate(
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Multivariate formatting methods for time series data."""
2+
3+
from sigllm.primitives.formatting.multivariate_formatting import MultivariateFormattingMethod
4+
from sigllm.primitives.formatting.json_format import JSONFormat
5+
from sigllm.primitives.formatting.univariate_control import UnivariateControl
6+
from sigllm.primitives.formatting.persistence_control import PersistenceControl
7+
from sigllm.primitives.formatting.value_concatenation import ValueConcatenation
8+
from sigllm.primitives.formatting.value_interleave import ValueInterleave
9+
from sigllm.primitives.formatting.digit_interleave import DigitInterleave
10+
11+
__all__ = [
12+
'MultivariateFormattingMethod',
13+
'JSONFormat',
14+
'UnivariateControl',
15+
'PersistenceControl',
16+
'ValueConcatenation',
17+
'ValueInterleave',
18+
'DigitInterleave',
19+
]
20+
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from .multivariate_formatting import MultivariateFormattingMethod
2+
import numpy as np
3+
4+
5+
class DigitInterleave(MultivariateFormattingMethod):
6+
def __init__(self, verbose: bool = False, **kwargs):
7+
super().__init__("digit_interleave", verbose=verbose, **kwargs)
8+
9+
10+
def format_as_string(self, data: np.ndarray, digits_per_timestamp = 3, separator = ",") -> str:
11+
max_digits = max(len(str(abs(int(v)))) for window in data for ts in window for v in ts)
12+
width_used = max(digits_per_timestamp, max_digits)
13+
self.metadata['width_used'] = width_used
14+
15+
def interleave_digits(timestamp):
16+
str_values = [str(int(val)) for val in timestamp]
17+
padded_values = [s.zfill(width_used) for s in str_values]
18+
result_str = ''
19+
for digit_pos in range(width_used):
20+
for padded_val in padded_values:
21+
result_str += padded_val[digit_pos]
22+
23+
return result_str
24+
25+
result = [
26+
separator.join(interleave_digits(timestamp) for timestamp in window) + separator # Add comma at the end
27+
for window in data
28+
]
29+
return result
30+
31+
32+
def format_as_integer(self, data: list[str], separator = ",", trunc = None, digits_per_timestamp = 3) -> np.ndarray:
33+
width_used = self.metadata['width_used']
34+
35+
def deinterleave_timestamp(interleaved_str):
36+
"""Convert interleaved digits back to original values"""
37+
total_digits = len(interleaved_str)
38+
num_values = total_digits // width_used
39+
40+
# Reconstruct each original value
41+
values = []
42+
for value_idx in range(num_values):
43+
# Collect digits for this value from each position
44+
value_digits = []
45+
for digit_pos in range(width_used):
46+
# Calculate position in interleaved string
47+
pos = digit_pos * num_values + value_idx
48+
if pos < total_digits:
49+
value_digits.append(interleaved_str[pos])
50+
51+
if value_digits:
52+
values.append(int(''.join(value_digits)))
53+
54+
return np.array(values)[:trunc] if trunc else np.array(values)
55+
56+
result = np.array([
57+
[
58+
deinterleave_timestamp(timestamp)
59+
for sample in entry
60+
for timestamp in sample.lstrip(separator).rstrip(separator).split(separator)[:trunc]
61+
if timestamp.strip() # Skip empty strings
62+
]
63+
for entry in data
64+
], dtype=object)
65+
return result
66+
67+
68+
69+
if __name__ == "__main__":
70+
method = DigitInterleave(digits_per_timestamp=3)
71+
method.test_multivariate_formatting_validity(verbose=False)
72+
errs, y_hat, y = method.run_pipeline(return_y_hat=True)
73+
print(errs)
74+
print(y_hat)
75+
print(y)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from .multivariate_formatting import MultivariateFormattingMethod
2+
import numpy as np
3+
import re
4+
5+
class JSONFormat(MultivariateFormattingMethod):
6+
def __init__(self, verbose: bool = False, **kwargs):
7+
super().__init__("json_format", verbose=verbose, **kwargs)
8+
9+
def format_as_string(self, data: np.ndarray, separator = ",") -> str:
10+
def window_to_json(data):
11+
rows = []
12+
for row in data:
13+
parts = [f"d{i}:{val}" for i, val in enumerate(row)]
14+
rows.append(",".join(parts))
15+
return ",".join(rows)
16+
17+
out = [window_to_json(window) for window in data]
18+
return out
19+
20+
21+
def format_as_integer(self, data, trunc=None):
22+
batch_rows = []
23+
for window in data:
24+
samples = []
25+
for sample in window:
26+
tokens = re.findall(r'd\d+:\d+', sample)
27+
flat, current = [], []
28+
for token in tokens:
29+
key, val = token.split(":")
30+
if key == "d0" and current:
31+
flat.extend(current)
32+
current = []
33+
current.append(int(val))
34+
if current:
35+
flat.extend(current)
36+
if trunc:
37+
flat = flat[:trunc]
38+
samples.append(flat)
39+
batch_rows.append(samples)
40+
return np.array(batch_rows, dtype=object)
41+
42+
43+
44+
45+
if __name__ == "__main__":
46+
method = JSONFormat()
47+
method.test_multivariate_formatting_validity(verbose=False)
48+
method.run_pipeline(multivariate_allowed_symbols=["d", ":", ","])
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import numpy as np
2+
from mlblocks import MLPipeline
3+
import pandas as pd
4+
5+
class MultivariateFormattingMethod:
6+
def __init__(self, method_name: str, verbose: bool = False, **kwargs):
7+
self.method_name = method_name
8+
self.config = kwargs
9+
self.metadata = {}
10+
self.verbose = verbose
11+
12+
if self.method_name != "persistence_control":
13+
self.test_multivariate_formatting_validity(verbose=verbose)
14+
15+
16+
def format_as_string(self, data: np.ndarray, **kwargs) -> str:
17+
raise NotImplementedError()
18+
19+
20+
def format_as_integer(self, data: str, **kwargs) -> np.ndarray:
21+
raise NotImplementedError()
22+
23+
24+
def normalize_data(self, df: pd.DataFrame) -> pd.DataFrame:
25+
ts = df[["timestamp"]]
26+
vals = df.drop(columns=["timestamp"])
27+
normed = (vals - vals.mean(axis=0)) / vals.std(axis=0)
28+
return pd.concat([ts, normed], axis=1)[df.columns]
29+
30+
31+
@staticmethod
32+
def create_test_data(N = 25):
33+
x1 = np.linspace(10, 9+N, N) / 100
34+
x2 = np.array([i % 2 for i in range(N)])
35+
x3 = np.linspace(N+40, 41, N) / 100
36+
37+
return pd.DataFrame({
38+
'timestamp': np.linspace(0, 3600*(N-1), N),
39+
'x1': x1,
40+
'x2': x2,
41+
'x3': x3,
42+
})
43+
44+
45+
def run_pipeline(self, data=create_test_data(),
46+
interval=3600,
47+
window_size=15,
48+
verbose=True,
49+
samples=7,
50+
normalize=False,
51+
temp=0.1,
52+
return_y_hat = False,
53+
multivariate_allowed_symbols = [],
54+
pipeline_name = 'mistral_detector',
55+
stride = 1,
56+
n_clusters = 2,
57+
strategy = 'binning'):
58+
pipeline = MLPipeline(pipeline_name)
59+
digits_per_timestamp = self.config.get('digits_per_timestamp', 2)
60+
61+
test_hyperparameters = {
62+
"mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": {
63+
"interval": interval
64+
},
65+
"sigllm.primitives.forecasting.custom.rolling_window_sequences#1": {
66+
"target_column": 0,
67+
"window_size": window_size,
68+
"target_size": 1,
69+
"step_size": stride,
70+
},
71+
"sigllm.primitives.forecasting.huggingface.HF#1": {
72+
"samples": samples,
73+
"temp": temp,
74+
"multivariate_allowed_symbols": multivariate_allowed_symbols,
75+
},
76+
}
77+
78+
if strategy == 'binning':
79+
test_hyperparameters["sigllm.primitives.transformation.Float2Scalar#1"] = {
80+
"strategy": "binning",
81+
"n_clusters": n_clusters,
82+
}
83+
84+
elif strategy == 'scaling':
85+
test_hyperparameters["sigllm.primitives.transformation.Float2Scalar#1"] = {
86+
"strategy": "scaling",
87+
"decimal": digits_per_timestamp,
88+
"rescale": True,
89+
}
90+
else:
91+
raise ValueError(f"Invalid strategy: {strategy}")
92+
93+
pipeline.set_hyperparameters(test_hyperparameters)
94+
if normalize:
95+
data = self.normalize_data(data)
96+
context = pipeline.fit(data, start_=0, output_=3)
97+
context['X'] = self.format_as_string(context['X'], **self.config)
98+
99+
if self.method_name == "persistence_control":
100+
context['y_hat'] = context['X']
101+
102+
else:
103+
context = pipeline.fit(**context, start_=5, output_=5)
104+
105+
if verbose:
106+
print(f"y_hat example: {context['y_hat'][0][0]}")
107+
108+
context['y_hat'] = self.format_as_integer(context['y_hat'], trunc=1)
109+
if verbose:
110+
print(f"y_hat example: {context['y_hat'][0][0]}")
111+
context = pipeline.fit(**context, start_=7, output_=10)
112+
113+
errors = np.round(context['errors'], 7)
114+
MAE = np.mean(abs(errors))
115+
116+
if verbose:
117+
print(f"y_hat: {context['y_hat']}")
118+
print(f"y: {context['y']}")
119+
print(f"errors: {errors}")
120+
121+
if return_y_hat:
122+
return errors, context['y_hat'], context['y']
123+
else:
124+
return errors
125+
126+
127+
def test_multivariate_formatting_validity(self, data=None, verbose=False):
128+
if verbose:
129+
print("Testing multivariate formatting method validity")
130+
131+
if data is None:
132+
raw_data = np.array(self.create_test_data())[:, 1:]
133+
windowed_data = np.array([raw_data[i:i+15,:] for i in range(0, len(raw_data)-15, 1)])
134+
data = (1000 * windowed_data).astype(int)
135+
if verbose:
136+
print(data.shape)
137+
138+
string_data = self.format_as_string(data, **self.config)
139+
LLM_mock_output = np.array(string_data).reshape(-1, 1) # pretend only one sample
140+
if verbose:
141+
print(LLM_mock_output)
142+
integer_data = self.format_as_integer(LLM_mock_output, **self.config)
143+
if verbose:
144+
print(f"Format as string output: {string_data}")
145+
146+
assert isinstance(string_data, list)
147+
assert isinstance(string_data[0], str)
148+
assert isinstance(integer_data, np.ndarray)
149+
150+
if self.method_name == "univariate_control":
151+
assert np.all(integer_data.flatten() == data[:, :, 0].flatten())
152+
else:
153+
assert np.all(integer_data.flatten() == data.flatten())
154+
155+
if verbose:
156+
print("Validation suite passed")
157+
158+
159+
if __name__ == "__main__":
160+
method = MultivariateFormattingMethod(method_name="test")
161+
print(method.normalize_data(method.create_test_data()))
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from .multivariate_formatting import MultivariateFormattingMethod
2+
import numpy as np
3+
4+
5+
class PersistenceControl(MultivariateFormattingMethod):
6+
def __init__(self, verbose: bool = False, **kwargs):
7+
super().__init__("persistence_control", verbose=verbose, **kwargs)
8+
9+
def format_as_string(self, data: np.ndarray, separator = ",") -> str:
10+
result = []
11+
for row in data[:, :, 0]:
12+
result.append(separator.join(map(str, row.flatten())))
13+
return result
14+
15+
def format_as_integer(self, data: list[str], separator = ",", trunc = None) -> np.ndarray:
16+
result = [
17+
[np.array([int(x) for x in entry.lstrip(separator).split(separator) if x])[-1:]]
18+
for entry in data
19+
]
20+
out = np.array(result, dtype=object)
21+
return out
22+
23+
24+
25+
if __name__ == "__main__":
26+
method = PersistenceControl()
27+
method.run_pipeline(stride=5)

0 commit comments

Comments
 (0)