diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py index 76831f6aae8..93a78e2cc21 100644 --- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py @@ -57,10 +57,7 @@ def _evaluate_dag_worker(dag_pickle, task_pickle, modalities_pickle, debug=False f"[DEBUG][worker] pid={os.getpid()} evaluating dag_root={getattr(dag, 'root_node_id', None)} task={getattr(task.model, 'name', None)}" ) - dag_copy = copy.deepcopy(dag) - task_copy = copy.deepcopy(task) - - fused_representation = dag_copy.execute(modalities_for_dag, task_copy) + fused_representation = dag.execute(modalities_for_dag, task) if fused_representation is None: return None @@ -73,22 +70,22 @@ def _evaluate_dag_worker(dag_pickle, task_pickle, modalities_pickle, debug=False ) from systemds.scuro.representations.aggregate import Aggregation - if task_copy.expected_dim == 1 and get_shape(final_representation.metadata) > 1: + if task.expected_dim == 1 and get_shape(final_representation.metadata) > 1: agg_operator = AggregatedRepresentation(Aggregation()) final_representation = agg_operator.transform(final_representation) eval_start = time.time() - scores = task_copy.run(final_representation.data) + scores = task.run(final_representation.data) eval_time = time.time() - eval_start total_time = time.time() - start_time return OptimizationResult( - dag=dag_copy, + dag=dag, train_score=scores[0].average_scores, val_score=scores[1].average_scores, test_score=scores[2].average_scores, runtime=total_time, - task_name=task_copy.model.name, + task_name=task.model.name, task_time=eval_time, representation_time=total_time - eval_time, ) @@ -354,21 +351,14 @@ def build_variants( def _evaluate_dag(self, dag: RepresentationDag, task: Task) -> "OptimizationResult": start_time = time.time() try: - tid = threading.get_ident() - tname = threading.current_thread().name - dag_copy = copy.deepcopy(dag) - modalities_for_dag = copy.deepcopy( + fused_representation = dag.execute( list( chain.from_iterable( self.k_best_representations[task.model.name].values() ) - ) - ) - task_copy = copy.deepcopy(task) - fused_representation = dag_copy.execute( - modalities_for_dag, - task_copy, + ), + task, ) torch.cuda.empty_cache() @@ -379,27 +369,24 @@ def _evaluate_dag(self, dag: RepresentationDag, task: Task) -> "OptimizationResu final_representation = fused_representation[ list(fused_representation.keys())[-1] ] - if ( - task_copy.expected_dim == 1 - and get_shape(final_representation.metadata) > 1 - ): + if task.expected_dim == 1 and get_shape(final_representation.metadata) > 1: agg_operator = AggregatedRepresentation(Aggregation()) final_representation = agg_operator.transform(final_representation) eval_start = time.time() - scores = task_copy.run(final_representation.data) + scores = task.run(final_representation.data) eval_time = time.time() - eval_start total_time = time.time() - start_time return OptimizationResult( - dag=dag_copy, + dag=dag, train_score=scores[0].average_scores, val_score=scores[1].average_scores, test_score=scores[2].average_scores, runtime=total_time, representation_time=total_time - eval_time, - task_name=task_copy.model.name, + task_name=task.model.name, task_time=eval_time, ) diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py b/src/main/python/systemds/scuro/drsearch/representation_dag.py index 5543da32dd1..ff46d1db95f 100644 --- a/src/main/python/systemds/scuro/drsearch/representation_dag.py +++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py @@ -32,6 +32,32 @@ from systemds.scuro.representations.context import Context from systemds.scuro.utils.identifier import get_op_id, get_node_id +from collections import OrderedDict +from typing import Any, Hashable, Optional + + +class LRUCache: + def __init__(self, max_size: int = 256): + self.max_size = max_size + self._cache: "OrderedDict[Hashable, Any]" = OrderedDict() + + def get(self, key: Hashable) -> Optional[Any]: + if key not in self._cache: + return None + value = self._cache.pop(key) + self._cache[key] = value + return value + + def put(self, key: Hashable, value: Any) -> None: + if key in self._cache: + self._cache.pop(key) + elif len(self._cache) >= self.max_size: + self._cache.popitem(last=False) + self._cache[key] = value + + def __len__(self) -> int: + return len(self._cache) + @dataclass class RepresentationNode: @@ -119,10 +145,22 @@ def has_cycle(node_id: str, path: set) -> bool: return not has_cycle(self.root_node_id, set()) + def _compute_leaf_signature(self, node) -> Hashable: + return ("leaf", node.modality_id, node.representation_index) + + def _compute_node_signature(self, node, input_sig_tuple) -> Hashable: + op_cls = node.operation + params_items = tuple(sorted((node.parameters or {}).items())) + return ("op", op_cls, params_items, input_sig_tuple) + def execute( - self, modalities: List[Modality], task=None + self, + modalities: List[Modality], + task=None, + external_cache: Optional[LRUCache] = None, ) -> Dict[str, TransformedModality]: - cache = {} + cache: Dict[str, TransformedModality] = {} + node_signatures: Dict[str, Hashable] = {} def execute_node(node_id: str, task) -> TransformedModality: if node_id in cache: @@ -135,38 +173,58 @@ def execute_node(node_id: str, task) -> TransformedModality: modalities, node.modality_id, node.representation_index ) cache[node_id] = modality + node_signatures[node_id] = self._compute_leaf_signature(node) return modality input_mods = [execute_node(input_id, task) for input_id in node.inputs] + input_signatures = tuple( + node_signatures[input_id] for input_id in node.inputs + ) + node_signature = self._compute_node_signature(node, input_signatures) + is_unimodal = len(input_mods) == 1 + + cached_result = None + if external_cache and is_unimodal: + cached_result = external_cache.get(node_signature) + if cached_result is not None: + result = cached_result - node_operation = copy.deepcopy(node.operation()) - if len(input_mods) == 1: - # It's a unimodal operation - if isinstance(node_operation, Context): - result = input_mods[0].context(node_operation) - elif isinstance(node_operation, AggregatedRepresentation): - result = node_operation.transform(input_mods[0]) - elif isinstance(node_operation, UnimodalRepresentation): + else: + node_operation = copy.deepcopy(node.operation()) + if len(input_mods) == 1: + # It's a unimodal operation + if isinstance(node_operation, Context): + result = input_mods[0].context(node_operation) + elif isinstance(node_operation, AggregatedRepresentation): + result = node_operation.transform(input_mods[0]) + elif isinstance(node_operation, UnimodalRepresentation): + if ( + isinstance(input_mods[0], TransformedModality) + and input_mods[0].transformation[0].__class__ + == node.operation + ): + # Avoid duplicate transformations + result = input_mods[0] + else: + # Compute the representation + result = input_mods[0].apply_representation(node_operation) + else: + # It's a fusion operation + fusion_op = node_operation if ( - isinstance(input_mods[0], TransformedModality) - and input_mods[0].transformation[0].__class__ == node.operation + hasattr(fusion_op, "needs_training") + and fusion_op.needs_training ): - # Avoid duplicate transformations - result = input_mods[0] + result = input_mods[0].combine_with_training( + input_mods[1:], fusion_op, task + ) else: - # Compute the representation - result = input_mods[0].apply_representation(node_operation) - else: - # It's a fusion operation - fusion_op = node_operation - if hasattr(fusion_op, "needs_training") and fusion_op.needs_training: - result = input_mods[0].combine_with_training( - input_mods[1:], fusion_op, task - ) - else: - result = input_mods[0].combine(input_mods[1:], fusion_op) + result = input_mods[0].combine(input_mods[1:], fusion_op) + if external_cache and is_unimodal: + external_cache.put(node_signature, result) cache[node_id] = result + node_signatures[node_id] = node_signature return result execute_node(self.root_node_id, task) @@ -230,3 +288,9 @@ def build(self, root_node_id: str) -> RepresentationDag: if not dag.validate(): raise ValueError("Invalid DAG construction") return dag + + def get_node(self, node_id: str) -> Optional[RepresentationNode]: + for node in self.nodes: + if node.node_id == node_id: + return node + return None diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index 4cde294b17c..e9029d63ee1 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -25,7 +25,7 @@ import multiprocessing as mp from typing import List, Any from functools import lru_cache - +from systemds.scuro.drsearch.task import Task from systemds.scuro import ModalityType from systemds.scuro.drsearch.ranking import rank_by_tradeoff from systemds.scuro.drsearch.task import PerformanceMeasure @@ -46,6 +46,7 @@ RepresentationDAGBuilder, ) from systemds.scuro.drsearch.representation_dag_visualizer import visualize_dag +from systemds.scuro.drsearch.representation_dag import LRUCache class UnimodalOptimizer: @@ -54,6 +55,7 @@ def __init__( ): self.modalities = modalities self.tasks = tasks + self.modality_ids = [modality.modality_id for modality in modalities] self.save_all_results = save_all_results self.result_path = result_path @@ -177,24 +179,27 @@ def _process_modality(self, modality, parallel): modality_specific_operators = self._get_modality_operators( modality.modality_type ) - + dags = [] for operator in modality_specific_operators: - dags = self._build_modality_dag(modality, operator()) - - for dag in dags: - representations = dag.execute([modality]) - node_id = list(representations.keys())[-1] - node = dag.get_node_by_id(node_id) - if node.operation is None: - continue - - reps = self._get_representation_chain(node, dag) - combination = next((op for op in reps if isinstance(op, Fusion)), None) - self._evaluate_local( - representations[node_id], local_results, dag, combination - ) - if self.debug: - visualize_dag(dag) + dags.extend(self._build_modality_dag(modality, operator())) + + external_cache = LRUCache(max_size=32) + for dag in dags: + representations = dag.execute( + [modality], task=self.tasks[0], external_cache=external_cache + ) # TODO: dynamic task selection + node_id = list(representations.keys())[-1] + node = dag.get_node_by_id(node_id) + if node.operation is None: + continue + + reps = self._get_representation_chain(node, dag) + combination = next((op for op in reps if isinstance(op, Fusion)), None) + self._evaluate_local( + representations[node_id], local_results, dag, combination + ) + if self.debug: + visualize_dag(dag) if self.save_all_results: timestr = time.strftime("%Y%m%d-%H%M%S") @@ -242,15 +247,21 @@ def _evaluate_local(self, modality, local_results, dag, combination=None): agg_operator.get_current_parameters(), ) dag = builder.build(rep_node_id) - representations = dag.execute([modality]) - node_id = list(representations.keys())[-1] + + aggregated_modality = agg_operator.transform(modality) + for task in self.tasks: start = time.perf_counter() - scores = task.run(representations[node_id].data) + scores = task.run(aggregated_modality.data) end = time.perf_counter() local_results.add_result( - scores, modality, task.model.name, end - start, combination, dag + scores, + aggregated_modality, + task.model.name, + end - start, + combination, + dag, ) else: modality.pad() @@ -272,7 +283,10 @@ def _evaluate_local(self, modality, local_results, dag, combination=None): agg_operator.get_current_parameters(), ) dag = builder.build(rep_node_id) + start_rep = time.perf_counter() representations = dag.execute([modality]) + end_rep = time.perf_counter() + modality.transform_time += end_rep - start_rep node_id = list(representations.keys())[-1] start = time.perf_counter() @@ -458,7 +472,9 @@ def print_results(self): for entry in self.results[modality][task_name]: print(f"{modality}_{task_name}: {entry}") - def get_k_best_results(self, modality, k, task, performance_metric_name): + def get_k_best_results( + self, modality, k, task, performance_metric_name, prune_cache=False + ): """ Get the k best results for the given modality :param modality: modality to get the best results for @@ -488,6 +504,21 @@ def get_k_best_results(self, modality, k, task, performance_metric_name): cache_items = list(task_cache.items()) if task_cache else [] cache = [cache_items[i][1] for i in sorted_indices if i < len(cache_items)] + if prune_cache: + # Note: in case the unimodal results are loaded from a file, we need to initialize the cache for the modality and task + if modality.modality_id not in self.operator_performance.cache: + self.operator_performance.cache[modality.modality_id] = {} + if ( + task.model.name + not in self.operator_performance.cache[modality.modality_id] + ): + self.operator_performance.cache[modality.modality_id][ + task.model.name + ] = {} + self.operator_performance.cache[modality.modality_id][ + task.model.name + ] = cache + return results, cache diff --git a/src/main/python/systemds/scuro/modality/modality.py b/src/main/python/systemds/scuro/modality/modality.py index f6e0320469f..d0cb148b20f 100644 --- a/src/main/python/systemds/scuro/modality/modality.py +++ b/src/main/python/systemds/scuro/modality/modality.py @@ -18,11 +18,9 @@ # under the License. # # ------------------------------------------------------------- -from copy import deepcopy from typing import List import numpy as np -from numpy.f2py.auxfuncs import throw_error from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations import utils @@ -31,7 +29,12 @@ class Modality: def __init__( - self, modalityType: ModalityType, modality_id=-1, metadata={}, data_type=None + self, + modalityType: ModalityType, + modality_id=-1, + metadata={}, + data_type=None, + transform_time=0, ): """ Parent class of the different Modalities (unimodal & multimodal) @@ -45,7 +48,7 @@ def __init__( self.cost = None self.shape = None self.modality_id = modality_id - self.transform_time = None + self.transform_time = transform_time if transform_time else 0 @property def data(self): diff --git a/src/main/python/systemds/scuro/modality/transformed.py b/src/main/python/systemds/scuro/modality/transformed.py index 3b014653028..c19c90adaac 100644 --- a/src/main/python/systemds/scuro/modality/transformed.py +++ b/src/main/python/systemds/scuro/modality/transformed.py @@ -43,7 +43,11 @@ def __init__( metadata = modality.metadata.copy() if modality.metadata is not None else None super().__init__( - new_modality_type, modality.modality_id, metadata, modality.data_type + new_modality_type, + modality.modality_id, + metadata, + modality.data_type, + modality.transform_time, ) self.transformation = None self.self_contained = ( @@ -106,7 +110,7 @@ def window_aggregation(self, window_size, aggregation): ) start = time.time() transformed_modality.data = w.execute(self) - transformed_modality.transform_time = time.time() - start + transformed_modality.transform_time += time.time() - start return transformed_modality def context(self, context_operator): @@ -115,14 +119,14 @@ def context(self, context_operator): ) start = time.time() transformed_modality.data = context_operator.execute(self) - transformed_modality.transform_time = time.time() - start + transformed_modality.transform_time += time.time() - start return transformed_modality def apply_representation(self, representation): start = time.time() new_modality = representation.transform(self) new_modality.update_metadata() - new_modality.transform_time = time.time() - start + new_modality.transform_time += time.time() - start new_modality.self_contained = representation.self_contained return new_modality diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py b/src/main/python/systemds/scuro/modality/unimodal_modality.py index e4ed85cce38..22a40db16cf 100644 --- a/src/main/python/systemds/scuro/modality/unimodal_modality.py +++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py @@ -95,7 +95,7 @@ def context(self, context_operator): transformed_modality = TransformedModality(self, context_operator) transformed_modality.data = context_operator.execute(self) - transformed_modality.transform_time = time.time() - start + transformed_modality.transform_time += time.time() - start return transformed_modality def aggregate(self, aggregation_function): @@ -191,6 +191,6 @@ def apply_representation(self, representation): ) new_modality.data = padded_embeddings new_modality.update_metadata() - new_modality.transform_time = time.time() - start + new_modality.transform_time += time.time() - start new_modality.self_contained = representation.self_contained return new_modality diff --git a/src/main/python/systemds/scuro/representations/aggregated_representation.py b/src/main/python/systemds/scuro/representations/aggregated_representation.py index 1e98d2f92ae..bcc36f46210 100644 --- a/src/main/python/systemds/scuro/representations/aggregated_representation.py +++ b/src/main/python/systemds/scuro/representations/aggregated_representation.py @@ -21,6 +21,7 @@ from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.representations.representation import Representation from systemds.scuro.representations.aggregate import Aggregation +import time class AggregatedRepresentation(Representation): @@ -33,8 +34,11 @@ def __init__(self, aggregation="mean"): self.self_contained = True def transform(self, modality): + start = time.perf_counter() aggregated_modality = TransformedModality( modality, self, self_contained=modality.self_contained ) + end = time.perf_counter() + aggregated_modality.transform_time += end - start aggregated_modality.data = self.aggregation.execute(modality) return aggregated_modality diff --git a/src/main/python/systemds/scuro/representations/fusion.py b/src/main/python/systemds/scuro/representations/fusion.py index 7ac0200819c..3f4257e64a9 100644 --- a/src/main/python/systemds/scuro/representations/fusion.py +++ b/src/main/python/systemds/scuro/representations/fusion.py @@ -22,6 +22,8 @@ from typing import List import numpy as np + +from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations.aggregated_representation import ( AggregatedRepresentation, ) @@ -44,6 +46,7 @@ def __init__(self, name, parameters=None): self.needs_alignment = False self.needs_training = False self.needs_instance_alignment = False + self.output_modality_type = ModalityType.EMBEDDING def transform(self, modalities: List[Modality]): """ diff --git a/src/main/python/tests/scuro/data_generator.py b/src/main/python/tests/scuro/data_generator.py index 9da0aa82c0a..ae78c50b8aa 100644 --- a/src/main/python/tests/scuro/data_generator.py +++ b/src/main/python/tests/scuro/data_generator.py @@ -66,6 +66,7 @@ def __init__(self): self.modality_type = None self.metadata = {} self.data_type = np.float32 + self.transform_time = None def create1DModality( self,