Skip to content

Commit 1a8d0bc

Browse files
Process signals implementation (#10)
* Include demo_timeseries.csv with the install * Process Signals scafold. * Fix fft * Implement sigpro.process_signals and integration test * Fix tests * Add process_signals primitive * Fix lint * Add sampling_frequency as int or str, if str use column from dataframe. * process signals update. * Add process_signal function to the __init__ * Rename process signals primitive.
1 parent ed4f92d commit 1a8d0bc

File tree

12 files changed

+822
-22
lines changed

12 files changed

+822
-22
lines changed

sigpro/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
from mlblocks import discovery
1212

13+
from sigpro.process_signals import process_signals
14+
1315
_BASE_PATH = os.path.abspath(os.path.dirname(__file__))
1416
MLBLOCKS_PRIMITIVES = os.path.join(_BASE_PATH, 'primitives')
1517

@@ -42,3 +44,6 @@ def get_primitives(name=None, primitive_type=None, primitive_subtype=None):
4244
filters['classifiers.subtype'] = primitive_subtype
4345

4446
return discovery.find_primitives(name or 'sigpro', filters)
47+
48+
49+
__all__ = ('process_signals', )

sigpro/demo.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
DEMO_PATH = os.path.join(os.path.dirname(__file__), 'data')
1212

1313

14-
def get_demo_data():
14+
def get_demo_data(nrows=None):
1515
"""Get a demo ``pandas.DataFrame`` containing the accepted data format.
1616
1717
Returns:
1818
A ``pd.DataFrame`` containing as ``values`` the signal values.
1919
"""
2020
demo_path = os.path.join(DEMO_PATH, 'demo_timeseries.csv')
21-
df = pd.read_csv(demo_path, parse_dates=['timestamp'])
21+
df = pd.read_csv(demo_path, parse_dates=['timestamp'], nrows=nrows)
2222
df["values"] = df["values"].apply(json.loads).apply(list)
2323
return df
2424

@@ -74,12 +74,12 @@ def get_frequency_demo(index=None, real=True):
7474
"""
7575
amplitude_values, sampling_frequency = get_amplitude_demo(index)
7676
fft_values = np.fft.fft(amplitude_values)
77-
frequencies = np.fft.fftfreq(len(fft_values), sampling_frequency)
77+
length = len(fft_values)
78+
frequencies = np.fft.fftfreq(len(fft_values), 1 / sampling_frequency)
7879
if real:
7980
fft_values = np.real(fft_values)
80-
frequencies = np.real(frequencies)
8181

82-
return fft_values, frequencies
82+
return fft_values[0:length // 2], frequencies[0:length // 2]
8383

8484

8585
def get_frequency_time_demo(index=None, real=True):
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
{
2+
"name": "sigpro.process_signals",
3+
"primitive": "sigpro.process_signals",
4+
"classifiers": {
5+
"type": "preprocessor",
6+
"subtype": "feature_extractor"
7+
},
8+
"produce": {
9+
"args": [
10+
{
11+
"name": "data",
12+
"type": "pandas.DataFrame"
13+
},
14+
{
15+
"name": "transformations",
16+
"type": "list"
17+
},
18+
{
19+
"name": "aggregations",
20+
"type": "list"
21+
},
22+
{
23+
"name": "keep_values",
24+
"type": "bool",
25+
"default": false
26+
},
27+
{
28+
"name": "values_column_name",
29+
"type": "str",
30+
"default": "values"
31+
}
32+
],
33+
"output": []
34+
},
35+
"hyperparameters": {
36+
"fixed": {
37+
"keep_values": {
38+
"type": "bool",
39+
"default": false
40+
},
41+
"values_column_name": {
42+
"type": "str",
43+
"default": "values"
44+
}
45+
}
46+
}
47+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{
2+
"name": "sigpro.transformations.frequency.band.frequency_band",
3+
"primitive": "sigpro.transformations.frequency.band.frequency_band",
4+
"classifiers": {
5+
"type": "aggregation",
6+
"subtype": "frequency"
7+
},
8+
"produce": {
9+
"args": [
10+
{
11+
"name": "amplitude_values",
12+
"type": "numpy.ndarray"
13+
},
14+
{
15+
"name": "frequency_values",
16+
"type": "numpy.ndarray"
17+
}
18+
],
19+
"output": [
20+
{
21+
"name": "amplitude_values",
22+
"type": "numpy.ndarray"
23+
},
24+
{
25+
"name": "frequency_values",
26+
"type": "numpy.ndarray"
27+
}
28+
]
29+
},
30+
"hyperparameters": {
31+
"fixed": {
32+
"low": {
33+
"type": "int"
34+
},
35+
"high": {
36+
"type": "int"
37+
}
38+
},
39+
"tunable": {}
40+
}
41+
}

sigpro/process_signals.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
# -*- coding: utf-8 -*-
2+
"""Process Signals core functionality."""
3+
4+
from collections import Counter
5+
6+
import pandas as pd
7+
from mlblocks import MLPipeline, load_primitive
8+
9+
10+
def _build_pipeline(transformations, aggregations):
11+
"""Build Pipeline function.
12+
13+
Given a list of transformations and aggregations build a pipeline
14+
with the output of the aggregations, which take as name the specified
15+
name of the transformations and the aggregation. This lists are composed
16+
by dictionaries with the following specification:
17+
18+
* ``Name``:
19+
Name of the transformation / aggregation.
20+
* ``primitive``:
21+
Name of the primitive to apply.
22+
* ``init_params``:
23+
Dictionary containing the initializing parameters for the primitive.
24+
25+
Args:
26+
transformations (list):
27+
List of dictionaries containing the transformation primitives.
28+
transformations (list):
29+
List of dictionaries containing the aggregation primitives.
30+
31+
Returns:
32+
mlblocks.MLPipeline:
33+
An ``MLPipeline`` object that first applies all the transformations
34+
and then produces as output the aggregations specified.
35+
"""
36+
primitives = []
37+
init_params = {}
38+
prefix = []
39+
outputs = []
40+
counter = Counter()
41+
42+
for transformation in transformations:
43+
prefix.append(transformation['name'])
44+
primitive = transformation['primitive']
45+
counter[primitive] += 1
46+
primitive_name = f'{primitive}#{counter[primitive]}'
47+
primitives.append(primitive)
48+
params = transformation.get('init_params')
49+
if params:
50+
init_params[primitive_name] = params
51+
52+
prefix = '.'.join(prefix) if prefix else ''
53+
54+
for aggregation in aggregations:
55+
aggregation_name = f'{prefix}.{aggregation["name"]}' if prefix else aggregation['name']
56+
57+
primitive = aggregation['primitive']
58+
counter[primitive] += 1
59+
primitive_name = f'{primitive}#{counter[primitive]}'
60+
primitives.append(primitive)
61+
62+
primitive = load_primitive(primitive)
63+
primitive_outputs = primitive['produce']['output']
64+
65+
for output in primitive_outputs:
66+
output = output['name']
67+
outputs.append({
68+
'name': f'{aggregation_name}.{output}',
69+
'variable': f'{primitive_name}.{output}'
70+
})
71+
72+
params = aggregation.get('init_params')
73+
if params:
74+
init_params[primitive_name] = params
75+
76+
outputs = {'default': outputs} if outputs else None
77+
78+
return MLPipeline(
79+
primitives,
80+
init_params=init_params,
81+
outputs=outputs
82+
)
83+
84+
85+
def _apply_pipeline(row, pipeline, values_column_name):
86+
"""Apply a ``mlblocks.MLPipeline`` to a row.
87+
88+
Apply a ``MLPipeline`` to a row of a ``pd.DataFrame``, this function can
89+
be combined with the ``pd.DataFrame.apply`` method to be applied to the
90+
entire data frame.
91+
92+
Args:
93+
row (pd.Series):
94+
Row used to apply the pipeline to.
95+
pipeline (mlblocks.MLPipeline):
96+
Pipeline to be used for producing the results.
97+
values_column_name (str):
98+
The name of the column that contains the signal values.
99+
"""
100+
context = row.to_dict()
101+
amplitude_values = context.pop(values_column_name)
102+
output = pipeline.predict(
103+
amplitude_values=amplitude_values,
104+
**context,
105+
)
106+
output_names = pipeline.get_output_names()
107+
108+
# ensure that we can iterate over output
109+
output = output if isinstance(output, tuple) else (output, )
110+
111+
return pd.Series(dict(zip(output_names, output)))
112+
113+
114+
def process_signals(data, transformations, aggregations,
115+
values_column_name='values', keep_values=False):
116+
"""Process Signals.
117+
118+
The Process Signals is responsible for applying a collection of primitives specified by the
119+
user in order to create features for the given data.
120+
121+
Given a list of transformations and aggregations which are composed
122+
by dictionaries with the following specification:
123+
124+
* ``Name``:
125+
Name of the transformation / aggregation.
126+
* ``primitive``:
127+
Name of the primitive to apply.
128+
* ``init_params``:
129+
Dictionary containing the initializing parameters for the primitive.
130+
131+
The process signals will build an ``mlblocks.MLPipeline`` and will generate the features
132+
by previously applying the transformations and then compute the aggregations.
133+
134+
Args:
135+
data (pandas.DataFrame):
136+
Dataframe with a column that contains signal values.
137+
transformations (list):
138+
List of dictionaries containing the transformation primitives.
139+
aggregations (list):
140+
List of dictionaries containing the aggregation primitives.
141+
values_column_name (str):
142+
The name of the column that contains the signal values. Defaults to ``values``.
143+
keep_values (bool):
144+
Whether or not to keep the original signal values or remove them.
145+
146+
Returns:
147+
pandas.DataFrame:
148+
A data frame with new feature columns by applying the previous primitives. If
149+
``keep_values`` is ``True`` the original signal values will be conserved in the
150+
data frame, otherwise the original signal values will be deleted.
151+
"""
152+
pipeline = _build_pipeline(transformations, aggregations)
153+
features = data.apply(
154+
_apply_pipeline,
155+
args=(pipeline, values_column_name),
156+
axis=1
157+
)
158+
159+
data = pd.concat([data, features], axis=1)
160+
161+
if not keep_values:
162+
del data[values_column_name]
163+
164+
return data
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""SigPro Frequency Band module."""
2+
3+
4+
def frequency_band(amplitude_values, frequency_values, low, high):
5+
"""Extract a specific band.
6+
7+
Filter between a high and low band frequency and return the amplitude values and frequency
8+
values for those.
9+
10+
Args:
11+
amplitude_values (np.ndarray):
12+
A numpy array with the signal values.
13+
frequency_values (np.ndarray):
14+
A numpy array with the frequency values.
15+
Returns:
16+
tuple:
17+
* `amplitude_values (numpy.ndarray)` for the selected frequency values.
18+
* `frequency_values (numpy.ndarray)` for the selected frequency values.
19+
"""
20+
mask = (frequency_values > low) & (frequency_values < high)
21+
return amplitude_values[mask], frequency_values[mask]

sigpro/transformations/frequency/fft.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ def fft(amplitude_values, sampling_frequency):
2222
* `frequency_values (numpy.ndarray)`
2323
"""
2424
amplitude_values = np.fft.fft(amplitude_values)
25-
frequency_values = np.fft.fftfreq(len(amplitude_values), sampling_frequency)
25+
frequency_values = np.fft.fftfreq(len(amplitude_values), 1 / sampling_frequency)
2626

27-
return amplitude_values, frequency_values
27+
length = len(frequency_values) // 2
28+
return amplitude_values[:length], frequency_values[:length]
2829

2930

3031
def fft_real(amplitude_values, sampling_frequency):
@@ -48,7 +49,6 @@ def fft_real(amplitude_values, sampling_frequency):
4849
* `amplitude_values (numpy.ndarray)`
4950
* `frequency_values (numpy.ndarray)`
5051
"""
51-
amplitude_values = np.real(np.fft.fft(amplitude_values))
52-
frequency_values = np.fft.fftfreq(len(amplitude_values), sampling_frequency)
52+
amplitude_values, frequency_values = fft(amplitude_values, sampling_frequency)
5353

54-
return amplitude_values, frequency_values
54+
return np.real(amplitude_values), np.real(frequency_values)

tests/integration/test_contributing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,5 +116,5 @@ def test_run_primitive_aggregation_hyperparameters():
116116

117117
def test_run_primitive_transformation():
118118
result = run_primitive('sigpro.transformations.frequency.fft.fft')
119-
assert len(result[0]) == 400
120-
assert len(result[1]) == 400
119+
assert len(result[0]) == 200
120+
assert len(result[1]) == 200

tests/integration/test_demo.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,22 @@ def test_get_amplitude_demo_indexed():
3131

3232
def test_get_frequency_demo_without_index():
3333
values, frequency_values = get_frequency_demo()
34-
assert EXPECTED_VALUES_LENGTH == len(values)
35-
assert EXPECTED_FREQUENCY_LENGTH == len(frequency_values)
34+
assert EXPECTED_VALUES_LENGTH // 2 == len(values)
35+
assert EXPECTED_FREQUENCY_LENGTH // 2 == len(frequency_values)
3636

3737

3838
def test_get_frequency_demo_indexed():
3939
values, frequency_values = get_frequency_demo(index=1)
40-
assert EXPECTED_VALUES_LENGTH == len(values)
41-
assert EXPECTED_FREQUENCY_LENGTH == len(frequency_values)
40+
assert EXPECTED_VALUES_LENGTH // 2 == len(values)
41+
assert EXPECTED_FREQUENCY_LENGTH // 2 == len(frequency_values)
4242

4343

4444
def test_get_frequency_demo_complex():
4545
values, frequency_values = get_frequency_demo(real=False)
4646
value = values[0]
4747
assert type(value) == np.complex128
48-
assert EXPECTED_VALUES_LENGTH == len(values)
49-
assert EXPECTED_FREQUENCY_LENGTH == len(frequency_values)
48+
assert EXPECTED_VALUES_LENGTH // 2 == len(values)
49+
assert EXPECTED_FREQUENCY_LENGTH // 2 == len(frequency_values)
5050

5151

5252
def test_get_frequency_time_demo_without_index():

0 commit comments

Comments
 (0)