Skip to content

Commit b000248

Browse files
[SYSTEMDS-3835] Add timeseries representations to Scuro
This patch adds new timeseries and represenations and a new mechanism to compute windowed timeseries representations in the unimodal optimizer.
1 parent c7300f3 commit b000248

File tree

14 files changed

+601
-42
lines changed

14 files changed

+601
-42
lines changed

src/main/python/systemds/scuro/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,21 @@
4141
from systemds.scuro.representations.multimodal_attention_fusion import (
4242
AttentionFusion,
4343
)
44+
from systemds.scuro.representations.timeseries_representations import (
45+
Mean,
46+
Max,
47+
Min,
48+
Kurtosis,
49+
Skew,
50+
Std,
51+
RMS,
52+
ACF,
53+
FrequencyMagnitude,
54+
SpectralCentroid,
55+
Quantile,
56+
ZeroCrossingRate,
57+
BandpowerFFT,
58+
)
4459
from systemds.scuro.representations.mfcc import MFCC
4560
from systemds.scuro.representations.hadamard import Hadamard
4661
from systemds.scuro.representations.optical_flow import OpticalFlow
@@ -141,4 +156,17 @@
141156
"AttentionFusion",
142157
"DynamicWindow",
143158
"StaticWindow",
159+
"Min",
160+
"Max",
161+
"Mean",
162+
"Std",
163+
"Kurtosis",
164+
"Skew",
165+
"RMS",
166+
"ACF",
167+
"FrequencyMagnitude",
168+
"SpectralCentroid",
169+
"Quantile",
170+
"BandpowerFFT",
171+
"ZeroCrossingRate",
144172
]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# -------------------------------------------------------------
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
# -------------------------------------------------------------
21+
import numpy as np
22+
from typing import List, Optional, Union
23+
import h5py
24+
25+
26+
from systemds.scuro.dataloader.base_loader import BaseLoader
27+
from systemds.scuro.modality.type import ModalityType
28+
29+
30+
class TimeseriesLoader(BaseLoader):
31+
def __init__(
32+
self,
33+
source_path: str,
34+
indices: List[str],
35+
signal_names: List[str],
36+
data_type: Union[np.dtype, str] = np.float32,
37+
chunk_size: Optional[int] = None,
38+
sampling_rate: Optional[int] = None,
39+
normalize: bool = True,
40+
file_format: str = "npy",
41+
):
42+
super().__init__(
43+
source_path, indices, data_type, chunk_size, ModalityType.TIMESERIES
44+
)
45+
self.signal_names = signal_names
46+
self.sampling_rate = sampling_rate
47+
self.normalize = normalize
48+
self.file_format = file_format.lower()
49+
50+
if self.file_format not in ["npy", "mat", "hdf5", "txt"]:
51+
raise ValueError(f"Unsupported file format: {self.file_format}")
52+
53+
def extract(self, file: str, index: Optional[Union[str, List[str]]] = None):
54+
self.file_sanity_check(file)
55+
56+
if self.file_format == "npy":
57+
data = self._load_npy(file)
58+
elif self.file_format in ["txt", "csv"]:
59+
with open(file, "r") as f:
60+
first_line = f.readline()
61+
if any(name in first_line for name in self.signal_names):
62+
data = self._load_csv_with_header(file)
63+
else:
64+
data = self._load_txt(file)
65+
66+
if data.ndim > 1 and len(self.signal_names) == 1:
67+
data = data.flatten()
68+
69+
if self.normalize:
70+
data = self._normalize_signals(data)
71+
72+
if file:
73+
self.metadata[index] = self.modality_type.create_ts_metadata(
74+
self.signal_names, data, self.sampling_rate
75+
)
76+
else:
77+
for i, index in enumerate(self.indices):
78+
self.metadata[str(index)] = self.modality_type.create_ts_metadata(
79+
self.signal_names, data[i], self.sampling_rate
80+
)
81+
self.data.append(data)
82+
83+
def _normalize_signals(self, data: np.ndarray) -> np.ndarray:
84+
if data.ndim == 1:
85+
mean = np.mean(data)
86+
std = np.std(data)
87+
return (data - mean) / (std + 1e-8)
88+
else:
89+
for i in range(data.shape[1]):
90+
mean = np.mean(data[:, i])
91+
std = np.std(data[:, i])
92+
data[:, i] = (data[:, i] - mean) / (std + 1e-8)
93+
return data
94+
95+
def _load_npy(self, file: str) -> np.ndarray:
96+
data = np.load(file).astype(self._data_type)
97+
return data
98+
99+
def _load_txt(self, file: str) -> np.ndarray:
100+
data = np.loadtxt(file).astype(self._data_type)
101+
return data
102+
103+
def _load_txt_with_header(self, file: str) -> np.ndarray:
104+
with open(file, "r") as f:
105+
header = f.readline().strip().split()
106+
107+
col_indices = [
108+
header.index(name) for name in self.signal_names if name in header
109+
]
110+
data = np.loadtxt(file, dtype=self._data_type, skiprows=1, usecols=col_indices)
111+
return data
112+
113+
def _load_csv_with_header(self, file: str, delimiter: str = None) -> np.ndarray:
114+
import pandas as pd
115+
116+
if delimiter is None:
117+
with open(file, "r") as f:
118+
sample = f.read(1024)
119+
if "," in sample:
120+
delimiter = ","
121+
elif "\t" in sample:
122+
delimiter = "\t"
123+
else:
124+
delimiter = " "
125+
df = pd.read_csv(file, delimiter=delimiter)
126+
127+
selected = [name for name in self.signal_names if name in df.columns]
128+
data = df[selected].to_numpy(dtype=self._data_type)
129+
return data

src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ def visit_node(node_id):
146146
visit_node(input_id)
147147
visited.add(node_id)
148148
if node.operation is not None:
149-
if node.parameters:
150-
hyperparams.update(node.parameters)
149+
if node.operation().parameters:
150+
hyperparams.update(node.operation().parameters)
151151
reps.append(node.operation)
152152
node_order.append(node_id)
153153
if node.modality_id is not None:

src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def build_variants(
157157
fusion_id = new_builder.create_operation_node(
158158
fusion_op.__class__,
159159
[left_root, right_root],
160-
fusion_op.parameters,
160+
fusion_op.get_current_parameters(),
161161
)
162162
variants.append((new_builder, fusion_id))
163163

src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ def store_results(self, file_name=None):
9595
with open(file_name, "wb") as f:
9696
pickle.dump(self.operator_performance.results, f)
9797

98+
def load_results(self, file_name):
99+
with open(file_name, "rb") as f:
100+
self.operator_performance.results = pickle.load(f)
101+
self.operator_performance.cache = None
102+
98103
def optimize_parallel(self, n_workers=None):
99104
if n_workers is None:
100105
n_workers = min(len(self.modalities), mp.cpu_count())
@@ -177,7 +182,9 @@ def _evaluate_local(self, modality, local_results, dag, combination=None):
177182
builder = self.builders[modality.modality_id]
178183
agg_operator = AggregatedRepresentation()
179184
rep_node_id = builder.create_operation_node(
180-
agg_operator.__class__, [dag.root_node_id], agg_operator.parameters
185+
agg_operator.__class__,
186+
[dag.root_node_id],
187+
agg_operator.get_current_parameters(),
181188
)
182189
dag = builder.build(rep_node_id)
183190
representations = dag.execute([modality])
@@ -207,7 +214,7 @@ def _evaluate_local(self, modality, local_results, dag, combination=None):
207214
rep_node_id = builder.create_operation_node(
208215
agg_operator.__class__,
209216
[dag.root_node_id],
210-
agg_operator.parameters,
217+
agg_operator.get_current_parameters(),
211218
)
212219
dag = builder.build(rep_node_id)
213220
representations = dag.execute([modality])
@@ -235,7 +242,7 @@ def _build_modality_dag(
235242
leaf_id = builder.create_leaf_node(modality.modality_id)
236243

237244
rep_node_id = builder.create_operation_node(
238-
operator.__class__, [leaf_id], operator.parameters
245+
operator.__class__, [leaf_id], operator.get_current_parameters()
239246
)
240247
current_node_id = rep_node_id
241248
dags.append(builder.build(current_node_id))
@@ -258,31 +265,68 @@ def _build_modality_dag(
258265
combine_id = builder.create_operation_node(
259266
combination.__class__,
260267
[current_node_id, other_rep_id],
261-
combination.parameters,
268+
combination.get_current_parameters(),
262269
)
263270
dags.append(builder.build(combine_id))
264271
current_node_id = combine_id
272+
if modality.modality_type in [
273+
ModalityType.EMBEDDING,
274+
ModalityType.IMAGE,
275+
ModalityType.AUDIO,
276+
]:
277+
dags.extend(
278+
self.default_context_operators(
279+
modality, builder, leaf_id, current_node_id
280+
)
281+
)
282+
elif modality.modality_type == ModalityType.TIMESERIES:
283+
dags.extend(
284+
self.temporal_context_operators(
285+
modality, builder, leaf_id, current_node_id
286+
)
287+
)
288+
return dags
265289

290+
def default_context_operators(self, modality, builder, leaf_id, current_node_id):
291+
dags = []
266292
context_operators = self._get_context_operators()
267-
268293
for context_op in context_operators:
269-
if modality.modality_type != ModalityType.TEXT:
294+
if (
295+
modality.modality_type != ModalityType.TEXT
296+
and modality.modality_type != ModalityType.VIDEO
297+
):
270298
context_node_id = builder.create_operation_node(
271299
context_op,
272300
[leaf_id],
273-
context_op().parameters,
301+
context_op().get_current_parameters(),
274302
)
275303
dags.append(builder.build(context_node_id))
276304

277305
context_node_id = builder.create_operation_node(
278306
context_op,
279307
[current_node_id],
280-
context_op().parameters,
308+
context_op().get_current_parameters(),
281309
)
282310
dags.append(builder.build(context_node_id))
283311

284312
return dags
285313

314+
def temporal_context_operators(self, modality, builder, leaf_id, current_node_id):
315+
aggregators = self.operator_registry.get_representations(modality.modality_type)
316+
context_operators = self._get_context_operators()
317+
318+
dags = []
319+
for agg in aggregators:
320+
for context_operator in context_operators:
321+
context_node_id = builder.create_operation_node(
322+
context_operator,
323+
[leaf_id],
324+
context_operator(agg()).get_current_parameters(),
325+
)
326+
dags.append(builder.build(context_node_id))
327+
328+
return dags
329+
286330

287331
class UnimodalResults:
288332
def __init__(self, modalities, tasks, debug=False, run=None):
@@ -339,13 +383,20 @@ def get_k_best_results(self, modality, k, task):
339383
key=lambda x: task_results[x].val_score,
340384
reverse=True,
341385
)[:k]
386+
if not self.cache:
387+
cache = [
388+
list(task_results[i].dag.execute([modality]).values())[-1]
389+
for i in sorted_indices
390+
]
391+
else:
392+
cache_items = (
393+
list(self.cache[modality.modality_id][task.model.name].items())
394+
if self.cache[modality.modality_id][task.model.name]
395+
else []
396+
)
397+
cache = [cache_items[i][1] for i in sorted_indices if i < len(cache_items)]
342398

343-
cache_items = list(self.cache[modality.modality_id][task.model.name].items())
344-
reordered_cache = [
345-
cache_items[i][1] for i in sorted_indices if i < len(cache_items)
346-
]
347-
348-
return results, reordered_cache
399+
return results, cache
349400

350401

351402
@dataclass(frozen=True)

src/main/python/systemds/scuro/modality/type.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class ModalityType(Flag):
193193
IMAGE = auto()
194194
TIMESERIES = auto()
195195
EMBEDDING = auto()
196+
PHYSIOLOGICAL = auto()
196197

197198
def get_schema(self):
198199
return ModalitySchemas.get(self.name)
@@ -239,6 +240,7 @@ def create_ts_metadata(
239240
md["length"] = data.shape[0]
240241
md["signal_names"] = signal_names
241242
md["timestamp"] = create_timestamps(md["frequency"], md["length"])
243+
md["is_multivariate"] = len(signal_names) > 1
242244
return md
243245

244246
def create_video_metadata(self, frequency, length, width, height, num_channels):

src/main/python/systemds/scuro/representations/context.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@
2525

2626

2727
class Context(Representation):
28-
def __init__(self, name, parameters=None):
28+
def __init__(self, name, parameters=None, is_ts_rep=False):
2929
"""
3030
Parent class for different context operations
3131
:param name: Name of the context operator
3232
"""
3333
super().__init__(name, parameters)
34+
self.is_ts_rep = is_ts_rep
3435

3536
@abc.abstractmethod
3637
def execute(self, modality: Modality):
@@ -40,3 +41,17 @@ def execute(self, modality: Modality):
4041
:return: contextualized data
4142
"""
4243
raise f"Not implemented for Context Operator: {self.name}"
44+
45+
def get_current_parameters(self):
46+
current_params = {}
47+
if not self.parameters:
48+
return current_params
49+
for parameter in list(self.parameters.keys()):
50+
if self.is_ts_rep:
51+
if parameter == "agg_params":
52+
current_params[parameter] = (
53+
self.aggregation_function.get_current_parameters()
54+
)
55+
continue
56+
current_params[parameter] = getattr(self, parameter)
57+
return current_params

0 commit comments

Comments
 (0)