diff --git a/pyproject.toml b/pyproject.toml index 92062a8..8b83077 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -204,4 +204,11 @@ convention = "google" [tool.ruff.lint.pycodestyle] max-doc-length = 99 -max-line-length = 99 \ No newline at end of file +max-line-length = 99 + +[tool.coverage.report] +exclude_also = [ + "def __repr__", + "def main", + "if __name__ == .__main__.:" +] diff --git a/sigllm/benchmark.py b/sigllm/benchmark.py new file mode 100644 index 0000000..2adf3c8 --- /dev/null +++ b/sigllm/benchmark.py @@ -0,0 +1,428 @@ +# -*- coding: utf-8 -*- + +import argparse +import ast +import concurrent +import gc +import json +import logging +import os +import uuid +import warnings +from copy import deepcopy +from datetime import datetime +from functools import partial +from glob import glob +from pathlib import Path + +import numpy as np +import pandas as pd +import torch +import tqdm +from mlblocks import get_pipelines_paths +from orion.benchmark import _load_signal, _parse_confusion_matrix, _sort_leaderboard +from orion.data import load_anomalies +from orion.evaluation import CONTEXTUAL_METRICS as METRICS +from orion.evaluation import contextual_confusion_matrix +from orion.progress import TqdmLogger + +from sigllm import SigLLM +from sigllm.data import load_normal + +warnings.simplefilter('ignore') + +LOGGER = logging.getLogger(__name__) + +BUCKET = 'sintel-sigllm' +S3_URL = 'https://{}.s3.amazonaws.com/{}' + +BENCHMARK_PATH = os.path.join( + os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'), 'benchmark' +) + +BENCHMARK_DATA = ( + pd.read_csv(S3_URL.format(BUCKET, 'datasets.csv'), index_col=0, header=None) + .applymap(ast.literal_eval) + .to_dict()[1] +) +BENCHMARK_PARAMS = ( + pd.read_csv(S3_URL.format(BUCKET, 'parameters.csv'), index_col=0, header=None) + .applymap(ast.literal_eval) + .to_dict()[1] +) + +PIPELINE_DIR = os.path.join(os.path.dirname(__file__), 'pipelines') + +PIPELINES = { + 'mistral_prompter_restricted': 'mistral_prompter', + 'mistral_prompter_0shot': 'mistral_prompter_0shot', + 'mistral_prompter_1shot': 'mistral_prompter_1shot', +} + + +def _get_pipeline_directory(pipeline_name): + if os.path.isfile(pipeline_name): + return os.path.dirname(pipeline_name) + + pipelines_paths = get_pipelines_paths() + for base_path in pipelines_paths: + parts = pipeline_name.split('.') + number_of_parts = len(parts) + + for folder_parts in range(number_of_parts): + folder = os.path.join(base_path, *parts[:folder_parts]) + filename = '.'.join(parts[folder_parts:]) + '.json' + json_path = os.path.join(folder, filename) + + if os.path.isfile(json_path): + return os.path.dirname(json_path) + + +def _get_pipeline_hyperparameter(hyperparameters, dataset_name, pipeline_name): + hyperparameters_ = deepcopy(hyperparameters) + + if isinstance(hyperparameters, dict): + hyperparameters_ = hyperparameters_.get(dataset_name) or hyperparameters_ + hyperparameters_ = hyperparameters_.get(pipeline_name) or hyperparameters_ + + elif isinstance(hyperparameters_, str) and os.path.exists(hyperparameters_): + with open(hyperparameters_) as f: + hyperparameters_ = json.load(f) + + elif hyperparameters_ is None and dataset_name and pipeline_name: + pipeline_path = _get_pipeline_directory(pipeline_name) + if pipeline_path: + pipeline_dirname = os.path.basename(pipeline_path) + file_path = os.path.join( + pipeline_path, pipeline_dirname + '_' + dataset_name.lower() + '.json' + ) + if os.path.exists(file_path): + hyperparameters_ = json.load(file_path) + + return hyperparameters_ + + +def _augment_hyperparameters(hyperparameters, few_shot): + hyperparameters_ = deepcopy(hyperparameters) + if few_shot: + for hyperparameter, value in hyperparameters.items(): + if 'time_segments_aggregate' in hyperparameter: + name = hyperparameter[:-1] + number = int(hyperparameter[-1]) + 1 + hyperparameters_[name + str(number)] = value + + return hyperparameters_ + + +def _evaluate_signal( + pipeline, signal, hyperparameter, metrics, test_split=False, few_shot=False, anomaly_path=None +): + _, test = _load_signal(signal, test_split) + truth = load_anomalies(signal) + + normal = None + if few_shot: + normal = load_normal(signal) + + try: + LOGGER.info( + 'Scoring pipeline %s on signal %s (test split: %s)', pipeline, signal, test_split + ) + + start = datetime.utcnow() + pipeline = SigLLM(pipeline, hyperparameters=hyperparameter) + anomalies = pipeline.detect(test, normal=normal) + elapsed = datetime.utcnow() - start + + scores = {name: scorer(truth, anomalies, test) for name, scorer in metrics.items()} + + status = 'OK' + + except Exception as ex: + LOGGER.exception( + 'Exception scoring pipeline %s on signal %s (test split: %s), error %s.', + pipeline, + signal, + test_split, + ex, + ) + + elapsed = datetime.utcnow() - start + anomalies = pd.DataFrame([], columns=['start', 'end', 'score']) + scores = {name: 0 for name in metrics.keys()} + + status = 'ERROR' + + if 'confusion_matrix' in metrics.keys(): + _parse_confusion_matrix(scores, truth) + + scores['status'] = status + scores['elapsed'] = elapsed.total_seconds() + scores['split'] = test_split + + if anomaly_path: + anomalies.to_csv(anomaly_path, index=False) + + del pipeline + torch.cuda.empty_cache() + gc.collect() + return scores + + +def _run_job(args): + # Reset random seed + np.random.seed() + + ( + pipeline, + pipeline_name, + dataset, + signal, + hyperparameter, + metrics, + test_split, + few_shot, + iteration, + cache_dir, + anomaly_dir, + run_id, + ) = args + + anomaly_path = anomaly_dir + if anomaly_dir: + base_path = str(anomaly_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}') + anomaly_path = base_path + '_anomalies.csv' + + LOGGER.info( + 'Evaluating pipeline %s on signal %s dataset %s (test split: %s); iteration %s', + pipeline_name, + signal, + dataset, + test_split, + iteration, + ) + + output = _evaluate_signal( + pipeline, signal, hyperparameter, metrics, test_split, few_shot, anomaly_path + ) + scores = pd.DataFrame.from_records([output], columns=output.keys()) + + scores.insert(0, 'dataset', dataset) + scores.insert(1, 'pipeline', pipeline_name) + scores.insert(2, 'signal', signal) + scores.insert(3, 'iteration', iteration) + scores['run_id'] = run_id + + if cache_dir: + base_path = str(cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}_{run_id}') + scores.to_csv(base_path + '_scores.csv', index=False) + + return scores + + +def benchmark( + pipelines=None, + datasets=None, + hyperparameters=None, + metrics=METRICS, + rank='f1', + test_split=False, + iterations=1, + workers=1, + show_progress=False, + cache_dir=None, + anomaly_dir=None, + resume=False, + output_path=None, +): + """Run pipelines on the given datasets and evaluate the performance. + + The pipelines are used to analyze the given signals and later on the + detected anomalies are scored against the known anomalies using the + indicated metrics. + + Finally, the scores obtained with each metric are averaged accross all the signals, + ranked by the indicated metric and returned on a ``pandas.DataFrame``. + + Args: + pipelines (dict or list): dictionary with pipeline names as keys and their + JSON paths as values. If a list is given, it should be of JSON paths, + and the paths themselves will be used as names. If not give, all verified + pipelines will be used for evaluation. + datasets (dict or list): dictionary of dataset name as keys and list of signals as + values. If a list is given then it will be under a generic name ``dataset``. + If not given, all benchmark datasets will be used used. + hyperparameters (dict or list): dictionary with pipeline names as keys + and their hyperparameter JSON paths or dictionaries as values. If a list is + given, it should be of corresponding order to pipelines. + metrics (dict or list): dictionary with metric names as keys and + scoring functions as values. If a list is given, it should be of scoring + functions, and they ``__name__`` value will be used as the metric name. + If not given, all the available metrics will be used. + rank (str): Sort and rank the pipelines based on the given metric. + If not given, rank using the first metric. + test_split (bool or float): Whether to use the prespecified train-test split. If + float, then it should be between 0.0 and 1.0 and represent the proportion of + the signal to include in the test split. If not given, use ``False``. + iterations (int): + Number of iterations to perform over each signal and pipeline. Defaults to 1. + workers (int): + If ``workers`` is given as an integer value other than 0 or 1, a multiprocessing + Pool is used to distribute the computation across the indicated number of workers. + show_progress (bool): + Whether to use tqdm to keep track of the progress. Defaults to ``True``. + cache_dir (str): + If a ``cache_dir`` is given, intermediate results are stored in the indicated directory + as CSV files as they get computted. This allows inspecting results while the benchmark + is still running and also recovering results in case the process does not finish + properly. Defaults to ``None``. + anomaly_dir (str): + If a ``anomaly_dir`` is given, detected anomalies will get dumped in the specificed + directory as csv files. Defaults to ``None``. + resume (bool): + Whether to continue running the experiments in the benchmark from the current + progress in ``cache_dir``. + output_path (str): Location to save the intermediatry results. If not given, + intermediatry results will not be saved. + + Returns: + pandas.DataFrame: + A table containing the scores obtained with each scoring function accross + all the signals for each pipeline. + """ + pipelines = pipelines or PIPELINES + datasets = datasets or BENCHMARK_DATA + run_id = os.getenv('RUN_ID') or str(uuid.uuid4())[:10] + + if isinstance(pipelines, list): + pipelines = {pipeline: pipeline for pipeline in pipelines} + + if isinstance(datasets, list): + datasets = {'dataset': datasets} + + if isinstance(hyperparameters, list): + hyperparameters = { + pipeline: hyperparameter + for pipeline, hyperparameter in zip(pipelines.keys(), hyperparameters) + } + + if isinstance(metrics, list): + metrics_ = dict() + for metric in metrics: + if callable(metric): + metrics_[metric.__name__] = metric + elif metric in METRICS: + metrics_[metric] = METRICS[metric] + else: + raise ValueError('Unknown metric: {}'.format(metric)) + + metrics = metrics_ + + if cache_dir: + cache_dir = Path(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + + if anomaly_dir: + anomaly_dir = Path(anomaly_dir) + os.makedirs(anomaly_dir, exist_ok=True) + + jobs = list() + for dataset, signals in datasets.items(): + for pipeline_name, pipeline in pipelines.items(): + hyperparameter = _get_pipeline_hyperparameter(hyperparameters, dataset, pipeline) + parameters = BENCHMARK_PARAMS.get(dataset) + few_shot = True if '1shot' in pipeline.lower() else False + hyperparameter = _augment_hyperparameters(hyperparameter, few_shot) + if parameters is not None: + (test_split,) = parameters.values() + + for signal in signals: + for iteration in range(iterations): + if resume: + experiment = str( + cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}' + ) + if len(glob(experiment + '*.csv')) > 0: + LOGGER.warning(f'skipping {experiment}') + continue + + args = ( + pipeline, + pipeline_name, + dataset, + signal, + hyperparameter, + metrics, + test_split, + few_shot, + iteration, + cache_dir, + anomaly_dir, + run_id, + ) + jobs.append(args) + + if workers in (0, 1): + scores = map(_run_job, jobs) + else: + pool = concurrent.futures.ProcessPoolExecutor(workers) + scores = pool.map(_run_job, jobs) + + scores = tqdm.tqdm(scores, total=len(jobs), file=TqdmLogger()) + if show_progress: + scores = tqdm.tqdm(scores, total=len(jobs)) + + if scores: + scores = pd.concat(scores) + if output_path: + LOGGER.info('Saving benchmark report to %s', output_path) + scores.to_csv(output_path, index=False) + + return _sort_leaderboard(scores, rank, metrics) + + LOGGER.info('No scores to be recorded.') + return pd.DataFrame() + + +def main(pipelines, datasets, resume, workers, output_path, cache_dir, anomaly_dir, **kwargs): + """Main to call benchmark function.""" + # output path + output_path = os.path.join(BENCHMARK_PATH, output_path) + + # metrics + del METRICS['accuracy'] + METRICS['confusion_matrix'] = contextual_confusion_matrix + metrics = {k: partial(fun, weighted=False) for k, fun in METRICS.items()} + + results = benchmark( + pipelines=pipelines, + datasets=datasets, + metrics=metrics, + output_path=output_path, + workers=workers, + resume=resume, + cache_dir=cache_dir, + anomaly_dir=anomaly_dir, + ) + + return results + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('-p', '--pipelines', nargs='+', type=str, default=PIPELINES) + parser.add_argument('-d', '--datasets', nargs='+', type=str, default=BENCHMARK_DATA) + parser.add_argument('-r', '--resume', type=bool, default=False) + parser.add_argument('-w', '--workers', default=1) + + parser.add_argument('-o', '--output_path', type=str, default='results.csv') + parser.add_argument('-c', '--cache_dir', type=str, default='cache') + parser.add_argument('-ad', '--anomaly_dir', type=str, default='anomaly_dir') + + config = parser.parse_args() + + if any([dataset in BENCHMARK_DATA.keys() for dataset in config.datasets]): + config.datasets = dict((dataset, BENCHMARK_DATA[dataset]) for dataset in config.datasets) + + results = main(**vars(config)) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py new file mode 100644 index 0000000..1ebe4a7 --- /dev/null +++ b/tests/test_benchmark.py @@ -0,0 +1,820 @@ +import json +import os +import shutil +import tempfile +from pathlib import Path +from unittest import TestCase +from unittest.mock import Mock, call, mock_open, patch + +import pandas as pd +from orion.evaluation import CONTEXTUAL_METRICS as METRICS +from orion.evaluation import contextual_confusion_matrix +from pytest import fixture + +from sigllm.benchmark import ( + _augment_hyperparameters, + _evaluate_signal, + _get_pipeline_directory, + _get_pipeline_hyperparameter, + _run_job, + benchmark, +) + + +@fixture +def base_hyperparameters(): + return { + 'time_segments_aggregate#1': {'interval': 1}, + 'rolling_window#1': {'window_size': 100}, + 'other_param': 'other_value', + } + + +def test__augment_hyperparameters_without_few_shot(base_hyperparameters): + result = _augment_hyperparameters(base_hyperparameters, few_shot=False) + + assert result == base_hyperparameters + + +def test__augment_hyperparameters_with_few_shot(base_hyperparameters): + result = _augment_hyperparameters(base_hyperparameters, few_shot=True) + + assert len(result) == 4 + assert result['time_segments_aggregate#1'] == base_hyperparameters['time_segments_aggregate#1'] + assert result['time_segments_aggregate#2'] == base_hyperparameters['time_segments_aggregate#1'] + assert result['rolling_window#1'] == base_hyperparameters['rolling_window#1'] + assert result['other_param'] == base_hyperparameters['other_param'] + + +def test__augment_hyperparameters_with_empty_hyperparameters(base_hyperparameters): + result = _augment_hyperparameters({}, few_shot=True) + + assert result == {} + + +def test__augment_hyperparameters_with_multiple_aggregate_params(): + hyperparameters = { + 'time_segments_aggregate#1': 'value1', + 'time_segments_aggregate#3': 'value3', + 'other_param': 'other_value', + } + + result = _augment_hyperparameters(hyperparameters, few_shot=True) + + assert result['time_segments_aggregate#1'] == 'value1' + assert result['time_segments_aggregate#2'] == 'value1' + assert result['time_segments_aggregate#3'] == 'value3' + assert result['time_segments_aggregate#4'] == 'value3' + assert result['other_param'] == 'other_value' + + +@patch('os.path.isfile') +def test__get_pipeline_directory(mock_isfile): + pipeline_path = '/fake/path/pipeline_dir/pipeline.json' + mock_isfile.return_value = True + + expected = '/fake/path/pipeline_dir' + result = _get_pipeline_directory(pipeline_path) + + assert result == expected + + +@patch('sigllm.benchmark.get_pipelines_paths') +@patch('os.path.isfile') +def test__get_pipeline_directory_search_pipeline_paths(mock_isfile, mock_pipeline_paths): + pipeline_name = 'pipeline' + base_path = '/fake/path/pipeline/' + mock_pipeline_paths.return_value = [base_path] + + # first isfile check should be False to enter the pipeline paths loop + # second isfile check should be True when finding the json file + mock_isfile.side_effect = [False, True] + + expected = '/fake/path/pipeline' + result = _get_pipeline_directory(pipeline_name) + + assert result == expected + + mock_isfile.assert_has_calls([ + call(pipeline_name), + call(os.path.join(base_path, 'pipeline.json')), + ]) + mock_pipeline_paths.assert_called_once() + + +class TestGetPipelineHyperparameter(TestCase): + @classmethod + def setup_class(cls): + cls.pipeline_name = 'test_pipeline' + cls.dataset_name = 'test_dataset' + + cls.temp_dir = tempfile.mkdtemp() + + cls.base_hyperparameters = {'default_param': 'default_value'} + + cls.dataset_hyperparameters = {'test_dataset': {'dataset_param': 'dataset_value'}} + + cls.pipeline_hyperparameters = { + 'test_dataset': {'test_pipeline': {'pipeline_param': 'pipeline_value'}} + } + + def test__get_pipeline_hyperparameter_exist_direct(self): + """Test when hyperparameters are directly provided""" + hyperparameters = {'param1': 'value1', 'param2': 'value2'} + + result = _get_pipeline_hyperparameter( + hyperparameters, self.dataset_name, self.pipeline_name + ) + + self.assertEqual(result, hyperparameters) + + def test__get_pipeline_hyperparameter_exist_nested(self): + """Test when hyperparameters are nested by dataset and pipeline""" + hyperparameters = { + 'test_dataset': {'test_pipeline': {'param1': 'value1', 'param2': 'value2'}} + } + + result = _get_pipeline_hyperparameter( + hyperparameters, self.dataset_name, self.pipeline_name + ) + + expected = {'param1': 'value1', 'param2': 'value2'} + self.assertEqual(result, expected) + + def test__get_pipeline_hyperparameter_do_not_exist(self): + """Test when no hyperparameters are provided""" + result = _get_pipeline_hyperparameter(None, self.dataset_name, self.pipeline_name) + + self.assertIsNone(result) + + def test__get_pipeline_hyperparameter_dataset_dependent_hyperparameters(self): + """Test when hyperparameters are dataset-specific""" + hyperparameters = { + 'test_dataset': {'param1': 'dataset_value1', 'param2': 'dataset_value2'}, + 'other_dataset': {'param1': 'other_value1', 'param2': 'other_value2'}, + } + + result = _get_pipeline_hyperparameter( + hyperparameters, self.dataset_name, self.pipeline_name + ) + + expected = {'param1': 'dataset_value1', 'param2': 'dataset_value2'} + self.assertEqual(result, expected) + + result = _get_pipeline_hyperparameter( + hyperparameters, 'nonexistent_dataset', self.pipeline_name + ) + + self.assertEqual(result, hyperparameters) + + @patch('os.path.exists') + @patch('json.load') + def test__get_pipeline_hyperparameter_from_json_file(self, mock_json_load, mock_exists): + """Test loading hyperparameters from a JSON file""" + mock_exists.return_value = True + mock_json_data = {'param1': 'file_value1', 'param2': 'file_value2'} + mock_json_load.return_value = mock_json_data + + file_path = os.path.join(self.temp_dir, 'test_pipeline_test_dataset.json') + + with patch('builtins.open', mock_open(read_data=json.dumps(mock_json_data))): + result = _get_pipeline_hyperparameter(file_path, self.dataset_name, self.pipeline_name) + + self.assertEqual(result, mock_json_data) + + @patch('sigllm.benchmark._get_pipeline_directory') + @patch('os.path.exists') + @patch('json.load') + def test__get_pipeline_hyperparameter_none_with_pipeline_path( + self, mock_json_load, mock_exists, mock_get_pipeline_dir + ): + """Test when hyperparameters is None but dataset and pipeline are specified. + Should look for JSON file in pipeline directory.""" + pipeline_dir = '/fake/path/pipeline_dir' + mock_get_pipeline_dir.return_value = pipeline_dir + mock_exists.return_value = True + + expected_hyperparams = {'param1': 'auto_value1', 'param2': 'auto_value2'} + mock_json_load.return_value = expected_hyperparams + + expected_file_path = os.path.join( + pipeline_dir, f'{os.path.basename(pipeline_dir)}_{self.dataset_name.lower()}.json' + ) + + with patch('builtins.open', mock_open(read_data=json.dumps(expected_hyperparams))): + result = _get_pipeline_hyperparameter(None, self.dataset_name, self.pipeline_name) + + mock_get_pipeline_dir.assert_called_once_with(self.pipeline_name) + mock_exists.assert_called_once_with(expected_file_path) + + self.assertEqual(result, expected_hyperparams) + + mock_exists.return_value = False + result = _get_pipeline_hyperparameter(None, self.dataset_name, self.pipeline_name) + self.assertIsNone(result) + + def test__get_pipeline_hyperparameter_fallback_behavior(self): + """Test the fallback behavior when dataset/pipeline specific params don't exist""" + hyperparameters = { + 'default': {'param': 'default_value'}, + 'test_dataset': {'param': 'dataset_value'}, + 'test_pipeline': {'param': 'pipeline_value'}, + } + + # fallback to dataset level + result = _get_pipeline_hyperparameter( + hyperparameters, self.dataset_name, 'nonexistent_pipeline' + ) + + self.assertEqual(result['param'], 'dataset_value') + + # fallback to pipeline level + result = _get_pipeline_hyperparameter( + hyperparameters, + 'nonexistent_dataset', + self.pipeline_name, + ) + + self.assertEqual(result['param'], 'pipeline_value') + + # fallback to default when dataset doesn't exist + result = _get_pipeline_hyperparameter( + hyperparameters, 'nonexistent_dataset', 'nonexistent_pipeline' + ) + + self.assertEqual(result, hyperparameters) + + +class TestEvaluateSignal(TestCase): + @classmethod + def setup_class(cls): + cls.pipeline_name = 'test_pipeline' + cls.signal_name = 'test_signal' + cls.hyperparameters = {'param': 'value'} + cls.metrics = {'f1': METRICS['f1']} + cls.test_split = False + cls.few_shot = False + cls.anomaly_path = None + + cls.test_data = pd.DataFrame({'timestamp': [1, 2, 3], 'value': [1, 2, 3]}) + cls.truth_data = pd.DataFrame({'start': [1], 'end': [2]}) + cls.detected_anomalies = pd.DataFrame({'start': [1], 'end': [2], 'score': [0.9]}) + + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test__evaluate_signal_success(self, mock_sigllm, mock_load_signal, mock_load_anomalies): + mock_load_signal.return_value = (None, self.test_data) + mock_load_anomalies.return_value = self.truth_data + + mock_pipeline = Mock() + mock_pipeline.detect.return_value = self.detected_anomalies + mock_sigllm.return_value = mock_pipeline + + result = _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.anomaly_path, + ) + + assert isinstance(result, dict) + self.assertEqual(result['status'], 'OK') + self.assertIn('elapsed', result) + self.assertIn('split', result) + self.assertIn('f1', result) + + mock_load_signal.assert_called_once_with(self.signal_name, self.test_split) + mock_load_anomalies.assert_called_once_with(self.signal_name) + mock_sigllm.assert_called_once_with( + self.pipeline_name, hyperparameters=self.hyperparameters + ) + mock_pipeline.detect.assert_called_once_with(self.test_data, normal=None) + + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test__evaluate_signal_fail(self, mock_sigllm, mock_load_signal, mock_load_anomalies): + mock_load_signal.return_value = (None, self.test_data) + mock_load_anomalies.return_value = self.truth_data + mock_sigllm.side_effect = Exception('Test error') + + result = _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.anomaly_path, + ) + + assert isinstance(result, dict) + self.assertEqual(result['status'], 'ERROR') + self.assertIn('elapsed', result) + self.assertIn('split', result) + self.assertIn('f1', result) + + mock_load_signal.assert_called_once_with(self.signal_name, self.test_split) + mock_load_anomalies.assert_called_once_with(self.signal_name) + mock_sigllm.assert_called_once_with( + self.pipeline_name, hyperparameters=self.hyperparameters + ) + + @patch('sigllm.benchmark.load_normal') + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test__evaluate_signal_with_few_shot( + self, + mock_sigllm, + mock_load_signal, + mock_load_anomalies, + mock_load_normal, + ): + mock_load_normal.return_value = self.test_data + mock_load_signal.return_value = (None, self.test_data) + mock_load_anomalies.return_value = self.truth_data + + mock_pipeline = Mock() + mock_pipeline.detect.return_value = self.detected_anomalies + mock_sigllm.return_value = mock_pipeline + + result = _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + self.metrics, + self.test_split, + few_shot=True, + anomaly_path=self.anomaly_path, + ) + + assert isinstance(result, dict) + self.assertEqual(result['status'], 'OK') + + mock_pipeline.detect.assert_called_once_with(self.test_data, normal=self.test_data) + mock_load_normal.assert_called_once_with(self.signal_name) + mock_load_signal.assert_called_once_with(self.signal_name, self.test_split) + mock_load_anomalies.assert_called_once_with(self.signal_name) + mock_sigllm.assert_called_once_with( + self.pipeline_name, hyperparameters=self.hyperparameters + ) + + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test__evaluate_signal_with_anomaly_path( + self, mock_sigllm, mock_load_signal, mock_load_anomalies + ): + anomaly_path = 'test_anomalies.csv' + mock_load_signal.return_value = (None, self.test_data) + mock_load_anomalies.return_value = self.truth_data + + mock_pipeline = Mock() + mock_pipeline.detect.return_value = self.detected_anomalies + mock_sigllm.return_value = mock_pipeline + + with patch.object(pd.DataFrame, 'to_csv') as mock_to_csv: + _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + anomaly_path, + ) + + mock_to_csv.assert_called_once_with(anomaly_path, index=False) + + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test_evaluate_signal_error(self, mock_sigllm, mock_load_signal, mock_load_anomalies): + mock_load_signal.side_effect = Exception('Test error') + + with self.assertRaises(Exception): + result = _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.anomaly_path, + ) + + assert isinstance(result, dict) + self.assertEqual(result['status'], 'ERROR') + self.assertIn('elapsed', result) + self.assertEqual(result['f1'], 0) + + mock_load_signal.assert_called_once_with(self.signal_name, self.test_split) + mock_load_anomalies.assert_not_called() + mock_sigllm.assert_not_called() + + @patch('sigllm.benchmark.load_anomalies') + @patch('sigllm.benchmark._load_signal') + @patch('sigllm.benchmark.SigLLM') + def test__evaluate_signal_confusion_matrix( + self, mock_sigllm, mock_load_signal, mock_load_anomalies + ): + mock_load_signal.return_value = (None, self.test_data) + mock_load_anomalies.return_value = self.truth_data + + mock_pipeline = Mock() + mock_pipeline.detect.return_value = self.detected_anomalies + mock_sigllm.return_value = mock_pipeline + + scores = (0, 0, 0, 1) + metrics_with_cm = { + 'f1': METRICS['f1'], + 'confusion_matrix': Mock(autospec=contextual_confusion_matrix, return_value=scores), + } + + result = _evaluate_signal( + self.pipeline_name, + self.signal_name, + self.hyperparameters, + metrics_with_cm, + self.test_split, + self.few_shot, + self.anomaly_path, + ) + + assert 'tp' in result + assert 'fp' in result + assert 'fn' in result + assert 'tn' in result + + +class TestRunJob(TestCase): + @classmethod + def setup_class(cls): + cls.pipeline = 'test_pipeline' + cls.pipeline_name = 'test_pipeline' + cls.dataset = 'test_dataset' + cls.signal = 'test_signal' + cls.hyperparameters = {'param': 'value'} + cls.metrics = {'f1': METRICS['f1']} + cls.test_split = False + cls.few_shot = False + cls.iteration = 0 + cls.run_id = 'test_run' + cls.anomaly_path = None + + cls.temp_dir = tempfile.mkdtemp() + cls.cache_dir = Path(cls.temp_dir) / 'cache' + cls.anomaly_dir = Path(cls.temp_dir) / 'anomalies' + + os.makedirs(cls.cache_dir, exist_ok=True) + + @classmethod + def teardown_class(cls): + if os.path.exists(cls.temp_dir): + shutil.rmtree(cls.temp_dir) + + @patch('sigllm.benchmark._evaluate_signal') + def test_run_job_basic(self, mock_evaluate): + mock_output = {'f1': 0.8, 'status': 'OK', 'elapsed': 1.0, 'split': False} + mock_evaluate.return_value = mock_output + + args = ( + self.pipeline, + self.pipeline_name, + self.dataset, + self.signal, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.iteration, + None, # cache_dir + None, # anomaly_dir + self.run_id, + ) + + result = _run_job(args) + + self.assertIsInstance(result, pd.DataFrame) + + row = result.iloc[0] + self.assertEqual(row['pipeline'], self.pipeline_name) + self.assertEqual(row['dataset'], self.dataset) + self.assertEqual(row['signal'], self.signal) + self.assertEqual(row['iteration'], self.iteration) + self.assertEqual(row['run_id'], self.run_id) + + mock_evaluate.assert_called_once_with( + self.pipeline, + self.signal, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + None, # anomaly_path + ) + + @patch('sigllm.benchmark._evaluate_signal') + def test_run_job_with_cache(self, mock_evaluate): + mock_output = {'f1': 0.8, 'status': 'OK', 'elapsed': 1.0, 'split': False} + mock_evaluate.return_value = mock_output + + args = ( + self.pipeline, + self.pipeline_name, + self.dataset, + self.signal, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.iteration, + self.cache_dir, + None, # anomaly_dir + self.run_id, + ) + + _run_job(args) + + expected_cache_file = ( + self.cache_dir / f'{self.pipeline_name}' + f'_{self.signal}_{self.dataset}_{self.iteration}_{self.run_id}_scores.csv' + ) + + self.assertTrue(expected_cache_file.exists()) + + @patch('sigllm.benchmark._evaluate_signal') + def test_run_job_with_anomaly_dir(self, mock_evaluate): + mock_output = {'f1': 0.8, 'status': 'OK', 'elapsed': 1.0, 'split': False} + mock_evaluate.return_value = mock_output + + args = ( + self.pipeline, + self.pipeline_name, + self.dataset, + self.signal, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + self.iteration, + None, # cache_dir + self.anomaly_dir, + self.run_id, + ) + + _run_job(args) + + expected_anomaly_path = str( + self.anomaly_dir / f'{self.pipeline_name}' + f'_{self.signal}_{self.dataset}_{self.iteration}_anomalies.csv' + ) + + mock_evaluate.assert_called_once_with( + self.pipeline, + self.signal, + self.hyperparameters, + self.metrics, + self.test_split, + self.few_shot, + expected_anomaly_path, + ) + + +class TestBenchmark(TestCase): + @classmethod + def setup_class(cls): + cls.pipeline_name = 'test_pipeline' + cls.dataset_name = 'test_dataset' + cls.signal_name = 'test_signal' + cls.run_id = 'test_run' + + cls.pipelines = {cls.pipeline_name: 'pipeline_path'} + cls.datasets = {cls.dataset_name: [cls.signal_name]} + cls.hyperparameters = {'param': 'value'} + cls.metrics = {'f1': METRICS['f1']} + + cls.temp_dir = tempfile.mkdtemp() + cls.cache_dir = Path(cls.temp_dir) / 'cache' + cls.anomaly_dir = Path(cls.temp_dir) / 'anomalies' + + cls.expected_columns = [ + 'pipeline', + 'rank', + 'dataset', + 'signal', + 'iteration', + 'f1', + 'status', + 'elapsed', + 'split', + 'run_id', + ] + + @classmethod + def teardown_class(cls): + if os.path.exists(cls.temp_dir): + shutil.rmtree(cls.temp_dir) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_basic(self, mock_run_job): + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark( + pipelines=self.pipelines, + datasets=self.datasets, + hyperparameters=self.hyperparameters, + metrics=self.metrics, + ) + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue(all(col in result.columns for col in self.expected_columns)) + mock_run_job.assert_called_once() + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_list_inputs(self, mock_run_job): + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': ['dataset'], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark( + pipelines=[self.pipeline_name], + datasets=[self.signal_name], + hyperparameters=[self.hyperparameters], + metrics=list(self.metrics.values()), + ) + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue(all(col in result.columns for col in self.expected_columns)) + + def test_benchmark_with_metric_error(self): + with self.assertRaises(ValueError): + benchmark( + pipelines=self.pipelines, + datasets=self.datasets, + hyperparameters=self.hyperparameters, + metrics=['f1', 'nonexistent_metric'], + ) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_cache(self, mock_run_job): + os.makedirs(self.cache_dir, exist_ok=True) + + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark( + pipelines=self.pipelines, datasets=self.datasets, cache_dir=self.cache_dir + ) + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue((self.cache_dir).exists()) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_anomaly_dir(self, mock_run_job): + os.makedirs(self.anomaly_dir, exist_ok=True) + + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark( + pipelines=self.pipelines, datasets=self.datasets, anomaly_dir=self.anomaly_dir + ) + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue((self.anomaly_dir).exists()) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_output_path(self, mock_run_job): + output_path = os.path.join(self.temp_dir, 'output.csv') + + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark( + pipelines=self.pipelines, + datasets=self.datasets, + anomaly_dir=self.anomaly_dir, + output_path=output_path, + ) + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue(os.path.exists(output_path)) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_iterations(self, mock_run_job): + mock_output = pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }) + mock_run_job.return_value = mock_output + + result = benchmark(pipelines=self.pipelines, datasets=self.datasets, iterations=3) + + self.assertIsInstance(result, pd.DataFrame) + self.assertEqual(mock_run_job.call_count, 3) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_empty_results(self, mock_run_job): + empty_pipelines = ['nonexistent_pipeline'] + empty_datasets = {'nonexistent_dataset': []} + + # Run benchmark with minimal configuration + result = benchmark(pipelines=empty_pipelines, datasets=empty_datasets) + + mock_run_job.assert_not_called() + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue(result.empty) + + @patch('sigllm.benchmark._run_job') + def test_benchmark_with_resume(self, mock_run_job): + os.makedirs(self.cache_dir, exist_ok=True) + + cached_file = ( + self.cache_dir / f'{self.pipeline_name}' + f'_{self.signal_name}_{self.dataset_name}_0_test_scores.csv' + ) + + pd.DataFrame({ + 'pipeline': [self.pipeline_name], + 'dataset': [self.dataset_name], + 'signal': [self.signal_name], + 'iteration': [0], + 'f1': [0.8], + 'status': ['OK'], + 'elapsed': [1.0], + 'split': [False], + 'run_id': [self.run_id], + }).to_csv(cached_file, index=False) + + result = benchmark( + pipelines=self.pipelines, datasets=self.datasets, cache_dir=self.cache_dir, resume=True + ) + + mock_run_job.assert_not_called() + + self.assertIsInstance(result, pd.DataFrame) + self.assertTrue(result.empty)