diff --git a/learn/custom_or_prebuilt_components.py b/learn/custom_or_prebuilt_components.py new file mode 100644 index 0000000..011d194 --- /dev/null +++ b/learn/custom_or_prebuilt_components.py @@ -0,0 +1,120 @@ +""" +Machine Learning Operations Playbook Adoption Workshop – Phase 2: +Data Services Integration Architecture - Scavenger Hunt + +File: custom_or_prebuilt_components.py + +Purpose: +-------- +This file provides scavenger hunt instructions for learners to explore +existing AWS-based labs (Lab 6.1 and Lab 6.2) and plan migration to +Vertex AI architecture. The focus is on Amazon S3 and Amazon Redshift +integration patterns, and how to prepare to convert them into Vertex AI +components using Kubeflow @component decorators. + +Learners should use VSCode/PyCharm search to locate the TODO markers +listed below and record WHERE (line of code), WHAT (purpose), and WHY +(rationale for migration). This stage is planning-only: code remains in +its AWS state, but learners should envision how it will map to Vertex AI +components. + +Target Vertex Architecture Structure: +------------------------------------- +├── src/ +│ ├── components/ +│ │ ├── __init__.py +│ │ │ +│ │ ├── custom_data_quality_components.py # ✅ Custom +│ │ ├── custom_training_components.py # ✅ Custom +│ │ ├── custom_evaluation_components.py # ✅ Custom +│ │ ├── custom_registry_components.py # ✅ Custom +│ │ ├── custom_monitoring_components.py # ✅ Custom +│ │ ├── custom_audit_components.py # ✅ Custom +│ │ ├── custom_sysco_modelplaceholder_components.py # ✅ Custom +│ │ │ +│ │ └── prebuilt_bigquery_components.py # ✅ Pre-built + +Scavenger Hunt Instructions: +---------------------------- + +1. Lab 6.1 — Amazon S3 Integration with SageMaker Workflows + - Search: "# TODO: Lab 6.1.1 - Line-by-Line Import Exploration" + * WHERE: Top of model.py imports + * WHAT: Identify boto3/joblib imports + * WHY: These libraries enable artifact persistence in S3 + * Migration Planning: In Vertex AI, this logic would move into + custom_training_components.py with @component decorators, using + GCS (gs:// URIs) instead of S3. + - Search: "# TODO: Lab 6.1.4 - S3 Data Loading Conversion" + * WHERE: _s3_persist() function in model.py + * WHAT: Inspect boto3.upload_file usage + * WHY: Durable storage pattern in AWS + * Migration Planning: Replace with GCS client logic inside a + @component in custom_registry_components.py. + + AWS Information to Gather for Migration: + - S3 bucket name (e.g., `my-ml-artifacts-bucket`) + - Bucket region (e.g., `us-east-1`) + - IAM role or access keys with `AmazonS3FullAccess` + - Artifact paths (prefixes like `s3://bucket/models/`) + - Current SageMaker registry integration points + + Equivalent in GCP: + - GCS bucket name (e.g., `gs://my-ml-artifacts`) + - GCP project ID and region + - Service account with `Storage Admin` role + - Artifact paths in GCS (prefixes like `gs://bucket/models/`) + +2. Lab 6.2 — Amazon Redshift Data Pipeline and ML Integration + - Search: "# TODO: Lab 6.2.1 - Data Access Pattern Conversion" + * WHERE: ingest_model.py _read_from_redshift() + * WHAT: Inspect select_sql_from_dict or pd.read_sql usage + * WHY: Redshift → DataFrame conversion + * Migration Planning: Equivalent logic would move into + prebuilt_bigquery_components.py using BigQuery query components. + - Search: "# TODO: Lab 6.2.4 - Data Movement and Performance Considerations" + * WHERE: stage_table_to_s3() in ingest_model.py + * WHAT: Inspect UNLOAD vs client-side upload patterns + * WHY: Efficiency vs cost trade-offs in Redshift + * Migration Planning: Replace with BigQuery export jobs inside + prebuilt_bigquery_components.py or custom_data_quality_components.py. + + AWS Information to Gather for Migration: + - Redshift cluster identifier (e.g., `redshift-cluster-1`) + - Database name (e.g., `analytics_db`) + - Schema names (e.g., `public`, `ml_features`) + - User credentials or IAM role with Redshift access + - Connection endpoint (host, port) + - Common SQL queries used for ETL (COPY, UNLOAD, CTAS) + + Equivalent in GCP: + - BigQuery dataset name (e.g., `ml_features_dataset`) + - BigQuery table names (e.g., `training_data`, `evaluation_data`) + - GCP project ID and region + - Service account with `BigQuery Admin` role + - SQL queries adapted to BigQuery syntax (SELECT, CREATE TABLE AS) + +3. Planning Migration with Vertex Kubeflow @component Decorators + - For S3 → GCS: + * Wrap artifact persistence logic in @component functions inside + custom_training_components.py and custom_registry_components.py. + * Replace boto3 calls with google-cloud-storage client calls. + - For Redshift → BigQuery: + * Wrap ETL and query logic in @component functions inside + prebuilt_bigquery_components.py. + * Replace psycopg2/sqlalchemy calls with google-cloud-bigquery client + or prebuilt BigQuery components. + +Learner Deliverable: +-------------------- +For each TODO marker found: +- Record WHERE: file name and line of code +- Record WHAT: the code pattern or component +- Record WHY: the rationale for its use in AWS +- Record Migration Plan: which Vertex component file it would map to + (custom_* or prebuilt_bigquery_components.py) with @component decorator + +This scavenger hunt prepares learners to design the component structure +shown above by understanding the migration path from AWS (S3, Redshift) +to Vertex AI (GCS, BigQuery, Pipeline components). +""" diff --git a/learn/hands_on_exercise.py b/learn/hands_on_exercise.py deleted file mode 100644 index 19e8dcd..0000000 --- a/learn/hands_on_exercise.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -Ready? - -Review train.py and train_to_vertex_ai_conversion.py files for patterns. - -""" - -from kfp import dsl, components -from kfp.dsl import ( - component, - pipeline, - Input, - Output, - Model, - Metrics -) -from google_cloud_pipeline_components.types import artifact_types - -PIPELINE_NAME = "diabetes-classification-exercise-pipeline" -BASE_IMAGE = "python:3.9" - -bigquery_query_job_op = components.load_component_from_url( - 'https://us-kfp.pkg.dev/ml-pipeline/google-cloud-registry/' - 'bigquery-query-job/sha256:' - 'd1cae80bc0de4e5b95b994739c8d0d7d42ce5a4cb17d3c9512eaed14540f6343' -) - -@component( - base_image=BASE_IMAGE, - packages_to_install=[ - "google-cloud-bigquery", - "scikit-learn", - "joblib", - "pandas" - ] -) -def evaluate_model_op( - test_data: Input[artifact_types.BQTable], - model: Input[Model], - metrics: Output[Metrics], - min_accuracy: float, - project_id: str, - bq_location: str -) -> float: - import re, logging, joblib - import pandas as pd - from sklearn.metrics import accuracy_score - from google.cloud import bigquery - - logging.basicConfig(level=logging.INFO) - - uri = test_data.uri - match = re.search(r'projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)', uri) - if not match: - raise ValueError(f"Could not parse BQ table from URI: {uri}") - proj, dataset, table = match.groups() - table_ref = f"{proj}.{dataset}.{table}" - - bq_client = bigquery.Client(project=project_id, location=bq_location) - query = f"SELECT * FROM `{table_ref}`" - test_df = bq_client.query(query).to_dataframe() - - model_obj = joblib.load(model.path) - - FEATURE_COLUMNS = ["Pregnancies","PlasmaGlucose","DiastolicBloodPressure", - "TricepsThickness","SerumInsulin","BMI","DiabetesPedigree","Age"] - X_test = test_df[FEATURE_COLUMNS] - y_test = test_df["Diabetic"] - - preds = model_obj.predict(X_test) - accuracy = accuracy_score(y_test, preds) - - metrics.log_metric("accuracy", accuracy) - metrics.log_metric("test_samples", len(test_df)) - - return accuracy - -@component(base_image=BASE_IMAGE) -def model_approved_op(model_accuracy: float, model_name: str): - import logging - logging.basicConfig(level=logging.INFO) - logging.info("Model approved with accuracy: %.4f", model_accuracy) - -@component( - base_image=BASE_IMAGE, - packages_to_install=["google-cloud-aiplatform"] -) -def register_model_op( - project_id: str, - region: str, - model_display_name: str, - model_artifact: Input[Model], - parent_model: str = "" -): - from google.cloud import aiplatform - import logging - - logging.basicConfig(level=logging.INFO) - aiplatform.init(project=project_id, location=region) - - artifact_dir = model_artifact.uri.rsplit("/", 1)[0] - - upload_args = { - "display_name": model_display_name, - "artifact_uri": artifact_dir, - "serving_container_image_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-2:latest", - "sync": True - } - - if parent_model: - upload_args["parent_model"] = parent_model - - model = aiplatform.Model.upload(**upload_args) - -@dsl.pipeline( - name=PIPELINE_NAME, - description="Exercise pipeline for train.py to Vertex AI conversion" -) -def diabetes_training_pipeline( - project_id: str, - region: str = "us-central1", - model_display_name: str = "diabetes-classification-model", - bq_dataset: str = "shared_bronze", - bq_view: str = "diabetes_features_view", - reg_rate: float = 0.01, - min_accuracy: float = 0.70, - parent_model: str = "" -): - train_query = f""" - SELECT Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, - SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic - FROM `{project_id}.{bq_dataset}.{bq_view}` - WHERE MOD(ABS(FARM_FINGERPRINT(CAST(CONCAT(Pregnancies, PlasmaGlucose) AS STRING))), 10) < 8 - """ - - test_query = f""" - SELECT Pregnancies, PlasmaGlucose, DiastolicBloodPressure, TricepsThickness, - SerumInsulin, BMI, DiabetesPedigree, Age, Diabetic - FROM `{project_id}.{bq_dataset}.{bq_view}` - WHERE MOD(ABS(FARM_FINGERPRINT(CAST(CONCAT(Pregnancies, PlasmaGlucose) AS STRING))), 10) >= 8 - """ - - bq_train_task = bigquery_query_job_op( - project=project_id, - location=region, - query=train_query - ) - - bq_test_task = bigquery_query_job_op( - project=project_id, - location=region, - query=test_query - ) - - train_task = train_model_op( - train_data=bq_train_task.outputs["destination_table"], - reg_rate=reg_rate, - project_id=project_id, - bq_location=region - ).set_cpu_limit("1").set_memory_limit("3840Mi") - train_task.after(bq_train_task) - - eval_task = evaluate_model_op( - test_data=bq_test_task.outputs["destination_table"], - model=train_task.outputs["output_model"], - min_accuracy=min_accuracy, - project_id=project_id, - bq_location=region - ).set_cpu_limit("1").set_memory_limit("3840Mi") - eval_task.after(train_task) - - with dsl.If(eval_task.outputs["Output"] >= min_accuracy, name="pass-accuracy-threshold"): - approved_task = model_approved_op( - model_accuracy=eval_task.outputs["Output"], - model_name=model_display_name - ) - approved_task.after(eval_task) - - register_task = register_model_op( - project_id=project_id, - region=region, - model_display_name=model_display_name, - model_artifact=train_task.outputs["output_model"], - parent_model=parent_model - ) - register_task.after(approved_task) - - with dsl.If(eval_task.outputs["Output"] < min_accuracy, name="fail-accuracy-threshold"): - rejected_task = model_rejected_op( - model_accuracy=eval_task.outputs["Output"], - min_accuracy=min_accuracy - ) - rejected_task.after(eval_task) - -if __name__ == "__main__": - from kfp import compiler - - compiler.Compiler().compile( - pipeline_func=diabetes_training_pipeline, - package_path="diabetes_training_exercise.json" - ) - print("Pipeline compiled successfully!") diff --git a/requirements.txt b/requirements.txt index 6cacfa8..87422a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,100 @@ -# This file points to the actual requirements in src/ -# All dependencies are managed in src/requirements.txt --r src/requirements.txt +# ============================================================================ +# Core ML & Data Science Libraries +# ============================================================================ +# NumPy - Numerical computing (used by scikit-learn, pandas) +numpy==1.23.5 + +# Pandas - Data manipulation (used in all components for DataFrame operations) +pandas==1.5.3 + +# Scikit-learn - Machine learning (LogisticRegression, accuracy_score) +scikit-learn==1.2.2 + +# Joblib - Model serialization (save/load models as .joblib) +joblib==1.2.0 + +# ============================================================================ +# Google Cloud Platform Libraries +# ============================================================================ +# Google Cloud Storage - GCS bucket operations (pipeline artifacts) +google-cloud-storage==2.10.0 + +# Google Cloud AI Platform - Vertex AI SDK (pipeline execution, model registry) +google-cloud-aiplatform==1.56.0 + +# Google Cloud BigQuery - BigQuery client (data reading in components) +google-cloud-bigquery==3.11.4 + +# BigQuery data types - Required for BigQuery DataFrame compatibility +db-dtypes==1.1.1 + +# PyArrow - Efficient data serialization for BigQuery +pyarrow==12.0.1 + +# ============================================================================ +# Kubeflow Pipelines (KFP) v2 +# ============================================================================ +# KFP SDK - Pipeline definition and component creation +kfp==2.7.0 + +# Google Cloud Pipeline Components - Pre-built GCP components (BigQuery Query Job) +google-cloud-pipeline-components==2.14.1 + +# ============================================================================ +# Testing Libraries (for tests/unit/) +# ============================================================================ +# Pytest - Test framework +pytest==7.4.3 + +# Pytest-cov - Code coverage reporting +pytest-cov==4.1.0 + +# Pytest-mock - Enhanced mocking capabilities +pytest-mock==3.12.0 + +# ============================================================================ +# Code Quality & Linting (CI/CD) +# ============================================================================ +# Flake8 - Python linting (enforces PEP8, detects errors) +flake8==6.1.0 + +# Black - Code formatter (consistent code style) +black==23.12.1 + +# isort - Import sorting (organized imports) +isort==5.13.2 + +# ============================================================================ +# YAML Processing (for governance/resource_auditor.py) +# ============================================================================ +# PyYAML - YAML parsing (used in resource auditor for pipeline YAML) +PyYAML==6.0.1 + +# ============================================================================ +# Type Checking (Optional - for development) +# ============================================================================ +# Mypy - Static type checking +# mypy==1.7.1 + +# ============================================================================ +# Documentation (Optional - for future Sphinx docs) +# ============================================================================ +# Sphinx - Documentation generation +# sphinx==7.2.6 +# sphinx-rtd-theme==2.0.0 + +# ============================================================================ +# Version Compatibility Notes +# ============================================================================ +# - All versions tested with Python 3.9 +# - Compatible with base_image="python:3.9" in KFP components +# - google-cloud-aiplatform 1.56.0 supports KFP 2.7.0 +# - google-cloud-pipeline-components 2.14.1 requires KFP 2.7.0 +# - BigQuery libraries (google-cloud-bigquery, db-dtypes, pyarrow) work together +# - Pytest versions compatible with Python 3.9 +# +# Installation: +# pip install -r src/requirements.txt +# +# Update all packages (use with caution): +# pip install --upgrade -r src/requirements.txt diff --git a/src/requirements.txt b/src/requirements.txt deleted file mode 100644 index 87422a3..0000000 --- a/src/requirements.txt +++ /dev/null @@ -1,100 +0,0 @@ -# ============================================================================ -# Core ML & Data Science Libraries -# ============================================================================ -# NumPy - Numerical computing (used by scikit-learn, pandas) -numpy==1.23.5 - -# Pandas - Data manipulation (used in all components for DataFrame operations) -pandas==1.5.3 - -# Scikit-learn - Machine learning (LogisticRegression, accuracy_score) -scikit-learn==1.2.2 - -# Joblib - Model serialization (save/load models as .joblib) -joblib==1.2.0 - -# ============================================================================ -# Google Cloud Platform Libraries -# ============================================================================ -# Google Cloud Storage - GCS bucket operations (pipeline artifacts) -google-cloud-storage==2.10.0 - -# Google Cloud AI Platform - Vertex AI SDK (pipeline execution, model registry) -google-cloud-aiplatform==1.56.0 - -# Google Cloud BigQuery - BigQuery client (data reading in components) -google-cloud-bigquery==3.11.4 - -# BigQuery data types - Required for BigQuery DataFrame compatibility -db-dtypes==1.1.1 - -# PyArrow - Efficient data serialization for BigQuery -pyarrow==12.0.1 - -# ============================================================================ -# Kubeflow Pipelines (KFP) v2 -# ============================================================================ -# KFP SDK - Pipeline definition and component creation -kfp==2.7.0 - -# Google Cloud Pipeline Components - Pre-built GCP components (BigQuery Query Job) -google-cloud-pipeline-components==2.14.1 - -# ============================================================================ -# Testing Libraries (for tests/unit/) -# ============================================================================ -# Pytest - Test framework -pytest==7.4.3 - -# Pytest-cov - Code coverage reporting -pytest-cov==4.1.0 - -# Pytest-mock - Enhanced mocking capabilities -pytest-mock==3.12.0 - -# ============================================================================ -# Code Quality & Linting (CI/CD) -# ============================================================================ -# Flake8 - Python linting (enforces PEP8, detects errors) -flake8==6.1.0 - -# Black - Code formatter (consistent code style) -black==23.12.1 - -# isort - Import sorting (organized imports) -isort==5.13.2 - -# ============================================================================ -# YAML Processing (for governance/resource_auditor.py) -# ============================================================================ -# PyYAML - YAML parsing (used in resource auditor for pipeline YAML) -PyYAML==6.0.1 - -# ============================================================================ -# Type Checking (Optional - for development) -# ============================================================================ -# Mypy - Static type checking -# mypy==1.7.1 - -# ============================================================================ -# Documentation (Optional - for future Sphinx docs) -# ============================================================================ -# Sphinx - Documentation generation -# sphinx==7.2.6 -# sphinx-rtd-theme==2.0.0 - -# ============================================================================ -# Version Compatibility Notes -# ============================================================================ -# - All versions tested with Python 3.9 -# - Compatible with base_image="python:3.9" in KFP components -# - google-cloud-aiplatform 1.56.0 supports KFP 2.7.0 -# - google-cloud-pipeline-components 2.14.1 requires KFP 2.7.0 -# - BigQuery libraries (google-cloud-bigquery, db-dtypes, pyarrow) work together -# - Pytest versions compatible with Python 3.9 -# -# Installation: -# pip install -r src/requirements.txt -# -# Update all packages (use with caution): -# pip install --upgrade -r src/requirements.txt