Skip to content

Commit 9835963

Browse files
sarahmishfrances-h
andauthored
Single value processing of signals (#37)
* single value data * fix test * single value data * fix test --------- Co-authored-by: Frances Hartwell <[email protected]>
1 parent 1a33006 commit 9835963

File tree

4 files changed

+64
-23
lines changed

4 files changed

+64
-23
lines changed

DEVELOPMENT.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,12 @@ imported and used like this:
259259
from sigpro.demo import get_amplitude_demo
260260
from sigpro.demo import get_frequency_demo
261261
from sigpro.demo import get_frequency_time_demo
262+
from sigpro.demo import get_demo_data
262263

263264
amplitude_values, sampling_frequency = get_amplitude_demo()
264265
amplitude_values, frequency_values = get_frequency_demo()
265266
amplitude_values, frequency_values, time_values = get_frequency_time_demo()
266-
dataframe = get_frequency_time_demo(dataframe=True)
267+
dataframe = get_demo_data()
267268
```
268269

269270
In all cases, the functions will return values that correspond to a

sigpro/core.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,28 @@ def __init__(self, transformations, aggregations, values_column_name='values',
163163
self.input_is_dataframe = input_is_dataframe
164164
self.pipeline = self._build_pipeline()
165165

166-
def _apply_pipeline(self, row):
166+
def _apply_pipeline(self, window, is_series=False):
167167
"""Apply a ``mlblocks.MLPipeline`` to a row.
168168
169-
Apply a ``MLPipeline`` to a row of a ``pd.DataFrame``, this function can
169+
Apply a ``MLPipeline`` to a window of a ``pd.DataFrame``, this function can
170170
be combined with the ``pd.DataFrame.apply`` method to be applied to the
171171
entire data frame.
172172
173173
Args:
174-
row (pd.Series):
175-
Row used to apply the pipeline to.
174+
window (pd.Series):
175+
Row or multiple rows (window) used to apply the pipeline to.
176+
is_series (bool):
177+
Indicator whether window is formated as a series or dataframe.
176178
"""
177-
context = row.to_dict()
178-
amplitude_values = context.pop(self.values_column_name)
179+
if is_series:
180+
context = window.to_dict()
181+
amplitude_values = context.pop(self.values_column_name)
182+
else:
183+
context = {} if window.empty else {
184+
k: v for k, v in window.iloc[0].to_dict().items() if k != self.values_column_name
185+
}
186+
amplitude_values = list(window[self.values_column_name])
187+
179188
output = self.pipeline.predict(
180189
amplitude_values=amplitude_values,
181190
**context,
@@ -187,12 +196,19 @@ def _apply_pipeline(self, row):
187196

188197
return pd.Series(dict(zip(output_names, output)))
189198

190-
def process_signal(self, data=None, feature_columns=None, **kwargs):
199+
def process_signal(self, data=None, window=None, time_index=None, groupby_index=None,
200+
feature_columns=None, **kwargs):
191201
"""Apply multiple transformation and aggregation primitives.
192202
193203
Args:
194204
data (pandas.DataFrame):
195205
Dataframe with a column that contains signal values.
206+
window (str):
207+
Duration of window size, e.g. ('1h').
208+
time_index (str):
209+
Column in ``data`` that represents the time index.
210+
groupby_index (str or list[str]):
211+
Column(s) to group together and take the window over.
196212
feature_columns (list):
197213
List of column names from the input data frame that must be considered as
198214
features and should not be dropped.
@@ -207,15 +223,25 @@ def process_signal(self, data=None, feature_columns=None, **kwargs):
207223
A list with the feature names generated.
208224
"""
209225
if data is None:
210-
row = pd.Series(kwargs)
211-
values = self._apply_pipeline(row).values
226+
window = pd.Series(kwargs)
227+
values = self._apply_pipeline(window, is_series=True).values
212228
return values if len(values) > 1 else values[0]
213229

214-
features = data.apply(
215-
self._apply_pipeline,
216-
axis=1
217-
)
218-
data = pd.concat([data, features], axis=1)
230+
data = data.copy()
231+
if window is not None and groupby_index is not None:
232+
features = data.set_index(time_index).groupby(groupby_index).resample(
233+
rule=window, **kwargs).apply(
234+
self._apply_pipeline
235+
).reset_index()
236+
data = features
237+
238+
else:
239+
features = data.apply(
240+
self._apply_pipeline,
241+
axis=1,
242+
is_series=True
243+
)
244+
data = pd.concat([data, features], axis=1)
219245

220246
if feature_columns:
221247
feature_columns = feature_columns + list(features.columns)

sigpro/demo.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,30 @@
1111
DEMO_PATH = os.path.join(os.path.dirname(__file__), 'data')
1212

1313

14+
def _load_demo(nrows=None):
15+
demo_path = os.path.join(DEMO_PATH, 'demo_timeseries.csv')
16+
df = pd.read_csv(demo_path, parse_dates=['timestamp'], nrows=nrows)
17+
df['sampling_frequency'] = 1000
18+
df["values"] = df["values"].apply(json.loads).apply(list)
19+
20+
return df
21+
22+
1423
def get_demo_data(nrows=None):
1524
"""Get a demo ``pandas.DataFrame`` containing the accepted data format.
1625
26+
Args:
27+
nrows (int):
28+
Number of rows to load from the demo datasets.
29+
1730
Returns:
1831
A ``pd.DataFrame`` containing as ``values`` the signal values.
1932
"""
20-
demo_path = os.path.join(DEMO_PATH, 'demo_timeseries.csv')
21-
df = pd.read_csv(demo_path, parse_dates=['timestamp'], nrows=nrows)
22-
df['sampling_frequency'] = 1000
23-
df["values"] = df["values"].apply(json.loads).apply(list)
33+
df = _load_demo(nrows)
34+
df = df.explode('values').reset_index(drop=True)
35+
36+
time_delta = pd.to_timedelta(list(range(400)) * 750, 's')
37+
df['timestamp'] = df['timestamp'] + time_delta
2438
return df
2539

2640

@@ -71,7 +85,7 @@ def get_amplitude_demo(index=None):
7185
A tuple with a `np.array` containing amplitude values and as second element the
7286
sampling frequency used.
7387
"""
74-
df = get_demo_data()
88+
df = _load_demo()
7589
if index is None:
7690
index = random.randint(0, len(df))
7791

tests/integration/test_demo.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
get_amplitude_demo, get_demo_data, get_demo_primitives, get_frequency_demo,
66
get_frequency_time_demo)
77

8-
EXPECTED_SHAPE = (750, 5)
8+
EXPECTED_SHAPE = (300000, 5)
99
EXPECTED_COLUMNS = ['turbine_id', 'signal_id', 'timestamp', 'values', 'sampling_frequency']
1010
EXPECTED_SAMPLING_FREQUENCY = 10000
1111
EXPECTED_VALUES_LENGTH = 400
@@ -54,7 +54,7 @@ def test_get_frequency_demo_indexed():
5454
def test_get_frequency_demo_complex():
5555
values, frequency_values = get_frequency_demo(real=False)
5656
value = values[0]
57-
assert type(value) == np.complex128
57+
assert isinstance(value, np.complex128)
5858
assert EXPECTED_VALUES_LENGTH // 2 == len(values)
5959
assert EXPECTED_FREQUENCY_LENGTH // 2 == len(frequency_values)
6060

@@ -79,4 +79,4 @@ def test_get_frequency_time_demo_complex():
7979
assert 129 == len(values)
8080
assert 129 == len(frequencies)
8181
assert 5 == len(time_values)
82-
assert type(value) == np.complex128
82+
assert isinstance(value, np.complex128)

0 commit comments

Comments
 (0)