Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions .devcontainer/devcontainer.json

This file was deleted.

23 changes: 0 additions & 23 deletions .devcontainer/post-install.sh

This file was deleted.

8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ coverage.*
go.work
go.work.sum

# t
.terraform/
.terraform.lock.hcl
terraform.tfstate
terraform.tfstate.backup

# ========================================
# Kubebuilder / Controller Tools
# ========================================
Expand Down Expand Up @@ -80,6 +86,8 @@ Thumbs.db
.cursor/
.cursorignore
.ruff_cache/
.coverage
.htmlcov/

# JetBrains & VSCode specific settings
*.iml
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ dev-cleanup: ## Clean up development environment (containers, volumes, networks)
docker volume ls | grep -E "(dev_|pipeline-forge)" | awk '{print $$2}' | xargs -r docker volume rm
docker ps -a | grep -E "(pipeline-forge|mysql|postgres)" | awk '{print $$1}' | xargs -r docker rm -f

.PHONE: apply-sandbox-tf
apply-sandbox-tf: ## Apply the development infrastructure
terraform -chdir=infrastructure/gcloud/sandbox init
terraform -chdir=infrastructure/gcloud/sandbox apply

.PHONE: destroy-sandbox-tf
destroy-sandbox-tf: ## Destroy the development infrastructure
terraform -chdir=infrastructure/gcloud/sandbox destroy

##@ Operator Development

Expand Down
215 changes: 44 additions & 171 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,207 +8,84 @@

> A Kubernetes-native platform for building modern, declarative data pipelines with clear boundaries between ingestion and transformation.

## 📋 Quick Navigation
## 🚀 Quick Navigation

- [Overview](#-overview)
- [Key Features](#-key-features)
- [Quick Start](#-quick-start)
- [Architecture](#️-architecture)
- [Documentation](#-documentation)
- [Contributing](#-contributing)
- **[🎛️ Kubernetes Operator](operator/README.md)** - Go-based CRD management and pipeline orchestration
- **[📥 Ingest Workload](workloads/ingest/README.md)** - Type-safe data ingestion (Python)
- **[🔄 Transform Workload](workloads/transform/README.md)** - dbt-based data transformation
- **[⚡ Trigger Workload](workloads/trigger/README.md)** - Event-driven pipeline activation (Go)
- **[📋 Examples](docs/examples.md)** - Comprehensive YAML examples and use cases

---

## 🚀 Overview

Pipeline Forge is a complete solution for orchestrating data pipelines in Kubernetes environments. It combines a powerful Kubernetes operator with specialized workloads to provide a declarative, event-driven approach to data pipeline management.

### 🎯 The Problem

Modern data teams face challenges with:

- **Complex Orchestration**: Managing dependencies between data ingestion and transformation
- **Event-Driven Requirements**: Responding to file drops, database changes, and streaming events
- **Infrastructure Complexity**: Deploying and scaling data processing workloads
- **Observability Gaps**: Tracking pipeline health and data lineage
- **Team Coordination**: Coordinating between data engineering and platform teams
- **Resilience and Lifecycle Management**: Ensuring each pipeline step is connected in a clear lifecycle—if one step fails, others don't attempt to run, preventing cascading errors and maintaining robust execution

### 💡 The Solution

Pipeline Forge provides a Kubernetes-native platform that:

- **Declaratively Orchestrates** data pipelines using Custom Resource Definitions (CRDs)
- **Flexible Ingestion** supports both event-driven (e.g., GCS file drops, Pub/Sub messages, BigQuery updates) and scheduled (CronJob-based) pipeline execution
- **Clear Separation** between data ingestion and transformation phases
- **Built-in Observability** with comprehensive status tracking and monitoring
- **GitOps Ready** configuration that fits modern deployment practices

## ✨ Key Features

- **Unified Pipeline Lifecycle**: Connect ingestion with staging models in a single application lifecycle - if ingestion fails, the entire staging fails, preventing orphaned transformations
- **Native Kubernetes Resources**: Each step runs on 100% native K8s resources (Transform → Job, Ingest → CronJob/Job/Trigger)
- **Event-Driven Orchestration**: React to file drops, Pub/Sub messages, and BigQuery updates with intelligent retry policies
- **Built-in Observability**: Comprehensive status tracking with detailed execution history and failure analysis
- **Flexible Ingestion**: Reference existing CronJobs during Ingestion or create new ones as needed with full type safety and managed by the operator
- **Custom Image Support**: Use your own image for each step, or use pre-built Docker images from the Pipeline Forge repository

### ⚡ Event-Driven Orchestration

- **GCS Triggers**: Monitor bucket changes and trigger pipelines
- **Pub/Sub Triggers**: React to real-time messages with optional filtering
- **BigQuery Triggers**: Watch for table updates and data freshness
- **Retry & Cooldown**: Configurable retry policies with intelligent intervals

### ☸️ Kubernetes-Native

- **CRD-Based**: Native Kubernetes resources for pipeline definition
- **RBAC Integration**: Fine-grained access control for teams
- **Resource Management**: CPU, memory, and storage allocation
- **Independent Scaling**: Each step scales independently as native K8s resources

### 📊 Built-in Observability

- **Rich Status Tracking**: Comprehensive pipeline health monitoring with detailed execution history
- **Lifecycle Management**: Real-time phase tracking (Pending, Running, Completed, Failed)
- **Execution Insights**: Track attempt counts, success/failure rates, and timing metrics
- **Failure Analysis**: Detailed error messages and retry attempt tracking

### 🔄 Example

```yaml
apiVersion: core.pipeline-forge.io/v1alpha1
kind: Staging
metadata:
name: user-events-pipeline
namespace: staging-events
spec:
ingest:
mode: reference
type: trigger
name: user-events-trigger
transform:
name: user-events-transform
project: analytics
target: prod
image: gcr.io/org/dbt-core:latest
models:
- stg_user_events
```

📖 **[View comprehensive examples →](docs/examples.md)**

## 🚀 Quick Start

### Prerequisites

- Kubernetes cluster (v1.11.3+)
- kubectl configured for your cluster
- Docker or container runtime
- Access to container registry
## 🎯 What is Pipeline Forge?

### Installation
A complete solution for orchestrating data pipelines in Kubernetes environments. Combines a powerful Kubernetes operator with specialized workloads to provide a declarative, event-driven approach to data pipeline management.

1. **Clone and Deploy**
### Key Benefits

```bash
# Clone the repository
git clone https://github.com/your-org/pipeline-forge.git
cd pipeline-forge/operator
- **Unified Pipeline Lifecycle** - Connect ingestion with transformation in a single application lifecycle
- **Native Kubernetes Resources** - Each step runs on 100% native K8s resources
- **Event-Driven Orchestration** - React to file drops, Pub/Sub messages, and BigQuery updates
- **Built-in Observability** - Comprehensive status tracking and monitoring

# Deploy the operator
make deploy IMG=your-registry/pipeline-forge-operator:latest
```
## 🏗️ Architecture Overview

2. **Deploy Sample Pipelines**
Pipeline Forge consists of two main components:

```bash
# Apply example configurations
kubectl apply -k operator/config/samples/
```
### 🎛️ [Kubernetes Operator](operator/README.md)

3. **Monitor Pipeline Status**
```bash
# Check pipeline health
kubectl get staging
kubectl describe staging user-events-staging
```
**Go-based CRD management and pipeline orchestration**

## 🏗️ Architecture
- Custom Resource Definitions (CRDs) for pipeline definition
- Automatic reconciliation and lifecycle management
- RBAC integration and resource management
- Event-driven trigger management

Pipeline Forge consists of two main components that work together seamlessly:
### 🔧 [Specialized Workloads](workloads/README.md)

### 🎛️ Kubernetes Operator
**Production-ready data processing components**

The operator manages the lifecycle of data pipeline stages through:
- **[Ingest](workloads/ingest/README.md)** - Type-safe data ingestion from MySQL, PostgreSQL to BigQuery
- **[Transform](workloads/transform/README.md)** - dbt-based data transformation with version control
- **[Trigger](workloads/trigger/README.md)** - Event processing for GCS, Pub/Sub, and BigQuery

- **Staging Resources**: Complete pipeline steps that coordinate ingestion and transformation
- **Trigger Resources**: Event-driven activation for pipelines
- **Ingestion Management**: Supports ingestion via both event-driven triggers and CronJobs
- **Automatic Reconciliation**: Ensures pipeline state matches desired configuration

### 🔧 Specialized Workloads

Pre-built, production-ready data processing components:
## 🛠️ Technology Stack

- **Ingest Workloads**: Type-safe data ingestion from MySQL, PostgreSQL, and more
- **Transform Workloads**: dbt-based data transformation with version control
- **Trigger Workloads**: Event processing for GCS, Pub/Sub, and BigQuery
| Component | Technology | Purpose |
| ------------- | ----------------------------- | ----------------------------------------- |
| **Operator** | Go, Kubernetes, Kubebuilder | Pipeline orchestration and CRD management |
| **Ingest** | Python 3.13+, Pydantic, Typer | Type-safe data ingestion with validation |
| **Transform** | dbt Core, BigQuery | Data transformation and analytics |
| **Triggers** | Go, Google Cloud APIs | Event-driven pipeline activation |

## 📚 Documentation
## 🚀 Quick Start

- **[📋 Examples](docs/examples.md)** - Comprehensive YAML examples and use cases
- **[🎛️ Operator Guide](operator/README.md)** - Detailed operator documentation
- **[🔧 Workloads](workloads/README.md)** - Data processing components
```bash
git clone https://github.com/DanielBlei/pipeline-forge.git

## 🛠️ Technology Stack
# Run the operator (safety check ensures you're on kind/minikube cluster)
make run-operator

| Component | Technology | Purpose |
| ------------------ | ----------------------------- | ---------------------------------------------------- |
| **Operator** | Go, Kubernetes, Kubebuilder | Pipeline orchestration and CRD management |
| **Ingest** | Python 3.13+, Pydantic, Typer | Type-safe data ingestion with validation |
| **Transform** | dbt Core, BigQuery | Data transformation and analytics |
| **Triggers** | GCS, Pub/Sub, BigQuery APIs | Event-driven pipeline activation with retry policies |
| **Infrastructure** | Kubernetes, Docker | Container orchestration and deployment |
# Deploy k8s samples resources
make apply-samples
```

## 📁 Project Structure

```
pipeline-forge/
├── operator/ # Kubernetes operator (Go)
│ ├── api/ # CRD definitions
│ ├── controllers/ # Reconciliation logic
│ └── config/ # Deployment manifests
├── workloads/ # Data processing components
├── workloads/ # Data processing components
│ ├── ingest/ # Type-safe ingestion (Python)
│ ├── transform/ # dbt transformations
│ └── trigger/ # Event processing
│ └── trigger/ # Event processing (Go)
└── docs/ # Documentation
└── examples.md # Comprehensive examples
```

## 🤝 Contributing

We welcome contributions to Pipeline Forge! Whether you're interested in:

- **Operator Development**: Kubernetes controller logic and CRDs
- **Workload Development**: Data processing components
- **Documentation**: User guides and examples
- **Testing**: End-to-end pipeline validation

Please see our contributing guidelines and development setup instructions in the respective component directories.

### 🛠️ Development Setup

```bash
# Set up development environment
cd operator
make manifests generate
make test

# Run locally
make run
```
We welcome contributions! See individual component READMEs for development setup and guidelines.

## 📄 License

Expand All @@ -225,7 +102,3 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

```

```
4 changes: 2 additions & 2 deletions dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
MYSQL_ROOT_PASSWORD=root_password_dev
MYSQL_DATABASE=pipeline_forge
MYSQL_USER=pipeline_user
MYSQL_PASSWORD=pipeline_password_dev
MYSQL_PASSWORD=pipeline_forge_sandbox

# PostgreSQL Configuration
POSTGRES_DB=pipeline_forge
POSTGRES_USER=pipeline_user
POSTGRES_PASSWORD=pipeline_password_dev
POSTGRES_PASSWORD= pipeline_forge_sandbox

14 changes: 7 additions & 7 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ services:
env_file:
- .env
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-rootpassword}
MYSQL_DATABASE: ${MYSQL_DATABASE:-pipeline_forge_dev}
MYSQL_USER: ${MYSQL_USER:-pipeline_user}
MYSQL_PASSWORD: ${MYSQL_PASSWORD:-pipeline_password}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
MYSQL_USER: ${MYSQL_USER}
MYSQL_PASSWORD: ${MYSQL_PASSWORD}
ports:
- "3306:3306"
volumes:
Expand All @@ -25,9 +25,9 @@ services:
env_file:
- .env
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-pipeline_password}
POSTGRES_USER: ${POSTGRES_USER:-pipeline_user}
POSTGRES_DB: ${POSTGRES_DB:-pipeline_forge_dev}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5432:5432"
volumes:
Expand Down
Loading
Loading