@@ -130,39 +130,41 @@ def load_model(self, node_type):
130130
131131 def process (self , data , power_labels , pipeline_lock ):
132132 node_types = pd .unique (data [node_info_column ])
133- try :
134- for node_type in node_types :
135- node_type = int ( node_type )
136- save_path = self . _get_save_path ( str ( node_type ) )
137- self .node_scalers [ node_type ] = load_scaler ( save_path )
138- self . load_model ( node_type )
139-
140- node_type_filtered_data = data [ data [ node_info_column ] == node_type ]
141- if self .node_scalers [ node_type ] is None :
142- self . print_log ( "fit scaler to latest data {1} for node_type={0}" . format ( node_type , self . feature_group_name ))
143- # no profiled scaler
144- x_values = node_type_filtered_data [ self .features ]. values
145- self .node_scalers [node_type ] = MaxAbsScaler ( )
146- self . node_scalers [ node_type ]. fit ( x_values )
147-
148- X_test_map = dict ()
149- y_test_map = dict ()
133+ for node_type in node_types :
134+ node_type = int ( node_type )
135+ save_path = self . _get_save_path ( str ( node_type ) )
136+ self . node_scalers [ node_type ] = load_scaler ( save_path )
137+ self .load_model ( node_type )
138+
139+ node_type_filtered_data = data [ data [ node_info_column ] == node_type ]
140+ if self . node_scalers [ node_type ] is None :
141+ self .print_log ( "fit scaler to latest data {1} for node_type={0}" . format ( node_type , self . feature_group_name ))
142+ # no profiled scaler
143+ x_values = node_type_filtered_data [ self . features ]. values
144+ self .node_scalers [ node_type ] = MaxAbsScaler ()
145+ self .node_scalers [node_type ]. fit ( x_values )
146+
147+ X_test_map = dict ()
148+ y_test_map = dict ()
149+ try :
150150 for component in self .energy_components :
151151 X_values , y_values = self .apply_ratio (component , node_type_filtered_data , power_labels )
152152 X_train , X_test , y_train , y_test = normalize_and_split (X_values , y_values , scaler = self .node_scalers [node_type ])
153153 X_test_map [component ] = X_test
154154 y_test_map [component ] = y_test
155155 self .train (node_type , component , X_train , y_train )
156156 self .save_checkpoint (self .node_models [node_type ][component ], self ._checkpoint_filepath (component , node_type ))
157- if self .should_archive (node_type ):
158- pipeline_lock .acquire ()
159- try :
160- self .save_model_and_metadata (node_type , X_test_map , y_test_map )
161- finally :
162- pipeline_lock .release ()
163- except Exception as e :
164- print (e )
165- pipeline_lock .release ()
157+ except Exception as err :
158+ self .print_log ("failed to process {}: {}" .format (node_type , err ))
159+ continue
160+ if self .should_archive (node_type ):
161+ pipeline_lock .acquire ()
162+ try :
163+ self .save_model_and_metadata (node_type , X_test_map , y_test_map )
164+ except Exception as err :
165+ self .print_log ("failed to save model {}: {}" .format (node_type , err ))
166+ finally :
167+ pipeline_lock .release ()
166168
167169 def apply_ratio (self , component , node_type_filtered_data , power_labels ):
168170 power_label = component_to_col (component )
0 commit comments