diff --git a/src/main/python/systemds/scuro/__init__.py b/src/main/python/systemds/scuro/__init__.py index 1c3cfe92231..ae9aed44c0a 100644 --- a/src/main/python/systemds/scuro/__init__.py +++ b/src/main/python/systemds/scuro/__init__.py @@ -73,6 +73,8 @@ from systemds.scuro.drsearch.unimodal_representation_optimizer import ( UnimodalRepresentationOptimizer, ) +from systemds.scuro.drsearch.multimodal_optimizer import MultimodalOptimizer +from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer __all__ = [ @@ -127,4 +129,6 @@ "OptimizationData", "RepresentationCache", "UnimodalRepresentationOptimizer", + "UnimodalOptimizer", + "MultimodalOptimizer", ] diff --git a/src/main/python/systemds/scuro/dataloader/audio_loader.py b/src/main/python/systemds/scuro/dataloader/audio_loader.py index a1dad304e53..1197617673f 100644 --- a/src/main/python/systemds/scuro/dataloader/audio_loader.py +++ b/src/main/python/systemds/scuro/dataloader/audio_loader.py @@ -45,18 +45,18 @@ def __init__( def extract(self, file: str, index: Optional[Union[str, List[str]]] = None): self.file_sanity_check(file) - # if not self.load_data_from_file: - # import numpy as np - # - # self.metadata[file] = self.modality_type.create_audio_metadata( - # 1000, np.array([0]) - # ) - # else: - audio, sr = librosa.load(file, dtype=self._data_type) + if not self.load_data_from_file: + import numpy as np - if self.normalize: - audio = librosa.util.normalize(audio) + self.metadata[file] = self.modality_type.create_audio_metadata( + 1000, np.array([0]) + ) + else: + audio, sr = librosa.load(file, dtype=self._data_type) - self.metadata[file] = self.modality_type.create_audio_metadata(sr, audio) + if self.normalize: + audio = librosa.util.normalize(audio) - self.data.append(audio) + self.metadata[file] = self.modality_type.create_audio_metadata(sr, audio) + + self.data.append(audio) diff --git a/src/main/python/systemds/scuro/dataloader/video_loader.py b/src/main/python/systemds/scuro/dataloader/video_loader.py index 96ea5f11f69..0e77d5dc57b 100644 --- a/src/main/python/systemds/scuro/dataloader/video_loader.py +++ b/src/main/python/systemds/scuro/dataloader/video_loader.py @@ -35,11 +35,13 @@ def __init__( data_type: Union[np.dtype, str] = np.float16, chunk_size: Optional[int] = None, load=True, + fps=None, ): super().__init__( source_path, indices, data_type, chunk_size, ModalityType.VIDEO ) self.load_data_from_file = load + self.fps = fps def extract(self, file: str, index: Optional[Union[str, List[str]]] = None): self.file_sanity_check(file) @@ -53,25 +55,33 @@ def extract(self, file: str, index: Optional[Union[str, List[str]]] = None): if not cap.isOpened(): raise f"Could not read video at path: {file}" - fps = cap.get(cv2.CAP_PROP_FPS) + orig_fps = cap.get(cv2.CAP_PROP_FPS) + frame_interval = 1 + if self.fps is not None and self.fps < orig_fps: + frame_interval = int(round(orig_fps / self.fps)) + else: + self.fps = orig_fps + length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) num_channels = 3 self.metadata[file] = self.modality_type.create_video_metadata( - fps, length, width, height, num_channels + self.fps, length, width, height, num_channels ) frames = [] + idx = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - frame = frame.astype(self._data_type) / 255.0 - - frames.append(frame) + if idx % frame_interval == 0: + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + frame = frame.astype(self._data_type) / 255.0 + frames.append(frame) + idx += 1 self.data.append(np.stack(frames)) diff --git a/src/main/python/systemds/scuro/drsearch/dr_search.py b/src/main/python/systemds/scuro/drsearch/dr_search.py index 2000608a1df..601001c7428 100644 --- a/src/main/python/systemds/scuro/drsearch/dr_search.py +++ b/src/main/python/systemds/scuro/drsearch/dr_search.py @@ -76,7 +76,7 @@ def set_best_params( """ # check if modality name is already in dictionary - if "_".join(modality_names) not in self.scores.keys(): + if "_".join(modality_names) not in list(self.scores.keys()): # if not add it to dictionary self.scores["_".join(modality_names)] = {} diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py new file mode 100644 index 00000000000..2da8e7ae195 --- /dev/null +++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py @@ -0,0 +1,398 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- +import itertools + +from systemds.scuro.representations.aggregated_representation import ( + AggregatedRepresentation, +) + +from systemds.scuro.representations.aggregate import Aggregation + +from systemds.scuro.drsearch.operator_registry import Registry + +from systemds.scuro.utils.schema_helpers import get_shape +import dataclasses + + +class MultimodalOptimizer: + def __init__( + self, modalities, unimodal_optimization_results, tasks, k=2, debug=True + ): + self.k_best_cache = None + self.k_best_modalities = None + self.modalities = modalities + self.unimodal_optimization_results = unimodal_optimization_results + self.tasks = tasks + self.k = k + self.extract_k_best_modalities_per_task() + self.debug = debug + + self.operator_registry = Registry() + self.optimization_results = MultimodalResults( + modalities, tasks, debug, self.k_best_modalities + ) + self.cache = {} + + def optimize(self): + for task in self.tasks: + self.optimize_intermodal_representations(task) + + def optimize_intramodal_representations(self, task): + for modality in self.modalities: + representations = self.k_best_modalities[task.model.name][ + modality.modality_id + ] + applied_representations = self.extract_representations( + representations, modality, task.model.name + ) + + for i in range(1, len(applied_representations)): + for fusion_method in self.operator_registry.get_fusion_operators(): + if fusion_method().needs_alignment and not applied_representations[ + i - 1 + ].is_aligned(applied_representations[i]): + continue + combined = applied_representations[i - 1].combine( + applied_representations[i], fusion_method() + ) + self.evaluate( + task, + combined, + [i - 1, i], + fusion_method, + [ + applied_representations[i - 1].modality_id, + applied_representations[i].modality_id, + ], + ) + if not fusion_method().commutative: + combined_comm = applied_representations[i].combine( + applied_representations[i - 1], fusion_method() + ) + self.evaluate( + task, + combined_comm, + [i, i - 1], + fusion_method, + [ + applied_representations[i - 1].modality_id, + applied_representations[i].modality_id, + ], + ) + + def optimize_intermodal_representations(self, task): + modality_combos = [] + n = len(self.k_best_cache[task.model.name]) + reuse_cache = {} + + def generate_extensions(current_combo, remaining_indices): + # Add current combination if it has at least 2 elements + if len(current_combo) >= 2: + combo_tuple = tuple(i for i in current_combo) + modality_combos.append(combo_tuple) + + for i in remaining_indices: + new_combo = current_combo + [i] + new_remaining = [j for j in remaining_indices if j > i] + generate_extensions(new_combo, new_remaining) + + for start_idx in range(n): + remaining = list(range(start_idx + 1, n)) + generate_extensions([start_idx], remaining) + fusion_methods = self.operator_registry.get_fusion_operators() + fused_representations = [] + reuse_fused_representations = False + for i, modality_combo in enumerate(modality_combos): + # clear reuse cache + if i % 5 == 0: + reuse_cache = self.prune_cache(modality_combos[i:], reuse_cache) + + if i != 0: + reuse_fused_representations = self.is_prefix_match( + modality_combos[i - 1], modality_combo + ) + if reuse_fused_representations: + mods = [ + self.k_best_cache[task.model.name][mod_idx] + for mod_idx in modality_combo[len(modality_combos[i - 1]) :] + ] + fused_representations = reuse_cache[modality_combos[i - 1]] + else: + prefix_idx = self.compute_equal_prefix_index( + modality_combos[i - 1], modality_combo + ) + if prefix_idx > 1: + fused_representations = reuse_cache[ + modality_combos[i - 1][:prefix_idx] + ] + reuse_fused_representations = True + mods = [ + self.k_best_cache[task.model.name][mod_idx] + for mod_idx in modality_combo[prefix_idx:] + ] + if self.debug: + print( + f"New modality combo: {modality_combo} - Reuse: {reuse_fused_representations} - # fused reps: {len(fused_representations)}" + ) + + all_mods = [ + self.k_best_cache[task.model.name][mod_idx] + for mod_idx in modality_combo + ] + temp_fused_reps = [] + for j, fusion_method in enumerate(fusion_methods): + # Evaluate all mods + fused_rep = all_mods[0].combine(all_mods[1:], fusion_method()) + temp_fused_reps.append(fused_rep) + self.evaluate( + task, + fused_rep, + [ + self.k_best_modalities[task.model.name][k].representations + for k in modality_combo + ], + fusion_method, + modality_combo, + ) + if reuse_fused_representations: + for fused_representation in fused_representations: + fused_rep = fused_representation.combine(mods, fusion_method()) + temp_fused_reps.append(fused_rep) + self.evaluate( + task, + fused_rep, + [ + self.k_best_modalities[task.model.name][ + k + ].representations + for k in modality_combo + ], + fusion_method, + modality_combo, + ) + + if ( + len(modality_combo) < len(self.k_best_cache[task.model.name]) + and i + 1 < len(modality_combos) + and self.is_prefix_match(modality_combos[i], modality_combos[i + 1]) + ): + reuse_cache[modality_combo] = temp_fused_reps + reuse_fused_representations = False + + def prune_cache(self, sequences, cache): + seqs_as_tuples = [tuple(seq) for seq in sequences] + + def still_used(key): + return any(self.is_prefix_match(key, seq) for seq in seqs_as_tuples) + + cache = {key: value for key, value in cache.items() if still_used(key)} + return cache + + def is_prefix_match(self, seq1, seq2): + if len(seq1) > len(seq2): + return False + + # Check if seq1 matches the beginning of seq2 + return seq2[: len(seq1)] == seq1 + + def compute_equal_prefix_index(self, seq1, seq2): + max_len = min(len(seq1), len(seq2)) + i = 0 + while i < max_len and seq1[i] == seq2[i]: + i += 1 + + return i + + def extract_representations(self, representations, modality, task_name): + applied_representations = [] + for i in range(0, len(representations)): + cache_key = ( + tuple(representations[i].representations), + representations[i].task_time, + representations[i].representation_time, + ) + if ( + cache_key + in self.unimodal_optimization_results.cache[modality.modality_id][ + task_name + ] + ): + applied_representations.append( + self.unimodal_optimization_results.cache[modality.modality_id][ + task_name + ][cache_key] + ) + else: + applied_representation = modality + for j, rep in enumerate(representations[i].representations): + representation, is_context = ( + self.operator_registry.get_representation_by_name( + rep, modality.modality_type + ) + ) + if representation is None: + if rep == AggregatedRepresentation.__name__: + representation = AggregatedRepresentation(Aggregation()) + else: + representation = representation() + representation.set_parameters(representations[i].params[j]) + if is_context: + applied_representation = applied_representation.context( + representation + ) + else: + applied_representation = ( + applied_representation.apply_representation(representation) + ) + self.k_best_cache[task_name].append(applied_representation) + applied_representations.append(applied_representation) + return applied_representations + + def evaluate(self, task, modality, representations, fusion, modality_combo): + if task.expected_dim == 1 and get_shape(modality.metadata) > 1: + for aggregation in Aggregation().get_aggregation_functions(): + agg_operator = AggregatedRepresentation(Aggregation(aggregation, False)) + agg_modality = agg_operator.transform(modality) + + scores = task.run(agg_modality.data) + reps = representations.copy() + reps.append(agg_operator) + + self.optimization_results.add_result( + scores, + reps, + modality.transformation, + modality_combo, + task.model.name, + ) + else: + scores = task.run(modality.data) + self.optimization_results.add_result( + scores, + representations, + modality.transformation, + modality_combo, + task.model.name, + ) + + def add_to_cache(self, result_idx, combined_modality): + self.cache[result_idx] = combined_modality + + def extract_k_best_modalities_per_task(self): + self.k_best_modalities = {} + self.k_best_cache = {} + for task in self.tasks: + self.k_best_modalities[task.model.name] = [] + self.k_best_cache[task.model.name] = [] + for modality in self.modalities: + k_best_results, cached_data = ( + self.unimodal_optimization_results.get_k_best_results( + modality, self.k, task + ) + ) + + self.k_best_modalities[task.model.name].extend(k_best_results) + self.k_best_cache[task.model.name].extend(cached_data) + + +class MultimodalResults: + def __init__(self, modalities, tasks, debug, k_best_modalities): + self.modality_ids = [modality.modality_id for modality in modalities] + self.task_names = [task.model.name for task in tasks] + self.results = {} + self.debug = debug + self.k_best_modalities = k_best_modalities + + for task in tasks: + self.results[task.model.name] = {} + + def add_result( + self, scores, best_representation_idx, fusion_methods, modality_combo, task_name + ): + + entry = MultimodalResultEntry( + representations=best_representation_idx, + train_score=scores[0], + val_score=scores[1], + fusion_methods=[ + fusion_method.__class__.__name__ for fusion_method in fusion_methods + ], + modality_combo=modality_combo, + task=task_name, + ) + + modality_id_strings = "_".join(list(map(str, modality_combo))) + if not modality_id_strings in self.results[task_name]: + self.results[task_name][modality_id_strings] = [] + + self.results[task_name][modality_id_strings].append(entry) + + if self.debug: + print(f"{modality_id_strings}_{task_name}: {entry}") + + def print_results(self): + for task_name in self.task_names: + for modality in self.results[task_name].keys(): + for entry in self.results[task_name][modality]: + reps = [] + for i, mod_idx in enumerate(entry.modality_combo): + reps.append(self.k_best_modalities[task_name][mod_idx]) + + print( + f"{modality}_{task_name}: " + f"Validation score: {entry.val_score} - Training score: {entry.train_score}" + ) + for i, rep in enumerate(reps): + print( + f" Representation: {entry.modality_combo[i]} - {rep.representations}" + ) + + print(f" Fusion: {entry.fusion_methods[0]} ") + + def store_results(self, file_name=None): + for task_name in self.task_names: + for modality in self.results[task_name].keys(): + for entry in self.results[task_name][modality]: + reps = [] + for i, mod_idx in enumerate(entry.modality_combo): + reps.append(self.k_best_modalities[task_name][mod_idx]) + entry.representations = reps + + import pickle + + if file_name is None: + import time + + timestr = time.strftime("%Y%m%d-%H%M%S") + file_name = "multimodal_optimizer" + timestr + ".pkl" + + with open(file_name, "wb") as f: + pickle.dump(self.results, f) + + +@dataclasses.dataclass +class MultimodalResultEntry: + val_score: float + modality_combo: list + representations: list + fusion_methods: list + train_score: float + task: str diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py b/src/main/python/systemds/scuro/drsearch/operator_registry.py index cfd313eb563..3909b51ff98 100644 --- a/src/main/python/systemds/scuro/drsearch/operator_registry.py +++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py @@ -64,6 +64,18 @@ def get_context_operators(self): def get_fusion_operators(self): return self._fusion_operators + def get_representation_by_name(self, representation_name, modality_type): + for representation in self._context_operators: + if representation.__name__ == representation_name: + return representation, True + + if modality_type is not None: + for representation in self._representations[modality_type]: + if representation.__name__ == representation_name: + return representation, False + + return None, False + def register_representation(modalities: Union[ModalityType, List[ModalityType]]): """ diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py new file mode 100644 index 00000000000..030f04aa431 --- /dev/null +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -0,0 +1,304 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- +import pickle +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass, field, asdict + +import multiprocessing as mp +from typing import Union + +import numpy as np +from systemds.scuro.representations.window_aggregation import WindowAggregation + +from systemds.scuro.representations.aggregated_representation import ( + AggregatedRepresentation, +) +from systemds.scuro import ModalityType, Aggregation +from systemds.scuro.drsearch.operator_registry import Registry +from systemds.scuro.utils.schema_helpers import get_shape + + +class UnimodalOptimizer: + def __init__(self, modalities, tasks, debug=True): + self.modalities = modalities + self.tasks = tasks + + self.operator_registry = Registry() + self.operator_performance = UnimodalResults(modalities, tasks, debug) + + self._tasks_require_same_dims = True + self.expected_dimensions = tasks[0].expected_dim + + for i in range(1, len(tasks)): + self.expected_dimensions = tasks[i].expected_dim + if tasks[i - 1].expected_dim != tasks[i].expected_dim: + self._tasks_require_same_dims = False + + def store_results(self, file_name=None): + if file_name is None: + import time + + timestr = time.strftime("%Y%m%d-%H%M%S") + file_name = "unimodal_optimizer" + timestr + ".pkl" + + with open(file_name, "wb") as f: + pickle.dump(self.operator_performance.results, f) + + def optimize_parallel(self, n_workers=None): + if n_workers is None: + n_workers = min(len(self.modalities), mp.cpu_count()) + + with ProcessPoolExecutor(max_workers=n_workers) as executor: + future_to_modality = { + executor.submit(self._process_modality, modality, True): modality + for modality in self.modalities + } + + for future in as_completed(future_to_modality): + modality = future_to_modality[future] + # try: + results = future.result() + self._merge_results(results) + # except Exception as exc: + # print(f'Modality {modality.modality_id} generated an exception: {exc}') + + def optimize(self): + for modality in self.modalities: + local_result = self._process_modality(modality, False) + # self._merge_results(local_result) + + def _process_modality(self, modality, parallel): + if parallel: + local_results = UnimodalResults( + modalities=[modality], tasks=self.tasks, debug=False + ) + else: + local_results = self.operator_performance + + context_operators = self.operator_registry.get_context_operators() + + for context_operator in context_operators: + context_representation = None + if ( + modality.modality_type != ModalityType.TEXT + and modality.modality_type != ModalityType.VIDEO + ): + con_op = context_operator() + context_representation = modality.context(con_op) + self._evaluate_local(context_representation, [con_op], local_results) + + modality_specific_operators = self.operator_registry.get_representations( + modality.modality_type + ) + for modality_specific_operator in modality_specific_operators: + mod_context = None + mod_op = modality_specific_operator() + if context_representation is not None: + mod_context = context_representation.apply_representation(mod_op) + self._evaluate_local(mod_context, [con_op, mod_op], local_results) + + mod = modality.apply_representation(mod_op) + self._evaluate_local(mod, [mod_op], local_results) + + for context_operator_after in context_operators: + con_op_after = context_operator_after() + if mod_context is not None: + mod_context = mod_context.context(con_op_after) + self._evaluate_local( + mod_context, [con_op, mod_op, con_op_after], local_results + ) + + mod = mod.context(con_op_after) + self._evaluate_local(mod, [mod_op, con_op_after], local_results) + + return local_results + + def _merge_results(self, local_results): + """Merge local results into the main results""" + for modality_id in local_results.results: + for task_name in local_results.results[modality_id]: + self.operator_performance.results[modality_id][task_name].extend( + local_results.results[modality_id][task_name] + ) + + for modality in self.modalities: + for task_name in local_results.cache[modality]: + for key, value in local_results.cache[modality][task_name].items(): + self.operator_performance.cache[modality][task_name][key] = value + + def _evaluate_local(self, modality, representations, local_results): + if self._tasks_require_same_dims: + if self.expected_dimensions == 1 and get_shape(modality.metadata) > 1: + # for aggregation in Aggregation().get_aggregation_functions(): + agg_operator = AggregatedRepresentation(Aggregation()) + agg_modality = agg_operator.transform(modality) + reps = representations.copy() + reps.append(agg_operator) + # agg_modality.pad() + for task in self.tasks: + start = time.time() + scores = task.run(agg_modality.data) + end = time.time() + + local_results.add_result( + scores, + reps, + modality, + task.model.name, + end - start, + ) + else: + modality.pad() + for task in self.tasks: + start = time.time() + scores = task.run(modality.data) + end = time.time() + local_results.add_result( + scores, + representations, + modality, + task.model.name, + end - start, + ) + else: + for task in self.tasks: + if task.expected_dim == 1 and get_shape(modality.metadata) > 1: + # for aggregation in Aggregation().get_aggregation_functions(): + agg_operator = AggregatedRepresentation(Aggregation()) + agg_modality = agg_operator.transform(modality) + + reps = representations.copy() + reps.append(agg_operator) + # modality.pad() + start = time.time() + scores = task.run(agg_modality.data) + end = time.time() + local_results.add_result( + scores, + reps, + modality, + task.model.name, + end - start, + ) + else: + # modality.pad() + start = time.time() + scores = task.run(modality.data) + end = time.time() + local_results.add_result( + scores, + representations, + modality, + task.model.name, + end - start, + ) + + +class UnimodalResults: + def __init__(self, modalities, tasks, debug=False): + self.modality_ids = [modality.modality_id for modality in modalities] + self.task_names = [task.model.name for task in tasks] + self.results = {} + self.debug = debug + self.cache = {} + + for modality in self.modality_ids: + self.results[modality] = {} + self.cache[modality] = {} + for task_name in self.task_names: + self.cache[modality][task_name] = {} + self.results[modality][task_name] = [] + + def add_result(self, scores, representations, modality, task_name, task_time): + parameters = [] + representation_names = [] + + for rep in representations: + representation_names.append(type(rep).__name__) + if isinstance(rep, AggregatedRepresentation): + parameters.append(rep.parameters) + continue + + params = {} + for param in list(rep.parameters.keys()): + params[param] = getattr(rep, param) + + if isinstance(rep, WindowAggregation): + params["aggregation_function"] = ( + rep.aggregation_function.aggregation_function_name + ) + + parameters.append(params) + + entry = ResultEntry( + representations=representation_names, + params=parameters, + train_score=scores[0], + val_score=scores[1], + representation_time=modality.transform_time, + task_time=task_time, + ) + self.results[modality.modality_id][task_name].append(entry) + self.cache[modality.modality_id][task_name][ + (tuple(representation_names), scores[1], modality.transform_time) + ] = modality + + if self.debug: + print(f"{modality.modality_id}_{task_name}: {entry}") + + def print_results(self): + for modality in self.modality_ids: + for task_name in self.task_names: + for entry in self.results[modality][task_name]: + print(f"{modality}_{task_name}: {entry}") + + def get_k_best_results(self, modality, k, task): + """ + Get the k best results for the given modality + :param modality: modality to get the best results for + :param k: number of best results + """ + items = self.results[modality.modality_id][task.model.name] + sorted_indices = sorted( + range(len(items)), key=lambda x: items[x].val_score, reverse=True + )[:k] + + results = sorted( + self.results[modality.modality_id][task.model.name], + key=lambda x: x.val_score, + reverse=True, + )[:k] + + items = list(self.cache[modality.modality_id][task.model.name].items()) + reordered_cache = [items[i][1] for i in sorted_indices] + + return results, list(reordered_cache) + + +@dataclass(frozen=True) +class ResultEntry: + val_score: float + representations: list + params: list + train_score: float + representation_time: float + task_time: float diff --git a/src/main/python/systemds/scuro/modality/joined_transformed.py b/src/main/python/systemds/scuro/modality/joined_transformed.py index 6c6190e03cc..3e0d8fb9dfb 100644 --- a/src/main/python/systemds/scuro/modality/joined_transformed.py +++ b/src/main/python/systemds/scuro/modality/joined_transformed.py @@ -36,7 +36,8 @@ def __init__(self, left_modality, right_modality, transformation): :param transformation: Representation to be applied on the modality """ super().__init__( - reduce(or_, [left_modality.modality_type], right_modality.modality_type) + reduce(or_, [left_modality.modality_type], right_modality.modality_type), + data_type=left_modality.data_type, ) self.transformation = transformation self.left_modality = left_modality diff --git a/src/main/python/systemds/scuro/modality/modality.py b/src/main/python/systemds/scuro/modality/modality.py index 87d5b5ee4e4..94e745b2cc1 100644 --- a/src/main/python/systemds/scuro/modality/modality.py +++ b/src/main/python/systemds/scuro/modality/modality.py @@ -22,6 +22,7 @@ from typing import List import numpy as np +from numpy.f2py.auxfuncs import throw_error from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations import utils @@ -44,6 +45,7 @@ def __init__( self.cost = None self.shape = None self.modality_id = modality_id + self.transform_time = None @property def data(self): @@ -61,9 +63,11 @@ def get_modality_names(self) -> List[str]: """ Extracts the individual unimodal modalities for a given transformed modality. """ - return [ + modality_names = [ modality.name for modality in ModalityType if modality in self.modality_type ] + modality_names.append(str(self.modality_id)) + return modality_names def copy_from_instance(self): """ @@ -90,36 +94,60 @@ def update_metadata(self): updated_md = self.modality_type.update_metadata(md_v, self.data[i]) self.metadata[md_k] = updated_md - def flatten(self, padding=True): + def flatten(self, padding=False): """ Flattens modality data by row-wise concatenation Prerequisite for some ML-models """ max_len = 0 + data = [] for num_instance, instance in enumerate(self.data): if type(instance) is np.ndarray: - self.data[num_instance] = instance.flatten() + d = instance.flatten() + max_len = max(max_len, len(d)) + data.append(d) elif isinstance(instance, List): - self.data[num_instance] = np.array( + d = np.array( [item for sublist in instance for item in sublist] ).flatten() - max_len = max(max_len, len(self.data[num_instance])) + max_len = max(max_len, len(d)) + data.append(d) if padding: - for i, instance in enumerate(self.data): + for i, instance in enumerate(data): if isinstance(instance, np.ndarray): if len(instance) < max_len: padded_data = np.zeros(max_len, dtype=instance.dtype) padded_data[: len(instance)] = instance - self.data[i] = padded_data + data[i] = padded_data else: padded_data = [] for entry in instance: padded_data.append(utils.pad_sequences(entry, max_len)) - self.data[i] = padded_data - self.data = np.array(self.data) + data[i] = padded_data + self.data = np.array(data) return self + def pad(self, value=0, max_len=None): + try: + if max_len is None: + result = np.array(self.data) + else: + raise "Needs padding to max_len" + except: + maxlen = ( + max([len(seq) for seq in self.data]) if max_len is None else max_len + ) + + result = np.full((len(self.data), maxlen), value, dtype=self.data_type) + + for i, seq in enumerate(self.data): + data = seq[:maxlen] + result[i, : len(data)] = data + # TODO: add padding to metadata as attention_masks + + self.data = result + def get_data_layout(self): if self.has_metadata(): return list(self.metadata.values())[0]["data_layout"]["representation"] @@ -131,3 +159,15 @@ def has_data(self): def has_metadata(self): return self.metadata is not None and self.metadata != {} + + def is_aligned(self, other_modality): + aligned = True + for i in range(len(self.data)): + if ( + list(self.metadata.values())[i]["data_layout"]["shape"] + != list(other_modality.metadata.values())[i]["data_layout"]["shape"] + ): + aligned = False + continue + + return aligned diff --git a/src/main/python/systemds/scuro/modality/transformed.py b/src/main/python/systemds/scuro/modality/transformed.py index 362764d21e9..6523e9502fc 100644 --- a/src/main/python/systemds/scuro/modality/transformed.py +++ b/src/main/python/systemds/scuro/modality/transformed.py @@ -20,11 +20,14 @@ # ------------------------------------------------------------- from functools import reduce from operator import or_ +from typing import Union, List from systemds.scuro.modality.type import ModalityType from systemds.scuro.modality.joined import JoinedModality from systemds.scuro.modality.modality import Modality from systemds.scuro.representations.window_aggregation import WindowAggregation +import time +import copy class TransformedModality(Modality): @@ -42,7 +45,22 @@ def __init__(self, modality, transformation, new_modality_type=None): super().__init__( new_modality_type, modality.modality_id, metadata, modality.data_type ) - self.transformation = transformation + self.transformation = None + self.add_transformation(transformation, modality) + + def add_transformation(self, transformation, modality): + if ( + transformation.__class__.__bases__[0].__name__ == "Fusion" + and modality.transformation[0].__class__.__bases__[0].__name__ != "Fusion" + ): + self.transformation = [] + else: + self.transformation = ( + [] + if type(modality).__name__ != "TransformedModality" + else copy.deepcopy(modality.transformation) + ) + self.transformation.append(transformation) def copy_from_instance(self): return type(self)(self, self.transformation) @@ -72,22 +90,26 @@ def join(self, right, join_condition): def window_aggregation(self, windowSize, aggregation): w = WindowAggregation(windowSize, aggregation) transformed_modality = TransformedModality(self, w) + start = time.time() transformed_modality.data = w.execute(self) - + transformed_modality.transform_time = time.time() - start return transformed_modality def context(self, context_operator): transformed_modality = TransformedModality(self, context_operator) - + start = time.time() transformed_modality.data = context_operator.execute(self) + transformed_modality.transform_time = time.time() - start return transformed_modality def apply_representation(self, representation): + start = time.time() new_modality = representation.transform(self) new_modality.update_metadata() + new_modality.transform_time = time.time() - start return new_modality - def combine(self, other, fusion_method): + def combine(self, other: Union[Modality, List[Modality]], fusion_method): """ Combines two or more modalities with each other using a dedicated fusion method :param other: The modality to be combined diff --git a/src/main/python/systemds/scuro/modality/type.py b/src/main/python/systemds/scuro/modality/type.py index a479e07085d..b2331d0faed 100644 --- a/src/main/python/systemds/scuro/modality/type.py +++ b/src/main/python/systemds/scuro/modality/type.py @@ -26,6 +26,7 @@ calculate_new_frequency, create_timestamps, ) +import torch # TODO: needs a way to define if data comes from a dataset with multiple instances or is like a streaming scenario where we only have one instance @@ -99,11 +100,19 @@ def update_base_metadata(cls, md, data, data_is_single_instance=True): dtype = np.nan shape = None if data_layout is DataLayout.SINGLE_LEVEL: - dtype = data.dtype - shape = data.shape + if isinstance(data, list): + dtype = data[0].dtype + shape = data[0].shape + elif isinstance(data, np.ndarray): + dtype = data.dtype + shape = data.shape elif data_layout is DataLayout.NESTED_LEVEL: - shape = data[0].shape - dtype = data[0].dtype + if data_is_single_instance: + dtype = data.dtype + shape = data.shape + else: + shape = data[0].shape + dtype = data[0].dtype md["data_layout"].update( {"representation": data_layout, "type": dtype, "shape": shape} @@ -199,9 +208,15 @@ def add_field(self, md, field, data): md[field] = data return md - def create_audio_metadata(self, sampling_rate, data): + def add_field_for_instances(self, md, field, data): + for key, value in zip(md.keys(), data): + md[key].update({field: value}) + + return md + + def create_audio_metadata(self, sampling_rate, data, is_single_instance=True): md = deepcopy(self.get_schema()) - md = ModalitySchemas.update_base_metadata(md, data, True) + md = ModalitySchemas.update_base_metadata(md, data, is_single_instance) md["frequency"] = sampling_rate md["length"] = data.shape[0] md["timestamp"] = create_timestamps(sampling_rate, md["length"]) @@ -240,10 +255,14 @@ def get_data_layout(cls, data, data_is_single_instance): return None if data_is_single_instance: - if isinstance(data, list): - return DataLayout.NESTED_LEVEL - elif isinstance(data, np.ndarray): + if ( + isinstance(data, list) + or isinstance(data, np.ndarray) + and data.ndim == 1 + ): return DataLayout.SINGLE_LEVEL + elif isinstance(data, np.ndarray) or isinstance(data, torch.Tensor): + return DataLayout.NESTED_LEVEL if isinstance(data[0], list): return DataLayout.NESTED_LEVEL diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py b/src/main/python/systemds/scuro/modality/unimodal_modality.py index c0ee70557c5..94d1fa057d9 100644 --- a/src/main/python/systemds/scuro/modality/unimodal_modality.py +++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py @@ -20,8 +20,9 @@ # ------------------------------------------------------------- from functools import reduce from operator import or_ - - +import time +import numpy as np +from systemds.scuro import ModalityType from systemds.scuro.dataloader.base_loader import BaseLoader from systemds.scuro.modality.modality import Modality from systemds.scuro.modality.joined import JoinedModality @@ -86,12 +87,14 @@ def join(self, other, join_condition): return joined_modality def context(self, context_operator): + start = time.time() if not self.has_data(): self.extract_raw_data() transformed_modality = TransformedModality(self, context_operator) transformed_modality.data = context_operator.execute(self) + transformed_modality.transform_time = time.time() - start return transformed_modality def aggregate(self, aggregation_function): @@ -108,18 +111,57 @@ def apply_representation(self, representation): representation, ) new_modality.data = [] - + start = time.time() + original_lengths = [] if self.data_loader.chunk_size: self.data_loader.reset() while self.data_loader.next_chunk < self.data_loader.num_chunks: self.extract_raw_data() transformed_chunk = representation.transform(self) new_modality.data.extend(transformed_chunk.data) + for d in transformed_chunk.data: + original_lengths.append(d.shape[0]) new_modality.metadata.update(transformed_chunk.metadata) else: - if not self.data: + if not self.has_data(): self.extract_raw_data() new_modality = representation.transform(self) + if not all( + "attention_masks" in entry for entry in new_modality.metadata.values() + ): + for d in new_modality.data: + original_lengths.append(d.shape[0]) + + if len(original_lengths) > 0 and min(original_lengths) < max(original_lengths): + target_length = max(original_lengths) + padded_embeddings = [] + for embeddings in new_modality.data: + current_length = embeddings.shape[0] + if current_length < target_length: + padding_needed = target_length - current_length + + padded = np.pad( + embeddings, + pad_width=( + (0, padding_needed), + (0, 0), + ), + mode="constant", + constant_values=0, + ) + padded_embeddings.append(padded) + else: + padded_embeddings.append(embeddings) + + attention_masks = np.zeros((len(new_modality.data), target_length)) + for i, length in enumerate(original_lengths): + attention_masks[i, :length] = 1 + + ModalityType(self.modality_type).add_field_for_instances( + new_modality.metadata, "attention_masks", attention_masks + ) + new_modality.data = padded_embeddings new_modality.update_metadata() + new_modality.transform_time = time.time() - start return new_modality diff --git a/src/main/python/systemds/scuro/representations/aggregate.py b/src/main/python/systemds/scuro/representations/aggregate.py index 756e6271ea5..2c046dc4016 100644 --- a/src/main/python/systemds/scuro/representations/aggregate.py +++ b/src/main/python/systemds/scuro/representations/aggregate.py @@ -52,12 +52,13 @@ def __init__(self, aggregation_function="mean", pad_modality=False, params=None) aggregation_function = params["aggregation_function"] pad_modality = params["pad_modality"] - if aggregation_function not in self._aggregation_function.keys(): + if aggregation_function not in list(self._aggregation_function.keys()): raise ValueError("Invalid aggregation function") self._aggregation_func = self._aggregation_function[aggregation_function] self.name = "Aggregation" self.pad_modality = pad_modality + self.aggregation_function_name = aggregation_function self.parameters = { "aggregation_function": aggregation_function, @@ -91,7 +92,7 @@ def execute(self, modality): padded_data.append(utils.pad_sequences(entry, max_len)) data[i] = padded_data - return data + return np.array(data) def transform(self, modality): return self.execute(modality) @@ -100,4 +101,4 @@ def aggregate_instance(self, instance): return self._aggregation_func(instance) def get_aggregation_functions(self): - return self._aggregation_function.keys() + return list(self._aggregation_function.keys()) diff --git a/src/main/python/systemds/scuro/representations/average.py b/src/main/python/systemds/scuro/representations/average.py index 8a7e6b9ec8e..ac51f5d1e8d 100644 --- a/src/main/python/systemds/scuro/representations/average.py +++ b/src/main/python/systemds/scuro/representations/average.py @@ -18,7 +18,7 @@ # under the License. # # ------------------------------------------------------------- - +import copy from typing import List import numpy as np @@ -37,23 +37,14 @@ def __init__(self): Combines modalities using averaging """ super().__init__("Average") + self.needs_alignment = True self.associative = True self.commutative = True - def transform(self, modalities: List[Modality]): - for modality in modalities: - modality.flatten() - - max_emb_size = self.get_max_embedding_size(modalities) - - padded_modalities = [] - for modality in modalities: - d = pad_sequences(modality.data, maxlen=max_emb_size, dtype="float32") - padded_modalities.append(d) - - data = padded_modalities[0] + def execute(self, modalities: List[Modality]): + data = copy.deepcopy(modalities[0].data) for i in range(1, len(modalities)): - data += padded_modalities[i] + data += modalities[i].data data /= len(modalities) diff --git a/src/main/python/systemds/scuro/representations/bert.py b/src/main/python/systemds/scuro/representations/bert.py index 8d8d40f4fd7..3478b84e672 100644 --- a/src/main/python/systemds/scuro/representations/bert.py +++ b/src/main/python/systemds/scuro/representations/bert.py @@ -18,7 +18,7 @@ # under the License. # # ------------------------------------------------------------- - +import numpy as np from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.representations.unimodal import UnimodalRepresentation import torch @@ -34,12 +34,13 @@ @register_representation(ModalityType.TEXT) class Bert(UnimodalRepresentation): - def __init__(self, model_name="bert", output_file=None): + def __init__(self, model_name="bert", output_file=None, max_seq_length=512): parameters = {"model_name": "bert"} self.model_name = model_name super().__init__("Bert", ModalityType.EMBEDDING, parameters) self.output_file = output_file + self.max_seq_length = max_seq_length def transform(self, modality): transformed_modality = TransformedModality(modality, self) @@ -55,32 +56,33 @@ def transform(self, modality): if self.output_file is not None: save_embeddings(embeddings, self.output_file) + transformed_modality.data_type = np.float32 transformed_modality.data = embeddings return transformed_modality def create_embeddings(self, modality, model, tokenizer): - embeddings = [] - for i, d in enumerate(modality.data): - inputs = tokenizer( - d, - return_offsets_mapping=True, - return_tensors="pt", - padding=True, - truncation=True, - ) - - ModalityType.TEXT.add_field( - list(modality.metadata.values())[i], - "token_to_character_mapping", - inputs.data["offset_mapping"][0].tolist(), - ) + inputs = tokenizer( + modality.data, + return_offsets_mapping=True, + return_tensors="pt", + padding="longest", + return_attention_mask=True, + truncation=True, + ) + ModalityType.TEXT.add_field_for_instances( + modality.metadata, + "token_to_character_mapping", + inputs.data["offset_mapping"].tolist(), + ) - del inputs.data["offset_mapping"] + ModalityType.TEXT.add_field_for_instances( + modality.metadata, "attention_masks", inputs.data["attention_mask"].tolist() + ) + del inputs.data["offset_mapping"] - with torch.no_grad(): - outputs = model(**inputs) + with torch.no_grad(): + outputs = model(**inputs) - cls_embedding = outputs.last_hidden_state[0].numpy() - embeddings.append(cls_embedding) + cls_embedding = outputs.last_hidden_state.detach().numpy() - return embeddings + return cls_embedding diff --git a/src/main/python/systemds/scuro/representations/bow.py b/src/main/python/systemds/scuro/representations/bow.py index 6778811c49c..7cfddbb506f 100644 --- a/src/main/python/systemds/scuro/representations/bow.py +++ b/src/main/python/systemds/scuro/representations/bow.py @@ -18,7 +18,7 @@ # under the License. # # ------------------------------------------------------------- - +import numpy as np from sklearn.feature_extraction.text import CountVectorizer from systemds.scuro.modality.transformed import TransformedModality @@ -50,5 +50,6 @@ def transform(self, modality): if self.output_file is not None: save_embeddings(X, self.output_file) + transformed_modality.data_type = np.float32 transformed_modality.data = X return transformed_modality diff --git a/src/main/python/systemds/scuro/representations/concatenation.py b/src/main/python/systemds/scuro/representations/concatenation.py index c7ce33ab5c7..a4d4d53c43e 100644 --- a/src/main/python/systemds/scuro/representations/concatenation.py +++ b/src/main/python/systemds/scuro/representations/concatenation.py @@ -20,7 +20,7 @@ # ------------------------------------------------------------- from typing import List - +import copy import numpy as np from systemds.scuro.modality.modality import Modality @@ -33,14 +33,13 @@ @register_fusion_operator() class Concatenation(Fusion): - def __init__(self, padding=True): + def __init__(self): """ Combines modalities using concatenation """ super().__init__("Concatenation") - self.padding = padding - def transform(self, modalities: List[Modality]): + def execute(self, modalities: List[Modality]): if len(modalities) == 1: return np.array(modalities[0].data) @@ -53,19 +52,6 @@ def transform(self, modalities: List[Modality]): data = np.zeros((size, 0)) for modality in modalities: - if self.padding: - data = np.concatenate( - [ - data, - pad_sequences( - modality.data, - maxlen=max_emb_size, - dtype=modality.data.dtype, - ), - ], - axis=-1, - ) - else: - data = np.concatenate([data, modality.data], axis=-1) + data = np.concatenate([data, copy.deepcopy(modality.data)], axis=-1) return np.array(data) diff --git a/src/main/python/systemds/scuro/representations/fusion.py b/src/main/python/systemds/scuro/representations/fusion.py index cbbb5606e6d..4b746eee219 100644 --- a/src/main/python/systemds/scuro/representations/fusion.py +++ b/src/main/python/systemds/scuro/representations/fusion.py @@ -21,9 +21,11 @@ from typing import List import numpy as np +from systemds.scuro import AggregatedRepresentation, Aggregation from systemds.scuro.modality.modality import Modality from systemds.scuro.representations.representation import Representation +from systemds.scuro.utils.schema_helpers import get_shape class Fusion(Representation): @@ -44,6 +46,21 @@ def transform(self, modalities: List[Modality]): :param modalities: List of modalities used in the fusion :return: fused data """ + mods = [] + for modality in modalities: + agg_modality = None + if get_shape(modality.metadata) > 1: + agg_operator = AggregatedRepresentation(Aggregation()) + agg_modality = agg_operator.transform(modality) + mods.append(agg_modality if agg_modality else modality) + + if self.needs_alignment: + max_len = self.get_max_embedding_size(mods) + for modality in mods: + modality.pad(max_len=max_len) + return self.execute(mods) + + def execute(self, modalities: List[Modality]): raise f"Not implemented for Fusion: {self.name}" def get_max_embedding_size(self, modalities: List[Modality]): diff --git a/src/main/python/systemds/scuro/representations/glove.py b/src/main/python/systemds/scuro/representations/glove.py index d948567f3f5..9076efecfc9 100644 --- a/src/main/python/systemds/scuro/representations/glove.py +++ b/src/main/python/systemds/scuro/representations/glove.py @@ -67,5 +67,6 @@ def transform(self, modality): if self.output_file is not None: save_embeddings(np.array(embeddings), self.output_file) + transformed_modality.data_type = np.float32 transformed_modality.data = np.array(embeddings) return transformed_modality diff --git a/src/main/python/systemds/scuro/representations/hadamard.py b/src/main/python/systemds/scuro/representations/hadamard.py index 138003b8741..a777768ff6e 100644 --- a/src/main/python/systemds/scuro/representations/hadamard.py +++ b/src/main/python/systemds/scuro/representations/hadamard.py @@ -41,8 +41,7 @@ def __init__(self): self.commutative = True self.associative = True - def transform(self, modalities: List[Modality], train_indices=None): - # TODO: check for alignment in the metadata + def execute(self, modalities: List[Modality], train_indices=None): fused_data = np.prod([m.data for m in modalities], axis=0) return fused_data diff --git a/src/main/python/systemds/scuro/representations/lstm.py b/src/main/python/systemds/scuro/representations/lstm.py index cbab0f68978..0cfafddefa9 100644 --- a/src/main/python/systemds/scuro/representations/lstm.py +++ b/src/main/python/systemds/scuro/representations/lstm.py @@ -64,7 +64,7 @@ def transform(self, modalities: List[Modality]): result = np.zeros((size, 0)) for modality in modalities: - if modality.modality_type in self.unimodal_embeddings.keys(): + if modality.modality_type in list(self.unimodal_embeddings.keys()): out = self.unimodal_embeddings.get(modality.modality_type) else: out = self.run_lstm(modality.data) diff --git a/src/main/python/systemds/scuro/representations/max.py b/src/main/python/systemds/scuro/representations/max.py index 6ecf5fd52f3..39f5069c2b5 100644 --- a/src/main/python/systemds/scuro/representations/max.py +++ b/src/main/python/systemds/scuro/representations/max.py @@ -40,11 +40,9 @@ def __init__(self): self.associative = True self.commutative = True - def transform( + def execute( self, modalities: List[Modality], ): - # TODO: need to check if data is aligned - same number of dimension fused_data = np.maximum.reduce([m.data for m in modalities]) - return fused_data diff --git a/src/main/python/systemds/scuro/representations/mel_spectrogram.py b/src/main/python/systemds/scuro/representations/mel_spectrogram.py index 8c14c03ac60..8e897542b0c 100644 --- a/src/main/python/systemds/scuro/representations/mel_spectrogram.py +++ b/src/main/python/systemds/scuro/representations/mel_spectrogram.py @@ -46,19 +46,18 @@ def transform(self, modality): modality, self, self.output_modality_type ) result = [] - max_length = 0 + for i, sample in enumerate(modality.data): sr = list(modality.metadata.values())[i]["frequency"] S = librosa.feature.melspectrogram( - y=sample, + y=np.array(sample), sr=sr, n_mels=self.n_mels, hop_length=self.hop_length, n_fft=self.n_fft, - ) + ).astype(modality.data_type) S_dB = librosa.power_to_db(S, ref=np.max) - if S_dB.shape[-1] > max_length: - max_length = S_dB.shape[-1] + result.append(S_dB.T) transformed_modality.data = result diff --git a/src/main/python/systemds/scuro/representations/mfcc.py b/src/main/python/systemds/scuro/representations/mfcc.py index 234e93246fd..00f735a756e 100644 --- a/src/main/python/systemds/scuro/representations/mfcc.py +++ b/src/main/python/systemds/scuro/representations/mfcc.py @@ -48,20 +48,19 @@ def transform(self, modality): modality, self, self.output_modality_type ) result = [] - max_length = 0 + for i, sample in enumerate(modality.data): sr = list(modality.metadata.values())[i]["frequency"] mfcc = librosa.feature.mfcc( - y=sample, + y=np.array(sample), sr=sr, n_mfcc=self.n_mfcc, dct_type=self.dct_type, hop_length=self.hop_length, n_mels=self.n_mels, - ) + ).astype(modality.data_type) mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc) - if mfcc.shape[-1] > max_length: # TODO: check if this needs to be done - max_length = mfcc.shape[-1] + result.append(mfcc.T) transformed_modality.data = result diff --git a/src/main/python/systemds/scuro/representations/representation.py b/src/main/python/systemds/scuro/representations/representation.py index a9f283b6fe3..6137baf46dc 100644 --- a/src/main/python/systemds/scuro/representations/representation.py +++ b/src/main/python/systemds/scuro/representations/representation.py @@ -32,7 +32,7 @@ def parameters(self): def get_current_parameters(self): current_params = {} - for parameter in self.parameters.keys(): + for parameter in list(self.parameters.keys()): current_params[parameter] = getattr(self, parameter) return current_params diff --git a/src/main/python/systemds/scuro/representations/spectrogram.py b/src/main/python/systemds/scuro/representations/spectrogram.py index 6a713a3d21c..8daa9abb015 100644 --- a/src/main/python/systemds/scuro/representations/spectrogram.py +++ b/src/main/python/systemds/scuro/representations/spectrogram.py @@ -41,15 +41,14 @@ def transform(self, modality): modality, self, self.output_modality_type ) result = [] - max_length = 0 + for i, sample in enumerate(modality.data): spectrogram = librosa.stft( - y=sample, hop_length=self.hop_length, n_fft=self.n_fft - ) + y=np.array(sample), hop_length=self.hop_length, n_fft=self.n_fft + ).astype(modality.data_type) S_dB = librosa.amplitude_to_db(np.abs(spectrogram)) - if S_dB.shape[-1] > max_length: - max_length = S_dB.shape[-1] - result.append(S_dB.T) + + result.append(S_dB.T.reshape(-1)) transformed_modality.data = result return transformed_modality diff --git a/src/main/python/systemds/scuro/representations/sum.py b/src/main/python/systemds/scuro/representations/sum.py index 46d93f2eda0..5b3710b6e14 100644 --- a/src/main/python/systemds/scuro/representations/sum.py +++ b/src/main/python/systemds/scuro/representations/sum.py @@ -37,15 +37,12 @@ def __init__(self): Combines modalities using colum-wise sum """ super().__init__("Sum") + self.needs_alignment = True - def transform(self, modalities: List[Modality]): - max_emb_size = self.get_max_embedding_size(modalities) - - data = pad_sequences(modalities[0].data, maxlen=max_emb_size, dtype="float32") + def execute(self, modalities: List[Modality]): + data = modalities[0].data for m in range(1, len(modalities)): - data += pad_sequences( - modalities[m].data, maxlen=max_emb_size, dtype="float32" - ) + data += modalities[m].data return data diff --git a/src/main/python/systemds/scuro/representations/tfidf.py b/src/main/python/systemds/scuro/representations/tfidf.py index 1df5a1fde08..3b8f069df83 100644 --- a/src/main/python/systemds/scuro/representations/tfidf.py +++ b/src/main/python/systemds/scuro/representations/tfidf.py @@ -43,10 +43,10 @@ def transform(self, modality): vectorizer = TfidfVectorizer(min_df=self.min_df) X = vectorizer.fit_transform(modality.data) - X = [np.array(x).reshape(1, -1) for x in X.toarray()] - + X = [np.array(x).astype(np.float32).reshape(1, -1) for x in X.toarray()] if self.output_file is not None: save_embeddings(X, self.output_file) + transformed_modality.data_type = np.float32 transformed_modality.data = X return transformed_modality diff --git a/src/main/python/systemds/scuro/representations/wav2vec.py b/src/main/python/systemds/scuro/representations/wav2vec.py index 29f5bcbea02..86145e3769e 100644 --- a/src/main/python/systemds/scuro/representations/wav2vec.py +++ b/src/main/python/systemds/scuro/representations/wav2vec.py @@ -52,7 +52,9 @@ def transform(self, modality): result = [] for i, sample in enumerate(modality.data): sr = list(modality.metadata.values())[i]["frequency"] - audio_resampled = librosa.resample(sample, orig_sr=sr, target_sr=16000) + audio_resampled = librosa.resample( + np.array(sample), orig_sr=sr, target_sr=16000 + ) input = self.processor( audio_resampled, sampling_rate=16000, return_tensors="pt", padding=True ) diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index bff63729c7b..167f4adafea 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -21,7 +21,7 @@ import numpy as np import math -from systemds.scuro.modality.type import DataLayout +from systemds.scuro.modality.type import DataLayout, ModalityType from systemds.scuro.drsearch.operator_registry import register_context_operator from systemds.scuro.representations.aggregate import Aggregation @@ -30,7 +30,7 @@ @register_context_operator() class WindowAggregation(Context): - def __init__(self, window_size=10, aggregation_function="mean"): + def __init__(self, window_size=10, aggregation_function="mean", pad=True): parameters = { "window_size": [window_size], "aggregation_function": list(Aggregation().get_aggregation_functions()), @@ -38,6 +38,7 @@ def __init__(self, window_size=10, aggregation_function="mean"): super().__init__("WindowAggregation", parameters) self.window_size = window_size self.aggregation_function = aggregation_function + self.pad = pad @property def aggregation_function(self): @@ -49,6 +50,7 @@ def aggregation_function(self, value): def execute(self, modality): windowed_data = [] + original_lengths = [] for instance in modality.data: new_length = math.ceil(len(instance) / self.window_size) if modality.get_data_layout() == DataLayout.SINGLE_LEVEL: @@ -59,14 +61,53 @@ def execute(self, modality): windowed_instance = self.window_aggregate_nested_level( instance, new_length ) - + original_lengths.append(new_length) windowed_data.append(windowed_instance) + if self.pad and not isinstance(windowed_data, np.ndarray): + target_length = max(original_lengths) + sample_shape = windowed_data[0].shape + is_1d = len(sample_shape) == 1 + + padded_features = [] + for i, features in enumerate(windowed_data): + current_len = original_lengths[i] + + if current_len < target_length: + padding_needed = target_length - current_len + + if is_1d: + padding = np.zeros(padding_needed) + padded = np.concatenate([features, padding]) + else: + feature_dim = features.shape[-1] + padding = np.zeros((padding_needed, feature_dim)) + padded = np.concatenate([features, padding], axis=0) + + padded_features.append(padded) + else: + padded_features.append(features) + + attention_masks = np.zeros((len(windowed_data), target_length)) + for i, length in enumerate(original_lengths): + actual_length = min(length, target_length) + attention_masks[i, :actual_length] = 1 + + ModalityType(modality.modality_type).add_field_for_instances( + modality.metadata, "attention_masks", attention_masks + ) + + windowed_data = np.array(padded_features) + data_type = list(modality.metadata.values())[0]["data_layout"]["type"] + if data_type != "str": + windowed_data = windowed_data.astype(data_type) + return windowed_data def window_aggregate_single_level(self, instance, new_length): if isinstance(instance, str): return instance + instance = np.array(instance) num_cols = instance.shape[1] if instance.ndim > 1 else 1 result = np.empty((new_length, num_cols)) for i in range(0, new_length): @@ -86,4 +127,4 @@ def window_aggregate_nested_level(self, instance, new_length): data[i * self.window_size : i * self.window_size + self.window_size] ) - return result + return np.array(result) diff --git a/src/main/python/systemds/scuro/representations/word2vec.py b/src/main/python/systemds/scuro/representations/word2vec.py index 0210207a013..88d60ac828b 100644 --- a/src/main/python/systemds/scuro/representations/word2vec.py +++ b/src/main/python/systemds/scuro/representations/word2vec.py @@ -65,9 +65,10 @@ def transform(self, modality): embeddings = [] for sentences in modality.data: tokens = list(tokenize(sentences.lower())) - embeddings.append(np.array(get_embedding(tokens, model)).reshape(1, -1)) + embeddings.append(np.array(get_embedding(tokens, model)).astype(np.float32)) if self.output_file is not None: save_embeddings(np.array(embeddings), self.output_file) - transformed_modality.data = embeddings + transformed_modality.data_type = np.float32 + transformed_modality.data = np.array(embeddings) return transformed_modality diff --git a/src/main/python/systemds/scuro/utils/schema_helpers.py b/src/main/python/systemds/scuro/utils/schema_helpers.py index 28af476cca4..3d1fbf4d71a 100644 --- a/src/main/python/systemds/scuro/utils/schema_helpers.py +++ b/src/main/python/systemds/scuro/utils/schema_helpers.py @@ -40,3 +40,7 @@ def calculate_new_frequency(new_length, old_length, old_frequency): duration = old_length / old_frequency new_frequency = new_length / duration return new_frequency + + +def get_shape(metadata): + return len(list(metadata.values())[0]["data_layout"]["shape"]) diff --git a/src/main/python/systemds/scuro/utils/static_variables.py b/src/main/python/systemds/scuro/utils/static_variables.py new file mode 100644 index 00000000000..8237cdf1b3e --- /dev/null +++ b/src/main/python/systemds/scuro/utils/static_variables.py @@ -0,0 +1,36 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- +import numpy as np +import torch + +global_rng = np.random.default_rng(42) + + +def get_seed(): + return global_rng.integers(0, 1024) + + +def get_device(): + return torch.device( + "cuda:0" + if torch.cuda.is_available() + else "mps" if torch.mps.is_available() else "cpu" + ) diff --git a/src/main/python/tests/scuro/data_generator.py b/src/main/python/tests/scuro/data_generator.py index fbb50ac180e..e57716fa99d 100644 --- a/src/main/python/tests/scuro/data_generator.py +++ b/src/main/python/tests/scuro/data_generator.py @@ -93,10 +93,14 @@ def create1DModality( self.modality_id += 1 return tf_modality - def create_audio_data(self, num_instances, num_features): - data = np.random.rand(num_instances, num_features).astype(np.float32) + def create_audio_data(self, num_instances, max_audio_length): + data = [ + [random.random() for _ in range(random.randint(1, max_audio_length))] + for _ in range(num_instances) + ] + metadata = { - i: ModalityType.AUDIO.create_audio_metadata(16000, data[i]) + i: ModalityType.AUDIO.create_audio_metadata(16000, np.array(data[i])) for i in range(num_instances) } @@ -165,26 +169,41 @@ def create_text_data(self, num_instances): return sentences, metadata - def create_visual_modality(self, num_instances, num_frames=1, height=28, width=28): - if num_frames == 1: + def create_visual_modality( + self, num_instances, max_num_frames=1, height=28, width=28 + ): + data = [ + np.random.randint( + 0, + 256, + (np.random.randint(5, max_num_frames + 1), height, width, 3), + dtype=np.uint8, + ) + for _ in range(num_instances) + ] + if max_num_frames == 1: print(f"TODO: create image metadata") else: metadata = { i: ModalityType.VIDEO.create_video_metadata( - 30, num_frames, width, height, 1 + 30, data[i].shape[0], width, height, 3 ) for i in range(num_instances) } - return ( - np.random.randint( - 0, - 256, - (num_instances, num_frames, height, width), - # ).astype(np.float16).tolist(), - ).astype(np.float16), - metadata, - ) + return (data, metadata) + + def create_balanced_labels(self, num_instances, num_classes=2): + if num_instances % num_classes != 0: + raise ValueError("Size must be even to have equal numbers of classes.") + + class_size = int(num_instances / num_classes) + vector = np.array([0] * class_size) + for i in range(num_classes - 1): + vector = np.concatenate((vector, np.array([1] * class_size))) + + np.random.shuffle(vector) + return vector def setup_data(modalities, num_instances, path): diff --git a/src/main/python/tests/scuro/test_dr_search.py b/src/main/python/tests/scuro/test_dr_search.py index 50f57eebb20..3e0e702e6f3 100644 --- a/src/main/python/tests/scuro/test_dr_search.py +++ b/src/main/python/tests/scuro/test_dr_search.py @@ -94,7 +94,9 @@ def setUpClass(cls): cls.num_instances = 20 cls.data_generator = ModalityRandomDataGenerator() - cls.labels = np.random.choice([0, 1], size=cls.num_instances) + cls.labels = ModalityRandomDataGenerator().create_balanced_labels( + num_instances=cls.num_instances + ) # TODO: adapt the representation so they return non aggregated values. Apply windowing operation instead cls.video = cls.data_generator.create1DModality( diff --git a/src/main/python/tests/scuro/test_fusion_orders.py b/src/main/python/tests/scuro/test_fusion_orders.py index eb01d18ffe4..22d64bcc0bf 100644 --- a/src/main/python/tests/scuro/test_fusion_orders.py +++ b/src/main/python/tests/scuro/test_fusion_orders.py @@ -65,7 +65,7 @@ def test_fusion_order_concat(self): self.assertFalse(np.array_equal(r_1_r_2.data, r_2_r_1.data)) self.assertFalse(np.array_equal(r_1_r_2_r_3.data, r_2_r_1_r_3.data)) - self.assertFalse(np.array_equal(r_1_r_2_r_3.data, r1_r2_r3.data)) + self.assertFalse(np.array_equal(r_2_r_1.data, r1_r2_r3.data)) self.assertFalse(np.array_equal(r_1_r_2.data, r1_r2_r3.data)) def test_fusion_order_max(self): diff --git a/src/main/python/tests/scuro/test_multimodal_fusion.py b/src/main/python/tests/scuro/test_multimodal_fusion.py index 77f03054eb5..ae3ddedffb1 100644 --- a/src/main/python/tests/scuro/test_multimodal_fusion.py +++ b/src/main/python/tests/scuro/test_multimodal_fusion.py @@ -22,12 +22,15 @@ import shutil import unittest +from multiprocessing import freeze_support import numpy as np from sklearn import svm from sklearn.metrics import classification_report from sklearn.model_selection import train_test_split +from systemds.scuro.drsearch.multimodal_optimizer import MultimodalOptimizer +from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer from systemds.scuro.representations.concatenation import Concatenation from systemds.scuro.representations.average import Average from systemds.scuro.drsearch.fusion_optimizer import FusionOptimizer @@ -115,7 +118,9 @@ class TestMultimodalRepresentationOptimizer(unittest.TestCase): def setUpClass(cls): cls.num_instances = 10 cls.mods = [ModalityType.VIDEO, ModalityType.AUDIO, ModalityType.TEXT] - cls.labels = np.random.choice([0, 1], size=cls.num_instances) + cls.labels = ModalityRandomDataGenerator().create_balanced_labels( + num_instances=cls.num_instances + ) cls.indices = np.array(range(cls.num_instances)) split = train_test_split( @@ -123,31 +128,15 @@ def setUpClass(cls): cls.labels, test_size=0.2, random_state=42, + stratify=cls.labels, ) cls.train_indizes, cls.val_indizes = [int(i) for i in split[0]], [ int(i) for i in split[1] ] - cls.tasks = [ - Task( - "UnimodalRepresentationTask1", - TestSVM(), - cls.labels, - cls.train_indizes, - cls.val_indizes, - ), - Task( - "UnimodalRepresentationTask2", - TestCNN(), - cls.labels, - cls.train_indizes, - cls.val_indizes, - ), - ] - def test_multimodal_fusion(self): task = Task( - "UnimodalRepresentationTask1", + "MM_Fusion_Task1", TestSVM(), self.labels, self.train_indizes, @@ -192,22 +181,47 @@ def test_multimodal_fusion(self): ): registry = Registry() registry._fusion_operators = [Average, Concatenation] - unimodal_optimizer = UnimodalRepresentationOptimizer( - [text, audio, video], [task], max_chain_depth=2 + unimodal_optimizer = UnimodalOptimizer( + [audio, text, video], [task], debug=False ) unimodal_optimizer.optimize() + unimodal_optimizer.operator_performance.get_k_best_results(audio, 2, task) - multimodal_optimizer = FusionOptimizer( + multimodal_optimizer = MultimodalOptimizer( [audio, text, video], - task, - unimodal_optimizer.optimization_results, - unimodal_optimizer.cache, - 2, - 2, + unimodal_optimizer.operator_performance, + [task], debug=False, ) + multimodal_optimizer.optimize() + assert ( + len(multimodal_optimizer.optimization_results.results["TestSVM"].keys()) + == 57 + ) + assert ( + len( + multimodal_optimizer.optimization_results.results["TestSVM"][ + "0_1_2_3_4_5" + ] + ) + == 62 + ) + assert ( + len( + multimodal_optimizer.optimization_results.results["TestSVM"][ + "3_4_5" + ] + ) + == 6 + ) + assert ( + len(multimodal_optimizer.optimization_results.results["TestSVM"]["0_1"]) + == 2 + ) + if __name__ == "__main__": + freeze_support() unittest.main() diff --git a/src/main/python/tests/scuro/test_multimodal_join.py b/src/main/python/tests/scuro/test_multimodal_join.py index 9e3a16ffcad..5fd22dc8d98 100644 --- a/src/main/python/tests/scuro/test_multimodal_join.py +++ b/src/main/python/tests/scuro/test_multimodal_join.py @@ -20,7 +20,6 @@ # TODO: Test edge cases: unequal number of audio-video timestamps (should still work and add the average over all audio/video samples) -import shutil import unittest import numpy as np @@ -30,9 +29,6 @@ from systemds.scuro.representations.mel_spectrogram import MelSpectrogram from systemds.scuro.representations.resnet import ResNet from tests.scuro.data_generator import TestDataLoader, ModalityRandomDataGenerator - -from systemds.scuro.dataloader.audio_loader import AudioLoader -from systemds.scuro.dataloader.video_loader import VideoLoader from systemds.scuro.modality.type import ModalityType diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py b/src/main/python/tests/scuro/test_unimodal_optimizer.py index 9ed034e5fe8..a73d7b5fcc1 100644 --- a/src/main/python/tests/scuro/test_unimodal_optimizer.py +++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py @@ -20,7 +20,6 @@ # ------------------------------------------------------------- -import shutil import unittest import numpy as np @@ -31,8 +30,8 @@ from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.models.model import Model from systemds.scuro.drsearch.task import Task -from systemds.scuro.drsearch.unimodal_representation_optimizer import ( - UnimodalRepresentationOptimizer, +from systemds.scuro.drsearch.unimodal_optimizer import ( + UnimodalOptimizer, ) from systemds.scuro.representations.spectrogram import Spectrogram @@ -41,9 +40,6 @@ from systemds.scuro.representations.resnet import ResNet from tests.scuro.data_generator import ModalityRandomDataGenerator, TestDataLoader -from systemds.scuro.dataloader.audio_loader import AudioLoader -from systemds.scuro.dataloader.video_loader import VideoLoader -from systemds.scuro.dataloader.text_loader import TextLoader from systemds.scuro.modality.type import ModalityType @@ -108,7 +104,9 @@ class TestUnimodalRepresentationOptimizer(unittest.TestCase): def setUpClass(cls): cls.num_instances = 10 cls.mods = [ModalityType.VIDEO, ModalityType.AUDIO, ModalityType.TEXT] - cls.labels = np.random.choice([0, 1], size=cls.num_instances) + cls.labels = ModalityRandomDataGenerator().create_balanced_labels( + num_instances=cls.num_instances + ) cls.indices = np.array(range(cls.num_instances)) split = train_test_split( @@ -186,24 +184,19 @@ def optimize_unimodal_representation_for_modality(self, modality): ): registry = Registry() - unimodal_optimizer = UnimodalRepresentationOptimizer( - [modality], self.tasks, max_chain_depth=2 - ) + unimodal_optimizer = UnimodalOptimizer([modality], self.tasks, False) unimodal_optimizer.optimize() assert ( - list(unimodal_optimizer.optimization_results.keys())[0] + unimodal_optimizer.operator_performance.modality_ids[0] == modality.modality_id ) - assert len(list(unimodal_optimizer.optimization_results.values())[0]) == 2 - assert ( - len( - unimodal_optimizer.get_k_best_results(modality, 1, self.tasks[0])[ - 0 - ].operator_chain - ) - >= 1 + assert len(unimodal_optimizer.operator_performance.task_names) == 2 + result, cached = unimodal_optimizer.operator_performance.get_k_best_results( + modality, 1, self.tasks[0] ) + assert len(result) == 1 + assert len(cached) == 1 if __name__ == "__main__":