diff --git a/oncoclear/.assets/cloud_mcp.png b/oncoclear/.assets/cloud_mcp.png new file mode 100644 index 00000000..920342c8 Binary files /dev/null and b/oncoclear/.assets/cloud_mcp.png differ diff --git a/oncoclear/.assets/deployment_architecture.png b/oncoclear/.assets/deployment_architecture.png new file mode 100644 index 00000000..01953300 Binary files /dev/null and b/oncoclear/.assets/deployment_architecture.png differ diff --git a/oncoclear/.assets/deployment_pipeline.png b/oncoclear/.assets/deployment_pipeline.png new file mode 100644 index 00000000..f8d519f6 Binary files /dev/null and b/oncoclear/.assets/deployment_pipeline.png differ diff --git a/oncoclear/.assets/fastapi_docs.png b/oncoclear/.assets/fastapi_docs.png new file mode 100644 index 00000000..1469eb41 Binary files /dev/null and b/oncoclear/.assets/fastapi_docs.png differ diff --git a/oncoclear/.assets/feature_engineering_pipeline.png b/oncoclear/.assets/feature_engineering_pipeline.png new file mode 100644 index 00000000..db301913 Binary files /dev/null and b/oncoclear/.assets/feature_engineering_pipeline.png differ diff --git a/oncoclear/.assets/inference_pipeline.png b/oncoclear/.assets/inference_pipeline.png new file mode 100644 index 00000000..358d5537 Binary files /dev/null and b/oncoclear/.assets/inference_pipeline.png differ diff --git a/oncoclear/.assets/pipeline_overview.png b/oncoclear/.assets/pipeline_overview.png new file mode 100644 index 00000000..609e97d2 Binary files /dev/null and b/oncoclear/.assets/pipeline_overview.png differ diff --git a/oncoclear/.assets/training_pipeline.png b/oncoclear/.assets/training_pipeline.png new file mode 100644 index 00000000..a2e6a7d0 Binary files /dev/null and b/oncoclear/.assets/training_pipeline.png differ diff --git a/oncoclear/.dockerignore b/oncoclear/.dockerignore new file mode 100644 index 00000000..455f4d7a --- /dev/null +++ b/oncoclear/.dockerignore @@ -0,0 +1,2 @@ +.venv* +.requirements* \ No newline at end of file diff --git a/oncoclear/README.md b/oncoclear/README.md new file mode 100644 index 00000000..b0090366 --- /dev/null +++ b/oncoclear/README.md @@ -0,0 +1,398 @@ +# 🔬 OncoClear + +A production-ready MLOps pipeline for accurate breast cancer classification using machine learning. + +## 🚀 Product Overview + +OncoClear is an end-to-end MLOps solution that transforms raw diagnostic measurements into reliable cancer classification predictions. Built with ZenML's robust framework, it delivers enterprise-grade machine learning pipelines that can be deployed in both development and production environments. + +
+
+ OncoClear Pipelines +
+
+ +### Key Features + +- **End-to-End MLOps Pipeline**: From feature engineering to model deployment +- **Automatic Model Versioning**: Track and compare different model versions +- **Model Promotion Workflow**: Automatically promote best-performing models to production +- **Batch Inference Pipeline**: Run predictions on new data with production models +- **Local API Deployment**: Serve models through a FastAPI endpoint + +## 💡 How It Works + +The OncoClear project consists of four integrated pipelines: + +1. **Feature Engineering Pipeline** + - Loads the Wisconsin Breast Cancer diagnostic dataset + - Performs data preprocessing and cleaning + - Splits data into training and testing sets + - Versions and tracks datasets in the ZenML artifact store + +
+
+ Feature Engineering Pipeline +
+
+ +2. **Training Pipeline** + - Trains multiple classification models (SGD and Random Forest) + - Evaluates models on test data using accuracy, precision, recall, and F1 score + - Registers models in the ZenML Model Control Plane + - Automatically promotes the best performer to production + +
+
+ Training Pipeline +
+
+ +3. **Inference Pipeline** + - Uses the production model to generate predictions on new data + - Leverages the same preprocessing pipeline used during training + - Tracks predictions as model artifacts + +
+
+ Inference Pipeline +
+
+ +4. **Deployment Pipeline** + - Deploys the production model as a FastAPI service + - Makes the model accessible via REST API + - Provides interactive Swagger documentation + +
+
+ Deployment Pipeline +
+
+ +## 🔧 Getting Started + +### Prerequisites + +- Python 3.9+ +- ZenML installed and configured + +### Installation + +1. Set up your environment: + +```bash +# Clone the repository +git clone https://github.com/zenml-io/zenml-projects.git +cd zenml-projects/oncoclear + +# Create and activate a Python virtual environment +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt + +# Install required integrations +zenml integration install sklearn pandas -y + +# Initialize ZenML (if not already initialized) +zenml init +``` + +### Running the Pipelines + +Execute the pipelines in sequence to train and deploy your breast cancer detection model: + +```bash +# Start the ZenServer to enable dashboard access (optional) +zenml login + +# Run the feature engineering pipeline +python run.py --feature-pipeline + +# Run the training pipeline +python run.py --training-pipeline + +# Run the training pipeline with versioned artifacts +python run.py --training-pipeline --train-dataset-version-name=1 --test-dataset-version-name=1 + +# Run the inference pipeline +python run.py --inference-pipeline + +# Deploy model locally with FastAPI +python run.py --deploy-locally --deployment-model-name=breast_cancer_classifier +``` + +After execution, access your model at http://localhost:8000 and view API documentation at http://localhost:8000/docs. + +
+
+ FastAPI Documentation +
+
+ +## 🚢 Deployment Options + +OncoClear provides flexible options for deploying your trained models in various environments. + +### Understanding Local Deployment + +When you run `python run.py --deploy-locally`, the deployment pipeline performs the following steps: + +1. **Build Docker Image**: Creates a containerized FastAPI application with the selected model + - The pipeline executes `build_deployment_image` step which: + - Copies necessary code from the project to the Docker build context + - Builds a Docker image using the Dockerfile in the `api/` directory + - Tags the image with the model name and stage + +2. **Run Container**: Deploys the container locally with appropriate environment variables + - The pipeline executes `run_deployment_container` step which: + - Configures the container with ZenML server details and API key + - Maps the specified ports for access (default: 8000) + - Sets environment variables for model access + - Outputs the service URL and API documentation URL + +The Dockerfile in the `api/` directory contains the instructions for building the deployment image. The core of the deployment is a FastAPI application in `api/main.py` that: + +- Loads the specified model from the ZenML Model Control Plane +- Loads the preprocessing pipeline to ensure consistent data transformations +- Exposes endpoints for predictions and model information +- Includes Swagger/OpenAPI documentation for easy testing + +
+
+ Deployment Architecture +
+
+ +### Configuring Artifact Store Dependencies + +By default, the API service is configured to work with AWS S3 artifact storage, which is reflected in `api/requirements.txt`: + +``` +# AWS dependencies for S3 artifact store +aws-profile-manager +boto3 +s3fs>=2022.3.0 +``` + +To adapt the deployment for different artifact stores: + +1. **For GCP Cloud Storage**: Replace the AWS dependencies with: + ``` + # GCP dependencies for GCS artifact store + google-cloud-storage>=2.9.0 + gcsfs>=2022.3.0 + ``` + +2. **For Azure Blob Storage**: Replace the AWS dependencies with: + ``` + # Azure dependencies for Blob Storage artifact store + azure-storage-blob>=12.17.0 + adlfs>=2021.10.0 + ``` + +3. **For local filesystem**: Remove the cloud storage dependencies. + +### Extending to Kubernetes Deployment + +To deploy OncoClear on Kubernetes: + +1. **Create Kubernetes Deployment Manifest**: Create a file named `k8s/deployment.yaml`: + ```yaml + apiVersion: apps/v1 + kind: Deployment + metadata: + name: oncoclear-api + spec: + replicas: 3 + selector: + matchLabels: + app: oncoclear-api + template: + metadata: + labels: + app: oncoclear-api + spec: + containers: + - name: oncoclear-api + image: ${YOUR_REGISTRY}/oncoclear:production + ports: + - containerPort: 8000 + env: + - name: ZENML_SERVER_URL + value: "${ZENML_SERVER_URL}" + - name: ZENML_API_KEY + valueFrom: + secretKeyRef: + name: zenml-api-secret + key: api_key + - name: MODEL_NAME + value: "breast_cancer_classifier" + - name: MODEL_STAGE + value: "production" + resources: + limits: + cpu: "1" + memory: "512Mi" + requests: + cpu: "0.5" + memory: "256Mi" + ``` + +2. **Create Kubernetes Service Manifest**: Create a file named `k8s/service.yaml`: + ```yaml + apiVersion: v1 + kind: Service + metadata: + name: oncoclear-api + spec: + selector: + app: oncoclear-api + ports: + - port: 80 + targetPort: 8000 + type: LoadBalancer + ``` + +3. **Create Kubernetes Secret for ZenML API Key**: + ```bash + kubectl create secret generic zenml-api-secret --from-literal=api_key=YOUR_ZENML_API_KEY + ``` + +4. **Deploy to Kubernetes**: + ```bash + kubectl apply -f k8s/deployment.yaml + kubectl apply -f k8s/service.yaml + ``` + +## 🌍 Running on Remote ZenML Stacks + +OncoClear pipelines can run on various cloud environments using ZenML stacks. Here's how to set up and run on remote infrastructure: + +### Setting Up Remote Stacks + +1. **Register Stack Components**: + ```bash + # Register a remote artifact store (e.g., S3) + zenml artifact-store register s3_store --flavor=s3 --path=s3://your-bucket-name + + # Register an orchestrator (e.g., Kubeflow) + zenml orchestrator register kubeflow_orchestrator --flavor=kubeflow \ + --kubernetes_context=your-context --namespace=zenml + + # Register a container registry + zenml container-registry register ecr_registry --flavor=aws --uri=your-ecr-uri + + # Register the stack with these components + zenml stack register remote_stack \ + -a s3_store \ + -o kubeflow_orchestrator \ + -c ecr_registry + + # Activate the stack + zenml stack set remote_stack + ``` + +2. **Cloud-Specific Configurations**: + + For **AWS**: + ```yaml + # configs/aws_training.yaml + settings: + docker: + required_integrations: + - s3 + - sklearn + - pandas + - aws + requirements: + - pyarrow + - boto3 + ``` + + For **GCP**: + ```yaml + # configs/gcp_training.yaml + settings: + docker: + required_integrations: + - gcp + - sklearn + - pandas + requirements: + - pyarrow + - google-cloud-storage + ``` + +3. **Run Pipelines on Remote Stack**: + ```bash + # Run with cloud-specific config + python run.py --training-pipeline --config gcp_training.yaml + ``` + +## 📊 Model Control Plane + +OncoClear leverages ZenML's Model Control Plane to: + +- Track all trained model versions +- Compare model performance metrics +- Promote models to production based on performance +- Link models to their training artifacts and code + +```shell +# List all models +zenml model list + +# Get details about a specific model version +zenml model version describe breast_cancer_classifier rf + +# Promote a model to production +zenml model version update breast_cancer_classifier rf --stage production +``` + +If you are a [ZenML Pro](https://zenml.io/pro) user, you can visualize the entire model lifecycle in the dashboard. + +
+
+ Model Control Plane +
+
+ +## 📁 Project Structure + +OncoClear follows a modular architecture: + +``` +├── api/ # API components for model serving +├── configs/ # Pipeline configuration profiles +│ ├── feature_engineering.yaml +│ ├── inference.yaml +│ ├── training_rf.yaml +│ └── training_sgd.yaml +├── pipelines/ # Core pipeline definitions +│ ├── feature_engineering.py +│ ├── inference.py +│ ├── training.py +│ └── local_deployment.py +├── steps/ # Individual pipeline components +│ ├── data_loader.py +│ ├── data_preprocessor.py +│ ├── model_trainer.py +│ └── ... +├── utils/ # Shared utility functions +├── run.py # Command-line interface +└── requirements.txt # Project dependencies +``` + +## 📚 Learn More + +For detailed documentation on building MLOps pipelines with ZenML, visit the [ZenML Documentation](https://docs.zenml.io/). In particular, the [Production Guide](https://docs.zenml.io/user-guide/production-guide/) goes into more detail about transitioning pipelines to production in the cloud. + +The best way to get a production ZenML instance up and running with all batteries included is with [ZenML Pro](https://zenml.io/pro). Check it out! + +Also, make sure to join our + Slack + Slack Community + to become part of the ZenML family! \ No newline at end of file diff --git a/oncoclear/api/.dockerignore b/oncoclear/api/.dockerignore new file mode 100644 index 00000000..e094f6cd --- /dev/null +++ b/oncoclear/api/.dockerignore @@ -0,0 +1,51 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Environment and editor files +.env +.venv +venv/ +ENV/ +.idea/ +.vscode/ +*.swp +*.swo +*.swn +*~ + +# Documentation or local development files +README.md +CHANGELOG.md +docs/ +tests/ + +# Git and GitHub +.git/ +.github/ +.gitignore + +# Docker +Dockerfile.* +docker-compose*.yml + +# Miscellaneous +.DS_Store \ No newline at end of file diff --git a/oncoclear/api/Dockerfile b/oncoclear/api/Dockerfile new file mode 100644 index 00000000..738d0d2f --- /dev/null +++ b/oncoclear/api/Dockerfile @@ -0,0 +1,60 @@ +# Use an official Python runtime as a parent image +FROM python:3.9-slim AS base + +# Set environment variables to prevent Python from writing pyc files and buffering stdout/stderr +ENV PYTHONDONTWRITEBYTECODE 1 +ENV PYTHONUNBUFFERED 1 + +# Set the working directory in the container +WORKDIR /app + +# Install system dependencies if needed (e.g., for certain ML libraries) +# RUN apt-get update && apt-get install -y --no-install-recommends some-package && rm -rf /var/lib/apt/lists/* + +# Create a non-root user and group and set up a proper home directory +RUN addgroup --system app && \ + adduser --system --ingroup app app && \ + mkdir -p /home/app && \ + chown -R app:app /home/app +ENV HOME=/home/app + +# --- Build Stage --- (Installs dependencies) +FROM base AS builder +COPY requirements.txt . +# Consider using a virtual environment for better isolation +# RUN python -m venv /opt/venv +# ENV PATH="/opt/venv/bin:$PATH" +RUN pip install --no-cache-dir -r requirements.txt + +# --- Runtime Stage --- (Copies app code and sets up execution) +FROM base AS runtime +# Copy installed dependencies from builder stage +# If using venv: +# COPY --from=builder /opt/venv /opt/venv +# ENV PATH="/opt/venv/bin:$PATH" +# If not using venv (copying globally installed packages): +COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Copy the application code (main.py, etc.) +# The utils directory should already be in the build context from the deployment step +COPY . . + +# Set ZENML_CONFIG_DIR to the home directory +ENV ZENML_CONFIG_DIR=/home/app/.zenml +ENV PYTHONPATH=/app:${PYTHONPATH} + +# Ensure directory permissions +RUN mkdir -p $ZENML_CONFIG_DIR && chown -R app:app $ZENML_CONFIG_DIR + +# Declare ARG for default port, can be overridden at build time or runtime via ENV +ARG PORT=8000 +ENV PORT=${PORT} +EXPOSE ${PORT} + +# Switch to the non-root user for security +USER app + +# Command to run the application using Uvicorn +# Critical ENV vars (ZENML_*, MODEL_*) must be passed via `docker run -e` +CMD uvicorn main:app --host 0.0.0.0 --port $PORT \ No newline at end of file diff --git a/oncoclear/api/main.py b/oncoclear/api/main.py new file mode 100644 index 00000000..564eed40 --- /dev/null +++ b/oncoclear/api/main.py @@ -0,0 +1,302 @@ +import os +import logging +from contextlib import asynccontextmanager +from typing import List, Any, Dict, Union +import json +import numpy as np +import datetime + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from zenml.client import Client +from zenml.logger import get_logger as zenml_get_logger +from zenml import log_metadata + +# Configure logging +# Use ZenML logger for consistency if desired, or standard Python logging +logger = zenml_get_logger(__name__) +# Basic logging configuration (adjust as needed) +# logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper()) +# logger = logging.getLogger(__name__) + +# --- Configuration --- +MODEL_NAME = os.getenv("MODEL_NAME") +MODEL_STAGE = os.getenv("MODEL_STAGE", "production") # Default to production +MODEL_ARTIFACT_NAME = os.getenv("MODEL_ARTIFACT_NAME", "sklearn_classifier") # Updated default artifact name +PREPROCESS_PIPELINE_NAME = os.getenv("PREPROCESS_PIPELINE_NAME", "preprocess_pipeline") # Added preprocessing pipeline artifact name +# ZENML_STORE_URL and ZENML_STORE_API_KEY are automatically picked up by Client if set + +if not MODEL_NAME: + logger.error("Environment variable MODEL_NAME is not set.") + # Or raise an exception to prevent startup + raise ValueError("MODEL_NAME must be set via environment variable") + +# --- Global Variables --- +# This dictionary will hold application state, like the loaded model +app_state = {"model": None, "preprocess_pipeline": None} + + +# --- Helper function to convert numpy arrays to Python native types --- +def convert_to_serializable(obj): + """Convert numpy arrays and other non-serializable types to Python native types.""" + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.bool_): + return bool(obj) + elif hasattr(obj, "tolist") and callable(getattr(obj, "tolist")): + return obj.tolist() + elif isinstance(obj, dict): + return {k: convert_to_serializable(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [convert_to_serializable(item) for item in obj] + return obj + + +# --- Pydantic Models --- +class FeaturesPayload(BaseModel): + # Expecting a single instance/row for prediction + # Adjust structure based on your model's expected input (e.g., dict, list of lists) + features: List[Any] # Using Any for flexibility, refine if possible + + # Example for sklearn models often expecting a list of lists (even for one sample) + # features: List[List[float]] + # Example for named features + # feature_dict: Dict[str, float] + +class PredictionResponse(BaseModel): + prediction: Union[int, float, str, bool, List[Any], Dict[str, Any]] + +class DebugResponse(BaseModel): + zenml_url: str + api_key_provided: bool + connection_test: str + error_details: str = None + + +# --- FastAPI Lifespan Event --- +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup logic + logger.info("FastAPI application starting up...") + + # Debug: Log environment variables (masking sensitive ones) + zenml_url = os.getenv("ZENML_STORE_URL", "Not set") + zenml_key = os.getenv("ZENML_STORE_API_KEY", "Not set") + if zenml_key != "Not set": + masked_key = zenml_key[:15] + "..." + zenml_key[-10:] if len(zenml_key) > 30 else "***masked***" + else: + masked_key = "Not set" + + logger.info(f"Environment variables:") + logger.info(f"MODEL_NAME: {MODEL_NAME}") + logger.info(f"MODEL_STAGE: {MODEL_STAGE}") + logger.info(f"MODEL_ARTIFACT_NAME: {MODEL_ARTIFACT_NAME}") + logger.info(f"ZENML_STORE_URL: {zenml_url}") + logger.info(f"ZENML_STORE_API_KEY: {masked_key}") + logger.info(f"ZENML_CONFIG_DIR: {os.getenv('ZENML_CONFIG_DIR', 'Not set')}") + + try: + logger.info(f"Attempting to load model '{MODEL_NAME}' version '{MODEL_STAGE}'.") + # Ensure environment variables for ZenML connection are set externally + client = Client() # Client automatically uses ENV vars if set + model_version = client.get_model_version(MODEL_NAME, MODEL_STAGE) + + # Load the model artifact using the artifact name from env vars + artifact_name = MODEL_ARTIFACT_NAME + logger.info(f"Found model version {model_version.name}. Loading model artifact '{artifact_name}'...") + app_state["model"] = model_version.get_artifact(artifact_name).load() + logger.info(f"Successfully loaded model artifact '{artifact_name}' from {MODEL_NAME}:{model_version.name}") + + # Also load the preprocessing pipeline artifact if available + preprocess_name = PREPROCESS_PIPELINE_NAME + try: + logger.info(f"Attempting to load preprocessing pipeline artifact '{preprocess_name}'...") + app_state["preprocess_pipeline"] = model_version.get_artifact(preprocess_name).load() + logger.info(f"Successfully loaded preprocessing pipeline artifact.") + except Exception as e: + logger.warning(f"Failed to load preprocessing pipeline: {e}. Predictions may require pre-processed input.") + app_state["preprocess_pipeline"] = None + + except Exception as e: + logger.error(f"Failed to load model during startup: {e}", exc_info=True) + # Decide if the app should fail to start or continue without a model + # Option 1: Raise exception to stop startup + raise RuntimeError(f"Model loading failed: {e}") + # Option 2: Log error and continue (endpoints needing model will fail) + # app_state["model"] = None + + yield # Application runs here + + # Shutdown logic (if any) + logger.info("FastAPI application shutting down...") + app_state["model"] = None + app_state["preprocess_pipeline"] = None + + +# --- FastAPI App --- +app = FastAPI( + title="ML Model Deployment API", + description=f"API for serving the '{MODEL_NAME}' model.", + version="0.1.0", + lifespan=lifespan # Use the lifespan context manager +) + + +# --- Endpoints --- +@app.get("/health") +async def health_check(): + """ + Simple health check endpoint. + """ + # Optionally add checks for model readiness + model_ready = app_state.get("model") is not None + if not model_ready: + # Service unavailable if model didn't load + raise HTTPException(status_code=503, detail="Model not loaded") + return {"status": "ok", "model_ready": model_ready} + + +@app.post("/predict", response_model=PredictionResponse) +async def predict(payload: FeaturesPayload): + """ + Make predictions using the loaded model. + If a preprocessing pipeline is available, it will be applied to the input data. + """ + model = app_state.get("model") + preprocess_pipeline = app_state.get("preprocess_pipeline") + + if model is None: + logger.error("Prediction endpoint called but model is not loaded.") + raise HTTPException(status_code=503, detail="Model not loaded or failed to load.") + + try: + logger.debug(f"Received prediction request: {payload}") + + # Capture request timestamp + timestamp = datetime.datetime.now().isoformat() + + # Convert input to format expected by model - as a 2D array + # This handles the input format for scikit-learn models + data_to_predict = [payload.features] + logger.debug(f"Input data before preprocessing: {data_to_predict}") + + # Apply preprocessing if available + if preprocess_pipeline is not None: + logger.debug("Applying preprocessing pipeline to input data") + try: + # Now the preprocessing pipeline should be properly loaded with transform() method + data_to_predict = preprocess_pipeline.transform(data_to_predict) + logger.debug(f"Data after preprocessing: {data_to_predict}") + except Exception as e: + logger.error(f"Error applying preprocessing: {e}") + # Fall back to using the raw input if preprocessing fails + logger.warning("Falling back to raw input without preprocessing") + + # Make prediction with the model + prediction_result = model.predict(data_to_predict) + + # Extract the first prediction if predict returns an array/list + prediction_value = prediction_result[0] if isinstance(prediction_result, (list, tuple, np.ndarray)) and len(prediction_result) > 0 else prediction_result + + # Convert numpy arrays and other non-serializable types to Python native types + serializable_prediction = convert_to_serializable(prediction_value) + + # Log prediction metadata to ZenML + try: + serializable_input = convert_to_serializable(payload.features) + + # Create prediction metadata + prediction_metadata = { + "prediction_info": { + "timestamp": timestamp, + "input": serializable_input, + "prediction": serializable_prediction, + "model_name": MODEL_NAME, + "model_stage": MODEL_STAGE, + } + } + + # Log metadata to the model version + logger.debug(f"Logging prediction metadata to model {MODEL_NAME}:{MODEL_STAGE}") + log_metadata( + metadata=prediction_metadata, + model_name=MODEL_NAME, + model_version=MODEL_STAGE, + ) + logger.debug("Successfully logged prediction metadata") + except Exception as log_error: + # Don't fail the API call if metadata logging fails + logger.warning(f"Failed to log prediction metadata: {log_error}") + + logger.debug(f"Prediction result (serializable): {serializable_prediction}") + return PredictionResponse(prediction=serializable_prediction) + + except Exception as e: + logger.error(f"Prediction failed: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}") + + +# Optional: Add a root endpoint for basic info +@app.get("/") +async def read_root(): + return {"message": f"Welcome to the prediction API for model '{MODEL_NAME}'"} + + +@app.get("/debug", response_model=DebugResponse) +async def debug_connection(): + """ + Debug endpoint to check ZenML connection. + """ + zenml_url = os.getenv("ZENML_STORE_URL", "Not set") + api_key = os.getenv("ZENML_STORE_API_KEY", "Not set") + api_key_provided = api_key != "Not set" + + result = { + "zenml_url": zenml_url, + "api_key_provided": api_key_provided, + "connection_test": "Not attempted", + "error_details": None + } + + if api_key_provided and zenml_url != "Not set": + try: + # Try to initialize client and make a simple request + from zenml.client import Client + + # This creates a Client which should attempt to connect to the server + client = Client() + + # Try to get the current user, which requires authentication + try: + user = client.zen_store.get_user() + result["connection_test"] = f"Success - authenticated as {user.name}" + except Exception as e: + result["connection_test"] = "Failed - Authentication error" + result["error_details"] = str(e) + + except Exception as e: + result["connection_test"] = "Failed - Client initialization error" + result["error_details"] = str(e) + + return result + + +# --- Main Execution (for local testing, typically run by Uvicorn) --- +if __name__ == "__main__": + # This block is mainly for local development/debugging. + # In production/deployment, Uvicorn runs the app instance directly. + # Set environment variables locally (e.g., using a .env file and python-dotenv) + # before running this script for testing. + logger.info("Running FastAPI app locally with Uvicorn...") + import uvicorn + uvicorn.run( + "main:app", + host="0.0.0.0", + port=int(os.getenv("PORT", 8000)), + reload=True, # Enable reload for development convenience + log_level=os.getenv("LOG_LEVEL", "info").lower() + ) \ No newline at end of file diff --git a/oncoclear/api/requirements.txt b/oncoclear/api/requirements.txt new file mode 100644 index 00000000..358183fc --- /dev/null +++ b/oncoclear/api/requirements.txt @@ -0,0 +1,12 @@ +fastapi +uvicorn[standard] +scikit-learn # Replace with specific version later if needed +zenml # Replace with specific version later if needed +python-dotenv +numpy +pandas + +# AWS dependencies for S3 artifact store +aws-profile-manager +boto3 +s3fs>=2022.3.0 \ No newline at end of file diff --git a/oncoclear/api/utils/__init__.py b/oncoclear/api/utils/__init__.py new file mode 100644 index 00000000..29e36c52 --- /dev/null +++ b/oncoclear/api/utils/__init__.py @@ -0,0 +1,16 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/oncoclear/api/utils/__init__.py:Zone.Identifier b/oncoclear/api/utils/__init__.py:Zone.Identifier new file mode 100644 index 00000000..e69de29b diff --git a/oncoclear/api/utils/preprocess.py b/oncoclear/api/utils/preprocess.py new file mode 100644 index 00000000..efd800db --- /dev/null +++ b/oncoclear/api/utils/preprocess.py @@ -0,0 +1,56 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Union + +import pandas as pd + + +class NADropper: + """Support class to drop NA values in sklearn Pipeline.""" + + def fit(self, *args, **kwargs): + return self + + def transform(self, X: Union[pd.DataFrame, pd.Series]): + return X.dropna() + + +class ColumnsDropper: + """Support class to drop specific columns in sklearn Pipeline.""" + + def __init__(self, columns): + self.columns = columns + + def fit(self, *args, **kwargs): + return self + + def transform(self, X: Union[pd.DataFrame, pd.Series]): + return X.drop(columns=self.columns) + + +class DataFrameCaster: + """Support class to cast type back to pd.DataFrame in sklearn Pipeline.""" + + def __init__(self, columns): + self.columns = columns + + def fit(self, *args, **kwargs): + return self + + def transform(self, X): + return pd.DataFrame(X, columns=self.columns) diff --git a/oncoclear/api/utils/preprocess.py:Zone.Identifier b/oncoclear/api/utils/preprocess.py:Zone.Identifier new file mode 100644 index 00000000..e69de29b diff --git a/oncoclear/configs/feature_engineering.yaml b/oncoclear/configs/feature_engineering.yaml new file mode 100644 index 00000000..055bb6fa --- /dev/null +++ b/oncoclear/configs/feature_engineering.yaml @@ -0,0 +1,11 @@ +# environment configuration +settings: + docker: + required_integrations: + - sklearn + - pandas + requirements: + - pyarrow + +# pipeline configuration +test_size: 0.35 \ No newline at end of file diff --git a/oncoclear/configs/inference.yaml b/oncoclear/configs/inference.yaml new file mode 100644 index 00000000..8f73d762 --- /dev/null +++ b/oncoclear/configs/inference.yaml @@ -0,0 +1,16 @@ +# environment configuration +settings: + docker: + required_integrations: + - sklearn + - pandas + requirements: + - pyarrow + +# configuration of the Model Control Plane +model: + name: "breast_cancer_classifier" + version: "production" + license: Apache 2.0 + description: A breast cancer classifier + tags: ["breast_cancer", "classifier"] \ No newline at end of file diff --git a/oncoclear/configs/training_rf.yaml b/oncoclear/configs/training_rf.yaml new file mode 100644 index 00000000..70fa6413 --- /dev/null +++ b/oncoclear/configs/training_rf.yaml @@ -0,0 +1,20 @@ +# environment configuration +settings: + docker: + required_integrations: + - sklearn + - pandas + requirements: + - pyarrow + +# configuration of the Model Control Plane +model: + name: breast_cancer_classifier + version: rf + license: Apache 2.0 + description: A breast cancer classifier + tags: ["breast_cancer", "classifier"] + +# Configure the pipeline +parameters: + model_type: "rf" # Choose between rf/sgd diff --git a/oncoclear/configs/training_sgd.yaml b/oncoclear/configs/training_sgd.yaml new file mode 100644 index 00000000..386b53b8 --- /dev/null +++ b/oncoclear/configs/training_sgd.yaml @@ -0,0 +1,20 @@ +# environment configuration +settings: + docker: + required_integrations: + - sklearn + - pandas + requirements: + - pyarrow + +# configuration of the Model Control Plane +model: + name: breast_cancer_classifier + version: sgd + license: Apache 2.0 + description: A breast cancer classifier + tags: ["breast_cancer", "classifier"] + +# Configure the pipeline +parameters: + model_type: "sgd" # Choose between rf/sgd \ No newline at end of file diff --git a/oncoclear/pipelines/__init__.py b/oncoclear/pipelines/__init__.py new file mode 100644 index 00000000..6a2d17b1 --- /dev/null +++ b/oncoclear/pipelines/__init__.py @@ -0,0 +1,21 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .feature_engineering import feature_engineering +from .inference import inference +from .training import training +from .deployment import local_deployment diff --git a/oncoclear/pipelines/deployment.py b/oncoclear/pipelines/deployment.py new file mode 100644 index 00000000..bb18f3a4 --- /dev/null +++ b/oncoclear/pipelines/deployment.py @@ -0,0 +1,92 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from zenml import pipeline +from zenml.logger import get_logger +from zenml.client import Client + +from steps.deployment import ( + build_deployment_image, + run_deployment_container, +) + +logger = get_logger(__name__) + + +@pipeline(enable_cache=False) # Disable caching for the entire pipeline +def local_deployment( + model_name: str, + zenml_server_url: str = None, + zenml_api_key: str = None, + model_stage: str = "production", + model_artifact_name: str = "sklearn_classifier", + preprocess_pipeline_name: str = "preprocess_pipeline", + host_port: int = 8000, + container_port: int = 8000, +): + """ + Model deployment pipeline. + + This pipeline builds a Docker image for a FastAPI service that serves the model, + and runs the container locally. + + Args: + model_name: Name of the ZenML model to deploy. + zenml_server_url: URL of the ZenML server. If None, uses the current client's server URL. + zenml_api_key: API key for the ZenML server. Required for the container to authenticate. + model_stage: Stage of the model to deploy (default: "production"). + model_artifact_name: Name of the model artifact to load (default: "sklearn_classifier"). + preprocess_pipeline_name: Name of the preprocessing pipeline artifact (default: "preprocess_pipeline"). + host_port: Port to expose on the host (default: 8000). + container_port: Port the container will listen on (default: 8000). + """ + # If not provided, get the server URL from the current client + if zenml_server_url is None: + client = Client() + zenml_server_url = client.zen_store.url + logger.info(f"Using current ZenML server URL: {zenml_server_url}") + + # Validate API key is provided + if zenml_api_key is None: + logger.warning( + "No API key provided. The deployment container will not be able to " + "authenticate with the ZenML server unless environment variables " + "are properly set." + ) + + # Build the deployment image + image_name = build_deployment_image( + model_name=model_name, + model_stage=model_stage, + ) + + # Run the deployment container and log metadata + container_id, service_url = run_deployment_container( + zenml_server_url=zenml_server_url, + zenml_api_key=zenml_api_key, + model_name=model_name, + model_stage=model_stage, + image_name=image_name, + model_artifact_name=model_artifact_name, + preprocess_pipeline_name=preprocess_pipeline_name, + host_port=host_port, + container_port=container_port, + ) + + logger.info(f"Model '{model_name}:{model_stage}' deployed successfully!") + logger.info(f"Service URL: {service_url}") + logger.info(f"API Documentation: {service_url}/docs") \ No newline at end of file diff --git a/oncoclear/pipelines/feature_engineering.py b/oncoclear/pipelines/feature_engineering.py new file mode 100644 index 00000000..475d2fca --- /dev/null +++ b/oncoclear/pipelines/feature_engineering.py @@ -0,0 +1,74 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import List, Optional + +from zenml import pipeline +from zenml.logger import get_logger + +from steps import ( + data_loader, + data_preprocessor, + data_splitter, +) + +logger = get_logger(__name__) + + +@pipeline +def feature_engineering( + test_size: float = 0.2, + drop_na: Optional[bool] = None, + normalize: Optional[bool] = None, + drop_columns: Optional[List[str]] = None, + target: Optional[str] = "target", + random_state: int = 17, +): + """ + Feature engineering pipeline. + + This is a pipeline that loads the data, processes it and splits + it into train and test sets. + + Args: + test_size: Size of holdout set for training 0.0..1.0 + drop_na: If `True` NA values will be removed from dataset + normalize: If `True` dataset will be normalized with MinMaxScaler + drop_columns: List of columns to drop from dataset + target: Name of target column in dataset + random_state: Random state to configure the data loader + + Returns: + The processed datasets (dataset_trn, dataset_tst). + """ + # Link all the steps together by calling them and passing the output + # of one step as the input of the next step. + raw_data = data_loader(random_state=random_state, target=target) + dataset_trn, dataset_tst = data_splitter( + dataset=raw_data, + test_size=test_size, + ) + dataset_trn, dataset_tst, _ = data_preprocessor( + dataset_trn=dataset_trn, + dataset_tst=dataset_tst, + drop_na=drop_na, + normalize=normalize, + drop_columns=drop_columns, + target=target, + random_state=random_state, + ) + return dataset_trn, dataset_tst diff --git a/oncoclear/pipelines/inference.py b/oncoclear/pipelines/inference.py new file mode 100644 index 00000000..e0faa764 --- /dev/null +++ b/oncoclear/pipelines/inference.py @@ -0,0 +1,62 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from zenml import get_pipeline_context, pipeline +from zenml.logger import get_logger + +from steps import ( + data_loader, + inference_predict, + inference_preprocessor, +) + +logger = get_logger(__name__) + + +@pipeline +def inference(random_state: int, target: str): + """ + Model inference pipeline. + + This is a pipeline that loads the inference data, processes it with + the same preprocessing pipeline used in training, and runs inference + with the trained model. + + Args: + random_state: Random state for reproducibility. + target: Name of target column in dataset. + """ + # Get the production model artifact + model = get_pipeline_context().model.get_artifact("sklearn_classifier") + + # Get the preprocess pipeline artifact associated with this version + preprocess_pipeline = get_pipeline_context().model.get_artifact( + "preprocess_pipeline" + ) + + # Link all the steps together by calling them and passing the output + # of one step as the input of the next step. + df_inference = data_loader(random_state=random_state, is_inference=True) + df_inference = inference_preprocessor( + dataset_inf=df_inference, + preprocess_pipeline=preprocess_pipeline, + target=target, + ) + inference_predict( + model=model, + dataset_inf=df_inference, + ) diff --git a/oncoclear/pipelines/training.py b/oncoclear/pipelines/training.py new file mode 100644 index 00000000..5cf49f93 --- /dev/null +++ b/oncoclear/pipelines/training.py @@ -0,0 +1,75 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional +from uuid import UUID + +from zenml import pipeline +from zenml.client import Client +from zenml.logger import get_logger + +from pipelines import ( + feature_engineering, +) +from steps import model_evaluator, model_promoter, model_trainer + +logger = get_logger(__name__) + + +@pipeline +def training( + train_dataset_id: Optional[UUID] = None, + test_dataset_id: Optional[UUID] = None, + target: Optional[str] = "target", + model_type: Optional[str] = "sgd", +): + """ + Model training pipeline. + + This is a pipeline that loads the data from a preprocessing pipeline, + trains a model on it and evaluates the model. If it is the first model + to be trained, it will be promoted to production. If not, it will be + promoted only if it has a higher accuracy than the current production + model version. + + Args: + train_dataset_id: ID of the train dataset produced by feature engineering. + test_dataset_id: ID of the test dataset produced by feature engineering. + target: Name of target column in dataset. + model_type: The type of model to train. + """ + # Link all the steps together by calling them and passing the output + # of one step as the input of the next step. + + # Execute Feature Engineering Pipeline + if train_dataset_id is None or test_dataset_id is None: + dataset_trn, dataset_tst = feature_engineering() + else: + client = Client() + dataset_trn = client.get_artifact_version(name_id_or_prefix=train_dataset_id) + dataset_tst = client.get_artifact_version(name_id_or_prefix=test_dataset_id) + + model = model_trainer(dataset_trn=dataset_trn, target=target, model_type=model_type) + + acc = model_evaluator( + model=model, + dataset_trn=dataset_trn, + dataset_tst=dataset_tst, + target=target, + ) + + model_promoter(accuracy=acc) diff --git a/oncoclear/requirements.txt b/oncoclear/requirements.txt new file mode 100644 index 00000000..1e0a8ac5 --- /dev/null +++ b/oncoclear/requirements.txt @@ -0,0 +1,5 @@ +zenml[server]>=0.50.0 +notebook +scikit-learn +pyarrow +pandas diff --git a/oncoclear/run.py b/oncoclear/run.py new file mode 100644 index 00000000..66565dc8 --- /dev/null +++ b/oncoclear/run.py @@ -0,0 +1,342 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from typing import Optional + +import click +import yaml +from zenml.client import Client +from zenml.logger import get_logger + +from pipelines import ( + feature_engineering, + inference, + training, + local_deployment, +) + +logger = get_logger(__name__) + + +@click.command( + help=""" +ZenML Starter project. + +Run the ZenML starter project with basic options. + +Examples: + + \b + # Run the feature engineering pipeline + python run.py --feature-pipeline + + \b + # Run the training pipeline + python run.py --training-pipeline + + \b + # Run the training pipeline with versioned artifacts + python run.py --training-pipeline --train-dataset-version-name=1 --test-dataset-version-name=1 + + \b + # Run the inference pipeline + python run.py --inference-pipeline + + \b + # Deploy a model locally with FastAPI + python run.py --deploy-locally --deployment-model-name=my_model + +""" +) +@click.option( + "--train-dataset-name", + default="dataset_trn", + type=click.STRING, + help="The name of the train dataset produced by feature engineering.", +) +@click.option( + "--train-dataset-version-name", + default=None, + type=click.STRING, + help="Version of the train dataset produced by feature engineering. " + "If not specified, a new version will be created.", +) +@click.option( + "--test-dataset-name", + default="dataset_tst", + type=click.STRING, + help="The name of the test dataset produced by feature engineering.", +) +@click.option( + "--test-dataset-version-name", + default=None, + type=click.STRING, + help="Version of the test dataset produced by feature engineering. " + "If not specified, a new version will be created.", +) +@click.option( + "--feature-pipeline", + is_flag=True, + default=False, + help="Whether to run the pipeline that creates the dataset.", +) +@click.option( + "--training-pipeline", + is_flag=True, + default=False, + help="Whether to run the pipeline that trains the model.", +) +@click.option( + "--inference-pipeline", + is_flag=True, + default=False, + help="Whether to run the pipeline that performs inference.", +) +@click.option( + "--no-cache", + is_flag=True, + default=False, + help="Disable caching for the pipeline run.", +) +@click.option( + "--deploy-locally", + is_flag=True, + default=False, + help="Whether to run the pipeline that deploys a model locally with FastAPI.", +) +@click.option( + "--deployment-model-name", + default=None, + type=click.STRING, + help="Name of the model to deploy locally. Required if --deploy-locally is set.", +) +@click.option( + "--deployment-model-stage", + default="production", + type=click.STRING, + help="Stage of the model to deploy (default: 'production').", +) +@click.option( + "--deployment-model-artifact-name", + default="sklearn_classifier", + type=click.STRING, + help="Name of the model artifact to load (default: 'sklearn_classifier').", +) +@click.option( + "--deployment-preprocess-pipeline-name", + default="preprocess_pipeline", + type=click.STRING, + help="Name of the preprocessing pipeline artifact to load (default: 'preprocess_pipeline').", +) +@click.option( + "--deployment-port", + default=8000, + type=click.INT, + help="Port to expose the deployment server on (default: 8000).", +) +@click.option( + "--deployment-zenml-server", + default=None, + type=click.STRING, + help="URL of the ZenML server to use for deployment. If not provided, uses the current client's server.", +) +@click.option( + "--deployment-zenml-api-key", + default=None, + type=click.STRING, + help="API key for the ZenML server. Required for the container to authenticate if not set in environment.", +) +def main( + train_dataset_name: str = "dataset_trn", + train_dataset_version_name: Optional[str] = None, + test_dataset_name: str = "dataset_tst", + test_dataset_version_name: Optional[str] = None, + feature_pipeline: bool = False, + training_pipeline: bool = False, + inference_pipeline: bool = False, + no_cache: bool = False, + deploy_locally: bool = False, + deployment_model_name: Optional[str] = None, + deployment_model_stage: str = "production", + deployment_model_artifact_name: str = "sklearn_classifier", + deployment_preprocess_pipeline_name: str = "preprocess_pipeline", + deployment_port: int = 8000, + deployment_zenml_server: Optional[str] = None, + deployment_zenml_api_key: Optional[str] = None, +): + """Main entry point for the pipeline execution. + + This entrypoint is where everything comes together: + + * configuring pipeline with the required parameters + (some of which may come from command line arguments, but most + of which comes from the YAML config files) + * launching the pipeline + + Args: + train_dataset_name: The name of the train dataset produced by feature engineering. + train_dataset_version_name: Version of the train dataset produced by feature engineering. + If not specified, a new version will be created. + test_dataset_name: The name of the test dataset produced by feature engineering. + test_dataset_version_name: Version of the test dataset produced by feature engineering. + If not specified, a new version will be created. + feature_pipeline: Whether to run the pipeline that creates the dataset. + training_pipeline: Whether to run the pipeline that trains the model. + inference_pipeline: Whether to run the pipeline that performs inference. + no_cache: If `True` cache will be disabled. + deploy_locally: Whether to run the pipeline that deploys a model locally with FastAPI. + deployment_model_name: Name of the model to deploy locally. + deployment_model_stage: Stage of the model to deploy. + deployment_model_artifact_name: Name of the model artifact to load. + deployment_preprocess_pipeline_name: Name of the preprocessing pipeline artifact to load. + deployment_port: Port to expose the deployment server on. + deployment_zenml_server: URL of the ZenML server for deployment. + deployment_zenml_api_key: API key for the ZenML server. + """ + client = Client() + + config_folder = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "configs", + ) + + # Execute Feature Engineering Pipeline + if feature_pipeline: + pipeline_args = {} + if no_cache: + pipeline_args["enable_cache"] = False + pipeline_args["config_path"] = os.path.join( + config_folder, "feature_engineering.yaml" + ) + run_args_feature = {} + feature_engineering.with_options(**pipeline_args)(**run_args_feature) + logger.info("Feature Engineering pipeline finished successfully!\n") + + train_dataset_artifact = client.get_artifact_version(train_dataset_name) + test_dataset_artifact = client.get_artifact_version(test_dataset_name) + logger.info( + "The latest feature engineering pipeline produced the following " + f"artifacts: \n\n1. Train Dataset - Name: {train_dataset_name}, " + f"Version Name: {train_dataset_artifact.version} \n2. Test Dataset: " + f"Name: {test_dataset_name}, Version Name: {test_dataset_artifact.version}" + ) + + # Execute Training Pipeline + if training_pipeline: + run_args_train = {} + + # If train_dataset_version_name is specified, use versioned artifacts + if train_dataset_version_name or test_dataset_version_name: + # However, both train and test dataset versions must be specified + assert ( + train_dataset_version_name is not None + and test_dataset_version_name is not None + ) + train_dataset_artifact_version = client.get_artifact_version( + train_dataset_name, train_dataset_version_name + ) + # If train dataset is specified, test dataset must be specified + test_dataset_artifact_version = client.get_artifact_version( + test_dataset_name, test_dataset_version_name + ) + # Use versioned artifacts + run_args_train["train_dataset_id"] = train_dataset_artifact_version.id + run_args_train["test_dataset_id"] = test_dataset_artifact_version.id + + # Run the SGD pipeline + pipeline_args = {} + if no_cache: + pipeline_args["enable_cache"] = False + pipeline_args["config_path"] = os.path.join(config_folder, "training_sgd.yaml") + training.with_options(**pipeline_args)(**run_args_train) + logger.info("Training pipeline with SGD finished successfully!\n\n") + + # Run the RF pipeline + pipeline_args = {} + if no_cache: + pipeline_args["enable_cache"] = False + pipeline_args["config_path"] = os.path.join(config_folder, "training_rf.yaml") + training.with_options(**pipeline_args)(**run_args_train) + logger.info("Training pipeline with RF finished successfully!\n\n") + + if inference_pipeline: + run_args_inference = {} + pipeline_args = {"enable_cache": False} + pipeline_args["config_path"] = os.path.join(config_folder, "inference.yaml") + + # Configure the pipeline + inference_configured = inference.with_options(**pipeline_args) + + # Fetch the production model + with open(pipeline_args["config_path"], "r") as f: + config = yaml.load(f, Loader=yaml.SafeLoader) + zenml_model = client.get_model_version( + config["model"]["name"], config["model"]["version"] + ) + preprocess_pipeline_artifact = zenml_model.get_artifact("preprocess_pipeline") + + # Use the metadata of feature engineering pipeline artifact + # to get the random state and target column + random_state = preprocess_pipeline_artifact.run_metadata["random_state"] + target = preprocess_pipeline_artifact.run_metadata["target"] + run_args_inference["random_state"] = random_state + run_args_inference["target"] = target + + # Run the pipeline + inference_configured(**run_args_inference) + logger.info("Inference pipeline finished successfully!") + + if deploy_locally: + # Ensure model name is provided + if not deployment_model_name: + raise ValueError( + "Model name must be provided for local deployment. " + "Use --deployment-model-name to specify the model name." + ) + + pipeline_args = {} + if no_cache: + pipeline_args["enable_cache"] = False + + # ZenML requires a config, but we don't need a specific one for deployment + # So we'll just use a default config path, or later you can create a deployment.yaml + # pipeline_args["config_path"] = os.path.join(config_folder, "deployment.yaml") + + run_args_deployment = { + "model_name": deployment_model_name, + "model_stage": deployment_model_stage, + "model_artifact_name": deployment_model_artifact_name, + "preprocess_pipeline_name": deployment_preprocess_pipeline_name, + "host_port": deployment_port, + "zenml_server_url": deployment_zenml_server, + "zenml_api_key": deployment_zenml_api_key, + } + + # Run the deployment pipeline + local_deployment.with_options(**pipeline_args)(**run_args_deployment) + + logger.info( + f"Local deployment pipeline for model '{deployment_model_name}:{deployment_model_stage}' " + f"finished successfully!\n\n" + f"The model is now accessible via FastAPI at http://localhost:{deployment_port}\n" + f"API documentation is available at http://localhost:{deployment_port}/docs" + ) + + +if __name__ == "__main__": + main() diff --git a/oncoclear/steps/__init__.py b/oncoclear/steps/__init__.py new file mode 100644 index 00000000..edf14e2f --- /dev/null +++ b/oncoclear/steps/__init__.py @@ -0,0 +1,45 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from .data_loader import ( + data_loader, +) +from .data_preprocessor import ( + data_preprocessor, +) +from .data_splitter import ( + data_splitter, +) +from .inference_predict import ( + inference_predict, +) +from .inference_preprocessor import ( + inference_preprocessor, +) +from .model_evaluator import ( + model_evaluator, +) +from .model_promoter import ( + model_promoter, +) +from .model_trainer import ( + model_trainer, +) +from .deployment import ( + run_deployment_container, + build_deployment_image, +) \ No newline at end of file diff --git a/oncoclear/steps/data_loader.py b/oncoclear/steps/data_loader.py new file mode 100644 index 00000000..7418dadc --- /dev/null +++ b/oncoclear/steps/data_loader.py @@ -0,0 +1,62 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd +from sklearn.datasets import load_breast_cancer +from typing_extensions import Annotated +from zenml import step +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def data_loader( + random_state: int, is_inference: bool = False, target: str = "target" +) -> Annotated[pd.DataFrame, "dataset"]: + """Dataset reader step. + + This is an example of a dataset reader step that load Breast Cancer dataset. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured with number of rows and logic + to drop target column or not. See the documentation for more information: + + https://docs.zenml.io/how-to/build-pipelines/use-pipeline-step-parameters + + Args: + random_state: Random state for sampling + is_inference: If `True` subset will be returned and target column + will be removed from dataset. + target: Name of target columns in dataset. + + Returns: + The dataset artifact as Pandas DataFrame and name of target column. + """ + dataset = load_breast_cancer(as_frame=True) + inference_size = int(len(dataset.target) * 0.05) + dataset: pd.DataFrame = dataset.frame + inference_subset = dataset.sample(inference_size, random_state=random_state) + if is_inference: + dataset = inference_subset + dataset.drop(columns=target, inplace=True) + else: + dataset.drop(inference_subset.index, inplace=True) + dataset.reset_index(drop=True, inplace=True) + logger.info(f"Dataset with {len(dataset)} records loaded!") + return dataset diff --git a/oncoclear/steps/data_preprocessor.py b/oncoclear/steps/data_preprocessor.py new file mode 100644 index 00000000..6f535e60 --- /dev/null +++ b/oncoclear/steps/data_preprocessor.py @@ -0,0 +1,91 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import List, Optional, Tuple + +import pandas as pd +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import MinMaxScaler +from typing_extensions import Annotated +from zenml import log_metadata, step + +from utils.preprocess import ColumnsDropper, DataFrameCaster, NADropper + + +@step +def data_preprocessor( + random_state: int, + dataset_trn: pd.DataFrame, + dataset_tst: pd.DataFrame, + drop_na: Optional[bool] = None, + normalize: Optional[bool] = None, + drop_columns: Optional[List[str]] = None, + target: Optional[str] = "target", +) -> Tuple[ + Annotated[pd.DataFrame, "dataset_trn"], + Annotated[pd.DataFrame, "dataset_tst"], + Annotated[Pipeline, "preprocess_pipeline"], +]: + """Data preprocessor step. + + This is an example of a data processor step that prepares the data so that + it is suitable for model training. It takes in a dataset as an input step + artifact and performs any necessary preprocessing steps like cleaning, + feature engineering, feature selection, etc. It then returns the processed + dataset as a step output artifact. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to drop NA values, drop some + columns and normalize numerical columns. See the documentation for more + information: + + https://docs.zenml.io/how-to/build-pipelines/use-pipeline-step-parameters + + Args: + random_state: Random state for sampling. + dataset_trn: The train dataset. + dataset_tst: The test dataset. + drop_na: If `True` all NA rows will be dropped. + normalize: If `True` all numeric fields will be normalized. + drop_columns: List of column names to drop. + target: Name of target column in dataset. + + Returns: + The processed datasets (dataset_trn, dataset_tst) and fitted `Pipeline` object. + """ + # We use the sklearn pipeline to chain together multiple preprocessing steps + preprocess_pipeline = Pipeline([("passthrough", "passthrough")]) + if drop_na: + preprocess_pipeline.steps.append(("drop_na", NADropper())) + if drop_columns: + # Drop columns + preprocess_pipeline.steps.append(("drop_columns", ColumnsDropper(drop_columns))) + if normalize: + # Normalize the data + preprocess_pipeline.steps.append(("normalize", MinMaxScaler())) + preprocess_pipeline.steps.append(("cast", DataFrameCaster(dataset_trn.columns))) + dataset_trn = preprocess_pipeline.fit_transform(dataset_trn) + dataset_tst = preprocess_pipeline.transform(dataset_tst) + + # Log metadata so we can load it in the inference pipeline + log_metadata( + metadata={"random_state": random_state, "target": target}, + artifact_name="preprocess_pipeline", + infer_artifact=True, + ) + return dataset_trn, dataset_tst, preprocess_pipeline diff --git a/oncoclear/steps/data_splitter.py b/oncoclear/steps/data_splitter.py new file mode 100644 index 00000000..433931c5 --- /dev/null +++ b/oncoclear/steps/data_splitter.py @@ -0,0 +1,60 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Tuple + +import pandas as pd +from sklearn.model_selection import train_test_split +from typing_extensions import Annotated +from zenml import step + + +@step +def data_splitter( + dataset: pd.DataFrame, test_size: float = 0.2 +) -> Tuple[ + Annotated[pd.DataFrame, "raw_dataset_trn"], + Annotated[pd.DataFrame, "raw_dataset_tst"], +]: + """Dataset splitter step. + + This is an example of a dataset splitter step that splits the data + into train and test set before passing it to ML model. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different test + set sizes. See the documentation for more information: + + https://docs.zenml.io/how-to/build-pipelines/use-pipeline-step-parameters + + Args: + dataset: Dataset read from source. + test_size: 0.0..1.0 defining portion of test set. + + Returns: + The split dataset: dataset_trn, dataset_tst. + """ + dataset_trn, dataset_tst = train_test_split( + dataset, + test_size=test_size, + random_state=42, + shuffle=True, + ) + dataset_trn = pd.DataFrame(dataset_trn, columns=dataset.columns) + dataset_tst = pd.DataFrame(dataset_tst, columns=dataset.columns) + return dataset_trn, dataset_tst diff --git a/oncoclear/steps/deployment.py b/oncoclear/steps/deployment.py new file mode 100644 index 00000000..1a0c972e --- /dev/null +++ b/oncoclear/steps/deployment.py @@ -0,0 +1,339 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Annotated, Tuple +import os +import datetime +import docker + +from zenml import step +from zenml.client import Client +from zenml.logger import get_logger +from zenml.utils import docker_utils +from zenml import log_metadata + +logger = get_logger(__name__) + +# NOTE: This step is no longer used as we now take these values as direct inputs to the pipeline +# Keeping it here (commented out) for reference in case we want to reintroduce a secrets-based approach +""" +@step +def get_deployment_config( + model_name: str, + model_stage: str, + secret_name: str = "deployment_service_key", + secret_key: str = "key", +) -> Tuple[ + Annotated[str, "zenml_server_url"], + Annotated[str, "zenml_api_key"], + Annotated[str, "model_name"], + Annotated[str, "model_stage"], +]: + \"""Fetches deployment configuration: ZenML server URL and API key from secrets. + + Args: + model_name: Name of the model to deploy. + model_stage: Stage of the model version to deploy. + secret_name: Name of the ZenML Secret containing the API key. + secret_key: Key within the ZenML Secret that holds the actual API key value. + + Returns: + Tuple containing ZenML server URL, API key, model name, and model stage. + + Raises: + RuntimeError: If the specified secret or key is not found. + KeyError: If the secret exists but doesn't contain the expected key. + \""" + logger.info( + f"Fetching deployment configuration for model '{model_name}:{model_stage}' " + f"using secret '{secret_name}' (key: '{secret_key}')." + ) + client = Client() + try: + server_url = client.zen_store.url + logger.info(f"ZenML Server URL: {server_url}") + + api_key_secret = client.get_secret(secret_name) + api_key = api_key_secret.secret_values[secret_key] + logger.info(f"Successfully fetched API key from secret '{secret_name}'.") + + # Basic validation to ensure API key looks somewhat reasonable (not empty) + if not api_key: + raise ValueError(f"API key value found in secret '{secret_name}' with key '{secret_key}' is empty.") + + except KeyError as e: + logger.error( + f"Secret '{secret_name}' found, but it does not contain the key " + f"'{secret_key}'. Please ensure the secret is created correctly " + f"with the API key stored under the key '{secret_key}'." + ) + # Re-raise as KeyError to indicate the specific issue + raise KeyError( + f"Secret '{secret_name}' does not contain key '{secret_key}'." + ) from e + except Exception as e: + # Catch potential errors during secret fetching (e.g., secret not found) + logger.error( + f"Failed to fetch deployment secret '{secret_name}' or ZenML server URL. " + f"Please ensure the secret '{secret_name}' exists and contains the key '{secret_key}' " + f"with the deployment API key. Error: {e}" + ) + # Wrap generic exceptions in a RuntimeError for clarity + if isinstance(e, (KeyError, ValueError)): # Don't wrap our specific errors + raise e + raise RuntimeError(f"Failed to get deployment configuration: {e}") from e + + # Pass through model name and stage for the next steps + return server_url, api_key, model_name, model_stage +""" + +@step(enable_cache=False) # Avoid caching image builds unless inputs are identical +def build_deployment_image( + model_name: str, + model_stage: str, +) -> Annotated[str, "image_name"]: + """Builds a Docker image for the FastAPI deployment service. + + Args: + model_name: Name of the model being deployed (used for tagging). + model_stage: Stage of the model being deployed (used for tagging). + + Returns: + The name and tag of the built Docker image. + """ + # Define image name based on model name and stage + image_name = f"local-deployment-{model_name}:{model_stage}" + logger.info(f"Building Docker image: {image_name}") + + # Define paths relative to the project root + # Assumes this script is in 'steps/' and 'api/' is at the project root + project_root = os.path.join(os.path.dirname(__file__), "..") + build_context_path = os.path.abspath(os.path.join(project_root, "api")) + dockerfile_path = os.path.abspath(os.path.join(build_context_path, "Dockerfile")) + utils_path = os.path.abspath(os.path.join(project_root, "utils")) + + logger.info(f"Using build context: {build_context_path}") + logger.info(f"Using Dockerfile: {dockerfile_path}") + logger.info(f"Utils module path: {utils_path}") + + # Check if Dockerfile exists + if not os.path.exists(dockerfile_path): + raise FileNotFoundError(f"Dockerfile not found at: {dockerfile_path}") + + # Copy the utils directory to the api directory so it's available in the build context + utils_in_context = os.path.join(build_context_path, "utils") + + # Create utils directory in the build context if it doesn't exist + if not os.path.exists(utils_in_context): + os.makedirs(utils_in_context, exist_ok=True) + logger.info(f"Created utils directory in build context: {utils_in_context}") + + # Copy utils module files + import shutil + for item in os.listdir(utils_path): + src = os.path.join(utils_path, item) + dst = os.path.join(utils_in_context, item) + if os.path.isfile(src): + shutil.copy2(src, dst) + logger.info(f"Copied {src} to {dst}") + elif os.path.isdir(src): + shutil.copytree(src, dst, dirs_exist_ok=True) + logger.info(f"Copied directory {src} to {dst}") + + try: + # Build the image using ZenML's utility + docker_utils.build_image( + image_name=image_name, + dockerfile=dockerfile_path, + build_context_root=build_context_path, + # Add any custom build options if needed, e.g.: + # custom_build_options={"platform": "linux/amd64"} + ) + logger.info(f"Successfully built Docker image: {image_name}") + except Exception as e: + logger.error(f"Failed to build Docker image '{image_name}'. Error: {e}") + raise RuntimeError(f"Docker image build failed: {e}") from e + + return image_name + +# --- Add run_deployment_container step below --- +@step(enable_cache=False) # Avoid caching container runs +def run_deployment_container( + zenml_server_url: str, + zenml_api_key: str, + model_name: str, + model_stage: str, + image_name: str, + model_artifact_name: str = "sklearn_classifier", + preprocess_pipeline_name: str = "preprocess_pipeline", + host_port: int = 8000, + container_port: int = 8000, +) -> Tuple[ + Annotated[str, "container_id"], + Annotated[str, "service_url"], +]: + """Runs the Docker container for the model deployment service and logs deployment metadata. + + Args: + zenml_server_url: URL of the ZenML server. + zenml_api_key: API key for the ZenML server. + model_name: Name of the model to deploy. + model_stage: Stage of the model to deploy. + image_name: Name of the Docker image to run. + model_artifact_name: Name of the model artifact to load (default: "sklearn_classifier"). + preprocess_pipeline_name: Name of the preprocessing pipeline artifact (default: "preprocess_pipeline"). + host_port: Port to expose on the host. + container_port: Port the container is listening on. + + Returns: + Tuple containing the container ID and service URL. + """ + logger.info(f"Preparing to run container from image: {image_name}") + + # Create a Docker client + client = docker.from_env() + + # Check if the image exists + try: + client.images.get(image_name) + logger.info(f"Found Docker image: {image_name}") + except docker.errors.ImageNotFound: + raise RuntimeError(f"Docker image '{image_name}' not found. Please build it first.") + + # Define environment variables for the container + env_vars = { + "ZENML_STORE_URL": zenml_server_url, + "ZENML_STORE_API_KEY": zenml_api_key, + "MODEL_NAME": model_name, + "MODEL_STAGE": model_stage, + "MODEL_ARTIFACT_NAME": model_artifact_name, + "PREPROCESS_PIPELINE_NAME": preprocess_pipeline_name, + "PORT": str(container_port) + } + + # Debug: Check the API key (mask it partially for logs) + if zenml_api_key: + masked_key = zenml_api_key[:15] + "..." + zenml_api_key[-10:] if len(zenml_api_key) > 30 else "***masked***" + logger.info(f"Using ZenML server: {zenml_server_url}") + logger.info(f"Using API key (masked): {masked_key}") + else: + logger.warning("No API key provided! Authentication will likely fail.") + + # Define port mapping + ports = {f"{container_port}/tcp": host_port} + + # Define a unique container name based on model name and stage + container_name = f"zenml-deployment-{model_name}-{model_stage}".lower().replace('_', '-') + + # Check if a container with this name already exists and remove it if it does + try: + existing_container = client.containers.get(container_name) + logger.warning(f"Found existing container '{container_name}'. Stopping and removing it.") + existing_container.stop() + existing_container.remove() + except docker.errors.NotFound: + # Container doesn't exist, which is fine + pass + + try: + # Run the container + logger.info(f"Starting container '{container_name}' with image '{image_name}'") + container = client.containers.run( + image=image_name, + name=container_name, + environment=env_vars, + ports=ports, + detach=True, # Run in background + restart_policy={"Name": "unless-stopped"}, # Restart if it crashes + ) + + # Ensure the env vars are passed correctly + logger.info("Verifying environment variables in the container...") + # Give the container a moment to start + import time + time.sleep(2) + + try: + # Don't do this in production, this is just for debugging + env_output = container.exec_run("env") + if env_output.exit_code == 0: + # Safely log env without showing full API key + env_lines = env_output.output.decode('utf-8').split('\n') + for line in env_lines: + if line.startswith("ZENML_STORE_API_KEY="): + key = line.split('=', 1)[1] + masked = key[:15] + "..." + key[-10:] if len(key) > 30 else "***masked***" + logger.info(f"ZENML_STORE_API_KEY={masked}") + elif line.startswith("ZENML_"): + logger.info(line) + elif line.startswith("MODEL_"): + logger.info(line) + except Exception as e: + logger.warning(f"Could not verify environment variables: {e}") + + container_id = container.id + service_url = f"http://localhost:{host_port}" + + logger.info(f"Container started successfully!") + logger.info(f"Container ID: {container_id}") + logger.info(f"Service URL: {service_url}") + logger.info(f"API Documentation: {service_url}/docs") + + # Log deployment metadata directly here instead of in a separate step + logger.info(f"Logging deployment metadata for model '{model_name}:{model_stage}'") + + # Get updated container details + container_info = client.containers.get(container_id).attrs + + # Create metadata to log + current_time = datetime.datetime.now().isoformat() + deployment_metadata = { + "deployment_info": { + "deployed_at": current_time, + "deployed_by": os.environ.get("USER", "unknown"), + "service_url": service_url, + "api_docs_url": f"{service_url}/docs", + "container_id": container_id, + "container_name": container_info.get("Name", "").strip("/"), + "container_image": container_info.get("Config", {}).get("Image", ""), + "container_status": container_info.get("State", {}).get("Status", ""), + "model_artifact_name": model_artifact_name, + }, + "environment_info": { + "host_platform": os.environ.get("OS", "unknown"), + "zenml_version": os.environ.get("ZENML_VERSION", "unknown"), + "deployed_from": os.environ.get("PWD", "unknown"), + } + } + + # Log the metadata + log_metadata( + metadata=deployment_metadata, + model_name=model_name, + model_version=model_stage, + ) + + logger.info("Successfully logged deployment metadata to model") + + return container_id, service_url + + except docker.errors.APIError as e: + logger.error(f"Failed to run container: {e}") + raise RuntimeError(f"Docker container creation failed: {e}") from e + except Exception as e: + logger.error(f"Unexpected error running container: {e}") + raise RuntimeError(f"Failed to run deployment container: {e}") from e diff --git a/oncoclear/steps/inference_predict.py b/oncoclear/steps/inference_predict.py new file mode 100644 index 00000000..c893ab31 --- /dev/null +++ b/oncoclear/steps/inference_predict.py @@ -0,0 +1,56 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2023. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any + +import pandas as pd +from typing_extensions import Annotated +from zenml import step +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def inference_predict( + model: Any, + dataset_inf: pd.DataFrame, +) -> Annotated[pd.Series, "predictions"]: + """Predictions step. + + This is an example of a predictions step that takes the data and model in + and returns predicted values. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different input data. + See the documentation for more information: + + https://docs.zenml.io/how-to/build-pipelines/use-pipeline-step-parameters + + Args: + model: Trained model. + dataset_inf: The inference dataset. + + Returns: + The predictions as pandas series + """ + # run prediction from memory + predictions = model.predict(dataset_inf) + + predictions = pd.Series(predictions, name="predicted") + return predictions diff --git a/oncoclear/steps/inference_preprocessor.py b/oncoclear/steps/inference_preprocessor.py new file mode 100644 index 00000000..d12247e0 --- /dev/null +++ b/oncoclear/steps/inference_preprocessor.py @@ -0,0 +1,49 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2023. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd +from sklearn.pipeline import Pipeline +from typing_extensions import Annotated +from zenml import step + + +@step +def inference_preprocessor( + dataset_inf: pd.DataFrame, + preprocess_pipeline: Pipeline, + target: str, +) -> Annotated[pd.DataFrame, "inference_dataset"]: + """Data preprocessor step. + + This is an example of a data processor step that prepares the data so that + it is suitable for model inference. It takes in a dataset as an input step + artifact and performs any necessary preprocessing steps based on pretrained + preprocessing pipeline. + + Args: + dataset_inf: The inference dataset. + preprocess_pipeline: Pretrained `Pipeline` to process dataset. + target: Name of target columns in dataset. + + Returns: + The processed dataframe: dataset_inf. + """ + # artificially adding `target` column to avoid Pipeline issues + dataset_inf[target] = pd.Series([1] * dataset_inf.shape[0]) + dataset_inf = preprocess_pipeline.transform(dataset_inf) + dataset_inf.drop(columns=[target], inplace=True) + return dataset_inf diff --git a/oncoclear/steps/model_evaluator.py b/oncoclear/steps/model_evaluator.py new file mode 100644 index 00000000..1bfa5ec7 --- /dev/null +++ b/oncoclear/steps/model_evaluator.py @@ -0,0 +1,109 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional + +import pandas as pd +from sklearn.base import ClassifierMixin +from zenml import log_metadata, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def model_evaluator( + model: ClassifierMixin, + dataset_trn: pd.DataFrame, + dataset_tst: pd.DataFrame, + min_train_accuracy: float = 0.0, + min_test_accuracy: float = 0.0, + target: Optional[str] = "target", +) -> float: + """Evaluate a trained model. + + This is an example of a model evaluation step that takes in a model artifact + previously trained by another step in your pipeline, and a training + and validation data set pair which it uses to evaluate the model's + performance. The model metrics are then returned as step output artifacts + (in this case, the model accuracy on the train and test set). + + The suggested step implementation also outputs some warnings if the model + performance does not meet some minimum criteria. This is just an example of + how you can use steps to monitor your model performance and alert you if + something goes wrong. As an alternative, you can raise an exception in the + step to force the pipeline run to fail early and all subsequent steps to + be skipped. + + This step is parameterized to configure the step independently of the step code, + before running it in a pipeline. In this example, the step can be configured + to use different values for the acceptable model performance thresholds and + to control whether the pipeline run should fail if the model performance + does not meet the minimum criteria. See the documentation for more + information: + + https://docs.zenml.io/how-to/build-pipelines/use-pipeline-step-parameters + + Args: + model: The pre-trained model artifact. + dataset_trn: The train dataset. + dataset_tst: The test dataset. + min_train_accuracy: Minimal acceptable training accuracy value. + min_test_accuracy: Minimal acceptable testing accuracy value. + target: Name of target column in dataset. + + Returns: + The model accuracy on the test set. + """ + # Calculate the model accuracy on the train and test set + trn_acc = model.score( + dataset_trn.drop(columns=[target]), + dataset_trn[target], + ) + tst_acc = model.score( + dataset_tst.drop(columns=[target]), + dataset_tst[target], + ) + logger.info(f"Train accuracy={trn_acc * 100:.2f}%") + logger.info(f"Test accuracy={tst_acc * 100:.2f}%") + + messages = [] + if trn_acc < min_train_accuracy: + messages.append( + f"Train accuracy {trn_acc * 100:.2f}% is below {min_train_accuracy * 100:.2f}% !" + ) + if tst_acc < min_test_accuracy: + messages.append( + f"Test accuracy {tst_acc * 100:.2f}% is below {min_test_accuracy * 100:.2f}% !" + ) + else: + for message in messages: + logger.warning(message) + + client = Client() + latest_classifier = client.get_artifact_version("sklearn_classifier") + + log_metadata( + metadata={ + "train_accuracy": float(trn_acc), + "test_accuracy": float(tst_acc) + }, + artifact_version_id=latest_classifier.id + ) + + return float(tst_acc) diff --git a/oncoclear/steps/model_promoter.py b/oncoclear/steps/model_promoter.py new file mode 100644 index 00000000..2b105135 --- /dev/null +++ b/oncoclear/steps/model_promoter.py @@ -0,0 +1,75 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from zenml import get_step_context, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def model_promoter(accuracy: float, stage: str = "production") -> bool: + """Model promoter step. + + This is an example of a step that conditionally promotes a model. It takes + in the accuracy of the model and the stage to promote the model to. If the + accuracy is below 80%, the model is not promoted. If it is above 80%, the + model is promoted to the stage indicated in the parameters. If there is + already a model in the indicated stage, the model with the higher accuracy + is promoted. + + Args: + accuracy: Accuracy of the model. + stage: Which stage to promote the model to. + + Returns: + Whether the model was promoted or not. + """ + is_promoted = False + + if accuracy < 0.8: + logger.info( + f"Model accuracy {accuracy*100:.2f}% is below 80% ! Not promoting model." + ) + else: + logger.info(f"Model promoted to {stage}!") + is_promoted = True + + # Get the model in the current context + current_model = get_step_context().model + + # Get the model that is in the production stage + client = Client() + try: + stage_model = client.get_model_version( + current_model.name, stage + ) + # We compare their metrics + prod_accuracy = ( + stage_model.get_artifact("sklearn_classifier") + .run_metadata["test_accuracy"] + ) + if float(accuracy) > float(prod_accuracy): + # If current model has better metrics, we promote it + is_promoted = True + current_model.set_stage(stage, force=True) + except KeyError: + # If no such model exists, current one is promoted + is_promoted = True + current_model.set_stage(stage, force=True) + return is_promoted diff --git a/oncoclear/steps/model_trainer.py b/oncoclear/steps/model_trainer.py new file mode 100644 index 00000000..98449cff --- /dev/null +++ b/oncoclear/steps/model_trainer.py @@ -0,0 +1,72 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional + +import pandas as pd +from sklearn.base import ClassifierMixin +from sklearn.ensemble import RandomForestClassifier +from sklearn.linear_model import SGDClassifier +from typing_extensions import Annotated +from zenml import ArtifactConfig, step +from zenml.enums import ArtifactType +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +@step +def model_trainer( + dataset_trn: pd.DataFrame, + model_type: str = "sgd", + target: Optional[str] = "target", +) -> Annotated[ + ClassifierMixin, ArtifactConfig(name="sklearn_classifier", artifact_type=ArtifactType.MODEL) +]: + """Configure and train a model on the training dataset. + + This is an example of a model training step that takes in a dataset artifact + previously loaded and pre-processed by other steps in your pipeline, then + configures and trains a model on it. The model is then returned as a step + output artifact. + + Args: + dataset_trn: The preprocessed train dataset. + model_type: The type of model to train. + target: The name of the target column in the dataset. + + Returns: + The trained model artifact. + + Raises: + ValueError: If the model type is not supported. + """ + # Initialize the model with the hyperparameters indicated in the step + # parameters and train it on the training set. + if model_type == "sgd": + model = SGDClassifier() + elif model_type == "rf": + model = RandomForestClassifier() + else: + raise ValueError(f"Unknown model type {model_type}") + logger.info(f"Training model {model}...") + + model.fit( + dataset_trn.drop(columns=[target]), + dataset_trn[target], + ) + return model diff --git a/oncoclear/utils/__init__.py b/oncoclear/utils/__init__.py new file mode 100644 index 00000000..29e36c52 --- /dev/null +++ b/oncoclear/utils/__init__.py @@ -0,0 +1,16 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/oncoclear/utils/preprocess.py b/oncoclear/utils/preprocess.py new file mode 100644 index 00000000..efd800db --- /dev/null +++ b/oncoclear/utils/preprocess.py @@ -0,0 +1,56 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2025. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Union + +import pandas as pd + + +class NADropper: + """Support class to drop NA values in sklearn Pipeline.""" + + def fit(self, *args, **kwargs): + return self + + def transform(self, X: Union[pd.DataFrame, pd.Series]): + return X.dropna() + + +class ColumnsDropper: + """Support class to drop specific columns in sklearn Pipeline.""" + + def __init__(self, columns): + self.columns = columns + + def fit(self, *args, **kwargs): + return self + + def transform(self, X: Union[pd.DataFrame, pd.Series]): + return X.drop(columns=self.columns) + + +class DataFrameCaster: + """Support class to cast type back to pd.DataFrame in sklearn Pipeline.""" + + def __init__(self, columns): + self.columns = columns + + def fit(self, *args, **kwargs): + return self + + def transform(self, X): + return pd.DataFrame(X, columns=self.columns) diff --git a/vertex-registry-and-deployer/README.md b/vertex-registry-and-deployer/README.md index 90e593f0..83689ce1 100644 --- a/vertex-registry-and-deployer/README.md +++ b/vertex-registry-and-deployer/README.md @@ -3,7 +3,7 @@ Welcome to your ZenML project for deploying ML models using Google Cloud's Vertex AI! This project provides a hands-on experience with MLOps pipelines using ZenML and Vertex AI. It contains a collection of ZenML steps, pipelines, and other artifacts to help you efficiently deploy your machine learning models. -Using these pipelines, you can run data preparation, model training, registration, and deployment with a single command while using YAML files for [configuration](https://docs.zenml.io/user-guide/production-guide/configure-pipeline). ZenML takes care of tracking your metadata and [containerizing your pipelines](https://docs.zenml.io/how-to/customize-docker-builds). +Using these pipelines, you can run data preparation, model training, registration, and deployment with a single command while using YAML files for [configuration](https://docs.zenml.io/user-guides/production-guide/configure-pipeline). ZenML takes care of tracking your metadata and [containerizing your pipelines](https://docs.zenml.io/how-to/customize-docker-builds). ## 🏃 How to run diff --git a/zenml-support-agent/run.ipynb b/zenml-support-agent/run.ipynb index 314b4351..2003dd6b 100644 --- a/zenml-support-agent/run.ipynb +++ b/zenml-support-agent/run.ipynb @@ -396,7 +396,7 @@ "\u001b[2;36mwill only take effect when you're running ZenML from the initialized repository \u001b[0m\n", "\u001b[2;36mroot, or from a subdirectory. For more information on repositories and \u001b[0m\n", "\u001b[2;36mconfigurations, please visit \u001b[0m\n", - "\u001b[2;4;94mhttps://docs.zenml.io/user-guide/starter-guide/understand-stacks.\u001b[0m\n" + "\u001b[2;4;94mhttps://docs.zenml.io/user-guides/starter-guide/understand-stacks.\u001b[0m\n" ] } ], @@ -476,7 +476,7 @@ "metadata": {}, "source": [ "#### Stack\n", - "A [stack](https://docs.zenml.io/user-guide/production-guide/understand-stacks) in ZenML is the combination of tools and infrastructure that your pipelines can run on. When you run ZenML code without configuring a stack, the pipeline will run on the so-called default stack.\n", + "A [stack](https://docs.zenml.io/user-guides/production-guide/understand-stacks) in ZenML is the combination of tools and infrastructure that your pipelines can run on. When you run ZenML code without configuring a stack, the pipeline will run on the so-called default stack.\n", "\n", "![image.png](attachment:image.png)\n", "\n",