Skip to content
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
ab36aaf
split up converter function for course and cohort
kaylawilding Jun 11, 2025
b1a6d5c
Change exception type
kaylawilding Jun 11, 2025
2783993
Update gcp bucket
kaylawilding Jun 11, 2025
89c9002
Update pdp_data_ingestion.py
kaylawilding Jun 11, 2025
65fbc11
Update pdp_data_ingestion.py
kaylawilding Jun 11, 2025
bfb399d
Include conv func to task
kaylawilding Jun 11, 2025
0d99266
Update pdp_data_ingestion.py
kaylawilding Jun 11, 2025
78983d5
Change inputs from bronze vol to gold
kaylawilding Jun 11, 2025
76bd280
Update github_sourced_pdp_inference_pipeline.yml
kaylawilding Jun 11, 2025
343d288
Update model uri
kaylawilding Jun 12, 2025
c866be3
pull latest model version
kaylawilding Jun 12, 2025
cea8f9b
Update preprocessing to v3
kaylawilding Jun 13, 2025
464cfcb
Update pdp_data_preprocessing.py
kaylawilding Jun 13, 2025
8fc1652
Update modeling table path
kaylawilding Jun 13, 2025
198fe04
Change inputs to only use model feature dfs
kaylawilding Jun 15, 2025
ca3c66b
limit to 100 for testing
kaylawilding Jun 15, 2025
c2bb1a6
Update pdp_model_inference.py
kaylawilding Jun 15, 2025
63d4464
ADd experiment
kaylawilding Jun 15, 2025
5337811
Update pdp_model_inference.py
kaylawilding Jun 16, 2025
4bbab86
Update pdp_model_inference.py
kaylawilding Jun 16, 2025
94336a7
Update pdp_model_inference.py
kaylawilding Jun 16, 2025
1fc579f
Add inference tables back in
kaylawilding Jun 20, 2025
92ba31f
Update parameters for support score distribution table
kaylawilding Jun 20, 2025
50a7869
Update pdp_model_inference.py
kaylawilding Jun 20, 2025
fe6b63a
Update pdp_model_inference.py
kaylawilding Jun 20, 2025
31354a3
Update pdp_model_inference.py
kaylawilding Jun 20, 2025
67fc6ec
removing inference run_id suffix
Jun 20, 2025
afad53c
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
vishpillai123 Jun 20, 2025
8878a4e
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
kaylawilding Jun 20, 2025
4aecec6
Update pdp_model_inference.py
kaylawilding Jun 20, 2025
e54f8ae
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
kaylawilding Jun 20, 2025
fda3bb8
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
kaylawilding Jun 20, 2025
3804d15
Update pdp_data_ingestion.py
kaylawilding Jun 23, 2025
a3af43c
Add checkpoint type structure
kaylawilding Jun 24, 2025
8ab5afc
Add logging
kaylawilding Jun 25, 2025
36eee9e
Update logging
kaylawilding Jun 25, 2025
d511147
rename custom schema file
kaylawilding Jun 25, 2025
7028510
Edit package import
kaylawilding Jun 25, 2025
e4c084b
Update pdp_data_ingestion.py
kaylawilding Jun 25, 2025
666e2e5
Update pdp_data_ingestion.py
kaylawilding Jun 25, 2025
77357d9
update logging
kaylawilding Jun 25, 2025
22416dc
Update pdp_data_ingestion.py
kaylawilding Jun 25, 2025
37330c7
fix logging
kaylawilding Jun 26, 2025
3295151
workaround for explainer
kaylawilding Jun 26, 2025
5de5e19
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
179fc3c
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
bb31bbc
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
0d7ba87
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
98419a8
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
2fa25a3
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
cc3f278
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
96507ce
remove experiment id in startrun
kaylawilding Jun 26, 2025
aa094b5
add logging of run and exp ids
kaylawilding Jun 26, 2025
854976f
Update pdp_model_inference.py
kaylawilding Jun 26, 2025
5aad768
Remove sample limit
kaylawilding Jun 27, 2025
5a6e6a4
add nested lists to configs for subject area and course ids
kaylawilding Jun 27, 2025
4e88bea
limit to 30 for testing
kaylawilding Jun 29, 2025
40d5cad
Create inference-integration-deployment.yml
kaylawilding Jul 15, 2025
6a44521
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
kaylawilding Jul 15, 2025
e3b8eb0
Update uv.lock
kaylawilding Jul 15, 2025
a72c674
linting
kaylawilding Jul 15, 2025
b652ba4
linting
kaylawilding Jul 15, 2025
61c53f4
update pull request typo
kaylawilding Jul 15, 2025
7e91405
Delete inference-integration-deployment.yml
kaylawilding Jul 15, 2025
159b4e5
Update github_sourced_pdp_inference_pipeline.yml
kaylawilding Jul 15, 2025
542a744
Create inference-integration-deployment.yml
kaylawilding Jul 15, 2025
f03d814
Merge branch 'develop' into inference-pipeline-testing-refactor-v3
kaylawilding Jul 25, 2025
b85515d
remove unused code and add parallelization back in
kaylawilding Jul 25, 2025
128c5ac
Merge branch 'inference-pipeline-testing-refactor-v3' of https://gith…
kaylawilding Jul 25, 2025
c7b512e
Delete inference-integration-deployment.yml
kaylawilding Jul 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ resources:
- --course_dataset_validated_path
- "{{tasks.data_ingestion.values.course_dataset_validated_path}}"
- --toml_file_path
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/{{job.parameters.databricks_institution_name}}_{{job.parameters.model_name}}_configuration_file.toml"
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/config.toml"
- --custom_schemas_path
- "{{job.parameters.custom_schemas_path}}"
job_cluster_key: pdp-inference-pipeline-cluster
Expand All @@ -73,7 +73,7 @@ resources:
- --input_table_path
- "{{tasks.data_preprocessing.values.processed_dataset_path}}"
- --input_schema_path
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/schema.pbtxt # TODO(samroon2): Update once finalized.
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/schema.pbtxt # TODO(samroon2): Update once finalized.
- --output_artifact_path
- "{{tasks.data_ingestion.values.job_root_dir}}"
- --environment
Expand All @@ -97,7 +97,7 @@ resources:
- --input_table_path
- "{{tasks.data_preprocessing.values.processed_dataset_path}}"
- --input_schema_path
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/configuration_files/schema.pbtxt # TODO(samroon2): Update once finalized.
- /Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/schema.pbtxt # TODO(samroon2): Update once finalized.
- --output_artifact_path
- "{{tasks.data_ingestion.values.job_root_dir}}"
- --environment
Expand Down Expand Up @@ -135,7 +135,7 @@ resources:
- --DK_CC_EMAIL
- "{{job.parameters.DK_CC_EMAIL}}"
- --modeling_table_path
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_gold.modeling_table"
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_silver.{{job.parameters.databricks_institution_name}}_pdp_modeling_ar_deid"
- --custom_schemas_path
- "{{job.parameters.custom_schemas_path}}"
job_cluster_key: pdp-inference-pipeline-cluster
Expand Down Expand Up @@ -212,19 +212,19 @@ resources:
enabled: true
parameters:
- name: cohort_file_name
default: kentucky_state_uni_pdp_ar_deid_20241029000400.csv
default: AO1600pdp_AO1600_AR_DEIDENTIFIED_STUDYID_20250522120554.csv
- name: course_file_name
default: kentucky_state_uni_pdp_course_ar_deid_20241029000414_dedup.csv
default: AO1600pdp_AO1600_COURSE_LEVEL_AR_DEIDENTIFIED_STUDYID_20250522120554.csv
- name: databricks_institution_name
default: kentucky_state_uni
default: midway_uni
- name: db_run_id
default: "{{job.run_id}}"
- name: DB_workspace
default: ${var.DB_workspace}
- name: gcp_bucket_name
default: dev_6782b2f451f84c17ae6e14e918432b65
default: databricks-2052166062819251-unitycatalog
- name: model_name
default: kentucky_state_uni_retention_end_of_first_year
default: midway_uni_graduation_4y_end_of_first_year
- name: model_type
default: sklearn
- name: notification_email
Expand Down
45 changes: 25 additions & 20 deletions pipelines/pdp/tasks/pdp_data_ingestion/pdp_data_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from databricks.sdk.runtime import dbutils
from google.cloud import storage

from student_success_tool.dataio import schemas
import student_success_tool.dataio as dataio
import importlib

Expand Down Expand Up @@ -227,14 +228,10 @@ def run(self):
Executes the data ingestion task.
"""
raw_files_path = f"{self.args.job_root_dir}/raw_files/"
# os.makedirs(raw_files_path, exist_ok=True)
print("raw_files_path:", raw_files_path)
dbutils.fs.mkdirs(raw_files_path)

# fpath_course, fpath_cohort = self.download_data_from_gcs(raw_files_path)
# Hack to get around gcp permissions right now
fpath_course = f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs/{self.args.course_file_name}"
fpath_cohort = f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs/{self.args.cohort_file_name}"
fpath_course, fpath_cohort = self.download_data_from_gcs(raw_files_path)
df_course, df_cohort = self.read_and_validate_data(fpath_course, fpath_cohort)

course_dataset_validated_path, cohort_dataset_validated_path = (
Expand Down Expand Up @@ -295,33 +292,41 @@ def parse_arguments() -> argparse.Namespace:

if __name__ == "__main__":
args = parse_arguments()
sys.path.append(args.custom_schemas_path)
# sys.path.append(args.custom_schemas_path)
sys.path.append(
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs"
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
)
logging.info(
"Files in the inference inputs path: %s",
os.listdir(
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
),
)
try:
print("Listdir1", os.listdir("/Workspace/Users"))
# converter_func = importlib.import_module(f"{args.databricks_institution_name}.dataio")
converter_func = importlib.import_module("dataio")
course_converter_func = converter_func.converter_func_course
cohort_converter_func = converter_func.converter_func_cohort
logging.info("Running task with custom converter func")
except ModuleNotFoundError:
print("Running task without custom converter func")
course_converter_func = None
logging.info("Running task with custom cohort converter func")
except Exception:
cohort_converter_func = None
logging.info("Running task without custom converter func")
logging.info("Running task with default cohort converter func")
try:
converter_func = importlib.import_module("dataio")
course_converter_func = converter_func.converter_func_course
logging.info("Running task with custom course converter func")
except Exception:
course_converter_func = None
logging.info("Running task default course converter func")
try:
print("sys.path:", sys.path)
# schemas = importlib.import_module(f"{args.databricks_institution_name}.schemas")
schemas = importlib.import_module("schemas")
logging.info("Running task with custom schema")
except Exception:
print("Running task with default schema")
print("Exception", Exception)
from student_success_tool.dataio.schemas import pdp as schemas

logging.info("Running task with default schema")

task = DataIngestionTask(args)
task = DataIngestionTask(
args,
cohort_converter_func=cohort_converter_func,
course_converter_func=course_converter_func,
)
task.run()
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

# import student_success_tool.preprocessing.targets.pdp as targets
from student_success_tool import preprocessing
from student_success_tool.preprocessing import selection
from student_success_tool.preprocessing import selection, checkpoints
from student_success_tool.configs.pdp import PDPProjectConfig


# Disable mlflow autologging (due to Databricks issues during feature selection)
mlflow.autolog(disable=True)

Expand Down Expand Up @@ -126,16 +127,7 @@ def preprocess_data(
"""

# Read preprocessing features from config
min_passing_grade = self.cfg.preprocessing.features.min_passing_grade
min_num_credits_full_time = (
self.cfg.preprocessing.features.min_num_credits_full_time
)
course_level_pattern = self.cfg.preprocessing.features.course_level_pattern
core_terms = self.cfg.preprocessing.features.core_terms
key_course_subject_areas = (
self.cfg.preprocessing.features.key_course_subject_areas
)
key_course_ids = self.cfg.preprocessing.features.key_course_ids
checkpoint_type = self.cfg.preprocessing.checkpoint.type_

# Read preprocessing target parameters from config
student_criteria = self.cfg.preprocessing.selection.student_criteria
Expand All @@ -145,23 +137,36 @@ def preprocess_data(
df_student_terms = preprocessing.pdp.make_student_term_dataset(
df_cohort,
df_course,
min_passing_grade=min_passing_grade,
min_num_credits_full_time=min_num_credits_full_time,
course_level_pattern=course_level_pattern,
core_terms=core_terms,
key_course_subject_areas=key_course_subject_areas,
key_course_ids=key_course_ids,
min_passing_grade=self.cfg.preprocessing.features.min_passing_grade,
min_num_credits_full_time=self.cfg.preprocessing.features.min_num_credits_full_time,
course_level_pattern=self.cfg.preprocessing.features.course_level_pattern,
core_terms=self.cfg.preprocessing.features.core_terms,
key_course_subject_areas=self.cfg.preprocessing.features.key_course_subject_areas,
key_course_ids=self.cfg.preprocessing.features.key_course_ids,
)
eligible_students = selection.pdp.select_students_by_attributes(

selected_students = selection.pdp.select_students_by_attributes(
df_student_terms, student_id_cols=student_id_col, **student_criteria
)
max_term_rank = df_student_terms["term_rank"].max()
if checkpoint_type == "nth":
logging.info("Checkpoint type: nth")
df_ckpt = checkpoints.pdp.nth_student_terms(
df_student_terms,
n=self.cfg.preprocessing.checkpoint.n,
sort_cols=self.cfg.preprocessing.checkpoint.sort_cols,
include_cols=self.cfg.preprocessing.checkpoint.include_cols,
enrollment_year_col="year_of_enrollment_at_cohort_inst",
valid_enrollment_year=1,
)
elif checkpoint_type == "first_at_num_credits_earned":
logging.info("Checkpoint type: first_at_num_credits_earned")
df_ckpt = checkpoints.pdp.first_student_terms_at_num_credits_earned(
df_student_terms,
min_num_credits=self.cfg.preprocessing.checkpoint.min_num_credits,
)

df_processed = pd.merge(
df_student_terms.loc[df_student_terms["term_rank"].eq(max_term_rank), :],
eligible_students,
on=student_id_col,
how="inner",
df_ckpt, pd.Series(selected_students.index), how="inner", on=student_id_col
)

df_processed = preprocessing.pdp.clean_up_labeled_dataset_cols_and_vals(
Expand Down Expand Up @@ -261,7 +266,7 @@ def parse_arguments() -> argparse.Namespace:
try:
sys.path.append(args.custom_schemas_path)
sys.path.append(
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_bronze/bronze_volume/inference_inputs"
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
)
schemas = importlib.import_module("schemas")
# schemas = importlib.import_module(f"{args.databricks_institution_name}.schemas")
Expand Down
Loading