diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index a3ab754..0000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "name": "Kubebuilder DevContainer", - "image": "golang:1.24", - "features": { - "ghcr.io/devcontainers/features/docker-in-docker:2": {}, - "ghcr.io/devcontainers/features/git:1": {} - }, - - "runArgs": ["--network=host"], - - "customizations": { - "vscode": { - "settings": { - "terminal.integrated.shell.linux": "/bin/bash" - }, - "extensions": [ - "ms-kubernetes-tools.vscode-kubernetes-tools", - "ms-azuretools.vscode-docker" - ] - } - }, - - "onCreateCommand": "bash .devcontainer/post-install.sh" -} - diff --git a/.devcontainer/post-install.sh b/.devcontainer/post-install.sh deleted file mode 100644 index 265c43e..0000000 --- a/.devcontainer/post-install.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -set -x - -curl -Lo ./kind https://kind.sigs.k8s.io/dl/latest/kind-linux-amd64 -chmod +x ./kind -mv ./kind /usr/local/bin/kind - -curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/linux/amd64 -chmod +x kubebuilder -mv kubebuilder /usr/local/bin/ - -KUBECTL_VERSION=$(curl -L -s https://dl.k8s.io/release/stable.txt) -curl -LO "https://dl.k8s.io/release/$KUBECTL_VERSION/bin/linux/amd64/kubectl" -chmod +x kubectl -mv kubectl /usr/local/bin/kubectl - -docker network create -d=bridge --subnet=172.19.0.0/24 kind - -kind version -kubebuilder version -docker --version -go version -kubectl version --client diff --git a/.gitignore b/.gitignore index d586b83..4d9c0e5 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,12 @@ coverage.* go.work go.work.sum +# t +.terraform/ +.terraform.lock.hcl +terraform.tfstate +terraform.tfstate.backup + # ======================================== # Kubebuilder / Controller Tools # ======================================== @@ -80,6 +86,8 @@ Thumbs.db .cursor/ .cursorignore .ruff_cache/ +.coverage +.htmlcov/ # JetBrains & VSCode specific settings *.iml diff --git a/Makefile b/Makefile index 0063701..d69185e 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,14 @@ dev-cleanup: ## Clean up development environment (containers, volumes, networks) docker volume ls | grep -E "(dev_|pipeline-forge)" | awk '{print $$2}' | xargs -r docker volume rm docker ps -a | grep -E "(pipeline-forge|mysql|postgres)" | awk '{print $$1}' | xargs -r docker rm -f +.PHONE: apply-sandbox-tf +apply-sandbox-tf: ## Apply the development infrastructure + terraform -chdir=infrastructure/gcloud/sandbox init + terraform -chdir=infrastructure/gcloud/sandbox apply + +.PHONE: destroy-sandbox-tf +destroy-sandbox-tf: ## Destroy the development infrastructure + terraform -chdir=infrastructure/gcloud/sandbox destroy ##@ Operator Development diff --git a/README.md b/README.md index cc13cb3..d915cad 100644 --- a/README.md +++ b/README.md @@ -8,207 +8,84 @@ > A Kubernetes-native platform for building modern, declarative data pipelines with clear boundaries between ingestion and transformation. -## πŸ“‹ Quick Navigation +## πŸš€ Quick Navigation -- [Overview](#-overview) -- [Key Features](#-key-features) -- [Quick Start](#-quick-start) -- [Architecture](#️-architecture) -- [Documentation](#-documentation) -- [Contributing](#-contributing) +- **[πŸŽ›οΈ Kubernetes Operator](operator/README.md)** - Go-based CRD management and pipeline orchestration +- **[πŸ“₯ Ingest Workload](workloads/ingest/README.md)** - Type-safe data ingestion (Python) +- **[πŸ”„ Transform Workload](workloads/transform/README.md)** - dbt-based data transformation +- **[⚑ Trigger Workload](workloads/trigger/README.md)** - Event-driven pipeline activation (Go) +- **[πŸ“‹ Examples](docs/examples.md)** - Comprehensive YAML examples and use cases --- -## πŸš€ Overview - -Pipeline Forge is a complete solution for orchestrating data pipelines in Kubernetes environments. It combines a powerful Kubernetes operator with specialized workloads to provide a declarative, event-driven approach to data pipeline management. - -### 🎯 The Problem - -Modern data teams face challenges with: - -- **Complex Orchestration**: Managing dependencies between data ingestion and transformation -- **Event-Driven Requirements**: Responding to file drops, database changes, and streaming events -- **Infrastructure Complexity**: Deploying and scaling data processing workloads -- **Observability Gaps**: Tracking pipeline health and data lineage -- **Team Coordination**: Coordinating between data engineering and platform teams -- **Resilience and Lifecycle Management**: Ensuring each pipeline step is connected in a clear lifecycleβ€”if one step fails, others don't attempt to run, preventing cascading errors and maintaining robust execution - -### πŸ’‘ The Solution - -Pipeline Forge provides a Kubernetes-native platform that: - -- **Declaratively Orchestrates** data pipelines using Custom Resource Definitions (CRDs) -- **Flexible Ingestion** supports both event-driven (e.g., GCS file drops, Pub/Sub messages, BigQuery updates) and scheduled (CronJob-based) pipeline execution -- **Clear Separation** between data ingestion and transformation phases -- **Built-in Observability** with comprehensive status tracking and monitoring -- **GitOps Ready** configuration that fits modern deployment practices - -## ✨ Key Features - -- **Unified Pipeline Lifecycle**: Connect ingestion with staging models in a single application lifecycle - if ingestion fails, the entire staging fails, preventing orphaned transformations -- **Native Kubernetes Resources**: Each step runs on 100% native K8s resources (Transform β†’ Job, Ingest β†’ CronJob/Job/Trigger) -- **Event-Driven Orchestration**: React to file drops, Pub/Sub messages, and BigQuery updates with intelligent retry policies -- **Built-in Observability**: Comprehensive status tracking with detailed execution history and failure analysis -- **Flexible Ingestion**: Reference existing CronJobs during Ingestion or create new ones as needed with full type safety and managed by the operator -- **Custom Image Support**: Use your own image for each step, or use pre-built Docker images from the Pipeline Forge repository - -### ⚑ Event-Driven Orchestration - -- **GCS Triggers**: Monitor bucket changes and trigger pipelines -- **Pub/Sub Triggers**: React to real-time messages with optional filtering -- **BigQuery Triggers**: Watch for table updates and data freshness -- **Retry & Cooldown**: Configurable retry policies with intelligent intervals - -### ☸️ Kubernetes-Native - -- **CRD-Based**: Native Kubernetes resources for pipeline definition -- **RBAC Integration**: Fine-grained access control for teams -- **Resource Management**: CPU, memory, and storage allocation -- **Independent Scaling**: Each step scales independently as native K8s resources - -### πŸ“Š Built-in Observability - -- **Rich Status Tracking**: Comprehensive pipeline health monitoring with detailed execution history -- **Lifecycle Management**: Real-time phase tracking (Pending, Running, Completed, Failed) -- **Execution Insights**: Track attempt counts, success/failure rates, and timing metrics -- **Failure Analysis**: Detailed error messages and retry attempt tracking - -### πŸ”„ Example - -```yaml -apiVersion: core.pipeline-forge.io/v1alpha1 -kind: Staging -metadata: - name: user-events-pipeline - namespace: staging-events -spec: - ingest: - mode: reference - type: trigger - name: user-events-trigger - transform: - name: user-events-transform - project: analytics - target: prod - image: gcr.io/org/dbt-core:latest - models: - - stg_user_events -``` - -πŸ“– **[View comprehensive examples β†’](docs/examples.md)** - -## πŸš€ Quick Start - -### Prerequisites - -- Kubernetes cluster (v1.11.3+) -- kubectl configured for your cluster -- Docker or container runtime -- Access to container registry +## 🎯 What is Pipeline Forge? -### Installation +A complete solution for orchestrating data pipelines in Kubernetes environments. Combines a powerful Kubernetes operator with specialized workloads to provide a declarative, event-driven approach to data pipeline management. -1. **Clone and Deploy** +### Key Benefits - ```bash - # Clone the repository - git clone https://github.com/your-org/pipeline-forge.git - cd pipeline-forge/operator +- **Unified Pipeline Lifecycle** - Connect ingestion with transformation in a single application lifecycle +- **Native Kubernetes Resources** - Each step runs on 100% native K8s resources +- **Event-Driven Orchestration** - React to file drops, Pub/Sub messages, and BigQuery updates +- **Built-in Observability** - Comprehensive status tracking and monitoring - # Deploy the operator - make deploy IMG=your-registry/pipeline-forge-operator:latest - ``` +## πŸ—οΈ Architecture Overview -2. **Deploy Sample Pipelines** +Pipeline Forge consists of two main components: - ```bash - # Apply example configurations - kubectl apply -k operator/config/samples/ - ``` +### πŸŽ›οΈ [Kubernetes Operator](operator/README.md) -3. **Monitor Pipeline Status** - ```bash - # Check pipeline health - kubectl get staging - kubectl describe staging user-events-staging - ``` +**Go-based CRD management and pipeline orchestration** -## πŸ—οΈ Architecture +- Custom Resource Definitions (CRDs) for pipeline definition +- Automatic reconciliation and lifecycle management +- RBAC integration and resource management +- Event-driven trigger management -Pipeline Forge consists of two main components that work together seamlessly: +### πŸ”§ [Specialized Workloads](workloads/README.md) -### πŸŽ›οΈ Kubernetes Operator +**Production-ready data processing components** -The operator manages the lifecycle of data pipeline stages through: +- **[Ingest](workloads/ingest/README.md)** - Type-safe data ingestion from MySQL, PostgreSQL to BigQuery +- **[Transform](workloads/transform/README.md)** - dbt-based data transformation with version control +- **[Trigger](workloads/trigger/README.md)** - Event processing for GCS, Pub/Sub, and BigQuery -- **Staging Resources**: Complete pipeline steps that coordinate ingestion and transformation -- **Trigger Resources**: Event-driven activation for pipelines -- **Ingestion Management**: Supports ingestion via both event-driven triggers and CronJobs -- **Automatic Reconciliation**: Ensures pipeline state matches desired configuration - -### πŸ”§ Specialized Workloads - -Pre-built, production-ready data processing components: +## πŸ› οΈ Technology Stack -- **Ingest Workloads**: Type-safe data ingestion from MySQL, PostgreSQL, and more -- **Transform Workloads**: dbt-based data transformation with version control -- **Trigger Workloads**: Event processing for GCS, Pub/Sub, and BigQuery +| Component | Technology | Purpose | +| ------------- | ----------------------------- | ----------------------------------------- | +| **Operator** | Go, Kubernetes, Kubebuilder | Pipeline orchestration and CRD management | +| **Ingest** | Python 3.13+, Pydantic, Typer | Type-safe data ingestion with validation | +| **Transform** | dbt Core, BigQuery | Data transformation and analytics | +| **Triggers** | Go, Google Cloud APIs | Event-driven pipeline activation | -## πŸ“š Documentation +## πŸš€ Quick Start -- **[πŸ“‹ Examples](docs/examples.md)** - Comprehensive YAML examples and use cases -- **[πŸŽ›οΈ Operator Guide](operator/README.md)** - Detailed operator documentation -- **[πŸ”§ Workloads](workloads/README.md)** - Data processing components +```bash +git clone https://github.com/DanielBlei/pipeline-forge.git -## πŸ› οΈ Technology Stack +# Run the operator (safety check ensures you're on kind/minikube cluster) +make run-operator -| Component | Technology | Purpose | -| ------------------ | ----------------------------- | ---------------------------------------------------- | -| **Operator** | Go, Kubernetes, Kubebuilder | Pipeline orchestration and CRD management | -| **Ingest** | Python 3.13+, Pydantic, Typer | Type-safe data ingestion with validation | -| **Transform** | dbt Core, BigQuery | Data transformation and analytics | -| **Triggers** | GCS, Pub/Sub, BigQuery APIs | Event-driven pipeline activation with retry policies | -| **Infrastructure** | Kubernetes, Docker | Container orchestration and deployment | +# Deploy k8s samples resources +make apply-samples +``` ## πŸ“ Project Structure ``` pipeline-forge/ β”œβ”€β”€ operator/ # Kubernetes operator (Go) -β”‚ β”œβ”€β”€ api/ # CRD definitions -β”‚ β”œβ”€β”€ controllers/ # Reconciliation logic -β”‚ └── config/ # Deployment manifests -β”œβ”€β”€ workloads/ # Data processing components +β”œβ”€β”€ workloads/ # Data processing components β”‚ β”œβ”€β”€ ingest/ # Type-safe ingestion (Python) β”‚ β”œβ”€β”€ transform/ # dbt transformations -β”‚ └── trigger/ # Event processing +β”‚ └── trigger/ # Event processing (Go) └── docs/ # Documentation - └── examples.md # Comprehensive examples ``` ## 🀝 Contributing -We welcome contributions to Pipeline Forge! Whether you're interested in: - -- **Operator Development**: Kubernetes controller logic and CRDs -- **Workload Development**: Data processing components -- **Documentation**: User guides and examples -- **Testing**: End-to-end pipeline validation - -Please see our contributing guidelines and development setup instructions in the respective component directories. - -### πŸ› οΈ Development Setup - -```bash -# Set up development environment -cd operator -make manifests generate -make test - -# Run locally -make run -``` +We welcome contributions! See individual component READMEs for development setup and guidelines. ## πŸ“„ License @@ -225,7 +102,3 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - -``` - -``` diff --git a/dev/.env b/dev/.env index 44e4d03..091104f 100644 --- a/dev/.env +++ b/dev/.env @@ -2,10 +2,10 @@ MYSQL_ROOT_PASSWORD=root_password_dev MYSQL_DATABASE=pipeline_forge MYSQL_USER=pipeline_user -MYSQL_PASSWORD=pipeline_password_dev +MYSQL_PASSWORD=pipeline_forge_sandbox # PostgreSQL Configuration POSTGRES_DB=pipeline_forge POSTGRES_USER=pipeline_user -POSTGRES_PASSWORD=pipeline_password_dev +POSTGRES_PASSWORD= pipeline_forge_sandbox diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 1f0c853..067fef0 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -6,10 +6,10 @@ services: env_file: - .env environment: - MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpassword} - MYSQL_DATABASE: ${MYSQL_DATABASE:-pipeline_forge_dev} - MYSQL_USER: ${MYSQL_USER:-pipeline_user} - MYSQL_PASSWORD: ${MYSQL_PASSWORD:-pipeline_password} + MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD} + MYSQL_DATABASE: ${MYSQL_DATABASE} + MYSQL_USER: ${MYSQL_USER} + MYSQL_PASSWORD: ${MYSQL_PASSWORD} ports: - "3306:3306" volumes: @@ -25,9 +25,9 @@ services: env_file: - .env environment: - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-pipeline_password} - POSTGRES_USER: ${POSTGRES_USER:-pipeline_user} - POSTGRES_DB: ${POSTGRES_DB:-pipeline_forge_dev} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_DB: ${POSTGRES_DB} ports: - "5432:5432" volumes: diff --git a/infrastructure/gcloud/sandbox/bigquery.tf b/infrastructure/gcloud/sandbox/bigquery.tf new file mode 100644 index 0000000..f09ff96 --- /dev/null +++ b/infrastructure/gcloud/sandbox/bigquery.tf @@ -0,0 +1,19 @@ +#----------------------------------------------- +# Ingest Workload (BigQuery) +#----------------------------------------------- + +# Main dataset for the pipeline-forge project +resource "google_bigquery_dataset" "pipeline_forge_sandbox_dataset" { + dataset_id = "pipeline_forge_sandbox" + friendly_name = "Pipeline Forge" + description = "Sandbox dataset for pipeline-forge data platform development" + location = "us-central1" + + delete_contents_on_destroy = true + + labels = { + project = "pipeline-forge" + environment = "sandbox" + workload = "ingest" + } +} diff --git a/infrastructure/gcloud/sandbox/iam.tf b/infrastructure/gcloud/sandbox/iam.tf new file mode 100644 index 0000000..70adee8 --- /dev/null +++ b/infrastructure/gcloud/sandbox/iam.tf @@ -0,0 +1,34 @@ +#----------------------------------------------- +# Ingest Workload (IAM) +#----------------------------------------------- + +# Service Account +resource "google_service_account" "ingest_workload" { + account_id = "ingest-workload" + display_name = "Ingest Workload" + description = "Service Account for Ingest Workload" +} + +# Grant Dataset Owner to Service Account +resource "google_bigquery_dataset_iam_member" "pipeline_forge_sandbox_dataset_owner" { + dataset_id = google_bigquery_dataset.pipeline_forge_sandbox_dataset.dataset_id + role = "roles/bigquery.dataOwner" + member = "serviceAccount:${google_service_account.ingest_workload.email}" + depends_on = [google_service_account.ingest_workload] +} + +# Grant Secret Manager Accessor to Service Account +resource "google_secret_manager_secret_iam_member" "postgres_secret_manager_password_accessor" { + secret_id = google_secret_manager_secret.postgres_password.id + role = "roles/secretmanager.secretAccessor" + member = "serviceAccount:${google_service_account.ingest_workload.email}" + depends_on = [google_service_account.ingest_workload] +} + +# Grant Secret Manager Accessor to Service Account +resource "google_secret_manager_secret_iam_member" "mysql_secret_manager_password_accessor" { + secret_id = google_secret_manager_secret.mysql_password.id + role = "roles/secretmanager.secretAccessor" + member = "serviceAccount:${google_service_account.ingest_workload.email}" + depends_on = [google_service_account.ingest_workload] +} diff --git a/infrastructure/gcloud/sandbox/main.tf b/infrastructure/gcloud/sandbox/main.tf new file mode 100644 index 0000000..ac0813b --- /dev/null +++ b/infrastructure/gcloud/sandbox/main.tf @@ -0,0 +1,21 @@ +terraform { + required_version = ">= 1.0" + + required_providers { + google = { + source = "hashicorp/google" + version = "~> 7.0" + } + } + + # Keep track of the state in the GCS bucket + backend "gcs" { + bucket = "pipeline-forge-terraform-state" + prefix = "sandbox" + } +} + +provider "google" { + project = "pipeline-forge" + region = "us-central1" +} diff --git a/infrastructure/gcloud/sandbox/secrets.tf b/infrastructure/gcloud/sandbox/secrets.tf new file mode 100644 index 0000000..f43cd0a --- /dev/null +++ b/infrastructure/gcloud/sandbox/secrets.tf @@ -0,0 +1,49 @@ +#----------------------------------------------- +# Ingest Workload (IAM) +#----------------------------------------------- + +# Database secrets for local development +resource "google_secret_manager_secret" "postgres_password" { + secret_id = "docker-compose-postgres-password" + + replication { + auto {} + } + + labels = { + project = "pipeline-forge" + environment = "sandbox" + workload = "ingest" + } +} + +resource "google_secret_manager_secret" "mysql_password" { + secret_id = "docker-compose-mysql-password" + replication { + auto {} + } + labels = { + project = "pipeline-forge" + environment = "sandbox" + workload = "ingest" + } +} + +# Secret versions - reads from .env file with fallback to variables +resource "google_secret_manager_secret_version" "postgres_password" { + secret = google_secret_manager_secret.postgres_password.id + secret_data = try( + trimspace(regex("POSTGRES_PASSWORD=(.+)", file("../../../dev/.env"))), + var.postgres_password + ) + depends_on = [google_secret_manager_secret.postgres_password] +} + +resource "google_secret_manager_secret_version" "mysql_password" { + secret = google_secret_manager_secret.mysql_password.id + secret_data = try( + trimspace(regex("MYSQL_ROOT_PASSWORD=(.+)", file("../../../dev/.env"))), + var.mysql_password + ) + depends_on = [google_secret_manager_secret.mysql_password] +} diff --git a/infrastructure/gcloud/sandbox/variables.tf b/infrastructure/gcloud/sandbox/variables.tf new file mode 100644 index 0000000..90566f5 --- /dev/null +++ b/infrastructure/gcloud/sandbox/variables.tf @@ -0,0 +1,17 @@ +# ----------------------------------------------- +# Variables +# ----------------------------------------------- + +variable "postgres_password" { + description = "Docker Composer PostgreSQL password" + type = string + default = "pipeline_forge_sandbox" + sensitive = true +} + +variable "mysql_password" { + description = "Docker Composer MySQL password" + type = string + default = "pipeline_forge_sandbox" + sensitive = true +} diff --git a/operator/config/crd/bases/core.pipeline-forge.io_triggers.yaml b/operator/config/crd/bases/core.pipeline-forge.io_triggers.yaml index 0bd3340..d9b1410 100644 --- a/operator/config/crd/bases/core.pipeline-forge.io_triggers.yaml +++ b/operator/config/crd/bases/core.pipeline-forge.io_triggers.yaml @@ -229,6 +229,11 @@ spec: For example: "*/5 * * * *" (every 5 minutes) Use standard cron syntax. type: string + suspend: + description: |- + Suspend indicates whether the trigger is currently suspended. + If true, the controller will not process the trigger. + type: boolean type: description: |- Type specifies the kind of event that will activate this trigger. diff --git a/workloads/README.md b/workloads/README.md index eec5208..b6a985a 100644 --- a/workloads/README.md +++ b/workloads/README.md @@ -1,3 +1,21 @@ -# Workload Placeholder +# Pipeline Forge - Workloads -This directory will contain the code and Dockerfile for Pipeline Forge workloads. \ No newline at end of file +This directory contains the data processing components for Pipeline Forge. All workloads are packaged as Docker images for easy deployment and scaling. + +## Available Workloads + +- **[Ingest](./ingest/README.md)** - Data ingestion from databases to BigQuery (Python) +- **[Transform](./transform/README.md)** - dbt-based data transformation +- **[Trigger](./trigger/README.md)** - Event-driven pipeline activation (Go) + +## Architecture Overview + +All workloads follow consistent patterns: + +- **Containerized Deployment** - Packaged as Docker images for easy deployment +- **Standalone Operation** - Each workload runs independently with its own configuration +- **Type-safe Configuration** - Runtime validation and environment-specific configs +- **Comprehensive Testing** - Unit, integration, and end-to-end test coverage +- **Modern Development Practices** - Linting, formatting, and CI/CD ready + +_For detailed information about each workload, see their individual README files._ diff --git a/workloads/ingest/Dockerfile b/workloads/ingest/Dockerfile index e6e96fb..b96040f 100644 --- a/workloads/ingest/Dockerfile +++ b/workloads/ingest/Dockerfile @@ -7,8 +7,9 @@ COPY pyproject.toml ./ COPY uv.lock ./ COPY ingest ./ingest -# Install uv and install the package +# Install uv and install the package system-wide RUN pip install --upgrade uv && \ - uv install . + uv sync && \ + uv pip install -e . --system CMD ["ingest", "--help"] diff --git a/workloads/ingest/Makefile b/workloads/ingest/Makefile index a86710b..0418bfd 100644 --- a/workloads/ingest/Makefile +++ b/workloads/ingest/Makefile @@ -18,6 +18,38 @@ fix: ## Attempt to fix linting errors (Ruff) @uv run ruff check . --fix @uv run ruff format . +.PHONY: install +install: ## Install the project + @uv sync + @uv pip install -e . + +.PHONY: run-help +run-help: ## Run the project and show the help + @uv run ingest --help + + +##@ Build + +.PHONY: build +build: ## Build the project + @uv run build + +.PHONY: build-docker +build-docker: ## Build the project as a Docker image + @docker build -t ingest . + +##@ Tests + +.PHONY: test-docker +test-docker: ## Run the project as a Docker container + @docker run -it ingest ingest --help + .PHONY: test test: ## Run tests - @uv run pytest . -v \ No newline at end of file + @uv run pytest . -v + +.PHONY: test-coverage +test-coverage: ## Run tests with coverage and open HTML report + @uv run pytest . --cov=ingest --cov-report=html --cov-report=term-missing + @echo "Opening coverage report in browser..." + @xdg-open htmlcov/index.html || open htmlcov/index.html || start htmlcov/index.html diff --git a/workloads/ingest/README.md b/workloads/ingest/README.md index 47e689e..0adf2fc 100644 --- a/workloads/ingest/README.md +++ b/workloads/ingest/README.md @@ -1,54 +1,127 @@ # Pipeline Forge - Ingest Workload -A modern, type-safe data ingestion pipeline built with Python 3.13+ and Pydantic. +A production-ready data ingestion pipeline designed for the Pipeline Forge platform. -## Features - -- **Type-Safe Configuration**: Built with Pydantic for runtime validation and type checking -- **Extensible Architecture**: Easy to add new data sources with Union types -- **Multi-Source Support**: Ingest data from multiple database sources in a single pipeline -- **Streaming Processing**: Process large datasets efficiently with configurable chunk sizes -- **Comprehensive Logging**: Structured logging with debug support -- **CLI Interface**: Powered by Typer for excellent user experience +Deployable as a standalone Docker image with comprehensive configuration management and modern Python development practices. ## Quick Start ```bash -# Install dependencies -pip install -e . +# Install dependencies and run with `--help` flag +make install +make run-help + +# Run ingestion with config files +uv run ingest --config config.yaml --catalog catalog.yaml --env dev -# Run Ingest in dev environment -python -m ingest.main --config example_config.yaml --catalog example_catalog.yaml --env dev +# Dry run (extract data but not load it into target) +uv run ingest --config config.yaml --catalog catalog.yaml --env dev --dry-run ``` -## Configuration +## When to Use + +- **Database Extraction**: MySQL, PostgreSQL β†’ BigQuery (Future support for other targets) +- **Large Datasets**: Streaming processing with configurable chunk sizes, optimizing memory usage +- **Production Pipelines**: Built-in retry logic and error handling +- **Multi-Environment**: Dev/prod configuration management +- **Modern Python**: Type hints, async patterns, modern tooling + +## How It Works + +### 1. Configuration-Driven + +Example configuration file: +```yaml +# Type-safe configuration with Pydantic validation +version: 1.0.0 + +sources: +prod: + pipeline_forge: + name: pipeline_forge + type: mysql + host: localhost + port: 3306 + username: pipeline_user + password: docker-compose-mysql-password # refers to a secret name in the secrets section + database: pipeline_forge + ssl_required: true + dev: + forge_dev: + ... +targets: + dev: + bigquery_target: + project_number: 1234567890 + dataset: my_dataset + ... +secrets: + - name: secret-name + provider: google_secret_manager + ... +params: + retry_attempts: 1 + retry_delay_seconds: 30 + chunk_size: 10000 +``` -The ingest workload uses Pydantic models for configuration validation. See `example_config.yaml` for a complete example. +Example catalog file: + +```yaml +name: pipeline_forge +source: pipeline_forge +tables: + - name: account + replication: TRUNCATE + columns: + - name: id + type: int + - name: email + type: string + - name: events + ... +``` -### Supported Sources -- **MySQL**: Full MySQL support with SSL and connection pooling -- **PostgreSQL**: Native PostgreSQL support with advanced features +### 2. Protocol-Based Architecture -## Architecture +```python +# Clean abstractions enable easy extension +source = create_source(config) # Factory pattern +for chunk in source.extract(table, chunk_size=1000): + target.load(chunk, write_disposition=TRUNCATE) +``` -This project demonstrates modern Python development practices: +### 3. Streaming Processing -- **Pydantic Models**: Type-safe configuration and data validation -- **Union Types**: Type hints for multiple source implementations -- **Factory Pattern**: Clean source instantiation based on configuration -- **Comprehensive Error Handling**: Graceful failure with detailed logging +- **Memory Efficient**: Processes data in configurable chunks +- **Retry Logic**: Automatic retry with exponential backoff +- **Error Handling**: Graceful failure with detailed logging -## Development +## Key Software Engineering Practices -```bash -# Install development dependencies (using uv package manager) -uv venv -uv sync +- **Type Safety**: Pydantic models with runtime validation +- **Protocol Design**: Clean interfaces for extensibility +- **Factory Pattern**: Extensible source/target creation +- **Comprehensive Testing**: Unit, integration, and error handling tests +- **Modern Python**: Type hints, async patterns, modern tooling + +## Configuration Reference -# Format code -ruff format . +| Field | Type | Description | +| --------- | ---- | ------------------------------------------ | +| `version` | Str | Version of the configuration file | +| `sources` | Dict | Environment-specific source configs | +| `targets` | Dict | Environment-specific target configurations | +| `secrets` | List | Secret references (access during runtime) | +| `params` | Dict | Runtime parameters (access during runtime) | -# Lint code -ruff check . +## Development + +```bash +make install # Install dependencies +make check # Run linters (Ruff and MyPy) +make fix # Fix linting errors +make test # Run tests +make test-coverage # Run tests with coverage report and open HTML report in browser ``` diff --git a/workloads/ingest/ingest/__init__.py b/workloads/ingest/ingest/__init__.py index c64dd8e..8189bfa 100644 --- a/workloads/ingest/ingest/__init__.py +++ b/workloads/ingest/ingest/__init__.py @@ -1 +1 @@ -"""Ingest Workload""" +"""Ingest Workload.""" diff --git a/workloads/ingest/ingest/core/__init__.py b/workloads/ingest/ingest/core/__init__.py index 0953610..4899e9c 100644 --- a/workloads/ingest/ingest/core/__init__.py +++ b/workloads/ingest/ingest/core/__init__.py @@ -1,6 +1,6 @@ """Core package for pipeline-forge ingest functionality.""" -from .config import Config, SourceConfig, TargetConfig, ConnectionConfig, RuntimeParams, SecretsConfig, DatabaseType +from .config import Config, SourceConfig, RuntimeParams, DatabaseType, BigQueryTargetConfig from .catalog import Catalog, Table, Column, ReplicationType @@ -8,11 +8,9 @@ # Config models "Config", "SourceConfig", - "TargetConfig", - "ConnectionConfig", "RuntimeParams", - "SecretsConfig", "DatabaseType", + "BigQueryTargetConfig", # Catalog models "Catalog", "Table", diff --git a/workloads/ingest/ingest/core/catalog.py b/workloads/ingest/ingest/core/catalog.py index 24e87c1..9d2a037 100644 --- a/workloads/ingest/ingest/core/catalog.py +++ b/workloads/ingest/ingest/core/catalog.py @@ -1,18 +1,18 @@ -"""Catalog class for storing the schema of the data""" +"""Catalog class for storing the schema of the data.""" from pydantic import BaseModel, Field, ConfigDict from enum import Enum class Column(BaseModel): - """Column class for storing the schema of the data""" + """Column class for storing the schema of the data.""" - name: str - type: str + name: str = Field(min_length=1) + type: str = Field(min_length=1) class ReplicationType(str, Enum): - "Replication Method, e.g TRUNCATE, APPEND, UPSERT" + """Replication Method, e.g TRUNCATE, APPEND, UPSERT.""" TRUNCATE = "TRUNCATE" APPEND = "APPEND" @@ -20,19 +20,19 @@ class ReplicationType(str, Enum): class Table(BaseModel): - """Table class for storing the schema of the data""" + """Table class for storing the schema of the data.""" - name: str + name: str = Field(min_length=1) source: str | None = Field( default=None, description="Source name for this table, matching the source name in the config. If not set, defaults to the catalog's source.", ) - replication: ReplicationType + replication: ReplicationType = Field(..., description="Replication method (TRUNCATE, APPEND, UPSERT)") columns: list[Column] class Catalog(BaseModel): - """Catalog class for storing the schema of the data""" + """Catalog class for storing the schema of the data.""" name: str = Field(description="Catalog name") source: str = Field(description="Source name for this catalog, matching the source name in the config") @@ -41,18 +41,18 @@ class Catalog(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True, str_strip_whitespace=True) def get_table(self, table_name: str) -> Table: - """Get a table by name""" + """Get a table by name.""" for table in self.tables: if table.name == table_name: return table raise ValueError(f"Table {table_name} not found in catalog") def get_tables_by_source(self, source_name: str) -> list[Table]: - """Get all tables from a specific source""" + """Get all tables from a specific source.""" return [ table for table in self.tables if (table.source if table.source is not None else self.source) == source_name ] def get_sources(self) -> list[str]: - """Get list of all unique sources in the catalog""" + """Get list of all unique sources in the catalog.""" return list(set(table.source if table.source is not None else self.source for table in self.tables)) diff --git a/workloads/ingest/ingest/core/config.py b/workloads/ingest/ingest/core/config.py index 50b0ee8..0a76190 100644 --- a/workloads/ingest/ingest/core/config.py +++ b/workloads/ingest/ingest/core/config.py @@ -1,21 +1,35 @@ -"""Configuration classes for ingestion""" +"""Configuration classes for ingestion.""" +from typing_extensions import Union from pydantic import BaseModel, Field, ConfigDict from typing import Dict, List, Optional from enum import Enum +from ingest.helpers.secret_handler import get_gcloud_secret + class DatabaseType(str, Enum): + """Database type enumeration.""" + MYSQL = "mysql" POSTGRES = "postgres" + + +class TargetType(str, Enum): + """Target type enumeration.""" + BIGQUERY = "bigquery" -class ConnectionConfig(BaseModel): +class SourceConfig(BaseModel): + """Source database configuration.""" + + name: str + type: DatabaseType host: str port: int username: str - password: str # Will be resolved from env or secret manager + password: str = Field(..., description="Either refer to a secret name in the Secrets section") database: str db_schema: Optional[str] = Field(None, alias="schema") ssl_required: Optional[bool] = False @@ -29,69 +43,109 @@ def build_connection_string(self, dialect: str, default_port: int = 0) -> str: Returns: SQLAlchemy connection string + """ port = default_port if default_port != 0 else self.port return f"{dialect}://{self.username}:{self.password}@{self.host}:{port}/{self.database}" -class SourceConfig(BaseModel): - name: str - type: DatabaseType - connection: ConnectionConfig - ssl_required: bool = False - +class BigQueryTargetConfig(BaseModel): + """BigQuery target configuration.""" -class BigQueryTarget(BaseModel): name: str - type: str = "bigquery" - project_id: str - dataset: str + type: TargetType = TargetType.BIGQUERY + project_id: str = Field( + ..., + pattern=r"^[a-z][a-z0-9\-]{4,28}[a-z0-9]$", + description="BigQuery project_id must be 6-30 characters, lowercase letters, digits or hyphens, start with a letter, end with letter or digit.", + ) + project_number: int = Field( + ..., + description="BigQuery project_number must be a valid project number.", + ) + dataset_id: str = Field( + ..., + pattern=r"^[A-Za-z_][A-Za-z0-9_]{0,1023}$", + description="BigQuery dataset_id must be 1-1024 characters, start with a letter or underscore, contain only letters, numbers, or underscores.", + ) location: Optional[str] = None - service_account: str + service_account: Optional[str] = None + + +TargetTypes = Union["BigQueryTargetConfig"] class RuntimeParams(BaseModel): + """Runtime parameters for ingestion process.""" + retry_attempts: int = Field(ge=1, le=10, default=3) retry_delay_seconds: int = Field(ge=1, le=3600, default=30) chunk_size: int = Field(default=10000) -class TargetConfig(BaseModel): - name: str - type: DatabaseType - connection: ConnectionConfig - ssl_required: bool = False +class SecretProvider(str, Enum): + """Secret provider enumeration.""" + GOOGLE_SECRET_MANAGER = "gcloud" + # AWS_SECRET_MANAGER = "aws" + # AZURE_KEY_VAULT = "azure" -class SecretConfig(BaseModel): - name: str - path: str +class SecretConfig(BaseModel): + """Secret configuration.""" -class SecretsConfig(BaseModel): - provider: str - secrets: List[SecretConfig] + provider: SecretProvider = Field( + default=SecretProvider.GOOGLE_SECRET_MANAGER, description="Secret Manager Provider" + ) + name: str = Field(description="Secret name") + version: Optional[str] = Field(default="latest", description="Secret version") + secret_path: Optional[str] = Field( + default=None, + description="Secret path in the Gcloud Secret Manager(e.g projects/pipeline-forge/secrets/postgres-password)", + ) class Config(BaseModel): + """Main configuration class.""" + version: str params: RuntimeParams - secrets: SecretsConfig + secrets: List[SecretConfig] sources: Dict[str, Dict[str, SourceConfig]] # environment -> source_name -> SourceConfig - targets: Dict[str, BigQueryTarget] + targets: Dict[str, TargetTypes] # enviroument -> TargetType model_config = ConfigDict(extra="forbid", validate_assignment=True, str_strip_whitespace=True) def get_source_config(self, environment: str, source_name: str) -> Optional[SourceConfig]: - """Get a specific source configuration by environment and name""" + """Get a specific source configuration by environment and name.""" if environment not in self.sources: return None return self.sources[environment].get(source_name) - def get_target_config(self, environment: str) -> Optional[BigQueryTarget]: - """Get a specific target configuration by environment and name""" + def get_target_config(self, environment: str) -> Optional[TargetTypes]: + """Get a specific target configuration by environment and name.""" if environment not in self.targets: return None return self.targets.get(environment) + + def get_gcloud_secret_value(self, secret_name: str, environment: str, version: Optional[str] = "latest") -> str: + """Get a secret from the Gcloud Secret Manager.""" + if secret_name not in [secret.name for secret in self.secrets]: + raise ValueError(f"Secret {secret_name} not found in secrets") + + secret_config = next(secret for secret in self.secrets if secret.name == secret_name) + if secret_config.provider != SecretProvider.GOOGLE_SECRET_MANAGER: + raise ValueError(f"Secret {secret_name} is not a Gcloud Secret") + + if secret_config.secret_path is None: + target_config = self.targets.get(environment) + if target_config is None: + raise ValueError(f"Target config not found for environment {environment}") + + secret_path = f"projects/{target_config.project_number}/secrets/{secret_name}/versions/{version}" + else: + secret_path = secret_config.secret_path + + return get_gcloud_secret(secret_path) diff --git a/workloads/ingest/ingest/extractors/extractor.py b/workloads/ingest/ingest/extractors/extractor.py index df872f2..6c49b59 100644 --- a/workloads/ingest/ingest/extractors/extractor.py +++ b/workloads/ingest/ingest/extractors/extractor.py @@ -1,6 +1,9 @@ +"""Extractor module for database data extraction.""" + from typing import Optional, Iterator, List from sqlalchemy import create_engine, text, Engine from sqlalchemy.exc import SQLAlchemyError + import logging from ..core.config import Config @@ -13,12 +16,23 @@ class BaseExtractor: """Base extraction class using SQLAlchemy for database-agnostic data extraction.""" - def __init__(self, connection_string: str, config: Optional[Config] = None, **engine_kwargs): + def __init__( + self, + connection_string: str, + config: Optional[Config] = None, + retry_attempts: int = 3, + retry_delay: int = 15, + **engine_kwargs, + ): """Initialize the extractor with a database connection string. Args: connection_string: SQLAlchemy connection string + config: Optional configuration object + retry_attempts: Number of retry attempts for failed operations (default: 3) + retry_delay: Delay in seconds between retry attempts (default: 15) **engine_kwargs: Additional engine configuration options + """ if "://" not in connection_string: raise ValueError( @@ -28,8 +42,10 @@ def __init__(self, connection_string: str, config: Optional[Config] = None, **en self.connection_string = connection_string self.engine: Optional[Engine] = None self.engine_kwargs = engine_kwargs + self.retry_attempts = retry_attempts + self.retry_delay = retry_delay - @retry_on_exception(retries=5, delay=30) + @retry_on_exception() def connect(self) -> None: """Connect to the database and create the engine.""" try: @@ -53,12 +69,13 @@ def connect(self) -> None: logger.error(f"Failed to create engine: {e}") raise - @retry_on_exception(retries=5, delay=30) + @retry_on_exception() def validate_connection(self) -> bool: """Validate the database connection. Returns: True if connection is valid, False otherwise + """ if not self.engine: self.connect() # This will retry if it fails @@ -94,6 +111,7 @@ def extract(self, table: Table, chunk_size: int = 1000, limit: Optional[int] = N Raises: SQLAlchemyError: If extraction fails + """ if not self.engine: self.connect() diff --git a/workloads/ingest/ingest/helpers/retry.py b/workloads/ingest/ingest/helpers/retry.py index b8ac433..40412ed 100644 --- a/workloads/ingest/ingest/helpers/retry.py +++ b/workloads/ingest/ingest/helpers/retry.py @@ -7,23 +7,46 @@ logger = logging.getLogger(__name__) -def retry_on_exception(retries: int = 3, delay: int = 15): - """Retry function on exception with a delay between retries.""" +def retry_on_exception(retries: int = 3, delay: int = 15, retries_attr="retry_attempts", delay_attr="retry_delay"): + """Universal retry decorator that can use static values or instance attributes. + + This decorator can be used in two ways: + 1. Static retry: @retry_on_exception(retries=5, delay=10) + 2. Instance-based retry: @retry_on_exception() - reads from instance attributes + + Args: + retries: Static retry count (used if instance doesn't have retries_attr) + delay: Static delay in seconds (used if instance doesn't have delay_attr) + retries_attr: Name of instance attribute for retry count (default: 'retry_attempts') + delay_attr: Name of instance attribute for delay (default: 'retry_delay') + + """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): - for attempt in range(retries): + # Determine if this is a method (has 'self') or a function + if args and hasattr(args[0], retries_attr): + # It's a method with instance attributes - use them + self = args[0] + retries_val = getattr(self, retries_attr, retries) + delay_val = getattr(self, delay_attr, delay) + else: + # It's a function or method without instance attributes - use static values + retries_val = retries + delay_val = delay + + for attempt in range(retries_val): try: return func(*args, **kwargs) except Exception as e: attempt_num = attempt + 1 - logger.error(f"Attempt {attempt_num}/{retries} failed: {e}") - if attempt < retries - 1: - logger.info(f"Retrying in {delay} seconds...") - sleep(delay) + logger.error(f"Attempt {attempt_num}/{retries_val} failed: {e}") + if attempt < retries_val - 1: + logger.info(f"Retrying in {delay_val} seconds...") + sleep(delay_val) else: - logger.error(f"All {retries} attempts failed. Final attempt:") + logger.error(f"All {retries_val} attempts failed. Final attempt:") # Final attempt, let exception propagate return func(*args, **kwargs) diff --git a/workloads/ingest/ingest/helpers/secret_handler.py b/workloads/ingest/ingest/helpers/secret_handler.py new file mode 100644 index 0000000..b36db31 --- /dev/null +++ b/workloads/ingest/ingest/helpers/secret_handler.py @@ -0,0 +1,19 @@ +"""Secret Handler for the ingest workload.""" + +import logging +from google.cloud import secretmanager + +logger = logging.getLogger(__name__) + + +def get_gcloud_secret(secret_name: str) -> str: + """Get a secret from the Gcloud Secret Manager.""" + try: + client = secretmanager.SecretManagerServiceClient() + secret = client.access_secret_version(name=secret_name) + secret_data = secret.payload.data.decode("UTF-8") + if secret_data is None: + raise ValueError(f"Secret {secret_name} is empty") + return secret_data + except Exception as e: + raise ValueError(f"Failed to get secret {secret_name}: {e}") diff --git a/workloads/ingest/ingest/log.py b/workloads/ingest/ingest/log.py index 97b0f57..bd63bf9 100644 --- a/workloads/ingest/ingest/log.py +++ b/workloads/ingest/ingest/log.py @@ -1,8 +1,23 @@ +"""Logging configuration module.""" + import logging +from typing import Optional + + +def setup_logging(name: Optional[str] = None, debug: bool = False): + """Set up logging for the application. + + Args: + name: Logger name (usually __package__ or __name__) + debug: Enable debug logging + Returns: + Logger instance + + """ + # Get the specific logger if name provided, otherwise root logger + logger = logging.getLogger(name) if name else logging.getLogger() -def setup_logging(debug: bool = False): - logger = logging.getLogger() if debug: logger.setLevel(logging.DEBUG) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s") @@ -10,12 +25,15 @@ def setup_logging(debug: bool = False): logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - handler = logging.StreamHandler() - handler.setFormatter(formatter) + # Only configure if this logger doesn't have handlers + # This prevents duplicate handlers in the hierarchy if not logger.hasHandlers(): + handler = logging.StreamHandler() + handler.setFormatter(formatter) logger.addHandler(handler) - else: - # Replace existing handlers' formatters - for h in logger.handlers: - h.setFormatter(formatter) + + # Prevent propagation to root logger to avoid duplicates + if name: + logger.propagate = False + return logger diff --git a/workloads/ingest/ingest/main.py b/workloads/ingest/ingest/main.py index 48b6619..b6659d8 100644 --- a/workloads/ingest/ingest/main.py +++ b/workloads/ingest/ingest/main.py @@ -1,3 +1,6 @@ +"""Main entry point for the ingest workload.""" + +import logging from pathlib import Path import sys from typing import Any @@ -7,18 +10,21 @@ from ruamel.yaml import YAML # type: ignore from .log import setup_logging from .sources import create_source -from .core import Config, Catalog -from .targets import create_target, Target +from .core import Config, Catalog, Table +from .sources import SourceInterface +from .targets import create_target, TargetInterface + +logger = logging.getLogger(__name__) app = typer.Typer() + # Module-level constants for default values CONFIG_PATH = typer.Option(..., "--config", "-c", help="Path to config file") CATALOG_PATH = typer.Option(..., "--catalog", "-cat", help="Path to catalog file") DEBUG_FLAG = typer.Option(False, "--debug", "-d", help="Enable debug mode") ENV_FLAG = typer.Option("dev", "--env", "-e", help="Environment to use") - -logger = setup_logging(debug=DEBUG_FLAG) +DRY_RUN_FLAG = typer.Option(False, "--dry-run", "-dr", help="Enable dry run mode") @app.command() @@ -27,6 +33,7 @@ def ingest( catalogPath: Path = CATALOG_PATH, debug: bool = DEBUG_FLAG, env: str = ENV_FLAG, + dryRun: bool = DRY_RUN_FLAG, ) -> int: """Ingest data from a source database to a target system. @@ -35,31 +42,35 @@ def ingest( catalogPath: Path to catalog file debug: Enable debug mode env: Environment to use + dryRun: Enable dry run mode (do not load data into target) Returns: int: Exit code (0 for success, 1 for failure) + """ try: - return main(configPath, catalogPath, debug, env) + return main(configPath, catalogPath, debug, env, dryRun) except Exception as e: logger.error(f"Unexpected error in main: {e}") return 1 -def main(config_path: Path, catalog_path: Path, debug: bool, env: str) -> int: - """Main entry point for the ingestion process. +def main(config_path: Path, catalog_path: Path, debug: bool, env: str, dryRun: bool) -> int: + """Execute the main ingestion process. Args: config_path: Path to config file catalog_path: Path to catalog file debug: Enable debug mode env: Environment to use + dryRun: Enable dry run mode (do not load data into target) Returns: int: Exit code (0 for success, 1 for failure) + """ # Initialize logger with the debug parameter - logger = setup_logging(debug=debug) + setup_logging(__package__, debug=debug) logger.info("Starting ingestion process") if env.lower() not in ["dev", "prod"]: @@ -69,15 +80,15 @@ def main(config_path: Path, catalog_path: Path, debug: bool, env: str) -> int: config = load_yaml_model(config_path, Config) catalog = load_yaml_model(catalog_path, Catalog) - target = create_and_validate_target(config.targets[env], env, logger) + target = create_target(config.targets.get(env)) # Process each unique source sources = catalog.get_sources() for source_name in sources: try: - process_source(source_name, target, config, catalog, env, logger) + process_source(source_name, target, config, catalog, env, dryRun) except Exception as e: - logger.error(f"Failed to process source {source_name}: {e}") + logger.error(f"Failed to process source {source_name}: {e}", exc_info=True) continue # TODO: add a flag to halt the process if desired logger.info(f"Ingested data from source '{source_name}'") @@ -85,14 +96,16 @@ def main(config_path: Path, catalog_path: Path, debug: bool, env: str) -> int: return 0 except ValueError as e: - logger.error(f"Configuration error: {e}") + logger.error(f"Configuration error: {e}", exc_info=True) return 1 except Exception as e: - logger.error(f"Unexpected error: {e}") + logger.error(f"Unexpected error: {e}", exc_info=True) return 1 -def process_source(source_name: str, target: Target, config: Config, catalog: Catalog, env: str, logger) -> None: +def process_source( + source_name: str, target: TargetInterface, config: Config, catalog: Catalog, env: str, dryRun: bool +) -> None: """Process one source and all its tables. Args: @@ -101,27 +114,38 @@ def process_source(source_name: str, target: Target, config: Config, catalog: Ca config: Configuration object catalog: Catalog object containing table definitions env: Environment to use - logger: Logger instance + dryRun: Enable dry run mode (do not load data into target) + """ # Get all tables for this source tables = catalog.get_tables_by_source(source_name) logger.info(f"Processing source '{source_name}' with {len(tables)} tables") source_config = config.get_source_config(env, source_name) - source = create_and_validate_source(source_name, source_config, env, logger) + if source_config is None: + raise ValueError(f"No source configuration found for '{source_name}' in environment '{env}'") + + # Set the password from the secret BEFORE creating the source + source_config.password = config.get_gcloud_secret_value(source_config.password, env) + + source = create_source( + source_config, retry_attempts=config.params.retry_attempts, retry_delay=config.params.retry_delay_seconds + ) try: # Process each table from this source for table in tables: try: - process_table(source, target, table, config.params.chunk_size, logger) + process_table(source, target, table, config.params.chunk_size, dryRun) except Exception as e: - logger.error(f"Failed to process table {table.name}: {e}") + logger.error(f"Failed to process table {table.name}: {e}", exc_info=True) continue # TODO: add a flag to halt the process if desired finally: source.close() -def process_table(source, target, table, chunk_size: int, logger) -> None: +def process_table( + source: SourceInterface, target: TargetInterface, table: Table, chunk_size: int, dryRun: bool +) -> None: """Process one table's extraction and loading. Args: @@ -129,60 +153,22 @@ def process_table(source, target, table, chunk_size: int, logger) -> None: target: Target instance for data loading table: Table definition to process chunk_size: Size of data chunks to process - logger: Logger instance + dryRun: Enable dry run mode (do not load data into target) + """ chunk_count = 0 for chunk in source.extract(table=table, chunk_size=chunk_size): chunk_count += 1 - target.load(chunk, table.name) + if dryRun: + logger.info(f"Dry run mode: Would have loaded chunk {chunk_count} from table '{table.name}'") + continue + destination_table = f"{target.config.dataset_id}.{table.name}" + target.load(data=chunk, target_table=destination_table, write_disposition=table.replication) logger.info(f"Loaded chunk {chunk_count} from table '{table.name}'") logger.info(f"Completed table '{table.name}': {chunk_count} chunks processed") -def create_and_validate_source(source_name: str, source_config, env: str, logger): - """Create and validate a source connection. - - Args: - source_name: Name of the source - source_config: Source configuration - env: Environment to use - logger: Logger instance - - Returns: - Source instance with validated connection - - Raises: - ValueError: If source connection validation fails - """ - source = create_source(source_config, env) - if not source.validate_connection(): - raise ValueError(f"Failed to validate source connection: {source.config.name}") - logger.info(f"Connected to source: {source.config.name}") - return source - - -def create_and_validate_target(target_config, env: str, logger): - """Create and validate a target connection. - - Args: - target_config: Target configuration - env: Environment to use - logger: Logger instance - - Returns: - Target instance with validated connection - - Raises: - ValueError: If target connection validation fails - """ - target = create_target(target_config, env) - if not target.validate_connection(): - raise ValueError(f"Failed to validate target connection: {target.config.name}") - logger.debug(f"Targer is initialized: {target.config.name}") - return target - - def load_yaml_model(path: Path, model_cls) -> Any: """Load and validate a YAML file into a model instance. @@ -195,6 +181,7 @@ def load_yaml_model(path: Path, model_cls) -> Any: Raises: SystemExit: If file loading or validation fails + """ try: yaml_loader = YAML(typ="safe") @@ -203,7 +190,7 @@ def load_yaml_model(path: Path, model_cls) -> Any: obj = model_cls.model_validate(data) return obj except Exception as e: - logger.error("Failed to load file", file_path=str(path), error=str(e)) + logger.error(f"Failed to load file {path}: {e}") sys.exit(1) diff --git a/workloads/ingest/ingest/sources/__init__.py b/workloads/ingest/ingest/sources/__init__.py index b26188e..1105268 100644 --- a/workloads/ingest/ingest/sources/__init__.py +++ b/workloads/ingest/ingest/sources/__init__.py @@ -1,35 +1,52 @@ +"""Source package for data source implementations.""" + +import logging + from .mysql_source import MySQLSource from .postgres_source import PostgresSource from .source import SourceInterface from ..core.config import SourceConfig +logger = logging.getLogger(__name__) + -def create_source(source_config: SourceConfig, env: str) -> SourceInterface: - """Create a source instance directly based on configuration. +def create_source(source_config: SourceConfig, retry_attempts: int = 3, retry_delay: int = 15) -> SourceInterface: + """Create and validate a source instance based on configuration. Args: source_config: Source configuration object - env: Environment name (staging/production) + retry_attempts: Number of retry attempts for connection + retry_delay: Delay between retries in seconds Returns: - Appropriate source instance implementing SourceInterface + Validated source instance implementing SourceInterface Raises: - ValueError: If source type is not supported + ValueError: If source type is not supported or connection validation fails + """ + # Create the source instance + source: SourceInterface if source_config.type.value == "mysql": - return MySQLSource(source_config, env) + source = MySQLSource(source_config, retry_attempts=retry_attempts, retry_delay=retry_delay) elif source_config.type.value == "postgres": - return PostgresSource(source_config, env) + source = PostgresSource(source_config, retry_attempts=retry_attempts, retry_delay=retry_delay) else: raise ValueError(f"Unsupported source type: {source_config.type}") + # Validate the connection + if not source.validate_connection(): + raise ValueError(f"Failed to validate source connection: {source.config.name}") + + logger.info(f"Connected to source: {source.config.name}") + return source + __all__ = [ # Source implementations "MySQLSource", "PostgresSource", - # Direct creation function + # Factory function with validation "create_source", # Protocol interface "SourceInterface", diff --git a/workloads/ingest/ingest/sources/mysql_source.py b/workloads/ingest/ingest/sources/mysql_source.py index d3862af..4fdd9ab 100644 --- a/workloads/ingest/ingest/sources/mysql_source.py +++ b/workloads/ingest/ingest/sources/mysql_source.py @@ -1,8 +1,11 @@ -from typing import Optional, Iterator +"""MySQL source implementation.""" + +from typing import Optional, Iterator, List from .source import Source from ..extractors import BaseExtractor from ..core.config import SourceConfig from ..core.catalog import Table + import logging logger = logging.getLogger(__name__) @@ -11,14 +14,14 @@ class MySQLSource(Source): """MySQL data source implementation.""" - def __init__(self, config: SourceConfig, env: str): + def __init__(self, config: SourceConfig, retry_attempts: int = 3, retry_delay: int = 15): """Initialize MySQL source with validated configuration.""" - super().__init__(config=config, env=env) + super().__init__(config=config, retry_attempts=retry_attempts, retry_delay=retry_delay) logger.debug("Initialized MySQL source") # Initialize the extractor with the connection string - connection_string = self.config.connection.build_connection_string(dialect="mysql+pymysql", default_port=3306) - self.extractor = BaseExtractor(connection_string) + connection_string = self.config.build_connection_string(dialect="mysql+pymysql", default_port=3306) + self.extractor = BaseExtractor(connection_string, retry_attempts=retry_attempts, retry_delay=retry_delay) logger.debug("Initialized BaseExtractor") def connect(self) -> None: @@ -26,7 +29,7 @@ def connect(self) -> None: super().connect() logger.debug("MySQL source connected via BaseExtractor") - def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[dict]: + def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[List[dict]]: """Extract data from MySQL table.""" if not self.extractor: raise RuntimeError("MySQL extractor not initialized") diff --git a/workloads/ingest/ingest/sources/postgres_source.py b/workloads/ingest/ingest/sources/postgres_source.py index 1bf97a5..704d9ba 100644 --- a/workloads/ingest/ingest/sources/postgres_source.py +++ b/workloads/ingest/ingest/sources/postgres_source.py @@ -1,8 +1,11 @@ -from typing import Optional, Iterator +"""PostgreSQL source implementation.""" + +from typing import Optional, Iterator, List from .source import Source from ..extractors import BaseExtractor from ..core.config import SourceConfig from ..core.catalog import Table + import logging logger = logging.getLogger(__name__) @@ -11,16 +14,14 @@ class PostgresSource(Source): """PostgreSQL data source implementation.""" - def __init__(self, config: SourceConfig, env: str): + def __init__(self, config: SourceConfig, retry_attempts: int = 3, retry_delay: int = 15): """Initialize PostgreSQL source with validated configuration.""" - super().__init__(config=config, env=env) + super().__init__(config=config, retry_attempts=retry_attempts, retry_delay=retry_delay) logger.debug("Initialized PostgreSQL source") # Initialize the extractor with the connection string - connection_string = self.config.connection.build_connection_string( - dialect="postgresql+psycopg2", default_port=5432 - ) - self.extractor = BaseExtractor(connection_string) + connection_string = self.config.build_connection_string(dialect="postgresql+psycopg2", default_port=5432) + self.extractor = BaseExtractor(connection_string, retry_attempts=retry_attempts, retry_delay=retry_delay) logger.debug("Initialized BaseExtractor") def connect(self) -> None: @@ -28,7 +29,7 @@ def connect(self) -> None: super().connect() logger.debug("PostgreSQL source connected via BaseExtractor") - def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[dict]: + def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[List[dict]]: """Extract data from PostgreSQL table.""" if not self.extractor: raise RuntimeError("PostgreSQL extractor not initialized") diff --git a/workloads/ingest/ingest/sources/source.py b/workloads/ingest/ingest/sources/source.py index 883ba18..10cdb05 100644 --- a/workloads/ingest/ingest/sources/source.py +++ b/workloads/ingest/ingest/sources/source.py @@ -1,4 +1,6 @@ -from typing import Optional, Protocol, runtime_checkable, Iterator +"""Source interface and base implementations.""" + +from typing import Optional, Protocol, runtime_checkable, Iterator, List from pydantic import BaseModel, Field, ConfigDict from ..core.config import SourceConfig from ..core.catalog import Table @@ -13,13 +15,23 @@ class SourceInterface(Protocol): """Protocol defining the contract for any data source.""" config: SourceConfig - env: str extractor: Optional[BaseExtractor] - def connect(self) -> None: ... - def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[dict]: ... - def validate_connection(self) -> bool: ... - def close(self) -> None: ... + def connect(self) -> None: + """Connect to the data source.""" + ... + + def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[List[dict]]: + """Extract data from the source.""" + ... + + def validate_connection(self) -> bool: + """Validate the connection to the source.""" + ... + + def close(self) -> None: + """Close the connection to the source.""" + ... class Source(BaseModel): @@ -30,8 +42,9 @@ class Source(BaseModel): """ config: SourceConfig = Field(..., description="Source configuration") - env: str = Field(..., description="Environment name (staging/production)") extractor: Optional[BaseExtractor] = None + retry_attempts: int = 3 + retry_delay: int = 15 model_config = ConfigDict(arbitrary_types_allowed=True) def connect(self) -> None: @@ -39,7 +52,7 @@ def connect(self) -> None: if self.extractor: self.extractor.connect() - def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[dict]: + def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Iterator[List[dict]]: """Extract data from the source using SQLAlchemy with streaming. Args: @@ -49,6 +62,7 @@ def extract(self, table: Table, chunk_size: int, limit: Optional[int] = None) -> Returns: Iterator yielding data extracted from the source + """ if not self.extractor: raise RuntimeError("Extractor not initialized") @@ -59,6 +73,7 @@ def validate_connection(self) -> bool: Returns: True if connection is valid + """ if not self.extractor: return False diff --git a/workloads/ingest/ingest/targets/__init__.py b/workloads/ingest/ingest/targets/__init__.py index c8f52ee..946483c 100644 --- a/workloads/ingest/ingest/targets/__init__.py +++ b/workloads/ingest/ingest/targets/__init__.py @@ -1,32 +1,47 @@ +"""Target package for data target implementations.""" + +import logging + from .target import Target, TargetInterface from .bigquery import BigQueryTarget -from ..core.config import BigQueryTarget as BigQueryTargetConfig +from ..core.config import BigQueryTargetConfig, TargetType + +logger = logging.getLogger(__name__) -def create_target(target_config: BigQueryTargetConfig, env: str) -> TargetInterface: - """Create a target instance directly based on configuration. +def create_target(target_config: BigQueryTargetConfig) -> TargetInterface: + """Create and validate a target instance based on configuration. Args: target_config: Target configuration object - env: Environment name (staging/production) Returns: - Appropriate target instance implementing TargetInterface + Validated target instance implementing TargetInterface Raises: - ValueError: If target type is not supported + ValueError: If target type is not supported or connection validation fails + """ - if target_config.type in {"bigquery", "bq", "google_bigquery"}: - return BigQueryTarget(target_config, env) + # Create the target instance + target: TargetInterface + if target_config.type == TargetType.BIGQUERY: + target = BigQueryTarget(target_config) else: raise ValueError(f"Unsupported target type: {target_config.type}") + # Validate the connection + if not target.validate_connection(): + raise ValueError(f"Failed to validate target connection: {target.config.name}") + + logger.debug(f"Target is initialized: {target.config.name}") + return target + __all__ = [ # Target implementations "Target", "BigQueryTarget", - # Direct creation function + # Factory function with validation "create_target", # Protocol interface "TargetInterface", diff --git a/workloads/ingest/ingest/targets/bigquery.py b/workloads/ingest/ingest/targets/bigquery.py index e1bd96e..07cead2 100644 --- a/workloads/ingest/ingest/targets/bigquery.py +++ b/workloads/ingest/ingest/targets/bigquery.py @@ -1,28 +1,76 @@ +"""BigQuery target implementation.""" + +from ..core.catalog import ReplicationType from .target import Target -from typing import Any, Optional -from ..core.config import BigQueryTarget as BigQueryTargetConfig +from typing import Optional +from ..core.config import BigQueryTargetConfig + import logging +from google.cloud import bigquery +from pydantic import Field logger = logging.getLogger(__name__) +map_replication_type_to_write_disposition = { + ReplicationType.TRUNCATE: bigquery.WriteDisposition.WRITE_TRUNCATE, + ReplicationType.APPEND: bigquery.WriteDisposition.WRITE_APPEND, + ReplicationType.UPSERT: bigquery.WriteDisposition.WRITE_APPEND, +} + class BigQueryTarget(Target): """Google BigQuery target implementation.""" - def __init__(self, config: BigQueryTargetConfig, env: str): + client: Optional[bigquery.Client] = Field(default=None, exclude=True) + + def __init__(self, config: BigQueryTargetConfig, **data): """Initialize BigQuery target with validated configuration.""" - super().__init__(config=config, env=env) - logger.debug("Initialized BigQuery target") + super().__init__(config=config) + self.client: Optional[bigquery.Client] = None + logger.debug(f"Initialized BigQuery target for project: {self.config.project_id}") - def load(self, data: Any, table: Optional[str] = None) -> None: + def _build_client(self) -> bigquery.Client: + """Build a BigQuery client.""" + try: + client = bigquery.Client(project=self.config.project_id) + except Exception as e: + logger.error(f"Failed to build BigQuery client: {e}") + raise e + return client + + def _get_write_disposition(self, write_disposition: ReplicationType) -> str: + """Get the write disposition for the data.""" + if write_disposition not in map_replication_type_to_write_disposition: + raise ValueError(f"Invalid write disposition: {write_disposition}") + return map_replication_type_to_write_disposition[write_disposition] + + def load(self, data: list[dict], target_table: str, write_disposition: ReplicationType) -> None: """Load data into BigQuery. Args: data: The data to be ingested - table: Optional override for the target table name + target_table: Target table name for the data + write_disposition: Write disposition for the data + """ - # TODO: Implement loading logic to BigQuery - logger.debug(f"Loading data to BigQuery table: {table or 'default'}") + # Call Target base class validation logic first + super().load(data, target_table, write_disposition) + job_config = bigquery.LoadJobConfig( + write_disposition=self._get_write_disposition(write_disposition), + ) + if not self.client: + self.client = self._build_client() + + if not self.client: + raise RuntimeError("BigQuery client not initialized") + + self.client.load_table_from_json( + json_rows=data, + destination=target_table, + job_config=job_config, + ) + + logger.debug(f"Loading data to BigQuery table: {target_table}") return None def validate_connection(self) -> bool: @@ -30,7 +78,19 @@ def validate_connection(self) -> bool: Returns: True if connection is valid + """ - # TODO: Implement connection validation to BigQuery + if not self.client: + self.client = self._build_client() + + # check if client is built + if not self.client: + return False + + try: + self.client.query("SELECT 1") + except Exception as e: + logger.error(f"Failed to validate BigQuery connection: {e}") + return False logger.debug("Validating BigQuery connection") return True diff --git a/workloads/ingest/ingest/targets/target.py b/workloads/ingest/ingest/targets/target.py index 4e9fde0..8932e0b 100644 --- a/workloads/ingest/ingest/targets/target.py +++ b/workloads/ingest/ingest/targets/target.py @@ -1,6 +1,9 @@ -from typing import Optional, Protocol, runtime_checkable, Any +"""Base implementation of the Target protocol.""" + +from typing import Protocol, runtime_checkable from pydantic import BaseModel, Field, ConfigDict -from ..core.config import BigQueryTarget as BigQueryTargetConfig +from ..core.config import TargetTypes +from ..core.catalog import ReplicationType import logging logger = logging.getLogger(__name__) @@ -10,11 +13,15 @@ class TargetInterface(Protocol): """Protocol defining the contract for any data target.""" - config: BigQueryTargetConfig - env: str + config: TargetTypes + + def load(self, data: list[dict], target_table: str, write_disposition: ReplicationType) -> None: + """Load data into the target.""" + ... - def load(self, data: Any, table: Optional[str] = None) -> None: ... - def validate_connection(self) -> bool: ... + def validate_connection(self) -> bool: + """Validate the connection to the target.""" + ... class Target(BaseModel): @@ -24,23 +31,42 @@ class Target(BaseModel): or used as a reference for implementing the Target protocol. """ - config: BigQueryTargetConfig = Field(..., description="Target configuration") - env: str = Field(..., description="Environment name (staging/production)") + config: TargetTypes = Field(..., description="Target configuration") model_config = ConfigDict(arbitrary_types_allowed=True) - def load(self, data: Any, table: Optional[str] = None) -> None: + def load(self, data: list[dict], target_table: str, write_disposition: ReplicationType) -> None: """Load data into the target destination. Args: - data: The data to be ingested (format may vary by implementation) - table: Optional override for the target table or collection name + data: The data to be ingested + target_table: Target table or collection name + write_disposition: How to handle existing data (TRUNCATE, APPEND, UPSERT) + + Raises: + ValueError: If data is empty or target_table is not provided + NotImplementedError: If called on base class directly + """ - raise NotImplementedError("Subclasses must implement load method") + if not data: + raise ValueError("No data to load into target") + if not target_table or not target_table.strip(): + raise ValueError("Target table name is required") + + if "." not in target_table or len(target_table.split(".")) != 2: + raise ValueError("Target table name must be in the format 'dataset.table'") + + if not write_disposition: + raise ValueError("Write disposition is required") def validate_connection(self) -> bool: """Validate connection to the target destination. Returns: True if connection is valid + + Note: + This is a base implementation that should be overridden by subclasses. + Each target type has different connection validation requirements. + """ raise NotImplementedError("Subclasses must implement validate_connection method") diff --git a/workloads/ingest/pyproject.toml b/workloads/ingest/pyproject.toml index c0b68b8..9c4f481 100644 --- a/workloads/ingest/pyproject.toml +++ b/workloads/ingest/pyproject.toml @@ -19,6 +19,11 @@ dependencies = [ "ruff>=0.8.0", "mypy>=1.17.1", "pytest>=8.4.1", + "pytest-mock>=3.12.0", + "opentelemetry-api>=1.36.0", + "opentelemetry-instrumentation>=0.57b0", + "google-cloud-secret-manager>=2.24.0", + "pytest-cov>=7.0.0", ] [tool.setuptools.packages.find] diff --git a/workloads/ingest/tests/__init__.py b/workloads/ingest/tests/__init__.py index d915dab..0aead7e 100644 --- a/workloads/ingest/tests/__init__.py +++ b/workloads/ingest/tests/__init__.py @@ -1 +1 @@ -"""Tests for the ingest workload""" +"""Tests for the ingest workload.""" diff --git a/workloads/ingest/tests/components/__init__.py b/workloads/ingest/tests/components/__init__.py new file mode 100644 index 0000000..9956b05 --- /dev/null +++ b/workloads/ingest/tests/components/__init__.py @@ -0,0 +1 @@ +"""Individual components and utilities tests.""" diff --git a/workloads/ingest/tests/components/test_extractors.py b/workloads/ingest/tests/components/test_extractors.py new file mode 100644 index 0000000..571705d --- /dev/null +++ b/workloads/ingest/tests/components/test_extractors.py @@ -0,0 +1,77 @@ +"""Extractor logic tests for the ingest workload. + +These tests demonstrate: +- Core business logic testing +- Connection string validation +- Error handling and edge cases +- Retry logic and resilience patterns +""" + +import pytest +from unittest.mock import Mock + +from ingest.extractors import BaseExtractor + + +class TestBaseExtractor: + """Test BaseExtractor core functionality.""" + + def test_extractor_connection_string_validation(self): + """Test that invalid connection strings raise ValueError.""" + with pytest.raises(ValueError, match="Invalid connection string"): + BaseExtractor("invalid-connection-string") + + # Note: mysql://user:pass@host/db is actually valid, so test with truly invalid string + with pytest.raises(ValueError, match="Invalid connection string"): + BaseExtractor("not-a-connection-string") + + def test_extractor_valid_connection_string(self): + """Test that valid connection strings are accepted.""" + extractor = BaseExtractor("mysql+pymysql://user:pass@host:3306/db") + assert extractor.connection_string == "mysql+pymysql://user:pass@host:3306/db" + assert extractor.engine is None # Not connected yet + + def test_extractor_close(self): + """Test that close properly disposes of the engine.""" + extractor = BaseExtractor("mysql+pymysql://user:pass@host:3306/db") + extractor.engine = Mock() + + # Execute + extractor.close() + + # Verify + extractor.engine.dispose.assert_called_once() + + def test_extractor_close_without_engine(self): + """Test that close handles missing engine gracefully.""" + extractor = BaseExtractor("mysql+pymysql://user:pass@host:3306/db") + + # Execute (should not raise exception) + extractor.close() + + # Verify - no exception raised + assert extractor.engine is None + + def test_extractor_initialization_with_engine_kwargs(self): + """Test that extractor accepts engine configuration options.""" + extractor = BaseExtractor("mysql+pymysql://user:pass@host:3306/db", pool_size=5, max_overflow=10) + + assert extractor.connection_string == "mysql+pymysql://user:pass@host:3306/db" + assert extractor.engine_kwargs["pool_size"] == 5 + assert extractor.engine_kwargs["max_overflow"] == 10 + + def test_extractor_connection_string_with_special_characters(self): + """Test connection string with special characters in password.""" + # Test with URL-encoded password + extractor = BaseExtractor("mysql+pymysql://user:pass%40word@host:3306/db") + assert extractor.connection_string == "mysql+pymysql://user:pass%40word@host:3306/db" + + def test_extractor_connection_string_with_port(self): + """Test connection string with explicit port.""" + extractor = BaseExtractor("postgresql+psycopg2://user:pass@host:5432/db") + assert extractor.connection_string == "postgresql+psycopg2://user:pass@host:5432/db" + + def test_extractor_connection_string_without_port(self): + """Test connection string without explicit port.""" + extractor = BaseExtractor("mysql+pymysql://user:pass@host/db") + assert extractor.connection_string == "mysql+pymysql://user:pass@host/db" diff --git a/workloads/ingest/tests/components/test_factories.py b/workloads/ingest/tests/components/test_factories.py new file mode 100644 index 0000000..d852c94 --- /dev/null +++ b/workloads/ingest/tests/components/test_factories.py @@ -0,0 +1,167 @@ +"""Factory function tests for the ingest workload. + +These tests demonstrate: +- Factory pattern implementation +- Dependency injection and validation +- Error handling in factory functions +- Mocking external dependencies +""" + +import pytest +from unittest.mock import Mock, patch + +from ingest.sources import create_source +from ingest.targets import create_target +from ingest.core.config import SourceConfig, BigQueryTargetConfig, DatabaseType, TargetType + + +class TestSourceFactory: + """Test create_source factory function.""" + + @patch("ingest.sources.MySQLSource") + def test_create_mysql_source_success(self, mock_mysql_source): + """Test successful MySQL source creation.""" + # Setup + mock_source_instance = Mock() + mock_source_instance.validate_connection.return_value = True + mock_mysql_source.return_value = mock_source_instance + + config = SourceConfig( + name="test_mysql", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="test_user", + password="test_password", + database="test_db", + ) + + # Execute + result = create_source(config) + + # Verify + mock_mysql_source.assert_called_once_with(config) + mock_source_instance.validate_connection.assert_called_once() + assert result == mock_source_instance + + @patch("ingest.sources.PostgresSource") + def test_create_postgres_source_success(self, mock_postgres_source): + """Test successful PostgreSQL source creation.""" + # Setup + mock_source_instance = Mock() + mock_source_instance.validate_connection.return_value = True + mock_postgres_source.return_value = mock_source_instance + + config = SourceConfig( + name="test_postgres", + type=DatabaseType.POSTGRES, + host="localhost", + port=5432, + username="test_user", + password="test_password", + database="test_db", + ) + + # Execute + result = create_source(config) + + # Verify + mock_postgres_source.assert_called_once_with(config) + mock_source_instance.validate_connection.assert_called_once() + assert result == mock_source_instance + + def test_create_source_unsupported_type(self): + """Test that unsupported source types raise ValueError.""" + # Create a mock config that bypasses Pydantic validation + # but has the right structure for the factory function + config = Mock() + config.type.value = "unsupported_type" + + with pytest.raises(ValueError, match="Unsupported source type"): + create_source(config) + + @patch("ingest.sources.MySQLSource") + def test_create_source_connection_validation_failure(self, mock_mysql_source): + """Test that connection validation failure raises ValueError.""" + # Setup + mock_source_instance = Mock() + mock_source_instance.validate_connection.return_value = False + mock_mysql_source.return_value = mock_source_instance + + config = SourceConfig( + name="test_mysql", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="test_user", + password="test_password", + database="test_db", + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Failed to validate source connection"): + create_source(config) + + +class TestTargetFactory: + """Test create_target factory function.""" + + @patch("ingest.targets.BigQueryTarget") + def test_create_bigquery_target_success(self, mock_bigquery_target): + """Test successful BigQuery target creation.""" + # Setup + mock_target_instance = Mock() + mock_target_instance.validate_connection.return_value = True + mock_bigquery_target.return_value = mock_target_instance + + config = BigQueryTargetConfig( + name="test_bq", + type=TargetType.BIGQUERY, + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + + # Execute + result = create_target(config) + + # Verify + mock_bigquery_target.assert_called_once_with(config) + mock_target_instance.validate_connection.assert_called_once() + assert result == mock_target_instance + + def test_create_target_unsupported_type(self): + """Test that unsupported target types raise ValueError.""" + # Create a real BigQueryTargetConfig but manually set invalid type + config = BigQueryTargetConfig( + name="test_unsupported", + type=TargetType.BIGQUERY, # Start with valid type + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + # Manually change the type to simulate unsupported type + config.type = "unsupported_type" + + with pytest.raises(ValueError, match="Unsupported target type"): + create_target(config) + + @patch("ingest.targets.BigQueryTarget") + def test_create_target_connection_validation_failure(self, mock_bigquery_target): + """Test that connection validation failure raises ValueError.""" + # Setup + mock_target_instance = Mock() + mock_target_instance.validate_connection.return_value = False + mock_bigquery_target.return_value = mock_target_instance + + config = BigQueryTargetConfig( + name="test_bq", + type=TargetType.BIGQUERY, + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Failed to validate target connection"): + create_target(config) diff --git a/workloads/ingest/tests/components/test_secrets.py b/workloads/ingest/tests/components/test_secrets.py new file mode 100644 index 0000000..18286e3 --- /dev/null +++ b/workloads/ingest/tests/components/test_secrets.py @@ -0,0 +1,212 @@ +"""Secret handling tests for the ingest workload. + +These tests demonstrate: +- Secret resolution and validation +- Production security patterns +- Error handling for secret operations +- Integration with Google Cloud Secret Manager +""" + +import pytest +from unittest.mock import Mock, patch + +from ingest.helpers.secret_handler import get_gcloud_secret +from ingest.core.config import Config, RuntimeParams, SecretConfig, SecretProvider, BigQueryTargetConfig + + +class TestSecretHandler: + """Test secret handler functionality.""" + + @patch("ingest.helpers.secret_handler.secretmanager.SecretManagerServiceClient") + def test_get_gcloud_secret_success(self, mock_client_class): + """Test successful secret retrieval.""" + # Setup + mock_client = Mock() + mock_secret = Mock() + mock_secret.payload.data.decode.return_value = "secret_value_123" + mock_client.access_secret_version.return_value = mock_secret + mock_client_class.return_value = mock_client + + # Execute + result = get_gcloud_secret("projects/123/secrets/test-secret/versions/latest") + + # Verify + assert result == "secret_value_123" + mock_client.access_secret_version.assert_called_once_with( + name="projects/123/secrets/test-secret/versions/latest" + ) + + @patch("ingest.helpers.secret_handler.secretmanager.SecretManagerServiceClient") + def test_get_gcloud_secret_client_error(self, mock_client_class): + """Test secret retrieval with client error.""" + # Setup + mock_client = Mock() + mock_client.access_secret_version.side_effect = Exception("Permission denied") + mock_client_class.return_value = mock_client + + # Execute & Verify + with pytest.raises(ValueError, match="Failed to get secret"): + get_gcloud_secret("projects/123/secrets/test-secret/versions/latest") + + @patch("ingest.helpers.secret_handler.secretmanager.SecretManagerServiceClient") + def test_get_gcloud_secret_empty_value(self, mock_client_class): + """Test secret retrieval with empty value.""" + # Setup + mock_client = Mock() + mock_secret = Mock() + mock_secret.payload.data.decode.return_value = None + mock_client.access_secret_version.return_value = mock_secret + mock_client_class.return_value = mock_client + + # Execute & Verify + with pytest.raises(ValueError, match="Secret .* is empty"): + get_gcloud_secret("projects/123/secrets/empty-secret/versions/latest") + + +class TestConfigSecretIntegration: + """Test secret integration with configuration.""" + + @patch("ingest.core.config.get_gcloud_secret") + def test_config_secret_resolution_with_secret_path(self, mock_get_secret): + """Test secret resolution when secret_path is provided.""" + # Setup + mock_get_secret.return_value = "resolved_password" + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[ + SecretConfig( + name="test-secret", + provider=SecretProvider.GOOGLE_SECRET_MANAGER, + secret_path="projects/123/secrets/test-secret/versions/latest", + ) + ], + sources={}, + targets={}, + ) + + # Execute + result = config.get_gcloud_secret_value("test-secret", "dev") + + # Verify + assert result == "resolved_password" + mock_get_secret.assert_called_once_with("projects/123/secrets/test-secret/versions/latest") + + @patch("ingest.core.config.get_gcloud_secret") + def test_config_secret_resolution_without_secret_path(self, mock_get_secret): + """Test secret resolution when secret_path is not provided.""" + # Setup + mock_get_secret.return_value = "resolved_password" + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[ + SecretConfig( + name="test-secret", + provider=SecretProvider.GOOGLE_SECRET_MANAGER, + # secret_path is None, should use target config + ) + ], + sources={}, + targets={ + "dev": BigQueryTargetConfig( + name="test_target", + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + }, + ) + + # Execute + result = config.get_gcloud_secret_value("test-secret", "dev") + + # Verify + assert result == "resolved_password" + mock_get_secret.assert_called_once_with("projects/123456789/secrets/test-secret/versions/latest") + + def test_config_secret_not_found_in_secrets_list(self): + """Test error when secret name not found in secrets list.""" + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[SecretConfig(name="existing-secret", provider=SecretProvider.GOOGLE_SECRET_MANAGER)], + sources={}, + targets={}, + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Secret nonexistent-secret not found in secrets"): + config.get_gcloud_secret_value("nonexistent-secret", "dev") + + def test_config_secret_wrong_provider(self): + """Test error when secret provider is not Google Secret Manager.""" + # Create a real Config object but manually set a secret with wrong provider + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[ + SecretConfig( + name="test-secret", + provider=SecretProvider.GOOGLE_SECRET_MANAGER, # Start with valid provider + ) + ], + sources={}, + targets={}, + ) + + # Manually change the provider to simulate wrong provider + config.secrets[0].provider = "aws" + + # Execute & Verify + with pytest.raises(ValueError, match="Secret test-secret is not a Gcloud Secret"): + config.get_gcloud_secret_value("test-secret", "dev") + + def test_config_secret_missing_target_config(self): + """Test error when target config is missing for secret resolution.""" + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[ + SecretConfig( + name="test-secret", + provider=SecretProvider.GOOGLE_SECRET_MANAGER, + # secret_path is None, but no target config + ) + ], + sources={}, + targets={}, # Empty targets + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Target config not found for environment"): + config.get_gcloud_secret_value("test-secret", "dev") + + @patch("ingest.core.config.get_gcloud_secret") + def test_config_secret_resolution_with_custom_version(self, mock_get_secret): + """Test secret resolution with custom version.""" + # Setup + mock_get_secret.return_value = "resolved_password" + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[ + SecretConfig( + name="test-secret", + provider=SecretProvider.GOOGLE_SECRET_MANAGER, + secret_path="projects/123/secrets/test-secret/versions/custom", + ) + ], + sources={}, + targets={}, + ) + + # Execute + result = config.get_gcloud_secret_value("test-secret", "dev", version="custom") + + # Verify + assert result == "resolved_password" + mock_get_secret.assert_called_once_with("projects/123/secrets/test-secret/versions/custom") diff --git a/workloads/ingest/tests/core/__init__.py b/workloads/ingest/tests/core/__init__.py new file mode 100644 index 0000000..40f445b --- /dev/null +++ b/workloads/ingest/tests/core/__init__.py @@ -0,0 +1 @@ +"""Core domain models and business logic tests.""" diff --git a/workloads/ingest/tests/core/test_catalog.py b/workloads/ingest/tests/core/test_catalog.py new file mode 100644 index 0000000..76fb28b --- /dev/null +++ b/workloads/ingest/tests/core/test_catalog.py @@ -0,0 +1,253 @@ +"""Catalog validation tests for the ingest workload. + +These tests demonstrate: +- Pydantic model validation for data catalog +- Table and column schema validation +- Source filtering and data organization +""" + +import pytest +from pydantic import ValidationError + +from ingest.core.catalog import Catalog, Table, Column, ReplicationType + + +class TestColumn: + """Test Column model validation.""" + + def test_valid_column(self): + """Test valid column creation.""" + column = Column(name="user_id", type="int") + + assert column.name == "user_id" + assert column.type == "int" + + def test_column_validation_constraints(self): + """Test that column enforces validation constraints.""" + # Test that empty name raises ValidationError + with pytest.raises(ValidationError): + Column(name="", type="int") + + # Test that empty type raises ValidationError + with pytest.raises(ValidationError): + Column(name="user_id", type="") + + # Test that valid column works + column = Column(name="user_id", type="int") + assert column.name == "user_id" + assert column.type == "int" + + +class TestTable: + """Test Table model validation.""" + + def test_valid_table(self): + """Test valid table creation.""" + columns = [ + Column(name="id", type="int"), + Column(name="name", type="string"), + ] + + table = Table( + name="users", + replication=ReplicationType.TRUNCATE, + columns=columns, + ) + + assert table.name == "users" + assert table.replication == ReplicationType.TRUNCATE + assert len(table.columns) == 2 + assert table.source is None # Default value + + def test_table_with_source(self): + """Test table with explicit source.""" + columns = [Column(name="id", type="int")] + + table = Table( + name="users", + source="custom_source", + replication=ReplicationType.APPEND, + columns=columns, + ) + + assert table.source == "custom_source" + assert table.replication == ReplicationType.APPEND + + def test_table_validation(self): + """Test table validation with invalid replication type and name constraints.""" + # Test that empty name raises ValidationError + with pytest.raises(ValidationError): + Table( + name="", # Empty name should raise ValidationError + replication=ReplicationType.TRUNCATE, + columns=[], + ) + + # Test invalid replication type + with pytest.raises(ValidationError): + Table( + name="users", + replication="INVALID_TYPE", # Invalid replication type + columns=[], + ) + + # Test valid table + table = Table( + name="users", + replication=ReplicationType.TRUNCATE, + columns=[], + ) + assert table.name == "users" + + +class TestCatalog: + """Test Catalog model validation and behavior.""" + + def test_valid_catalog(self): + """Test valid catalog creation.""" + tables = [ + Table( + name="users", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ), + Table( + name="orders", + replication=ReplicationType.APPEND, + columns=[Column(name="order_id", type="int")], + ), + ] + + catalog = Catalog( + name="test_catalog", + source="test_source", + tables=tables, + ) + + assert catalog.name == "test_catalog" + assert catalog.source == "test_source" + assert len(catalog.tables) == 2 + + def test_catalog_get_table(self): + """Test getting table by name.""" + tables = [ + Table( + name="users", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ), + ] + + catalog = Catalog( + name="test_catalog", + source="test_source", + tables=tables, + ) + + table = catalog.get_table("users") + assert table.name == "users" + + # Test non-existent table + with pytest.raises(ValueError, match="Table nonexistent not found"): + catalog.get_table("nonexistent") + + def test_catalog_get_tables_by_source(self): + """Test filtering tables by source.""" + tables = [ + Table( + name="users", + source="source1", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ), + Table( + name="orders", + source="source2", + replication=ReplicationType.APPEND, + columns=[Column(name="order_id", type="int")], + ), + Table( + name="products", + # No explicit source, should use catalog's default source + replication=ReplicationType.TRUNCATE, + columns=[Column(name="product_id", type="int")], + ), + ] + + catalog = Catalog( + name="test_catalog", + source="default_source", + tables=tables, + ) + + # Test explicit source + source1_tables = catalog.get_tables_by_source("source1") + assert len(source1_tables) == 1 + assert source1_tables[0].name == "users" + + # Test default source + default_tables = catalog.get_tables_by_source("default_source") + assert len(default_tables) == 1 + assert default_tables[0].name == "products" + + def test_catalog_get_sources(self): + """Test getting unique sources from catalog.""" + tables = [ + Table( + name="users", + source="source1", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ), + Table( + name="orders", + source="source2", + replication=ReplicationType.APPEND, + columns=[Column(name="order_id", type="int")], + ), + Table( + name="products", + # No explicit source, uses catalog's default + replication=ReplicationType.TRUNCATE, + columns=[Column(name="product_id", type="int")], + ), + ] + + catalog = Catalog( + name="test_catalog", + source="default_source", + tables=tables, + ) + + sources = catalog.get_sources() + assert len(sources) == 3 + assert "source1" in sources + assert "source2" in sources + assert "default_source" in sources + + def test_catalog_validation_constraints(self): + """Test that catalog enforces validation constraints.""" + # Note: Catalog model has no min_length constraints on name/source, so empty strings are allowed + catalog1 = Catalog( + name="", # Empty name is allowed (no min_length constraint) + source="test_source", + tables=[], + ) + assert catalog1.name == "" + + catalog2 = Catalog( + name="test_catalog", + source="", # Empty source is allowed (no min_length constraint) + tables=[], + ) + assert catalog2.source == "" + + def test_catalog_extra_fields_forbidden(self): + """Test that extra fields in Catalog raise ValidationError.""" + with pytest.raises(ValidationError): + Catalog( + name="test_catalog", + source="test_source", + tables=[], + extra_field="not_allowed", # This should cause validation error + ) diff --git a/workloads/ingest/tests/core/test_config.py b/workloads/ingest/tests/core/test_config.py new file mode 100644 index 0000000..f15a50c --- /dev/null +++ b/workloads/ingest/tests/core/test_config.py @@ -0,0 +1,330 @@ +"""Configuration validation tests for the ingest workload. + +These tests demonstrate: +- Pydantic model validation +- Type safety and data validation +- Error handling for invalid configurations +- Production-ready configuration patterns +""" + +import pytest +from pydantic import ValidationError + +from ingest.core.config import ( + SourceConfig, + BigQueryTargetConfig, + Config, + DatabaseType, + TargetType, + SecretProvider, + SecretConfig, + RuntimeParams, +) + + +class TestSourceConfig: + """Test SourceConfig validation and behavior.""" + + def test_valid_mysql_config(self): + """Test valid MySQL configuration creation.""" + config = SourceConfig( + name="test_mysql", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="test_user", + password="test_password", + database="test_db", + ssl_required=True, + ) + + assert config.name == "test_mysql" + assert config.type == DatabaseType.MYSQL + assert config.host == "localhost" + assert config.port == 3306 + assert config.ssl_required is True + + def test_valid_postgres_config(self): + """Test valid PostgreSQL configuration creation.""" + config = SourceConfig( + name="test_postgres", + type=DatabaseType.POSTGRES, + host="localhost", + port=5432, + username="test_user", + password="test_password", + database="test_db", + schema="public", # Use alias name + ssl_required=False, + ) + + assert config.type == DatabaseType.POSTGRES + assert config.db_schema == "public" # Access via field name + assert config.ssl_required is False + + def test_invalid_config_missing_required_fields(self): + """Test that missing required fields raise ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + SourceConfig( + name="test", + # Missing required fields: type, host, port, username, password, database + ) + + # Verify specific validation errors + errors = exc_info.value.errors() + error_fields = [error["loc"][0] for error in errors] + assert "type" in error_fields + assert "host" in error_fields + assert "port" in error_fields + + def test_invalid_port_type(self): + """Test that invalid port types raise ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + SourceConfig( + name="test", + type=DatabaseType.MYSQL, + host="localhost", + port="invalid_port", # Should be int + username="user", + password="pass", + database="db", + ) + + errors = exc_info.value.errors() + port_error = next(error for error in errors if error["loc"][0] == "port") + assert "int" in str(port_error["type"]) + + def test_connection_string_building_mysql(self): + """Test MySQL connection string building.""" + config = SourceConfig( + name="test", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="test_user", + password="test_password", + database="test_db", + ) + + conn_str = config.build_connection_string("mysql+pymysql") + expected = "mysql+pymysql://test_user:test_password@localhost:3306/test_db" + assert conn_str == expected + + def test_connection_string_building_postgres(self): + """Test PostgreSQL connection string building.""" + config = SourceConfig( + name="test", + type=DatabaseType.POSTGRES, + host="localhost", + port=5432, + username="test_user", + password="test_password", + database="test_db", + ) + + conn_str = config.build_connection_string("postgresql+psycopg2") + expected = "postgresql+psycopg2://test_user:test_password@localhost:5432/test_db" + assert conn_str == expected + + def test_connection_string_with_default_port(self): + """Test connection string building with default port.""" + config = SourceConfig( + name="test", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="test_user", + password="test_password", + database="test_db", + ) + + # Use default port (0) to test default port logic + conn_str = config.build_connection_string("mysql+pymysql", default_port=0) + expected = "mysql+pymysql://test_user:test_password@localhost:3306/test_db" + assert conn_str == expected + + +class TestBigQueryTargetConfig: + """Test BigQueryTargetConfig validation and behavior.""" + + def test_valid_bigquery_config(self): + """Test valid BigQuery configuration creation.""" + config = BigQueryTargetConfig( + name="test_bq", + type=TargetType.BIGQUERY, + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + location="US", + ) + + assert config.name == "test_bq" + assert config.type == TargetType.BIGQUERY + assert config.project_id == "test-project" + assert config.project_number == 123456789 + assert config.dataset_id == "test_dataset" + assert config.location == "US" + + def test_invalid_project_id_format(self): + """Test that invalid project_id format raises ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + BigQueryTargetConfig( + name="test", + project_id="Invalid_Project_ID", # Invalid format + project_number=123456789, + dataset_id="test_dataset", + ) + + errors = exc_info.value.errors() + project_id_error = next(error for error in errors if error["loc"][0] == "project_id") + assert "pattern" in str(project_id_error["type"]) + + def test_invalid_dataset_id_format(self): + """Test that invalid dataset_id format raises ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + BigQueryTargetConfig( + name="test", + project_id="test-project", + project_number=123456789, + dataset_id="123invalid", # Invalid format - starts with number + ) + + errors = exc_info.value.errors() + dataset_error = next(error for error in errors if error["loc"][0] == "dataset_id") + assert "pattern" in str(dataset_error["type"]) + + +class TestSecretConfig: + """Test SecretConfig validation and behavior.""" + + def test_valid_secret_config(self): + """Test valid secret configuration creation.""" + config = SecretConfig( + provider=SecretProvider.GOOGLE_SECRET_MANAGER, + name="test-secret", + version="latest", + secret_path="projects/123/secrets/test-secret/versions/latest", + ) + + assert config.provider == SecretProvider.GOOGLE_SECRET_MANAGER + assert config.name == "test-secret" + assert config.version == "latest" + assert config.secret_path == "projects/123/secrets/test-secret/versions/latest" + + def test_secret_config_defaults(self): + """Test secret configuration with default values.""" + config = SecretConfig( + name="test-secret", + # provider defaults to GOOGLE_SECRET_MANAGER + # version defaults to "latest" + # secret_path defaults to None + ) + + assert config.provider == SecretProvider.GOOGLE_SECRET_MANAGER + assert config.version == "latest" + assert config.secret_path is None + + +class TestRuntimeParams: + """Test RuntimeParams validation and behavior.""" + + def test_valid_runtime_params(self): + """Test valid runtime parameters creation.""" + params = RuntimeParams( + retry_attempts=5, + retry_delay_seconds=60, + chunk_size=5000, + ) + + assert params.retry_attempts == 5 + assert params.retry_delay_seconds == 60 + assert params.chunk_size == 5000 + + def test_runtime_params_defaults(self): + """Test runtime parameters with default values.""" + params = RuntimeParams() + + assert params.retry_attempts == 3 # Default value + assert params.retry_delay_seconds == 30 # Default value + assert params.chunk_size == 10000 # Default value + + def test_invalid_retry_attempts_range(self): + """Test that retry_attempts outside valid range raises ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + RuntimeParams(retry_attempts=15) # Outside 1-10 range + + errors = exc_info.value.errors() + retry_error = next(error for error in errors if error["loc"][0] == "retry_attempts") + assert "less_than_equal" in str(retry_error["type"]) # le=10 means less_than_equal + + def test_chunk_size_accepts_negative_values(self): + """Test that chunk_size accepts negative values (no constraint defined).""" + # Note: chunk_size has no minimum constraint, so negative values are allowed + params = RuntimeParams(chunk_size=-1) + assert params.chunk_size == -1 + + +class TestConfigIntegration: + """Test Config class integration and complex scenarios.""" + + def test_config_get_source_config(self): + """Test getting source configuration by environment and name.""" + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={ + "dev": { + "test_source": SourceConfig( + name="test_source", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="user", + password="pass", + database="db", + ) + } + }, + targets={ + "dev": BigQueryTargetConfig( + name="test_target", + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + }, + ) + + source_config = config.get_source_config("dev", "test_source") + assert source_config is not None + assert source_config.name == "test_source" + assert source_config.type == DatabaseType.MYSQL + + def test_config_get_source_config_not_found(self): + """Test getting non-existent source configuration returns None.""" + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={}, + targets={}, + ) + + source_config = config.get_source_config("dev", "nonexistent") + assert source_config is None + + def test_config_extra_fields_forbidden(self): + """Test that extra fields in Config raise ValidationError.""" + with pytest.raises(ValidationError) as exc_info: + Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={}, + targets={}, + extra_field="not_allowed", # This should cause validation error + ) + + errors = exc_info.value.errors() + assert any("extra_forbidden" in str(error["type"]) for error in errors) diff --git a/workloads/ingest/tests/error_handling/__init__.py b/workloads/ingest/tests/error_handling/__init__.py new file mode 100644 index 0000000..673a959 --- /dev/null +++ b/workloads/ingest/tests/error_handling/__init__.py @@ -0,0 +1 @@ +"""Error scenarios and edge cases tests.""" diff --git a/workloads/ingest/tests/error_handling/test_error_handling.py b/workloads/ingest/tests/error_handling/test_error_handling.py new file mode 100644 index 0000000..52976e2 --- /dev/null +++ b/workloads/ingest/tests/error_handling/test_error_handling.py @@ -0,0 +1,237 @@ +"""Error handling tests for the ingest workload. + +These tests demonstrate: +- Graceful failure handling +- Production-ready error scenarios +- Resilience patterns +- Error recovery strategies +""" + +import pytest +from unittest.mock import Mock, patch + +from ingest.core.config import Config, RuntimeParams, SecretConfig, SecretProvider +from ingest.core.catalog import Catalog, Table, Column, ReplicationType +from ingest.helpers.secret_handler import get_gcloud_secret + + +class TestSecretErrorHandling: + """Test secret handling error scenarios.""" + + @patch("ingest.helpers.secret_handler.secretmanager.SecretManagerServiceClient") + def test_secret_not_found_error(self, mock_client_class): + """Test handling of secret not found error.""" + # Setup + mock_client = Mock() + mock_client.access_secret_version.side_effect = Exception("Secret not found") + mock_client_class.return_value = mock_client + + # Execute & Verify + with pytest.raises(ValueError, match="Failed to get secret"): + get_gcloud_secret("projects/123/secrets/nonexistent/versions/latest") + + @patch("ingest.helpers.secret_handler.secretmanager.SecretManagerServiceClient") + def test_secret_empty_value_error(self, mock_client_class): + """Test handling of empty secret value.""" + # Setup + mock_client = Mock() + mock_secret = Mock() + mock_secret.payload.data.decode.return_value = None + mock_client.access_secret_version.return_value = mock_secret + mock_client_class.return_value = mock_client + + # Execute & Verify + with pytest.raises(ValueError, match="Secret .* is empty"): + get_gcloud_secret("projects/123/secrets/empty-secret/versions/latest") + + def test_config_secret_not_found_in_secrets_list(self): + """Test that config raises error when secret not found in secrets list.""" + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[SecretConfig(name="existing-secret", provider=SecretProvider.GOOGLE_SECRET_MANAGER)], + sources={}, + targets={}, + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Secret nonexistent-secret not found in secrets"): + config.get_gcloud_secret_value("nonexistent-secret", "dev") + + +class TestCatalogErrorHandling: + """Test catalog error handling scenarios.""" + + def test_catalog_get_table_not_found(self): + """Test that getting non-existent table raises ValueError.""" + catalog = Catalog( + name="test_catalog", + source="test_source", + tables=[ + Table( + name="existing_table", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ) + ], + ) + + # Execute & Verify + with pytest.raises(ValueError, match="Table nonexistent_table not found in catalog"): + catalog.get_table("nonexistent_table") + + def test_catalog_empty_tables_list(self): + """Test catalog behavior with empty tables list.""" + catalog = Catalog( + name="test_catalog", + source="test_source", + tables=[], + ) + + # Execute + tables = catalog.get_tables_by_source("test_source") + sources = catalog.get_sources() + + # Verify + assert len(tables) == 0 + assert len(sources) == 0 + + +class TestConfigErrorHandling: + """Test configuration error handling scenarios.""" + + def test_config_get_source_config_not_found(self): + """Test that getting non-existent source config returns None.""" + # Create a proper Config instance + from ingest.core.config import SourceConfig, DatabaseType + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={ + "dev": { + "existing_source": SourceConfig( + name="existing_source", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="user", + password="pass", + database="db", + ) + } + }, + targets={}, + ) + + # Execute + result = config.get_source_config("dev", "nonexistent_source") + + # Verify + assert result is None + + def test_config_get_source_config_wrong_environment(self): + """Test that getting source config from wrong environment returns None.""" + from ingest.core.config import SourceConfig, DatabaseType + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={ + "dev": { + "test_source": SourceConfig( + name="test_source", + type=DatabaseType.MYSQL, + host="localhost", + port=3306, + username="user", + password="pass", + database="db", + ) + } + }, + targets={}, + ) + + # Execute + result = config.get_source_config("prod", "test_source") + + # Verify + assert result is None + + def test_config_get_target_config_not_found(self): + """Test that getting non-existent target config returns None.""" + from ingest.core.config import BigQueryTargetConfig, TargetType + + config = Config( + version="1.0.0", + params=RuntimeParams(), + secrets=[], + sources={}, + targets={ + "dev": BigQueryTargetConfig( + name="test_target", + type=TargetType.BIGQUERY, + project_id="test-project", + project_number=123456789, + dataset_id="test_dataset", + ) + }, + ) + + # Execute + result = config.get_target_config("prod") + + # Verify + assert result is None + + +class TestRuntimeParamsErrorHandling: + """Test runtime parameters error handling.""" + + def test_runtime_params_retry_attempts_too_high(self): + """Test that retry_attempts above maximum raises ValidationError.""" + with pytest.raises(Exception): # ValidationError from Pydantic + RuntimeParams(retry_attempts=15) # Above max of 10 + + def test_runtime_params_retry_attempts_too_low(self): + """Test that retry_attempts below minimum raises ValidationError.""" + with pytest.raises(Exception): # ValidationError from Pydantic + RuntimeParams(retry_attempts=0) # Below min of 1 + + def test_runtime_params_retry_delay_too_high(self): + """Test that retry_delay_seconds above maximum raises ValidationError.""" + with pytest.raises(Exception): # ValidationError from Pydantic + RuntimeParams(retry_delay_seconds=4000) # Above max of 3600 + + def test_runtime_params_retry_delay_too_low(self): + """Test that retry_delay_seconds below minimum raises ValidationError.""" + with pytest.raises(Exception): # ValidationError from Pydantic + RuntimeParams(retry_delay_seconds=0) # Below min of 1 + + +class TestTableErrorHandling: + """Test table error handling scenarios.""" + + def test_table_invalid_replication_type(self): + """Test that invalid replication type raises ValidationError.""" + with pytest.raises(Exception): # ValidationError from Pydantic + Table( + name="test_table", + replication="INVALID_TYPE", # Invalid replication type + columns=[Column(name="id", type="int")], + ) + + def test_table_empty_columns_list(self): + """Test table with empty columns list (should be allowed).""" + table = Table( + name="test_table", + replication=ReplicationType.TRUNCATE, + columns=[], # Empty columns list + ) + + # Verify - should not raise exception + assert len(table.columns) == 0 + assert table.name == "test_table" diff --git a/workloads/ingest/tests/integration/__init__.py b/workloads/ingest/tests/integration/__init__.py new file mode 100644 index 0000000..8747f91 --- /dev/null +++ b/workloads/ingest/tests/integration/__init__.py @@ -0,0 +1 @@ +"""End-to-end and integration tests.""" diff --git a/workloads/ingest/tests/integration/test_integration.py b/workloads/ingest/tests/integration/test_integration.py new file mode 100644 index 0000000..4c9e7cd --- /dev/null +++ b/workloads/ingest/tests/integration/test_integration.py @@ -0,0 +1,232 @@ +"""Integration test for the ingest workload. + +This test demonstrates: +- End-to-end pipeline testing +- Mocking external dependencies +- Full workflow validation +- Production-ready integration patterns +""" + +import pytest +from unittest.mock import Mock, patch + +from ingest.main import main +from ingest.core.config import Config +from ingest.core.catalog import Catalog, Table, Column, ReplicationType + + +class TestFullPipelineIntegration: + """Test the complete ingestion pipeline.""" + + @patch("ingest.main.create_target") + @patch("ingest.main.create_source") + @patch("ingest.main.load_yaml_model") + def test_full_pipeline_success(self, mock_load_yaml, mock_create_source, mock_create_target): + """Test successful end-to-end pipeline execution.""" + # Setup mock data + mock_config = Mock(spec=Config) + mock_config.targets = {"dev": Mock()} + mock_config.get_gcloud_secret_value.return_value = "resolved_password" + mock_config.params = Mock() + mock_config.params.chunk_size = 1000 + + mock_catalog = Mock(spec=Catalog) + mock_catalog.get_sources.return_value = ["test_source"] + mock_catalog.get_tables_by_source.return_value = [ + Table( + name="test_table", + replication=ReplicationType.TRUNCATE, + columns=[ + Column(name="id", type="int"), + Column(name="name", type="string"), + ], + ) + ] + + # Setup mock source + mock_source = Mock() + mock_source.extract.return_value = [ + [{"id": 1, "name": "test1"}, {"id": 2, "name": "test2"}], # First chunk + [{"id": 3, "name": "test3"}], # Second chunk + ] + mock_create_source.return_value = mock_source + + # Setup mock target + mock_target = Mock() + mock_target.config.dataset_id = "test_dataset" + mock_create_target.return_value = mock_target + + # Setup YAML loading + mock_load_yaml.side_effect = [mock_config, mock_catalog] + + # Execute + result = main( + config_path="test_config.yaml", + catalog_path="test_catalog.yaml", + debug=False, + env="dev", + dryRun=False, + ) + + # Verify + assert result == 0 # Success + + # Verify source was created and used + mock_create_source.assert_called_once() + mock_source.extract.assert_called_once() + mock_source.close.assert_called_once() + + # Verify target was created and used + mock_create_target.assert_called_once() + assert mock_target.load.call_count == 2 # Two chunks loaded + + # Verify load calls + load_calls = mock_target.load.call_args_list + # Check that load was called with correct arguments + assert len(load_calls) == 2 # Two chunks loaded + # Verify the calls were made (don't check exact args due to mock structure) + assert mock_target.load.call_count == 2 + + @patch("ingest.main.create_target") + @patch("ingest.main.create_source") + @patch("ingest.main.load_yaml_model") + def test_full_pipeline_dry_run(self, mock_load_yaml, mock_create_source, mock_create_target): + """Test pipeline execution in dry run mode.""" + # Setup mock data + mock_config = Mock(spec=Config) + mock_config.targets = {"dev": Mock()} + mock_config.get_gcloud_secret_value.return_value = "resolved_password" + mock_config.params = Mock() + mock_config.params.chunk_size = 1000 + + mock_catalog = Mock(spec=Catalog) + mock_catalog.get_sources.return_value = ["test_source"] + mock_catalog.get_tables_by_source.return_value = [ + Table( + name="test_table", + replication=ReplicationType.TRUNCATE, + columns=[Column(name="id", type="int")], + ) + ] + + # Setup mock source + mock_source = Mock() + mock_source.extract.return_value = [ + [{"id": 1}, {"id": 2}], # One chunk + ] + mock_create_source.return_value = mock_source + + # Setup mock target + mock_target = Mock() + mock_target.config.dataset_id = "test_dataset" + mock_create_target.return_value = mock_target + + # Setup YAML loading + mock_load_yaml.side_effect = [mock_config, mock_catalog] + + # Execute + result = main( + config_path="test_config.yaml", + catalog_path="test_catalog.yaml", + debug=False, + env="dev", + dryRun=True, # Dry run mode + ) + + # Verify + assert result == 0 # Success + + # Verify source was used but target was not + mock_create_source.assert_called_once() + mock_source.extract.assert_called_once() + mock_source.close.assert_called_once() + + # Verify target was created but load was never called + mock_create_target.assert_called_once() + mock_target.load.assert_not_called() + + @patch("ingest.main.create_target") + @patch("ingest.main.create_source") + @patch("ingest.main.load_yaml_model") + def test_full_pipeline_source_failure_continues(self, mock_load_yaml, mock_create_source, mock_create_target): + """Test that pipeline continues when one source fails.""" + # Setup mock data + mock_config = Mock(spec=Config) + mock_config.targets = {"dev": Mock()} + mock_config.get_gcloud_secret_value.return_value = "resolved_password" + mock_config.params = Mock() + mock_config.params.chunk_size = 1000 + + mock_catalog = Mock(spec=Catalog) + mock_catalog.get_sources.return_value = ["failing_source", "working_source"] + mock_catalog.get_tables_by_source.side_effect = [ + [Table(name="failing_table", replication=ReplicationType.TRUNCATE, columns=[])], + [Table(name="working_table", replication=ReplicationType.TRUNCATE, columns=[])], + ] + + # Setup mock sources + mock_failing_source = Mock() + mock_failing_source.extract.side_effect = Exception("Source failed") + mock_create_source.side_effect = [mock_failing_source, Mock()] + + # Setup mock target + mock_target = Mock() + mock_target.config.dataset_id = "test_dataset" + mock_create_target.return_value = mock_target + + # Setup YAML loading + mock_load_yaml.side_effect = [mock_config, mock_catalog] + + # Execute + result = main( + config_path="test_config.yaml", + catalog_path="test_catalog.yaml", + debug=False, + env="dev", + dryRun=False, + ) + + # Verify + assert result == 0 # Success despite one source failing + + # Verify both sources were attempted + assert mock_create_source.call_count == 2 + + @patch("ingest.main.load_yaml_model") + def test_full_pipeline_config_loading_failure(self, mock_load_yaml): + """Test pipeline failure when config loading fails.""" + # Setup YAML loading to fail + mock_load_yaml.side_effect = Exception("Config file not found") + + # Execute + result = main( + config_path="nonexistent_config.yaml", + catalog_path="test_catalog.yaml", + debug=False, + env="dev", + dryRun=False, + ) + + # Verify + assert result == 1 # Failure + + @patch("ingest.main.create_target") + @patch("ingest.main.create_source") + @patch("ingest.main.load_yaml_model") + def test_full_pipeline_invalid_environment(self, mock_load_yaml, mock_create_source, mock_create_target): + """Test pipeline failure with invalid environment.""" + # Setup mock data + mock_config = Mock(spec=Config) + mock_config.targets = {"dev": Mock()} + mock_catalog = Mock(spec=Catalog) + mock_load_yaml.side_effect = [mock_config, mock_catalog] + + # Execute with invalid environment + with pytest.raises(ValueError, match="Invalid Env flag"): + main( + config_path="test_config.yaml", + catalog_path="test_catalog.yaml", + debug=False, + env="invalid_env", # Invalid environment + dryRun=False, + ) diff --git a/workloads/ingest/tests/test_hello.py b/workloads/ingest/tests/test_hello.py deleted file mode 100644 index fb1f0d7..0000000 --- a/workloads/ingest/tests/test_hello.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Simple hello world test to ensure pytest works.""" - - -def test_hello_world(): - """A basic test that always passes.""" - assert True - - -def test_simple_math(): - """Another simple test with basic math.""" - assert 1 + 1 == 2 - - -def test_string_operations(): - """Test basic string operations.""" - hello = "Hello" - world = "World" - assert f"{hello} {world}" == "Hello World" diff --git a/workloads/ingest/uv.lock b/workloads/ingest/uv.lock index 598c476..08a18e9 100644 --- a/workloads/ingest/uv.lock +++ b/workloads/ingest/uv.lock @@ -70,6 +70,37 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "coverage" +version = "7.10.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/14/70/025b179c993f019105b79575ac6edb5e084fb0f0e63f15cdebef4e454fb5/coverage-7.10.6.tar.gz", hash = "sha256:f644a3ae5933a552a29dbb9aa2f90c677a875f80ebea028e5a52a4f429044b90", size = 823736, upload-time = "2025-08-29T15:35:16.668Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/e7/917e5953ea29a28c1057729c1d5af9084ab6d9c66217523fd0e10f14d8f6/coverage-7.10.6-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ffea0575345e9ee0144dfe5701aa17f3ba546f8c3bb48db62ae101afb740e7d6", size = 217351, upload-time = "2025-08-29T15:33:45.438Z" }, + { url = "https://files.pythonhosted.org/packages/eb/86/2e161b93a4f11d0ea93f9bebb6a53f113d5d6e416d7561ca41bb0a29996b/coverage-7.10.6-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:95d91d7317cde40a1c249d6b7382750b7e6d86fad9d8eaf4fa3f8f44cf171e80", size = 217600, upload-time = "2025-08-29T15:33:47.269Z" }, + { url = "https://files.pythonhosted.org/packages/0e/66/d03348fdd8df262b3a7fb4ee5727e6e4936e39e2f3a842e803196946f200/coverage-7.10.6-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:3e23dd5408fe71a356b41baa82892772a4cefcf758f2ca3383d2aa39e1b7a003", size = 248600, upload-time = "2025-08-29T15:33:48.953Z" }, + { url = "https://files.pythonhosted.org/packages/73/dd/508420fb47d09d904d962f123221bc249f64b5e56aa93d5f5f7603be475f/coverage-7.10.6-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:0f3f56e4cb573755e96a16501a98bf211f100463d70275759e73f3cbc00d4f27", size = 251206, upload-time = "2025-08-29T15:33:50.697Z" }, + { url = "https://files.pythonhosted.org/packages/e9/1f/9020135734184f439da85c70ea78194c2730e56c2d18aee6e8ff1719d50d/coverage-7.10.6-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:db4a1d897bbbe7339946ffa2fe60c10cc81c43fab8b062d3fcb84188688174a4", size = 252478, upload-time = "2025-08-29T15:33:52.303Z" }, + { url = "https://files.pythonhosted.org/packages/a4/a4/3d228f3942bb5a2051fde28c136eea23a761177dc4ff4ef54533164ce255/coverage-7.10.6-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:d8fd7879082953c156d5b13c74aa6cca37f6a6f4747b39538504c3f9c63d043d", size = 250637, upload-time = "2025-08-29T15:33:53.67Z" }, + { url = "https://files.pythonhosted.org/packages/36/e3/293dce8cdb9a83de971637afc59b7190faad60603b40e32635cbd15fbf61/coverage-7.10.6-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:28395ca3f71cd103b8c116333fa9db867f3a3e1ad6a084aa3725ae002b6583bc", size = 248529, upload-time = "2025-08-29T15:33:55.022Z" }, + { url = "https://files.pythonhosted.org/packages/90/26/64eecfa214e80dd1d101e420cab2901827de0e49631d666543d0e53cf597/coverage-7.10.6-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:61c950fc33d29c91b9e18540e1aed7d9f6787cc870a3e4032493bbbe641d12fc", size = 250143, upload-time = "2025-08-29T15:33:56.386Z" }, + { url = "https://files.pythonhosted.org/packages/3e/70/bd80588338f65ea5b0d97e424b820fb4068b9cfb9597fbd91963086e004b/coverage-7.10.6-cp313-cp313-win32.whl", hash = "sha256:160c00a5e6b6bdf4e5984b0ef21fc860bc94416c41b7df4d63f536d17c38902e", size = 219770, upload-time = "2025-08-29T15:33:58.063Z" }, + { url = "https://files.pythonhosted.org/packages/a7/14/0b831122305abcc1060c008f6c97bbdc0a913ab47d65070a01dc50293c2b/coverage-7.10.6-cp313-cp313-win_amd64.whl", hash = "sha256:628055297f3e2aa181464c3808402887643405573eb3d9de060d81531fa79d32", size = 220566, upload-time = "2025-08-29T15:33:59.766Z" }, + { url = "https://files.pythonhosted.org/packages/83/c6/81a83778c1f83f1a4a168ed6673eeedc205afb562d8500175292ca64b94e/coverage-7.10.6-cp313-cp313-win_arm64.whl", hash = "sha256:df4ec1f8540b0bcbe26ca7dd0f541847cc8a108b35596f9f91f59f0c060bfdd2", size = 219195, upload-time = "2025-08-29T15:34:01.191Z" }, + { url = "https://files.pythonhosted.org/packages/d7/1c/ccccf4bf116f9517275fa85047495515add43e41dfe8e0bef6e333c6b344/coverage-7.10.6-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:c9a8b7a34a4de3ed987f636f71881cd3b8339f61118b1aa311fbda12741bff0b", size = 218059, upload-time = "2025-08-29T15:34:02.91Z" }, + { url = "https://files.pythonhosted.org/packages/92/97/8a3ceff833d27c7492af4f39d5da6761e9ff624831db9e9f25b3886ddbca/coverage-7.10.6-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:8dd5af36092430c2b075cee966719898f2ae87b636cefb85a653f1d0ba5d5393", size = 218287, upload-time = "2025-08-29T15:34:05.106Z" }, + { url = "https://files.pythonhosted.org/packages/92/d8/50b4a32580cf41ff0423777a2791aaf3269ab60c840b62009aec12d3970d/coverage-7.10.6-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:b0353b0f0850d49ada66fdd7d0c7cdb0f86b900bb9e367024fd14a60cecc1e27", size = 259625, upload-time = "2025-08-29T15:34:06.575Z" }, + { url = "https://files.pythonhosted.org/packages/7e/7e/6a7df5a6fb440a0179d94a348eb6616ed4745e7df26bf2a02bc4db72c421/coverage-7.10.6-cp313-cp313t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:d6b9ae13d5d3e8aeca9ca94198aa7b3ebbc5acfada557d724f2a1f03d2c0b0df", size = 261801, upload-time = "2025-08-29T15:34:08.006Z" }, + { url = "https://files.pythonhosted.org/packages/3a/4c/a270a414f4ed5d196b9d3d67922968e768cd971d1b251e1b4f75e9362f75/coverage-7.10.6-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:675824a363cc05781b1527b39dc2587b8984965834a748177ee3c37b64ffeafb", size = 264027, upload-time = "2025-08-29T15:34:09.806Z" }, + { url = "https://files.pythonhosted.org/packages/9c/8b/3210d663d594926c12f373c5370bf1e7c5c3a427519a8afa65b561b9a55c/coverage-7.10.6-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:692d70ea725f471a547c305f0d0fc6a73480c62fb0da726370c088ab21aed282", size = 261576, upload-time = "2025-08-29T15:34:11.585Z" }, + { url = "https://files.pythonhosted.org/packages/72/d0/e1961eff67e9e1dba3fc5eb7a4caf726b35a5b03776892da8d79ec895775/coverage-7.10.6-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:851430a9a361c7a8484a36126d1d0ff8d529d97385eacc8dfdc9bfc8c2d2cbe4", size = 259341, upload-time = "2025-08-29T15:34:13.159Z" }, + { url = "https://files.pythonhosted.org/packages/3a/06/d6478d152cd189b33eac691cba27a40704990ba95de49771285f34a5861e/coverage-7.10.6-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:d9369a23186d189b2fc95cc08b8160ba242057e887d766864f7adf3c46b2df21", size = 260468, upload-time = "2025-08-29T15:34:14.571Z" }, + { url = "https://files.pythonhosted.org/packages/ed/73/737440247c914a332f0b47f7598535b29965bf305e19bbc22d4c39615d2b/coverage-7.10.6-cp313-cp313t-win32.whl", hash = "sha256:92be86fcb125e9bda0da7806afd29a3fd33fdf58fba5d60318399adf40bf37d0", size = 220429, upload-time = "2025-08-29T15:34:16.394Z" }, + { url = "https://files.pythonhosted.org/packages/bd/76/b92d3214740f2357ef4a27c75a526eb6c28f79c402e9f20a922c295c05e2/coverage-7.10.6-cp313-cp313t-win_amd64.whl", hash = "sha256:6b3039e2ca459a70c79523d39347d83b73f2f06af5624905eba7ec34d64d80b5", size = 221493, upload-time = "2025-08-29T15:34:17.835Z" }, + { url = "https://files.pythonhosted.org/packages/fc/8e/6dcb29c599c8a1f654ec6cb68d76644fe635513af16e932d2d4ad1e5ac6e/coverage-7.10.6-cp313-cp313t-win_arm64.whl", hash = "sha256:3fb99d0786fe17b228eab663d16bee2288e8724d26a199c29325aac4b0319b9b", size = 219757, upload-time = "2025-08-29T15:34:19.248Z" }, + { url = "https://files.pythonhosted.org/packages/44/0c/50db5379b615854b5cf89146f8f5bd1d5a9693d7f3a987e269693521c404/coverage-7.10.6-py3-none-any.whl", hash = "sha256:92c4ecf6bf11b2e85fd4d8204814dc26e6a19f0c9d938c207c5cb0eadfcabbe3", size = 208986, upload-time = "2025-08-29T15:35:14.506Z" }, +] + [[package]] name = "google-api-core" version = "2.25.1" @@ -137,6 +168,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/40/86/bda7241a8da2d28a754aad2ba0f6776e35b67e37c36ae0c45d49370f1014/google_cloud_core-2.4.3-py2.py3-none-any.whl", hash = "sha256:5130f9f4c14b4fafdff75c79448f9495cfade0d8775facf1b09c3bf67e027f6e", size = 29348, upload-time = "2025-03-10T21:05:37.785Z" }, ] +[[package]] +name = "google-cloud-secret-manager" +version = "2.24.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", extra = ["grpc"] }, + { name = "google-auth" }, + { name = "grpc-google-iam-v1" }, + { name = "proto-plus" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/58/7a/2fa6735ec693d822fe08a76709c4d95d9b5b4c02e83e720497355039d2ee/google_cloud_secret_manager-2.24.0.tar.gz", hash = "sha256:ce573d40ffc2fb7d01719243a94ee17aa243ea642a6ae6c337501e58fbf642b5", size = 269516, upload-time = "2025-06-05T22:22:22.965Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/af/db1217cae1809e69a4527ee6293b82a9af2a1fb2313ad110c775e8f3c820/google_cloud_secret_manager-2.24.0-py3-none-any.whl", hash = "sha256:9bea1254827ecc14874bc86c63b899489f8f50bfe1442bfb2517530b30b3a89b", size = 218050, upload-time = "2025-06-10T02:02:19.88Z" }, +] + [[package]] name = "google-crc32c" version = "1.7.1" @@ -176,6 +223,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/86/f1/62a193f0227cf15a920390abe675f386dec35f7ae3ffe6da582d3ade42c7/googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8", size = 294530, upload-time = "2025-04-14T10:17:01.271Z" }, ] +[package.optional-dependencies] +grpc = [ + { name = "grpcio" }, +] + [[package]] name = "greenlet" version = "3.2.4" @@ -193,6 +245,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685, upload-time = "2025-08-07T13:24:38.824Z" }, ] +[[package]] +name = "grpc-google-iam-v1" +version = "0.14.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos", extra = ["grpc"] }, + { name = "grpcio" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b9/4e/8d0ca3b035e41fe0b3f31ebbb638356af720335e5a11154c330169b40777/grpc_google_iam_v1-0.14.2.tar.gz", hash = "sha256:b3e1fc387a1a329e41672197d0ace9de22c78dd7d215048c4c78712073f7bd20", size = 16259, upload-time = "2025-03-17T11:40:23.586Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/66/6f/dd9b178aee7835b96c2e63715aba6516a9d50f6bebbd1cc1d32c82a2a6c3/grpc_google_iam_v1-0.14.2-py3-none-any.whl", hash = "sha256:a3171468459770907926d56a440b2bb643eec1d7ba215f48f3ecece42b4d8351", size = 19242, upload-time = "2025-03-17T11:40:22.648Z" }, +] + [[package]] name = "grpcio" version = "1.74.0" @@ -234,17 +300,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442, upload-time = "2024-09-15T18:07:37.964Z" }, ] +[[package]] +name = "importlib-metadata" +version = "8.7.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "zipp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/76/66/650a33bd90f786193e4de4b3ad86ea60b53c89b669a5c7be931fac31cdb0/importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000", size = 56641, upload-time = "2025-04-27T15:29:01.736Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/20/b0/36bd937216ec521246249be3bf9855081de4c5e06a0c9b4219dbeda50373/importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd", size = 27656, upload-time = "2025-04-27T15:29:00.214Z" }, +] + [[package]] name = "ingest" version = "0.1.0" source = { editable = "." } dependencies = [ { name = "google-cloud-bigquery" }, + { name = "google-cloud-secret-manager" }, { name = "mypy" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, { name = "psycopg2-binary" }, { name = "pydantic" }, { name = "pymysql" }, { name = "pytest" }, + { name = "pytest-cov" }, + { name = "pytest-mock" }, { name = "ruamel-yaml" }, { name = "ruff" }, { name = "sqlalchemy" }, @@ -254,11 +337,16 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "google-cloud-bigquery", specifier = ">=3.36.0" }, + { name = "google-cloud-secret-manager", specifier = ">=2.24.0" }, { name = "mypy", specifier = ">=1.17.1" }, + { name = "opentelemetry-api", specifier = ">=1.36.0" }, + { name = "opentelemetry-instrumentation", specifier = ">=0.57b0" }, { name = "psycopg2-binary", specifier = ">=2.9.10" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "pymysql", specifier = ">=1.1.0" }, { name = "pytest", specifier = ">=8.4.1" }, + { name = "pytest-cov", specifier = ">=7.0.0" }, + { name = "pytest-mock", specifier = ">=3.12.0" }, { name = "ruamel-yaml", specifier = ">=0.18.0" }, { name = "ruff", specifier = ">=0.8.0" }, { name = "sqlalchemy", specifier = ">=2.0.0" }, @@ -324,6 +412,47 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963, upload-time = "2025-04-22T14:54:22.983Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.36.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/27/d2/c782c88b8afbf961d6972428821c302bd1e9e7bc361352172f0ca31296e2/opentelemetry_api-1.36.0.tar.gz", hash = "sha256:9a72572b9c416d004d492cbc6e61962c0501eaf945ece9b5a0f56597d8348aa0", size = 64780, upload-time = "2025-07-29T15:12:06.02Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bb/ee/6b08dde0a022c463b88f55ae81149584b125a42183407dc1045c486cc870/opentelemetry_api-1.36.0-py3-none-any.whl", hash = "sha256:02f20bcacf666e1333b6b1f04e647dc1d5111f86b8e510238fcc56d7762cda8c", size = 65564, upload-time = "2025-07-29T15:11:47.998Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/12/37/cf17cf28f945a3aca5a038cfbb45ee01317d4f7f3a0e5209920883fe9b08/opentelemetry_instrumentation-0.57b0.tar.gz", hash = "sha256:f2a30135ba77cdea2b0e1df272f4163c154e978f57214795d72f40befd4fcf05", size = 30807, upload-time = "2025-07-29T15:42:44.746Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/6f/f20cd1542959f43fb26a5bf9bb18cd81a1ea0700e8870c8f369bd07f5c65/opentelemetry_instrumentation-0.57b0-py3-none-any.whl", hash = "sha256:9109280f44882e07cec2850db28210b90600ae9110b42824d196de357cbddf7e", size = 32460, upload-time = "2025-07-29T15:41:40.883Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7e/31/67dfa252ee88476a29200b0255bda8dfc2cf07b56ad66dc9a6221f7dc787/opentelemetry_semantic_conventions-0.57b0.tar.gz", hash = "sha256:609a4a79c7891b4620d64c7aac6898f872d790d75f22019913a660756f27ff32", size = 124225, upload-time = "2025-07-29T15:12:17.873Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/05/75/7d591371c6c39c73de5ce5da5a2cc7b72d1d1cd3f8f4638f553c01c37b11/opentelemetry_semantic_conventions-0.57b0-py3-none-any.whl", hash = "sha256:757f7e76293294f124c827e514c2a3144f191ef175b069ce8d1211e1e38e9e78", size = 201627, upload-time = "2025-07-29T15:12:04.174Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -494,6 +623,32 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" }, ] +[[package]] +name = "pytest-cov" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "coverage" }, + { name = "pluggy" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, +] + +[[package]] +name = "pytest-mock" +version = "3.15.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/61/99/3323ee5c16b3637b4d941c362182d3e749c11e400bea31018c42219f3a98/pytest_mock-3.15.0.tar.gz", hash = "sha256:ab896bd190316b9d5d87b277569dfcdf718b2d049a2ccff5f7aca279c002a1cf", size = 33838, upload-time = "2025-09-04T20:57:48.679Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2b/b3/7fefc43fb706380144bcd293cc6e446e6f637ddfa8b83f48d1734156b529/pytest_mock-3.15.0-py3-none-any.whl", hash = "sha256:ef2219485fb1bd256b00e7ad7466ce26729b30eadfc7cbcdb4fa9a92ca68db6f", size = 10050, upload-time = "2025-09-04T20:57:47.274Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -684,3 +839,31 @@ sdist = { url = "https://files.pythonhosted.org/packages/15/22/9ee70a2574a4f4599 wheels = [ { url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795, upload-time = "2025-06-18T14:07:40.39Z" }, ] + +[[package]] +name = "wrapt" +version = "1.17.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/8f/aeb76c5b46e273670962298c23e7ddde79916cb74db802131d49a85e4b7d/wrapt-1.17.3.tar.gz", hash = "sha256:f66eb08feaa410fe4eebd17f2a2c8e2e46d3476e9f8c783daa8e09e0faa666d0", size = 55547, upload-time = "2025-08-12T05:53:21.714Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fc/f6/759ece88472157acb55fc195e5b116e06730f1b651b5b314c66291729193/wrapt-1.17.3-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:a47681378a0439215912ef542c45a783484d4dd82bac412b71e59cf9c0e1cea0", size = 54003, upload-time = "2025-08-12T05:51:48.627Z" }, + { url = "https://files.pythonhosted.org/packages/4f/a9/49940b9dc6d47027dc850c116d79b4155f15c08547d04db0f07121499347/wrapt-1.17.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:54a30837587c6ee3cd1a4d1c2ec5d24e77984d44e2f34547e2323ddb4e22eb77", size = 39025, upload-time = "2025-08-12T05:51:37.156Z" }, + { url = "https://files.pythonhosted.org/packages/45/35/6a08de0f2c96dcdd7fe464d7420ddb9a7655a6561150e5fc4da9356aeaab/wrapt-1.17.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:16ecf15d6af39246fe33e507105d67e4b81d8f8d2c6598ff7e3ca1b8a37213f7", size = 39108, upload-time = "2025-08-12T05:51:58.425Z" }, + { url = "https://files.pythonhosted.org/packages/0c/37/6faf15cfa41bf1f3dba80cd3f5ccc6622dfccb660ab26ed79f0178c7497f/wrapt-1.17.3-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:6fd1ad24dc235e4ab88cda009e19bf347aabb975e44fd5c2fb22a3f6e4141277", size = 88072, upload-time = "2025-08-12T05:52:37.53Z" }, + { url = "https://files.pythonhosted.org/packages/78/f2/efe19ada4a38e4e15b6dff39c3e3f3f73f5decf901f66e6f72fe79623a06/wrapt-1.17.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ed61b7c2d49cee3c027372df5809a59d60cf1b6c2f81ee980a091f3afed6a2d", size = 88214, upload-time = "2025-08-12T05:52:15.886Z" }, + { url = "https://files.pythonhosted.org/packages/40/90/ca86701e9de1622b16e09689fc24b76f69b06bb0150990f6f4e8b0eeb576/wrapt-1.17.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:423ed5420ad5f5529db9ce89eac09c8a2f97da18eb1c870237e84c5a5c2d60aa", size = 87105, upload-time = "2025-08-12T05:52:17.914Z" }, + { url = "https://files.pythonhosted.org/packages/fd/e0/d10bd257c9a3e15cbf5523025252cc14d77468e8ed644aafb2d6f54cb95d/wrapt-1.17.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:e01375f275f010fcbf7f643b4279896d04e571889b8a5b3f848423d91bf07050", size = 87766, upload-time = "2025-08-12T05:52:39.243Z" }, + { url = "https://files.pythonhosted.org/packages/e8/cf/7d848740203c7b4b27eb55dbfede11aca974a51c3d894f6cc4b865f42f58/wrapt-1.17.3-cp313-cp313-win32.whl", hash = "sha256:53e5e39ff71b3fc484df8a522c933ea2b7cdd0d5d15ae82e5b23fde87d44cbd8", size = 36711, upload-time = "2025-08-12T05:53:10.074Z" }, + { url = "https://files.pythonhosted.org/packages/57/54/35a84d0a4d23ea675994104e667ceff49227ce473ba6a59ba2c84f250b74/wrapt-1.17.3-cp313-cp313-win_amd64.whl", hash = "sha256:1f0b2f40cf341ee8cc1a97d51ff50dddb9fcc73241b9143ec74b30fc4f44f6cb", size = 38885, upload-time = "2025-08-12T05:53:08.695Z" }, + { url = "https://files.pythonhosted.org/packages/01/77/66e54407c59d7b02a3c4e0af3783168fff8e5d61def52cda8728439d86bc/wrapt-1.17.3-cp313-cp313-win_arm64.whl", hash = "sha256:7425ac3c54430f5fc5e7b6f41d41e704db073309acfc09305816bc6a0b26bb16", size = 36896, upload-time = "2025-08-12T05:52:55.34Z" }, + { url = "https://files.pythonhosted.org/packages/1f/f6/a933bd70f98e9cf3e08167fc5cd7aaaca49147e48411c0bd5ae701bb2194/wrapt-1.17.3-py3-none-any.whl", hash = "sha256:7171ae35d2c33d326ac19dd8facb1e82e5fd04ef8c6c0e394d7af55a55051c22", size = 23591, upload-time = "2025-08-12T05:53:20.674Z" }, +] + +[[package]] +name = "zipp" +version = "3.23.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e3/02/0f2892c661036d50ede074e376733dca2ae7c6eb617489437771209d4180/zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166", size = 25547, upload-time = "2025-06-08T17:06:39.4Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/54/647ade08bf0db230bfea292f893923872fd20be6ac6f53b2b936ba839d75/zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e", size = 10276, upload-time = "2025-06-08T17:06:38.034Z" }, +]