diff --git a/models/core/fct_lap_times.sql b/models/core/fct_lap_times.sql new file mode 100644 index 0000000..37d62b0 --- /dev/null +++ b/models/core/fct_lap_times.sql @@ -0,0 +1,13 @@ +with lap_times as ( + select + {{ dbt_utils.generate_surrogate_key(['race_id', 'driver_id', 'lap']) }} as lap_times_id, + race_id as race_id, + driver_id as driver_id, + lap as lap, + driver_position as driver_position, + lap_time_formatted as lap_time_formatted, + official_laptime as official_laptime, + lap_time_milliseconds as lap_time_milliseconds + from {{ ref('stg_lap_times') }} +) +select * from lap_times \ No newline at end of file diff --git a/models/marts/aggregates/agg_lap_times_moving_avg.py b/models/marts/aggregates/agg_lap_times_moving_avg.py new file mode 100644 index 0000000..b1dc3ac --- /dev/null +++ b/models/marts/aggregates/agg_lap_times_moving_avg.py @@ -0,0 +1,17 @@ +import pandas as pd + +def model(dbt, session): + # dbt configuration + dbt.config(packages=["pandas"]) + + # get upstream data + lap_times = dbt.ref("mrt_lap_times_years").to_pandas() + + # describe the data + lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000 + lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame() + lap_time_trends.reset_index(inplace=True) + lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean() + lap_time_trends.columns = lap_time_trends.columns.str.upper() + + return lap_time_trends.round(1) \ No newline at end of file diff --git a/models/marts/mrt_lap_times_years.sql b/models/marts/mrt_lap_times_years.sql new file mode 100644 index 0000000..b35db7e --- /dev/null +++ b/models/marts/mrt_lap_times_years.sql @@ -0,0 +1,19 @@ +with lap_times as ( +select * from {{ ref('fct_lap_times') }} + ), + races as ( + select * from {{ ref('dim_races') }} + ), + expanded_lap_times_by_year as ( + select + lap_times.race_id, + driver_id, + race_year, + lap, + lap_time_milliseconds + from lap_times + left join races + on lap_times.race_id = races.race_id + where lap_time_milliseconds is not null + ) + select * from expanded_lap_times_by_year \ No newline at end of file diff --git a/models/ml/prep_encoding_splitting/covariate_encoding.py b/models/ml/prep_encoding_splitting/covariate_encoding.py index 3b44892..894b6e6 100644 --- a/models/ml/prep_encoding_splitting/covariate_encoding.py +++ b/models/ml/prep_encoding_splitting/covariate_encoding.py @@ -36,8 +36,8 @@ def position_index(x): return 2 # we are dropping the columns that we filtered on in addition to our training variable - encoded_data = fil_cov.drop(['ACTIVE_DRIVER','ACTIVE_CONSTRUCTOR'],1) + encoded_data = fil_cov.drop(['ACTIVE_DRIVER','ACTIVE_CONSTRUCTOR'], axis=1) encoded_data['POSITION_LABEL']= encoded_data['DRIVER_POSITION'].apply(lambda x: position_index(x)) - encoded_data_grouped_target = encoded_data.drop(['DRIVER_POSITION'],1) + encoded_data_grouped_target = encoded_data.drop(['DRIVER_POSITION'], axis=1) - return encoded_data_grouped_target \ No newline at end of file + return encoded_data_grouped_target diff --git a/models/ml/prep_encoding_splitting/ml_data_prep.py b/models/ml/prep_encoding_splitting/ml_data_prep.py index 29f2eaf..e75f706 100644 --- a/models/ml/prep_encoding_splitting/ml_data_prep.py +++ b/models/ml/prep_encoding_splitting/ml_data_prep.py @@ -23,15 +23,15 @@ def model(dbt, session): # some of the constructors changed their name over the year so replacing old names with current name mapping = {'Force India': 'Racing Point', 'Sauber': 'Alfa Romeo', 'Lotus F1': 'Renault', 'Toro Rosso': 'AlphaTauri'} data['CONSTRUCTOR_NAME'].replace(mapping, inplace=True) - + # create confidence metrics for drivers and constructors - dnf_by_driver = data.groupby('DRIVER').sum()['DNF_FLAG'] + dnf_by_driver = data.groupby('DRIVER')['DNF_FLAG'].sum() driver_race_entered = data.groupby('DRIVER').count()['DNF_FLAG'] driver_dnf_ratio = (dnf_by_driver/driver_race_entered) driver_confidence = 1-driver_dnf_ratio driver_confidence_dict = dict(zip(driver_confidence.index,driver_confidence)) - dnf_by_constructor = data.groupby('CONSTRUCTOR_NAME').sum()['DNF_FLAG'] + dnf_by_constructor = data.groupby('CONSTRUCTOR_NAME')['DNF_FLAG'].sum() constructor_race_entered = data.groupby('CONSTRUCTOR_NAME').count()['DNF_FLAG'] constructor_dnf_ratio = (dnf_by_constructor/constructor_race_entered) constructor_relaiblity = 1-constructor_dnf_ratio @@ -56,4 +56,4 @@ def model(dbt, session): data['ACTIVE_DRIVER'] = data['DRIVER'].apply(lambda x: int(x in active_drivers)) data['ACTIVE_CONSTRUCTOR'] = data['CONSTRUCTOR_NAME'].apply(lambda x: int(x in active_constructors)) - return data \ No newline at end of file + return data diff --git a/models/ml/train_and_prediction/training_model_to_predict_position.py b/models/ml/train_and_prediction/training_model_to_predict_position.py new file mode 100644 index 0000000..07172de --- /dev/null +++ b/models/ml/train_and_prediction/training_model_to_predict_position.py @@ -0,0 +1,65 @@ +import snowflake.snowpark.functions as F +from sklearn.model_selection import train_test_split +import pandas as pd +from sklearn.metrics import confusion_matrix, balanced_accuracy_score +import io +from sklearn.linear_model import LogisticRegression +from joblib import dump, load +import joblib +import logging +import sys +from joblib import dump, load + +logger = logging.getLogger("mylog") + +def save_file(session, model, path, dest_filename): + input_stream = io.BytesIO() + joblib.dump(model, input_stream) + session._conn.upload_stream(input_stream, path, dest_filename) + return "successfully created file: " + path + +def model(dbt, session): + dbt.config( + packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'], + materialized = "table", + tags = "train" + ) + # Create a stage in Snowflake to save our model file + session.sql('create or replace stage MODELSTAGE').collect() + + #session._use_scoped_temp_objects = False + version = "1.0" + logger.info('Model training version: ' + version) + + # read in our training and testing upstream dataset + test_train_df = dbt.ref("training_testing_dataset") + + # cast snowpark df to pandas df + test_train_pd_df = test_train_df.to_pandas() + target_col = "POSITION_LABEL" + + # split out covariate predictors, x, from our target column position_label, y. + split_X = test_train_pd_df.drop([target_col], axis=1) + split_y = test_train_pd_df[target_col] + + # Split out our training and test data into proportions + X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42) + train = [X_train, y_train] + test = [X_test, y_test] + # now we are only training our one model to deploy + # we are keeping the focus on the workflows and not algorithms for this lab! + model = LogisticRegression() + + # fit the preprocessing pipeline and the model together + model.fit(X_train, y_train) + + # Save the model to a stage + save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) + logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib") + + # Take our pandas training and testing dataframes and put them back into snowpark dataframes + snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True) + snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True) + + # Union our training and testing data together and add a column indicating train vs test rows + return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test"))) \ No newline at end of file diff --git a/models/ml/training_and_prediction/apply_prediction_to_position.py b/models/ml/training_and_prediction/apply_prediction_to_position.py new file mode 100644 index 0000000..d6648c5 --- /dev/null +++ b/models/ml/training_and_prediction/apply_prediction_to_position.py @@ -0,0 +1,91 @@ +import logging +import joblib +import pandas as pd +import os +from snowflake.snowpark import types as T + +DB_STAGE = 'MODELSTAGE' +version = '1.0' +# The name of the model file +model_file_path = 'driver_position_'+version +model_file_packaged = 'driver_position_'+version+'.joblib' + +# This is a local directory, used for storing the various artifacts locally +LOCAL_TEMP_DIR = f'/tmp/driver_position' +DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download') +TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model') +TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib') + +# The feature columns that were used during model training +# and that will be used during prediction +FEATURE_COLS = [ + "RACE_YEAR" + ,"RACE_NAME" + ,"GRID" + ,"CONSTRUCTOR_NAME" + ,"DRIVER" + ,"DRIVERS_AGE_YEARS" + ,"DRIVER_CONFIDENCE" + ,"CONSTRUCTOR_RELAIBLITY" + ,"TOTAL_PIT_STOPS_PER_RACE"] + +def register_udf_for_prediction(p_predictor ,p_session ,p_dbt): + + # The prediction udf + + def predict_position(p_df: T.PandasDataFrame[int, int, int, int, + int, int, int, int, int]) -> T.PandasSeries[int]: + # Snowpark currently does not set the column name in the input dataframe + # The default col names are like 0,1,2,... Hence we need to reset the column + # names to the features that we initially used for training. + p_df.columns = [*FEATURE_COLS] + + # Perform prediction. this returns an array object + pred_array = p_predictor.predict(p_df) + # Convert to series + df_predicted = pd.Series(pred_array) + return df_predicted + + # The list of packages that will be used by UDF + udf_packages = p_dbt.config.get('packages') + + predict_position_udf = p_session.udf.register( + predict_position + ,name=f'predict_position' + ,packages = udf_packages + ) + return predict_position_udf + +def download_models_and_libs_from_stage(p_session): + p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR) + +def load_model(p_session): + # Load the model and initialize the predictor + model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged) + predictor = joblib.load(model_fl_path) + return predictor + +# ------------------------------- +def model(dbt, session): + dbt.config( + packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'], + materialized = "table", + tags = "predict" + ) + session._use_scoped_temp_objects = False + download_models_and_libs_from_stage(session) + predictor = load_model(session) + predict_position_udf = register_udf_for_prediction(predictor, session ,dbt) + + # Retrieve the data, and perform the prediction + hold_out_df = (dbt.ref("hold_out_dataset_for_prediction") + .select(*FEATURE_COLS) + ) + trained_model_file = dbt.ref("training_model_to_predict_position") + + # Perform prediction. + new_predictions_df = hold_out_df.withColumn("position_predicted" + ,predict_position_udf(*FEATURE_COLS) + ) + + return new_predictions_df \ No newline at end of file diff --git a/package-lock.yml b/package-lock.yml new file mode 100644 index 0000000..c73bffd --- /dev/null +++ b/package-lock.yml @@ -0,0 +1,4 @@ +packages: +- package: dbt-labs/dbt_utils + version: 1.0.0 +sha1_hash: efa9169fb1f1a1b2c967378c02b60e3d85ae464b