@@ -163,19 +163,28 @@ def __init__(self, transformations, aggregations, values_column_name='values',
163163 self .input_is_dataframe = input_is_dataframe
164164 self .pipeline = self ._build_pipeline ()
165165
166- def _apply_pipeline (self , row ):
166+ def _apply_pipeline (self , window , is_series = False ):
167167 """Apply a ``mlblocks.MLPipeline`` to a row.
168168
169- Apply a ``MLPipeline`` to a row of a ``pd.DataFrame``, this function can
169+ Apply a ``MLPipeline`` to a window of a ``pd.DataFrame``, this function can
170170 be combined with the ``pd.DataFrame.apply`` method to be applied to the
171171 entire data frame.
172172
173173 Args:
174- row (pd.Series):
175- Row used to apply the pipeline to.
174+ window (pd.Series):
175+ Row or multiple rows (window) used to apply the pipeline to.
176+ is_series (bool):
177+ Indicator whether window is formated as a series or dataframe.
176178 """
177- context = row .to_dict ()
178- amplitude_values = context .pop (self .values_column_name )
179+ if is_series :
180+ context = window .to_dict ()
181+ amplitude_values = context .pop (self .values_column_name )
182+ else :
183+ context = {} if window .empty else {
184+ k : v for k , v in window .iloc [0 ].to_dict ().items () if k != self .values_column_name
185+ }
186+ amplitude_values = list (window [self .values_column_name ])
187+
179188 output = self .pipeline .predict (
180189 amplitude_values = amplitude_values ,
181190 ** context ,
@@ -187,12 +196,19 @@ def _apply_pipeline(self, row):
187196
188197 return pd .Series (dict (zip (output_names , output )))
189198
190- def process_signal (self , data = None , feature_columns = None , ** kwargs ):
199+ def process_signal (self , data = None , window = None , time_index = None , groupby_index = None ,
200+ feature_columns = None , ** kwargs ):
191201 """Apply multiple transformation and aggregation primitives.
192202
193203 Args:
194204 data (pandas.DataFrame):
195205 Dataframe with a column that contains signal values.
206+ window (str):
207+ Duration of window size, e.g. ('1h').
208+ time_index (str):
209+ Column in ``data`` that represents the time index.
210+ groupby_index (str or list[str]):
211+ Column(s) to group together and take the window over.
196212 feature_columns (list):
197213 List of column names from the input data frame that must be considered as
198214 features and should not be dropped.
@@ -207,15 +223,25 @@ def process_signal(self, data=None, feature_columns=None, **kwargs):
207223 A list with the feature names generated.
208224 """
209225 if data is None :
210- row = pd .Series (kwargs )
211- values = self ._apply_pipeline (row ).values
226+ window = pd .Series (kwargs )
227+ values = self ._apply_pipeline (window , is_series = True ).values
212228 return values if len (values ) > 1 else values [0 ]
213229
214- features = data .apply (
215- self ._apply_pipeline ,
216- axis = 1
217- )
218- data = pd .concat ([data , features ], axis = 1 )
230+ data = data .copy ()
231+ if window is not None and groupby_index is not None :
232+ features = data .set_index (time_index ).groupby (groupby_index ).resample (
233+ rule = window , ** kwargs ).apply (
234+ self ._apply_pipeline
235+ ).reset_index ()
236+ data = features
237+
238+ else :
239+ features = data .apply (
240+ self ._apply_pipeline ,
241+ axis = 1 ,
242+ is_series = True
243+ )
244+ data = pd .concat ([data , features ], axis = 1 )
219245
220246 if feature_columns :
221247 feature_columns = feature_columns + list (features .columns )
0 commit comments