Skip to content

Commit c7f46ef

Browse files
authored
Merge pull request #250 from datakind/inference-pipeline-testing-refactor-v3
Inference pipeline testing refactor v3
2 parents 6549223 + c7b512e commit c7f46ef

File tree

5 files changed

+185
-178
lines changed

5 files changed

+185
-178
lines changed

pipelines/pdp/inference/pdp_inference/workflow_asset_bundle/resources/github_sourced_pdp_inference_pipeline.yml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ resources:
4949
- --course_dataset_validated_path
5050
- "{{tasks.data_ingestion.values.course_dataset_validated_path}}"
5151
- --toml_file_path
52-
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/{{job.parameters.databricks_institution_name}}_{{job.parameters.model_name}}_configuration_file.toml"
52+
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/config.toml"
5353
- --custom_schemas_path
5454
- "{{job.parameters.custom_schemas_path}}"
5555
job_cluster_key: pdp-inference-pipeline-cluster
@@ -73,7 +73,7 @@ resources:
7373
- --input_table_path
7474
- "{{tasks.data_preprocessing.values.processed_dataset_path}}"
7575
- --input_schema_path
76-
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/schema.pbtxt # TODO(samroon2): Update once finalized.
76+
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/schema.pbtxt # TODO(samroon2): Update once finalized.
7777
- --output_artifact_path
7878
- "{{tasks.data_ingestion.values.job_root_dir}}"
7979
- --environment
@@ -97,7 +97,7 @@ resources:
9797
- --input_table_path
9898
- "{{tasks.data_preprocessing.values.processed_dataset_path}}"
9999
- --input_schema_path
100-
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/schema.pbtxt # TODO(samroon2): Update once finalized.
100+
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/schema.pbtxt # TODO(samroon2): Update once finalized.
101101
- --output_artifact_path
102102
- "{{tasks.data_ingestion.values.job_root_dir}}"
103103
- --environment
@@ -135,7 +135,7 @@ resources:
135135
- --DK_CC_EMAIL
136136
- "{{job.parameters.DK_CC_EMAIL}}"
137137
- --modeling_table_path
138-
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_gold.modeling_table"
138+
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_silver.{{job.parameters.databricks_institution_name}}_pdp_modeling_ar_deid"
139139
- --custom_schemas_path
140140
- "{{job.parameters.custom_schemas_path}}"
141141
job_cluster_key: pdp-inference-pipeline-cluster
@@ -212,19 +212,19 @@ resources:
212212
enabled: true
213213
parameters:
214214
- name: cohort_file_name
215-
default: kentucky_state_uni_pdp_ar_deid_20241029000400.csv
215+
default: AO1600pdp_AO1600_AR_DEIDENTIFIED_STUDYID_20250522120554.csv
216216
- name: course_file_name
217-
default: kentucky_state_uni_pdp_course_ar_deid_20241029000414_dedup.csv
217+
default: AO1600pdp_AO1600_COURSE_LEVEL_AR_DEIDENTIFIED_STUDYID_20250522120554.csv
218218
- name: databricks_institution_name
219-
default: kentucky_state_uni
219+
default: midway_uni
220220
- name: db_run_id
221221
default: "{{job.run_id}}"
222222
- name: DB_workspace
223223
default: ${var.DB_workspace}
224224
- name: gcp_bucket_name
225-
default: dev_6782b2f451f84c17ae6e14e918432b65
225+
default: databricks-2052166062819251-unitycatalog
226226
- name: model_name
227-
default: kentucky_state_uni_retention_end_of_first_year
227+
default: midway_uni_graduation_4y_end_of_first_year
228228
- name: model_type
229229
default: sklearn
230230
- name: notification_email

pipelines/pdp/tasks/pdp_data_ingestion/pdp_data_ingestion.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from databricks.sdk.runtime import dbutils
2121
from google.cloud import storage
2222

23+
from student_success_tool.dataio import schemas
2324
import student_success_tool.dataio as dataio
2425
import importlib
2526

@@ -227,14 +228,10 @@ def run(self):
227228
Executes the data ingestion task.
228229
"""
229230
raw_files_path = f"{self.args.job_root_dir}/raw_files/"
230-
# os.makedirs(raw_files_path, exist_ok=True)
231231
print("raw_files_path:", raw_files_path)
232232
dbutils.fs.mkdirs(raw_files_path)
233233

234-
# fpath_course, fpath_cohort = self.download_data_from_gcs(raw_files_path)
235-
# Hack to get around gcp permissions right now
236-
fpath_course = f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs/{self.args.course_file_name}"
237-
fpath_cohort = f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs/{self.args.cohort_file_name}"
234+
fpath_course, fpath_cohort = self.download_data_from_gcs(raw_files_path)
238235
df_course, df_cohort = self.read_and_validate_data(fpath_course, fpath_cohort)
239236

240237
course_dataset_validated_path, cohort_dataset_validated_path = (
@@ -295,33 +292,40 @@ def parse_arguments() -> argparse.Namespace:
295292

296293
if __name__ == "__main__":
297294
args = parse_arguments()
298-
sys.path.append(args.custom_schemas_path)
299295
sys.path.append(
300-
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs"
296+
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
297+
)
298+
logging.info(
299+
"Files in the inference inputs path: %s",
300+
os.listdir(
301+
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
302+
),
301303
)
302304
try:
303-
print("Listdir1", os.listdir("/Workspace/Users"))
304-
# converter_func = importlib.import_module(f"{args.databricks_institution_name}.dataio")
305305
converter_func = importlib.import_module("dataio")
306-
course_converter_func = converter_func.converter_func_course
307306
cohort_converter_func = converter_func.converter_func_cohort
308-
logging.info("Running task with custom converter func")
309-
except ModuleNotFoundError:
310-
print("Running task without custom converter func")
311-
course_converter_func = None
307+
logging.info("Running task with custom cohort converter func")
308+
except Exception:
312309
cohort_converter_func = None
313-
logging.info("Running task without custom converter func")
310+
logging.info("Running task with default cohort converter func")
311+
try:
312+
converter_func = importlib.import_module("dataio")
313+
course_converter_func = converter_func.converter_func_course
314+
logging.info("Running task with custom course converter func")
315+
except Exception:
316+
course_converter_func = None
317+
logging.info("Running task default course converter func")
314318
try:
315-
print("sys.path:", sys.path)
316-
# schemas = importlib.import_module(f"{args.databricks_institution_name}.schemas")
317319
schemas = importlib.import_module("schemas")
318320
logging.info("Running task with custom schema")
319321
except Exception:
320-
print("Running task with default schema")
321-
print("Exception", Exception)
322322
from student_success_tool.dataio.schemas import pdp as schemas
323323

324324
logging.info("Running task with default schema")
325325

326-
task = DataIngestionTask(args)
326+
task = DataIngestionTask(
327+
args,
328+
cohort_converter_func=cohort_converter_func,
329+
course_converter_func=course_converter_func,
330+
)
327331
task.run()

pipelines/pdp/tasks/pdp_data_preprocessing/pdp_data_preprocessing.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626

2727
# import student_success_tool.preprocessing.targets.pdp as targets
2828
from student_success_tool import preprocessing
29-
from student_success_tool.preprocessing import selection
29+
from student_success_tool.preprocessing import selection, checkpoints
3030
from student_success_tool.configs.pdp import PDPProjectConfig
3131

32+
3233
# Disable mlflow autologging (due to Databricks issues during feature selection)
3334
mlflow.autolog(disable=True)
3435

@@ -126,16 +127,7 @@ def preprocess_data(
126127
"""
127128

128129
# Read preprocessing features from config
129-
min_passing_grade = self.cfg.preprocessing.features.min_passing_grade
130-
min_num_credits_full_time = (
131-
self.cfg.preprocessing.features.min_num_credits_full_time
132-
)
133-
course_level_pattern = self.cfg.preprocessing.features.course_level_pattern
134-
core_terms = self.cfg.preprocessing.features.core_terms
135-
key_course_subject_areas = (
136-
self.cfg.preprocessing.features.key_course_subject_areas
137-
)
138-
key_course_ids = self.cfg.preprocessing.features.key_course_ids
130+
checkpoint_type = self.cfg.preprocessing.checkpoint.type_
139131

140132
# Read preprocessing target parameters from config
141133
student_criteria = self.cfg.preprocessing.selection.student_criteria
@@ -145,23 +137,36 @@ def preprocess_data(
145137
df_student_terms = preprocessing.pdp.make_student_term_dataset(
146138
df_cohort,
147139
df_course,
148-
min_passing_grade=min_passing_grade,
149-
min_num_credits_full_time=min_num_credits_full_time,
150-
course_level_pattern=course_level_pattern,
151-
core_terms=core_terms,
152-
key_course_subject_areas=key_course_subject_areas,
153-
key_course_ids=key_course_ids,
140+
min_passing_grade=self.cfg.preprocessing.features.min_passing_grade,
141+
min_num_credits_full_time=self.cfg.preprocessing.features.min_num_credits_full_time,
142+
course_level_pattern=self.cfg.preprocessing.features.course_level_pattern,
143+
core_terms=self.cfg.preprocessing.features.core_terms,
144+
key_course_subject_areas=self.cfg.preprocessing.features.key_course_subject_areas,
145+
key_course_ids=self.cfg.preprocessing.features.key_course_ids,
154146
)
155-
eligible_students = selection.pdp.select_students_by_attributes(
147+
148+
selected_students = selection.pdp.select_students_by_attributes(
156149
df_student_terms, student_id_cols=student_id_col, **student_criteria
157150
)
158-
max_term_rank = df_student_terms["term_rank"].max()
151+
if checkpoint_type == "nth":
152+
logging.info("Checkpoint type: nth")
153+
df_ckpt = checkpoints.pdp.nth_student_terms(
154+
df_student_terms,
155+
n=self.cfg.preprocessing.checkpoint.n,
156+
sort_cols=self.cfg.preprocessing.checkpoint.sort_cols,
157+
include_cols=self.cfg.preprocessing.checkpoint.include_cols,
158+
enrollment_year_col="year_of_enrollment_at_cohort_inst",
159+
valid_enrollment_year=1,
160+
)
161+
elif checkpoint_type == "first_at_num_credits_earned":
162+
logging.info("Checkpoint type: first_at_num_credits_earned")
163+
df_ckpt = checkpoints.pdp.first_student_terms_at_num_credits_earned(
164+
df_student_terms,
165+
min_num_credits=self.cfg.preprocessing.checkpoint.min_num_credits,
166+
)
159167

160168
df_processed = pd.merge(
161-
df_student_terms.loc[df_student_terms["term_rank"].eq(max_term_rank), :],
162-
eligible_students,
163-
on=student_id_col,
164-
how="inner",
169+
df_ckpt, pd.Series(selected_students.index), how="inner", on=student_id_col
165170
)
166171

167172
df_processed = preprocessing.pdp.clean_up_labeled_dataset_cols_and_vals(
@@ -261,7 +266,7 @@ def parse_arguments() -> argparse.Namespace:
261266
try:
262267
sys.path.append(args.custom_schemas_path)
263268
sys.path.append(
264-
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs"
269+
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
265270
)
266271
schemas = importlib.import_module("schemas")
267272
# schemas = importlib.import_module(f"{args.databricks_institution_name}.schemas")

0 commit comments

Comments
 (0)