Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/main/python/systemds/scuro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@
from systemds.scuro.representations.multimodal_attention_fusion import (
AttentionFusion,
)
from systemds.scuro.representations.timeseries_representations import (
Mean,
Max,
Min,
Kurtosis,
Skew,
Std,
RMS,
ACF,
FrequencyMagnitude,
SpectralCentroid,
Quantile,
ZeroCrossingRate,
BandpowerFFT,
)
from systemds.scuro.representations.mfcc import MFCC
from systemds.scuro.representations.hadamard import Hadamard
from systemds.scuro.representations.optical_flow import OpticalFlow
Expand Down Expand Up @@ -141,4 +156,17 @@
"AttentionFusion",
"DynamicWindow",
"StaticWindow",
"Min",
"Max",
"Mean",
"Std",
"Kurtosis",
"Skew",
"RMS",
"ACF",
"FrequencyMagnitude",
"SpectralCentroid",
"Quantile",
"BandpowerFFT",
"ZeroCrossingRate",
]
129 changes: 129 additions & 0 deletions src/main/python/systemds/scuro/dataloader/timeseries_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------
import numpy as np
from typing import List, Optional, Union
import h5py


from systemds.scuro.dataloader.base_loader import BaseLoader
from systemds.scuro.modality.type import ModalityType


class TimeseriesLoader(BaseLoader):
def __init__(
self,
source_path: str,
indices: List[str],
signal_names: List[str],
data_type: Union[np.dtype, str] = np.float32,
chunk_size: Optional[int] = None,
sampling_rate: Optional[int] = None,
normalize: bool = True,
file_format: str = "npy",
):
super().__init__(
source_path, indices, data_type, chunk_size, ModalityType.TIMESERIES
)
self.signal_names = signal_names
self.sampling_rate = sampling_rate
self.normalize = normalize
self.file_format = file_format.lower()

if self.file_format not in ["npy", "mat", "hdf5", "txt"]:
raise ValueError(f"Unsupported file format: {self.file_format}")

def extract(self, file: str, index: Optional[Union[str, List[str]]] = None):
self.file_sanity_check(file)

if self.file_format == "npy":
data = self._load_npy(file)
elif self.file_format in ["txt", "csv"]:
with open(file, "r") as f:
first_line = f.readline()
if any(name in first_line for name in self.signal_names):
data = self._load_csv_with_header(file)
else:
data = self._load_txt(file)

if data.ndim > 1 and len(self.signal_names) == 1:
data = data.flatten()

if self.normalize:
data = self._normalize_signals(data)

if file:
self.metadata[index] = self.modality_type.create_ts_metadata(
self.signal_names, data, self.sampling_rate
)
else:
for i, index in enumerate(self.indices):
self.metadata[str(index)] = self.modality_type.create_ts_metadata(
self.signal_names, data[i], self.sampling_rate
)
self.data.append(data)

def _normalize_signals(self, data: np.ndarray) -> np.ndarray:
if data.ndim == 1:
mean = np.mean(data)
std = np.std(data)
return (data - mean) / (std + 1e-8)
else:
for i in range(data.shape[1]):
mean = np.mean(data[:, i])
std = np.std(data[:, i])
data[:, i] = (data[:, i] - mean) / (std + 1e-8)
return data

def _load_npy(self, file: str) -> np.ndarray:
data = np.load(file).astype(self._data_type)
return data

def _load_txt(self, file: str) -> np.ndarray:
data = np.loadtxt(file).astype(self._data_type)
return data

def _load_txt_with_header(self, file: str) -> np.ndarray:
with open(file, "r") as f:
header = f.readline().strip().split()

col_indices = [
header.index(name) for name in self.signal_names if name in header
]
data = np.loadtxt(file, dtype=self._data_type, skiprows=1, usecols=col_indices)
return data

def _load_csv_with_header(self, file: str, delimiter: str = None) -> np.ndarray:
import pandas as pd

if delimiter is None:
with open(file, "r") as f:
sample = f.read(1024)
if "," in sample:
delimiter = ","
elif "\t" in sample:
delimiter = "\t"
else:
delimiter = " "
df = pd.read_csv(file, delimiter=delimiter)

selected = [name for name in self.signal_names if name in df.columns]
data = df[selected].to_numpy(dtype=self._data_type)
return data
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def visit_node(node_id):
visit_node(input_id)
visited.add(node_id)
if node.operation is not None:
if node.parameters:
hyperparams.update(node.parameters)
if node.operation().parameters:
hyperparams.update(node.operation().parameters)
reps.append(node.operation)
node_order.append(node_id)
if node.modality_id is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def build_variants(
fusion_id = new_builder.create_operation_node(
fusion_op.__class__,
[left_root, right_root],
fusion_op.parameters,
fusion_op.get_current_parameters(),
)
variants.append((new_builder, fusion_id))

Expand Down
79 changes: 65 additions & 14 deletions src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ def store_results(self, file_name=None):
with open(file_name, "wb") as f:
pickle.dump(self.operator_performance.results, f)

def load_results(self, file_name):
with open(file_name, "rb") as f:
self.operator_performance.results = pickle.load(f)
self.operator_performance.cache = None

def optimize_parallel(self, n_workers=None):
if n_workers is None:
n_workers = min(len(self.modalities), mp.cpu_count())
Expand Down Expand Up @@ -177,7 +182,9 @@ def _evaluate_local(self, modality, local_results, dag, combination=None):
builder = self.builders[modality.modality_id]
agg_operator = AggregatedRepresentation()
rep_node_id = builder.create_operation_node(
agg_operator.__class__, [dag.root_node_id], agg_operator.parameters
agg_operator.__class__,
[dag.root_node_id],
agg_operator.get_current_parameters(),
)
dag = builder.build(rep_node_id)
representations = dag.execute([modality])
Expand Down Expand Up @@ -207,7 +214,7 @@ def _evaluate_local(self, modality, local_results, dag, combination=None):
rep_node_id = builder.create_operation_node(
agg_operator.__class__,
[dag.root_node_id],
agg_operator.parameters,
agg_operator.get_current_parameters(),
)
dag = builder.build(rep_node_id)
representations = dag.execute([modality])
Expand Down Expand Up @@ -235,7 +242,7 @@ def _build_modality_dag(
leaf_id = builder.create_leaf_node(modality.modality_id)

rep_node_id = builder.create_operation_node(
operator.__class__, [leaf_id], operator.parameters
operator.__class__, [leaf_id], operator.get_current_parameters()
)
current_node_id = rep_node_id
dags.append(builder.build(current_node_id))
Expand All @@ -258,31 +265,68 @@ def _build_modality_dag(
combine_id = builder.create_operation_node(
combination.__class__,
[current_node_id, other_rep_id],
combination.parameters,
combination.get_current_parameters(),
)
dags.append(builder.build(combine_id))
current_node_id = combine_id
if modality.modality_type in [
ModalityType.EMBEDDING,
ModalityType.IMAGE,
ModalityType.AUDIO,
]:
dags.extend(
self.default_context_operators(
modality, builder, leaf_id, current_node_id
)
)
elif modality.modality_type == ModalityType.TIMESERIES:
dags.extend(
self.temporal_context_operators(
modality, builder, leaf_id, current_node_id
)
)
return dags

def default_context_operators(self, modality, builder, leaf_id, current_node_id):
dags = []
context_operators = self._get_context_operators()

for context_op in context_operators:
if modality.modality_type != ModalityType.TEXT:
if (
modality.modality_type != ModalityType.TEXT
and modality.modality_type != ModalityType.VIDEO
):
context_node_id = builder.create_operation_node(
context_op,
[leaf_id],
context_op().parameters,
context_op().get_current_parameters(),
)
dags.append(builder.build(context_node_id))

context_node_id = builder.create_operation_node(
context_op,
[current_node_id],
context_op().parameters,
context_op().get_current_parameters(),
)
dags.append(builder.build(context_node_id))

return dags

def temporal_context_operators(self, modality, builder, leaf_id, current_node_id):
aggregators = self.operator_registry.get_representations(modality.modality_type)
context_operators = self._get_context_operators()

dags = []
for agg in aggregators:
for context_operator in context_operators:
context_node_id = builder.create_operation_node(
context_operator,
[leaf_id],
context_operator(agg()).get_current_parameters(),
)
dags.append(builder.build(context_node_id))

return dags


class UnimodalResults:
def __init__(self, modalities, tasks, debug=False, run=None):
Expand Down Expand Up @@ -339,13 +383,20 @@ def get_k_best_results(self, modality, k, task):
key=lambda x: task_results[x].val_score,
reverse=True,
)[:k]
if not self.cache:
cache = [
list(task_results[i].dag.execute([modality]).values())[-1]
for i in sorted_indices
]
else:
cache_items = (
list(self.cache[modality.modality_id][task.model.name].items())
if self.cache[modality.modality_id][task.model.name]
else []
)
cache = [cache_items[i][1] for i in sorted_indices if i < len(cache_items)]

cache_items = list(self.cache[modality.modality_id][task.model.name].items())
reordered_cache = [
cache_items[i][1] for i in sorted_indices if i < len(cache_items)
]

return results, reordered_cache
return results, cache


@dataclass(frozen=True)
Expand Down
2 changes: 2 additions & 0 deletions src/main/python/systemds/scuro/modality/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class ModalityType(Flag):
IMAGE = auto()
TIMESERIES = auto()
EMBEDDING = auto()
PHYSIOLOGICAL = auto()

def get_schema(self):
return ModalitySchemas.get(self.name)
Expand Down Expand Up @@ -239,6 +240,7 @@ def create_ts_metadata(
md["length"] = data.shape[0]
md["signal_names"] = signal_names
md["timestamp"] = create_timestamps(md["frequency"], md["length"])
md["is_multivariate"] = len(signal_names) > 1
return md

def create_video_metadata(self, frequency, length, width, height, num_channels):
Expand Down
17 changes: 16 additions & 1 deletion src/main/python/systemds/scuro/representations/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@


class Context(Representation):
def __init__(self, name, parameters=None):
def __init__(self, name, parameters=None, is_ts_rep=False):
"""
Parent class for different context operations
:param name: Name of the context operator
"""
super().__init__(name, parameters)
self.is_ts_rep = is_ts_rep

@abc.abstractmethod
def execute(self, modality: Modality):
Expand All @@ -40,3 +41,17 @@ def execute(self, modality: Modality):
:return: contextualized data
"""
raise f"Not implemented for Context Operator: {self.name}"

def get_current_parameters(self):
current_params = {}
if not self.parameters:
return current_params
for parameter in list(self.parameters.keys()):
if self.is_ts_rep:
if parameter == "agg_params":
current_params[parameter] = (
self.aggregation_function.get_current_parameters()
)
continue
current_params[parameter] = getattr(self, parameter)
return current_params
Loading
Loading