diff --git a/Custom_Script/02_CustomScript_Training_Pipeline.ipynb b/Custom_Script/02_CustomScript_Training_Pipeline.ipynb index c7a6e044..d0534aed 100644 --- a/Custom_Script/02_CustomScript_Training_Pipeline.ipynb +++ b/Custom_Script/02_CustomScript_Training_Pipeline.ipynb @@ -243,7 +243,7 @@ " entry_script='train.py',\n", " mini_batch_size=\"1\",\n", " run_invocation_timeout=timeout,\n", - " error_threshold=10,\n", + " error_threshold=-1,\n", " output_action=\"append_row\",\n", " environment=train_env,\n", " process_count_per_node=processes_per_node,\n", diff --git a/Custom_Script/scripts/train.py b/Custom_Script/scripts/train.py index bc641dc4..cbab8f14 100644 --- a/Custom_Script/scripts/train.py +++ b/Custom_Script/scripts/train.py @@ -59,67 +59,103 @@ def run(input_data): train = data[:-args.test_size] test = data[-args.test_size:] - # 3.0 Create and fit the forecasting pipeline - # The pipeline will drop unhelpful features, make a calendar feature, and make lag features - lagger = SimpleLagger(args.target_column, lag_orders=[1, 2, 3, 4]) - transform_steps = [('column_dropper', ColumnDropper(args.drop_columns)), - ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)] - forecaster = SimpleForecaster(transform_steps, LinearRegression(), args.target_column, args.timestamp_column) - forecaster.fit(train) - print('Featurized data example:') - print(forecaster.transform(train).head()) - - # 4.0 Get predictions on test set - forecasts = forecaster.forecast(test) - compare_data = test.assign(forecasts=forecasts).dropna() - - # 5.0 Calculate accuracy metrics for the fit - mse = mean_squared_error(compare_data[args.target_column], compare_data['forecasts']) - rmse = np.sqrt(mse) - mae = mean_absolute_error(compare_data[args.target_column], compare_data['forecasts']) - actuals = compare_data[args.target_column].values - preds = compare_data['forecasts'].values - mape = np.mean(np.abs((actuals - preds) / actuals) * 100) - - # 6.0 Log metrics - current_run.log(model_name + '_mse', mse) - current_run.log(model_name + '_rmse', rmse) - current_run.log(model_name + '_mae', mae) - current_run.log(model_name + '_mape', mape) - - # 7.0 Train model with full dataset - forecaster.fit(data) - - # 8.0 Save the forecasting pipeline - joblib.dump(forecaster, filename=os.path.join('./outputs/', model_name)) - - # 9.0 Register the model to the workspace - # Uses the values in the timeseries id columns from the first row of data to form tags for the model - current_run.upload_file(model_name, os.path.join('./outputs/', model_name)) - ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in args.timeseries_id_columns} - tags_dict = {**ts_id_dict, 'ModelType': args.model_type} - current_run.register_model(model_path=model_name, model_name=model_name, - model_framework=args.model_type, tags=tags_dict) - - # 10.0 Add data to output - end_datetime = datetime.datetime.now() - result.update(ts_id_dict) - result['model_type'] = args.model_type - result['file_name'] = file_name - result['model_name'] = model_name - result['start_date'] = str(start_datetime) - result['end_date'] = str(end_datetime) - result['duration'] = str(end_datetime-start_datetime) - result['mse'] = mse - result['rmse'] = rmse - result['mae'] = mae - result['mape'] = mape - result['index'] = idx - result['num_models'] = len(input_data) - result['status'] = current_run.get_status() - - print('ending (' + csv_file_path + ') ' + str(end_datetime)) - result_list.append(result) + child_run = None + try: + child_run = current_run.child_run(name=model_name) + + # 3.0 Create and fit the forecasting pipeline + # The pipeline will drop unhelpful features, make a calendar feature, and make lag features + lagger = SimpleLagger(args.target_column, lag_orders=[1, 2, 3, 4]) + transform_steps = [('column_dropper', ColumnDropper(args.drop_columns)), + ('calendar_featurizer', SimpleCalendarFeaturizer()), ('lagger', lagger)] + forecaster = SimpleForecaster(transform_steps, LinearRegression(), args.target_column, + args.timestamp_column) + forecaster.fit(train) + print('Featurized data example:') + print(forecaster.transform(train).head()) + + # 4.0 Get predictions on test set + forecasts = forecaster.forecast(test) + compare_data = test.assign(forecasts=forecasts).dropna() + + # 5.0 Calculate accuracy metrics for the fit + mse = mean_squared_error(compare_data[args.target_column], compare_data['forecasts']) + rmse = np.sqrt(mse) + mae = mean_absolute_error(compare_data[args.target_column], compare_data['forecasts']) + actuals = compare_data[args.target_column].values + preds = compare_data['forecasts'].values + mape = np.mean(np.abs((actuals - preds) / actuals) * 100) + + # 6.0 Log metrics + child_run.log(model_name + '_mse', mse) + child_run.log(model_name + '_rmse', rmse) + child_run.log(model_name + '_mae', mae) + child_run.log(model_name + '_mape', mape) + + # 7.0 Train model with full dataset + forecaster.fit(data) + + # Simulating the 3 minutes run to test concurrency + import time + time.sleep(180) + + # 8.0 Save the forecasting pipeline + joblib.dump(forecaster, filename=os.path.join('./outputs/', model_name)) + + # 9.0 Register the model to the workspace + # Uses the values in the timeseries id columns from the first row of data to form tags for the model + child_run.upload_file(model_name, os.path.join('./outputs/', model_name)) + ts_id_dict = {id_col: str(data[id_col].iloc[0]) for id_col in args.timeseries_id_columns} + tags_dict = {**ts_id_dict, 'ModelType': args.model_type} + tags_dict.update({'InputData': os.path.basename(csv_file_path)}) + tags_dict.update({'StepRunId': current_run.id}) + tags_dict.update({'RunId': current_run.parent.id}) + child_run.register_model(model_path=model_name, model_name=model_name, + model_framework=args.model_type, tags=tags_dict) + + child_run.complete() + # 10.0 Add data to output + end_datetime = datetime.datetime.now() + result.update(ts_id_dict) + result['model_type'] = args.model_type + result['file_name'] = file_name + result['model_name'] = model_name + result['start_date'] = str(start_datetime) + result['end_date'] = str(end_datetime) + result['duration'] = str(end_datetime-start_datetime) + result['mse'] = mse + result['rmse'] = rmse + result['mae'] = mae + result['mape'] = mape + result['index'] = idx + result['num_models'] = len(input_data) + result['status'] = child_run.get_status() + result['run_id'] = str(child_run.id) + + print('ending (' + csv_file_path + ') ' + str(end_datetime)) + result_list.append(result) + except Exception: + if child_run and child_run.get_status() != 'Completed': + child_run.fail() + result['model_type'] = args.model_type + end_datetime = datetime.datetime.now() + result['file_name'] = file_name + result['model_name'] = model_name + result['start_date'] = str(start_datetime) + result['end_date'] = str(end_datetime) + result['duration'] = str(end_datetime-start_datetime) + result['mse'] = str(None) + result['rmse'] = str(None) + result['mae'] = str(None) + result['mape'] = str(None) + result['index'] = idx + result['num_models'] = len(input_data) + if child_run: + result['status'] = child_run.get_status() + result['run_id'] = str(child_run.id) + else: + result['status'] = 'Failed' + result['run_id'] = str(None) # Data returned by this function will be available in parallel_run_step.txt return pd.DataFrame(result_list)