Picture this: It's 3 AM, and Sarah, a data engineer at a fast-growing fintech startup, is frantically debugging a production pipeline that's been failing for the past six hours. The issue? A dependency conflict between her local development environment and the production Kubernetes cluster. Sound familiar?
This scenario plays out thousands of times daily across data teams worldwide. According to recent industry surveys, data engineers spend 60-70% of their time on infrastructure and environment management rather than actual data engineering. It's like being a chef who spends most of their time fixing the stove instead of cooking.
Why does this matter to you right now?
The modern data landscape is experiencing a seismic shift. Companies that can move fast with their data win. Those that can't get left behind. The traditional approach of building complex data platforms with Docker, Kubernetes, and a dozen different tools is creating a bottleneck that's strangling innovation .
flowchart TD
A[Traditional Data Stack] --> B[Complex Infrastructure]
B --> C[Environment Inconsistencies]
C --> D[Debug Hell]
D --> E[Delayed Insights]
E --> F[Lost Competitive Advantage]
G[Tower.dev Approach] --> H[Serverless Python Runtime]
H --> I[Consistent Environments]
I --> J[Rapid Deployment]
J --> K[Faster Insights]
K --> L[Competitive Edge]
%% Traditional stack styling (warning colors)
style A fill:#ffebee,stroke:#d32f2f,stroke-width:2px,color:#b71c1c
style B fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
style C fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c
style D fill:#ede7f6,stroke:#512da8,stroke-width:2px,color:#311b92
style E fill:#e8eaf6,stroke:#303f9f,stroke-width:2px,color:#1a237e
style F fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
%% Tower.dev approach styling (success colors)
style G fill:#e8f5e8,stroke:#388e3c,stroke-width:3px,color:#1b5e20
style H fill:#e0f2f1,stroke:#00796b,stroke-width:2px,color:#004d40
style I fill:#e0f7fa,stroke:#0097a7,stroke-width:2px,color:#006064
style J fill:#e1f5fe,stroke:#0288d1,stroke-width:2px,color:#01579b
style K fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c
style L fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#e65100
Tower.dev emerges as the solution to this chaos. Think of it as "Docker for Data" – a platform that solves the fundamental problem of environment portability in data engineering . Just as Docker revolutionized application deployment by containerizing environments, Tower.dev revolutionizes data engineering by providing a portable Python runtime that works identically across local development, testing, and production environments .
What makes Tower.dev different?
Tower.dev is a serverless orchestrator and portable Python runtime specifically designed for ETL and AI inference workloads . It's not trying to be everything to everyone – it's laser-focused on solving the specific pain points that data scientists and engineers face daily:
- Environment Consistency: Your code runs the same way locally and in production
- Zero Infrastructure Management: No Docker files, no Kubernetes manifests, no server provisioning
- Modern Python Stack: Native support for dltHub, Polars, DataFusion, and other cutting-edge frameworks
- Seamless Iceberg Integration: Direct access to Apache Iceberg tables for lakehouse architectures
Pause and Reflect: Think about your last data pipeline deployment. How many hours did you spend on environment setup versus actual data logic? Write down that number – we'll come back to it.
Before diving into the technical details, let me share a story that illustrates Tower.dev's transformative power. Taktile, a leading decision intelligence platform for FinTechs, had a common problem: a two-person data engineering team struggling to keep up with requests for new data sources .
Their head of data platform, Simon Rosenberger, described their transformation: "Tower is like Docker, Dagster, and Jenkins having a baby 🐳+ 🐙+ 🤵🏼♂️ = 💜" . By implementing Tower.dev with dltHub, they went from a team of 2 data engineers capable of making platform changes to "pretty much everyone who knows a bit of Python" being able to contribute to production data pipelines .
The result? A 10x increase in their data team's effective capacity without hiring additional engineers. That's the power of removing friction from the development-to-production workflow.
To understand Tower.dev's power, we need to grasp its architectural philosophy. Traditional data platforms follow a complex, multi-layered approach with separate systems for orchestration, compute, storage, and monitoring. Tower.dev simplifies this into what they call a "Simple but Powerful 2-Layer Data Stack" .
graph TB
subgraph "Traditional Data Stack"
T1[Application Layer]
T2[Orchestration Layer]
T3[Compute Layer]
T4[Storage Layer]
T5[Monitoring Layer]
T6[Security Layer]
T1 --> T2 --> T3 --> T4
T2 --> T5
T3 --> T6
end
subgraph "Tower.dev 2-Layer Stack"
A[Apps & Logic Layer]
B[Serverless Runtime Layer]
A --> B
B --> C[Open Lakehouse Storage]
end
%% Traditional stack styling
style T1 fill:#ffebee,stroke:#d32f2f,stroke-width:2px,color:#b71c1c
style T2 fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
style T3 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c
style T4 fill:#ede7f6,stroke:#512da8,stroke-width:2px,color:#311b92
style T5 fill:#e8eaf6,stroke:#303f9f,stroke-width:2px,color:#1a237e
style T6 fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
%% Tower.dev stack styling
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px,color:#1b5e20
style B fill:#e0f2f1,stroke:#00695c,stroke-width:3px,color:#004d40
style C fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
Layer 1: Apps and Logic This is where your Python code lives. Tower.dev can run any Python code – there's no special framework to learn, no APIs to integrate with . You write standard Python using your favorite libraries like Polars, dltHub, or pandas, and Tower.dev handles the rest.
Layer 2: Serverless Runtime This is Tower.dev's secret sauce. The platform provides a serverless runtime environment that automatically handles scaling, dependencies, and execution . It's like having a dedicated ops team managing your infrastructure, but it's all automated and invisible.
Tower.dev uses a "runner model" to execute applications . Here's how it works:
- Package Creation: When you deploy code, Tower.dev creates an encrypted package containing your Python code, dependencies, and configuration
- Runner Provisioning: When execution is needed, Tower.dev spins up a runner (think of it as a specialized container)
- Code Execution: The runner downloads your encrypted package, decrypts it, and executes your code
- Cleanup: After execution, the runner is destroyed, ensuring no data persists
sequenceDiagram
participant Dev as 👨💻 Developer
participant CLI as 🖥️ Tower CLI
participant Platform as ☁️ Tower Platform
participant Runner as 🏃♂️ Tower Runner
participant Storage as 🗄️ Data Storage
rect rgb(232, 245, 233)
Note over Dev,CLI: Development Phase
Dev->>CLI: tower deploy
CLI->>Platform: Upload encrypted package
Platform->>Platform: Store package securely
end
rect rgb(227, 242, 253)
Note over Dev,Storage: Execution Phase
Dev->>CLI: tower run
CLI->>Platform: Request execution
Platform->>Runner: Provision runner
Runner->>Platform: Download package
Runner->>Runner: Decrypt & execute
Runner->>Storage: Process data
Runner->>Platform: Return results
Platform->>CLI: Execution complete
CLI->>Dev: Show results
end
Pro Tip: The encryption happens end-to-end, meaning Tower.dev employees cannot access your code. This is crucial for enterprise customers with sensitive data processing requirements .
Every Tower.dev application is defined by a Towerfile – a simple configuration file that tells Tower.dev how to run your app . Think of it as a recipe that describes your app's ingredients and cooking instructions.
[app]
name = "my-data-pipeline"
script = "./pipeline.py"
source = [
"./**/*.py",
"./requirements.txt"
]
[[parameters]]
name = "source_bucket"
description = "S3 bucket containing source data"
default = "my-company-data"
[[parameters]]
name = "target_table"
description = "Destination table name"
default = "processed_data"What's happening here?
name: Your app's identifier in Tower.devscript: The entry point Python filesource: Which files to include in your deployment packageparameters: Runtime configuration options
One of Tower.dev's most powerful features is its secrets management system . Instead of hardcoding database credentials or API keys, you define them as secrets that are securely shared with your applications at runtime.
# Set a secret
tower secrets create --name DATABASE_URL --value "postgresql://user:pass@host:5432/db"
# Use in your Python code
import os
database_url = os.environ['DATABASE_URL']Secrets are encrypted end-to-end and can be shared across team members and environments . This solves the common problem of credential management in data teams.
Environments in Tower.dev allow you to define different configurations for different execution contexts . You might have a dev environment that writes to a local file system and a prod environment that writes to your production data warehouse.
Quick Quiz: What are the three main components of a Tower.dev application?
- Towerfile, Secrets, Environments
- Docker, Kubernetes, Helm
- Code, Tests, Documentation
Answer: 1. Towerfile, Secrets, Environments
Let's start with something more useful than printing "Hello World" to the console. We'll build a simple data quality checker that validates CSV files – something every data engineer needs.
Why this example? This example demonstrates core Tower.dev concepts while solving a real problem. You'll learn about parameters, file handling, and output generation in a practical context.
First, let's get Tower.dev installed and configured:
# Install Tower CLI
pip install tower
# Login to Tower (this opens a browser window)
tower login
# Create your first app
tower apps create --name="data-quality-checker"Create a new directory and the core files:
mkdir data-quality-checker
cd data-quality-checkerCreate the Towerfile:
[app]
name = "data-quality-checker"
script = "./quality_check.py"
source = [
"./quality_check.py",
"./requirements.txt",
"./sample_data.csv"
]
[[parameters]]
name = "file_path"
description = "Path to the CSV file to check"
default = "sample_data.csv"
[[parameters]]
name = "required_columns"
description = "Comma-separated list of required columns"
default = "id,name,email"Create requirements.txt:
pandas>=1.5.0Now, create the main application file quality_check.py:
import pandas as pd
import sys
import os
from typing import List, Dict, Any
def validate_csv_file(file_path: str, required_columns: List[str]) -> Dict[str, Any]:
"""
Validate a CSV file and return quality metrics.
This function demonstrates Tower.dev's ability to handle real data tasks
while showcasing parameter usage and error handling.
"""
results = {
'file_path': file_path,
'status': 'unknown',
'row_count': 0,
'column_count': 0,
'missing_columns': [],
'null_percentages': {},
'issues': []
}
try:
# Load the CSV file
df = pd.read_csv(file_path)
# Basic metrics
results['row_count'] = len(df)
results['column_count'] = len(df.columns)
# Check for required columns
missing_columns = [col for col in required_columns if col not in df.columns]
results['missing_columns'] = missing_columns
# Calculate null percentages
null_percentages = {}
for column in df.columns:
null_pct = (df[column].isnull().sum() / len(df)) * 100
null_percentages[column] = round(null_pct, 2)
results['null_percentages'] = null_percentages
# Identify issues
issues = []
if missing_columns:
issues.append(f"Missing required columns: {', '.join(missing_columns)}")
high_null_columns = [col for col, pct in null_percentages.items() if pct > 50]
if high_null_columns:
issues.append(f"High null percentage in columns: {', '.join(high_null_columns)}")
results['issues'] = issues
results['status'] = 'success' if not issues else 'warning'
except FileNotFoundError:
results['status'] = 'error'
results['issues'] = [f"File not found: {file_path}"]
except Exception as e:
results['status'] = 'error'
results['issues'] = [f"Error processing file: {str(e)}"]
return results
def main():
# Get parameters from Tower.dev environment
file_path = os.environ.get('file_path', 'sample_data.csv')
required_columns_str = os.environ.get('required_columns', 'id,name,email')
required_columns = [col.strip() for col in required_columns_str.split(',')]
print(f"🔍 Analyzing file: {file_path}")
print(f"📋 Required columns: {required_columns}")
print("-" * 50)
# Perform validation
results = validate_csv_file(file_path, required_columns)
# Display results
print(f"📊 Results for {results['file_path']}:")
print(f" Status: {results['status'].upper()}")
print(f" Rows: {results['row_count']:,}")
print(f" Columns: {results['column_count']}")
if results['missing_columns']:
print(f" ❌ Missing columns: {', '.join(results['missing_columns'])}")
if results['null_percentages']:
print(" 📈 Null percentages:")
for col, pct in results['null_percentages'].items():
icon = "⚠️" if pct > 10 else "✅"
print(f" {icon} {col}: {pct}%")
if results['issues']:
print(" 🚨 Issues found:")
for issue in results['issues']:
print(f" - {issue}")
else:
print(" ✅ No issues found!")
# Exit with appropriate code
sys.exit(0 if results['status'] != 'error' else 1)
if __name__ == "__main__":
main()Tower.dev's beauty is that you can test your app locally before deploying:
# Create a sample CSV file for testing
cat > sample_data.csv << EOF
id,name,email,age
1,John Doe,john@example.com,30
2,Jane Smith,jane@example.com,25
3,Bob Johnson,,35
EOF
# Run locally
python quality_check.pyYou should see output like:
🔍 Analyzing file: sample_data.csv
📋 Required columns: ['id', 'name', 'email']
--------------------------------------------------
📊 Results for sample_data.csv:
Status: WARNING
Rows: 3
Columns: 4
📈 Null percentages:
✅ id: 0.0%
✅ name: 0.0%
⚠️ email: 33.33%
✅ age: 0.0%
🚨 Issues found:
- High null percentage in columns: email
Now let's deploy our application to Tower.dev:
# Deploy the application
tower deployYou'll see output like:
✔ Building package... Done!
Deploying to Tower... [████████████████████] 100%
Success! Version v1 of your code has been deployed to Tower!
Execute your application in Tower.dev's serverless environment:
# Run with default parameters
tower run
# Run with custom parameters
tower run --param file_path="production_data.csv" --param required_columns="user_id,timestamp,event_type"What just happened?
You created a real data quality checking tool that:
- Accepts parameters for flexibility
- Processes actual data files
- Provides meaningful output
- Runs identically locally and in the cloud
- Handles errors gracefully
Pro Tip: The
tower runcommand with parameters is perfect for ad-hoc analysis. You can quickly check different files without modifying code.
Tower.dev's workflow is designed to minimize friction between development and production. Here's the cycle you'll use for all your applications:
graph LR
A[📝 Write Code] --> B[🧪 Test Locally]
B --> C[🚀 Deploy to Tower]
C --> D[☁️ Run in Cloud]
D --> E[📊 Monitor Results]
E --> F[🔄 Iterate]
F --> A
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px,color:#1b5e20
style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100
style C fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px,color:#4a148c
style D fill:#e8f5e8,stroke:#388e3c,stroke-width:3px,color:#1b5e20
style E fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
style F fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
- Write Code: Standard Python development in your favorite IDE
- Test Locally: Run your code with the same environment variables Tower.dev will use
- Deploy to Tower: Package and upload your code (encrypted and secure)
- Run in Cloud: Execute with managed infrastructure and automatic scaling
- Monitor Results: Check logs, metrics, and outputs
- Iterate: Make changes and repeat the cycle
This workflow eliminates the traditional "works on my machine" problem because the local and cloud environments are identical .
Pause and Reflect: Compare this workflow to your current data pipeline deployment process. How many steps does Tower.dev eliminate?
Now we'll build something more substantial – a complete ETL pipeline that extracts data from an API, transforms it, and loads it into a data warehouse. This example showcases Tower.dev's integration with dltHub, one of the most powerful modern ETL frameworks .
The Scenario: You're working for an e-commerce company that needs to extract daily sales data from their API and load it into Snowflake for analytics.
dltHub provides a modular framework for extraction, loading, and transformation tasks . Tower.dev serves as the perfect runtime environment for dltHub pipelines, handling infrastructure, orchestration, and observability .
flowchart LR
subgraph "📊 Data Sources"
A[🛒 Sales API]
B[👥 Customer API]
C[📦 Product API]
end
subgraph "🏗️ Tower.dev + dltHub"
D[📥 Extract]
E[🔄 Transform]
F[📤 Load]
end
subgraph "🎯 Destinations"
G[❄️ Snowflake]
H[🪣 S3 Data Lake]
I[📈 Analytics Dashboard]
end
A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
F --> H
G --> I
H --> I
%% Data Sources styling
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#1b5e20
style B fill:#e0f2f1,stroke:#00695c,stroke-width:2px,color:#004d40
style C fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
%% Processing styling
style D fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#e65100
style E fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px,color:#4a148c
style F fill:#ffebee,stroke:#d32f2f,stroke-width:3px,color:#b71c1c
%% Destinations styling
style G fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
style H fill:#e8eaf6,stroke:#303f9f,stroke-width:2px,color:#1a237e
style I fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
Create a new project:
mkdir ecommerce-etl
cd ecommerce-etlCreate the Towerfile:
[app]
name = "ecommerce-etl"
script = "./pipeline.py"
source = [
"./**/*.py",
"./requirements.txt",
"./schemas/"
]
[[parameters]]
name = "api_base_url"
description = "Base URL for the e-commerce API"
default = "https://api.example-shop.com"
[[parameters]]
name = "date_range"
description = "Date range to process (YYYY-MM-DD:YYYY-MM-DD)"
default = "2024-01-01:2024-01-31"
[[parameters]]
name = "destination_table"
description = "Target table in Snowflake"
default = "sales_data"Create requirements.txt:
dlt[snowflake]>=0.4.0
requests>=2.31.0
pandas>=1.5.0Now, create the main pipeline file pipeline.py:
import dlt
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Any, Iterator
import os
# Define the data schema for dlt
@dlt.resource(name="sales_data", write_disposition="append")
def extract_sales_data(api_base_url: str, start_date: str, end_date: str) -> Iterator[Dict[str, Any]]:
"""
Extract sales data from the e-commerce API.
This function demonstrates how to create a dlt resource that
extracts data incrementally and handles pagination.
"""
current_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date_obj = datetime.strptime(end_date, "%Y-%m-%d")
while current_date <= end_date_obj:
date_str = current_date.strftime("%Y-%m-%d")
print(f"📅 Processing sales data for {date_str}")
# Simulate API call (replace with actual API logic)
try:
url = f"{api_base_url}/sales"
params = {
"date": date_str,
"limit": 1000,
"offset": 0
}
# In a real scenario, you'd make actual API calls
# For this example, we'll generate sample data
sample_data = generate_sample_sales_data(date_str)
for record in sample_data:
yield record
except Exception as e:
print(f"❌ Error extracting data for {date_str}: {e}")
current_date += timedelta(days=1)
def generate_sample_sales_data(date_str: str) -> List[Dict[str, Any]]:
"""Generate sample sales data for demonstration."""
import random
products = ["Laptop", "Phone", "Tablet", "Headphones", "Watch"]
customers = [f"customer_{i}" for i in range(1, 101)]
data = []
for i in range(random.randint(50, 200)): # Random number of sales per day
record = {
"sale_id": f"sale_{date_str}_{i}",
"date": date_str,
"customer_id": random.choice(customers),
"product": random.choice(products),
"quantity": random.randint(1, 5),
"unit_price": round(random.uniform(10.0, 1000.0), 2),
"total_amount": 0, # Will be calculated in transformation
"currency": "USD",
"sales_rep": f"rep_{random.randint(1, 10)}"
}
record["total_amount"] = round(record["quantity"] * record["unit_price"], 2)
data.append(record)
return data
@dlt.transformer(name="enriched_sales_data", write_disposition="append")
def transform_sales_data(sales_data) -> Iterator[Dict[str, Any]]:
"""
Transform and enrich sales data.
This transformer adds calculated fields and data quality checks.
"""
for record in sales_data:
# Add calculated fields
record["revenue_category"] = categorize_revenue(record["total_amount"])
record["processed_at"] = datetime.utcnow().isoformat()
# Data quality checks
if record["total_amount"] > 0 and record["quantity"] > 0:
record["data_quality_score"] = 1.0
else:
record["data_quality_score"] = 0.0
record["quality_issues"] = "Invalid amount or quantity"
yield record
def categorize_revenue(amount: float) -> str:
"""Categorize revenue into buckets."""
if amount < 50:
return "Low"
elif amount < 200:
return "Medium"
elif amount < 500:
return "High"
else:
return "Premium"
def create_pipeline() -> dlt.Pipeline:
"""Create and configure the dlt pipeline."""
# Create pipeline with Snowflake destination
pipeline = dlt.pipeline(
pipeline_name="ecommerce_etl",
destination="snowflake",
dataset_name="ecommerce_data"
)
return pipeline
def main():
"""Main pipeline execution function."""
# Get parameters from environment
api_base_url = os.environ.get('api_base_url', 'https://api.example-shop.com')
date_range = os.environ.get('date_range', '2024-01-01:2024-01-31')
destination_table = os.environ.get('destination_table', 'sales_data')
# Parse date range
start_date, end_date = date_range.split(':')
print(f"🚀 Starting ETL pipeline")
print(f" API: {api_base_url}")
print(f" Date Range: {start_date} to {end_date}")
print(f" Destination: {destination_table}")
print("-" * 50)
try:
# Create pipeline
pipeline = create_pipeline()
# Extract data
sales_resource = extract_sales_data(api_base_url, start_date, end_date)
# Transform data
transformed_data = transform_sales_data(sales_resource)
# Load data
print("💾 Loading data to Snowflake...")
load_info = pipeline.run(transformed_data, table_name=destination_table)
# Print results
print(f"✅ Pipeline completed successfully!")
print(f" Records processed: {load_info.loads.jobs.job_file_info.records_count}")
print(f" Load ID: {load_info.loads.load_id}")
# Print some statistics
print("\n📊 Pipeline Statistics:")
for job in load_info.loads.jobs:
print(f" Table: {job.job_file_info.table_name}")
print(f" Records: {job.job_file_info.records_count}")
print(f" Status: {job.status}")
except Exception as e:
print(f"❌ Pipeline failed: {e}")
raise
if __name__ == "__main__":
main()Before running the pipeline, you need to configure Snowflake credentials as secrets:
# Set Snowflake connection details
tower secrets create --name SNOWFLAKE_ACCOUNT --value "your-account.snowflakecomputing.com"
tower secrets create --name SNOWFLAKE_USER --value "your-username"
tower secrets create --name SNOWFLAKE_PASSWORD --value "your-password"
tower secrets create --name SNOWFLAKE_WAREHOUSE --value "your-warehouse"
tower secrets create --name SNOWFLAKE_DATABASE --value "your-database"
tower secrets create --name SNOWFLAKE_SCHEMA --value "your-schema"# Test locally (will use sample data)
python pipeline.py
# Deploy to Tower
tower deploy
# Run in production
tower run --param date_range="2024-06-01:2024-06-07"What makes this example powerful?
- Real ETL Logic: Handles extraction, transformation, and loading with proper error handling
- Incremental Processing: Processes data day by day, making it suitable for large datasets
- Data Quality: Includes data validation and quality scoring
- Scalable Architecture: dltHub handles the heavy lifting while Tower.dev manages the infrastructure
- Production Ready: Includes proper logging, error handling, and monitoring
Pro Tip: Tower.dev's integration with dltHub means your pipeline automatically gets schema evolution, data lineage tracking, and destination management. It's like having a data engineering team built into your runtime.
Our next example demonstrates Tower.dev's native integration with Apache Iceberg tables, showcasing the modern lakehouse architecture that's revolutionizing data storage .
The Scenario: You're building a real-time analytics system that needs to process stock market data, store it in an Iceberg table, and provide both batch and real-time query capabilities.
The lakehouse architecture combines the best of data lakes and data warehouses . With Apache Iceberg as the table format, you get:
- ACID Transactions: Ensuring data consistency
- Schema Evolution: Add/modify columns without breaking existing queries
- Time Travel: Query data as it existed at any point in time
- Efficient Querying: Optimized for both batch and interactive queries
graph TB
subgraph "📊 Data Sources"
A[📈 Stock Price API]
B[📰 News Feed]
C[📊 Trading Volume]
end
subgraph "🏗️ Tower.dev Processing"
D[⚡ Real-time Ingestion]
E[✅ Data Validation]
F[🔧 Enrichment]
end
subgraph "🏔️ Iceberg Lakehouse"
G[📋 Raw Data Table]
H[⚙️ Processed Data Table]
I[📊 Aggregated Views]
end
subgraph "📈 Analytics Layer"
J[🐻❄️ Polars Analysis]
K[🤖 LLM Insights]
L[📊 Dashboards]
end
A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
G --> H
H --> I
I --> J
I --> K
J --> L
K --> L
%% Data Sources styling
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#1b5e20
style B fill:#e0f2f1,stroke:#00695c,stroke-width:2px,color:#004d40
style C fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
%% Processing styling
style D fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#e65100
style E fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c
style F fill:#ffebee,stroke:#d32f2f,stroke-width:2px,color:#b71c1c
%% Lakehouse styling
style G fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
style H fill:#e8eaf6,stroke:#303f9f,stroke-width:2px,color:#1a237e
style I fill:#ede7f6,stroke:#512da8,stroke-width:2px,color:#311b92
%% Analytics styling
style J fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
style K fill:#f9fbe7,stroke:#689f38,stroke-width:2px,color:#33691e
style L fill:#fff8e1,stroke:#ffa000,stroke-width:2px,color:#ff6f00
Create a new project:
mkdir stock-analytics
cd stock-analyticsCreate the Towerfile:
[app]
name = "stock-analytics"
script = "./stock_pipeline.py"
source = [
"./**/*.py",
"./requirements.txt"
]
[[parameters]]
name = "symbols"
description = "Comma-separated list of stock symbols"
default = "AAPL,GOOGL,MSFT,TSLA"
[[parameters]]
name = "analysis_date"
description = "Date to analyze (YYYY-MM-DD)"
default = "2024-06-14"
[[parameters]]
name = "enable_llm_analysis"
description = "Enable LLM-powered analysis"
default = "true"Create requirements.txt:
tower[iceberg,ai]>=0.3.0
polars>=0.20.0
pyarrow>=10.0.0
requests>=2.31.0
pandas>=1.5.0Create the main pipeline file stock_pipeline.py:
import polars as pl
import pandas as pd
from datetime import datetime, timedelta
import requests
import os
import json
from typing import List, Dict, Any
import tower
# Note: This example uses Tower.dev's correct API patterns:
# - tower.tables() for Iceberg table operations
# - External LLM inference classes for AI functionality
def fetch_stock_data(symbols: List[str], date: str) -> List[Dict[str, Any]]:
"""
Fetch stock data for given symbols and date.
In a real implementation, this would connect to a stock API like Alpha Vantage,
IEX Cloud, or Yahoo Finance. For this example, we'll generate realistic sample data.
"""
print(f"📈 Fetching stock data for {len(symbols)} symbols on {date}")
stock_data = []
for symbol in symbols:
# Generate realistic sample data
# In production, replace this with actual API calls
base_price = {
"AAPL": 150.0,
"GOOGL": 2800.0,
"MSFT": 300.0,
"TSLA": 200.0
}.get(symbol, 100.0)
# Simulate intraday price movements
import random
for hour in range(9, 17): # Market hours 9 AM to 4 PM
for minute in [0, 15, 30, 45]: # 15-minute intervals
timestamp = f"{date} {hour:02d}:{minute:02d}:00"
# Add some realistic price movement
price_change = random.uniform(-0.02, 0.02) # ±2% change
current_price = base_price * (1 + price_change)
volume = random.randint(100000, 1000000)
record = {
"symbol": symbol,
"timestamp": timestamp,
"open": round(current_price * 0.999, 2),
"high": round(current_price * 1.001, 2),
"low": round(current_price * 0.998, 2),
"close": round(current_price, 2),
"volume": volume,
"date": date
}
stock_data.append(record)
print(f"✅ Generated {len(stock_data)} stock price records")
return stock_data
def write_to_iceberg_table(data: List[Dict[str, Any]], table_name: str) -> None:
"""
Write data to an Apache Iceberg table using Tower.dev's native support.
"""
print(f"💾 Writing {len(data)} records to Iceberg table: {table_name}")
# Convert to Polars DataFrame for efficient processing
df = pl.DataFrame(data)
# Add processing metadata
df = df.with_columns([
pl.lit(datetime.utcnow().isoformat()).alias("ingested_at"),
pl.lit("tower-stock-pipeline").alias("source_system")
])
# Use Tower.dev's Iceberg integration
try:
# Create table schema
schema = df.to_arrow().schema
# Create or get table reference
table = tower.tables(table_name).create_if_not_exists(schema)
# Insert data
table.insert(df.to_arrow())
print(f"✅ Successfully wrote data to {table_name}")
except Exception as e:
print(f"❌ Error writing to Iceberg table: {e}")
raise
def analyze_stock_performance(table_name: str, symbols: List[str], date: str) -> Dict[str, Any]:
"""
Analyze stock performance using Polars for fast computation.
"""
print(f"📊 Analyzing stock performance for {date}")
# Load data from Iceberg table
df = tower.load_table(table_name)
# Filter for the specific date and symbols
df = df.filter(
(pl.col("date") == date) &
(pl.col("symbol").is_in(symbols))
)
# Calculate daily statistics
daily_stats = df.group_by("symbol").agg([
pl.col("open").first().alias("day_open"),
pl.col("close").last().alias("day_close"),
pl.col("high").max().alias("day_high"),
pl.col("low").min().alias("day_low"),
pl.col("volume").sum().alias("total_volume"),
pl.col("close").count().alias("data_points")
])
# Calculate additional metrics
daily_stats = daily_stats.with_columns([
((pl.col("day_close") - pl.col("day_open")) / pl.col("day_open") * 100).alias("daily_return_pct"),
((pl.col("day_high") - pl.col("day_low")) / pl.col("day_open") * 100).alias("volatility_pct")
])
# Calculate moving averages (using last 20 data points as proxy for 20-day MA)
ma_df = df.group_by("symbol").agg([
pl.col("close").tail(20).mean().alias("ma_20"),
pl.col("close").tail(5).mean().alias("ma_5")
])
# Join the dataframes
analysis_df = daily_stats.join(ma_df, on="symbol")
# Add trend analysis
analysis_df = analysis_df.with_columns([
pl.when(pl.col("day_close") > pl.col("ma_20"))
.then(pl.lit("Bullish"))
.when(pl.col("day_close") < pl.col("ma_20"))
.then(pl.lit("Bearish"))
.otherwise(pl.lit("Neutral"))
.alias("trend")
])
# Convert to dict for easy handling
analysis_results = analysis_df.to_dicts()
print("📈 Analysis Summary:")
for result in analysis_results:
symbol = result["symbol"]
daily_return = result["daily_return_pct"]
trend = result["trend"]
print(f" {symbol}: {daily_return:+.2f}% ({trend})")
return {
"analysis_date": date,
"symbols_analyzed": symbols,
"results": analysis_results,
"summary": {
"best_performer": max(analysis_results, key=lambda x: x["daily_return_pct"])["symbol"],
"worst_performer": min(analysis_results, key=lambda x: x["daily_return_pct"])["symbol"],
"total_volume": sum(r["total_volume"] for r in analysis_results)
}
}
def generate_llm_insights(analysis_results: Dict[str, Any]) -> str:
"""
Generate investment insights using Tower.dev's LLM integration.
"""
print("🤖 Generating AI-powered investment insights...")
# Prepare context for the LLM
context = f"""
Stock Analysis Summary for {analysis_results['analysis_date']}:
Performance Results:
"""
for result in analysis_results['results']:
context += f"""
{result['symbol']}:
- Daily Return: {result['daily_return_pct']:.2f}%
- Volatility: {result['volatility_pct']:.2f}%
- Trend: {result['trend']}
- Volume: {result['total_volume']:,}
"""
context += f"""
Market Summary:
- Best Performer: {analysis_results['summary']['best_performer']}
- Worst Performer: {analysis_results['summary']['worst_performer']}
- Total Volume: {analysis_results['summary']['total_volume']:,}
"""
# Use external LLM service for inference
# Note: Tower.dev supports LLM inference through external providers
try:
# Example using custom inference class (as shown in Tower.dev examples)
from core.inference.huggingface import InferWithHuggingfaceHub
messages = [
{"role": "system", "content": "You are a financial analyst providing investment insights."},
{"role": "user", "content": f"""
Based on this stock analysis data, provide a brief investment insight summary:
{context}
Please provide:
1. Key observations about market performance
2. Notable trends or patterns
3. Brief risk assessment
4. General market sentiment
Keep the response concise and actionable for investors.
"""}
]
# Get API key from environment (set as Tower.dev secret)
HF_TOKEN = os.environ.get("HF_TOKEN")
# Use Hugging Face Hub for inference
infer_with_hf_hub = InferWithHuggingfaceHub(
provider="together",
api_key=HF_TOKEN
)
llm_response = infer_with_hf_hub(
inputs=messages,
model="deepseek-ai/DeepSeek-R1",
max_tokens=500
)
print("✅ Generated LLM insights")
return llm_response
except Exception as e:
print(f"⚠️ LLM analysis unavailable: {e}")
return "LLM analysis temporarily unavailable. Please refer to numerical analysis above."
def main():
"""Main pipeline execution."""
# Get parameters
symbols_str = os.environ.get('symbols', 'AAPL,GOOGL,MSFT,TSLA')
symbols = [s.strip() for s in symbols_str.split(',')]
analysis_date = os.environ.get('analysis_date', '2024-06-14')
enable_llm = os.environ.get('enable_llm_analysis', 'true').lower() == 'true'
print(f"🚀 Starting Stock Analytics Pipeline")
print(f" Symbols: {symbols}")
print(f" Date: {analysis_date}")
print(f" LLM Analysis: {'Enabled' if enable_llm else 'Disabled'}")
print("-" * 60)
try:
# Step 1: Fetch stock data
stock_data = fetch_stock_data(symbols, analysis_date)
# Step 2: Write to Iceberg table
table_name = "stock_prices_raw"
write_to_iceberg_table(stock_data, table_name)
# Step 3: Analyze performance
analysis_results = analyze_stock_performance(table_name, symbols, analysis_date)
# Step 4: Generate LLM insights (if enabled)
if enable_llm:
llm_insights = generate_llm_insights(analysis_results)
print("\n🤖 AI Investment Insights:")
print(llm_insights)
# Step 5: Save analysis results to another Iceberg table
analysis_table = "stock_analysis_results"
analysis_data = [{
"analysis_date": analysis_date,
"analysis_timestamp": datetime.utcnow().isoformat(),
"symbols": json.dumps(symbols),
"results": json.dumps(analysis_results),
"llm_insights": llm_insights if enable_llm else None
}]
write_to_iceberg_table(analysis_data, analysis_table)
print(f"\n✅ Pipeline completed successfully!")
print(f" Raw data table: {table_name}")
print(f" Analysis table: {analysis_table}")
print(f" Records processed: {len(stock_data)}")
except Exception as e:
print(f"❌ Pipeline failed: {e}")
raise
if __name__ == "__main__":
main()# Deploy to Tower
tower deploy
# Run analysis for today
tower run
# Run analysis for specific date and symbols
tower run --param symbols="AAPL,TSLA" --param analysis_date="2024-06-01"
# Run without LLM analysis for faster execution
tower run --param enable_llm_analysis="false"Key Features Demonstrated:
- Iceberg Integration: Native support for Apache Iceberg tables with ACID transactions
- High-Performance Analytics: Using Polars for fast data processing
- LLM Integration: AI-powered insights using Tower.dev's built-in LLM support
- Flexible Architecture: Easy to extend with additional data sources or analysis methods
- Production Ready: Proper error handling, logging, and data validation
Pro Tip: Tower.dev's Iceberg integration automatically handles schema evolution and partition management. You can query your data using SQL engines like Spark, Trino, or Dremio while still benefiting from Tower.dev's managed compute layer.
Our final comprehensive example demonstrates Tower.dev's AI capabilities, showing how to build a pipeline that processes documents using both local and serverless LLM models .
The Scenario: You're building a document processing system that summarizes GitHub issues, categorizes them by priority, and suggests action items using AI models.
mkdir github-issue-processor
cd github-issue-processorCreate the Towerfile:
[app]
name = "github-issue-processor"
script = "./issue_processor.py"
source = [
"./**/*.py",
"./requirements.txt"
]
[[parameters]]
name = "github_repo"
description = "GitHub repository (owner/repo)"
default = "microsoft/vscode"
[[parameters]]
name = "issue_count"
description = "Number of recent issues to process"
default = "10"
[[parameters]]
name = "model_mode"
description = "Model execution mode: local or serverless"
default = "local"Create the main pipeline:
import tower
import requests
import json
from datetime import datetime
from typing import List, Dict, Any
import os
def fetch_github_issues(repo: str, count: int) -> List[Dict[str, Any]]:
"""Fetch recent GitHub issues."""
print(f"📥 Fetching {count} recent issues from {repo}")
url = f"https://api.github.com/repos/{repo}/issues"
params = {
"state": "open",
"sort": "created",
"direction": "desc",
"per_page": count
}
response = requests.get(url, params=params)
issues = response.json()
processed_issues = []
for issue in issues:
processed_issue = {
"id": issue["id"],
"number": issue["number"],
"title": issue["title"],
"body": issue["body"] or "",
"state": issue["state"],
"created_at": issue["created_at"],
"updated_at": issue["updated_at"],
"labels": [label["name"] for label in issue["labels"]],
"user": issue["user"]["login"]
}
processed_issues.append(processed_issue)
print(f"✅ Fetched {len(processed_issues)} issues")
return processed_issues
def process_with_ai(issues: List[Dict[str, Any]], model_mode: str) -> List[Dict[str, Any]]:
"""Process issues with AI analysis."""
print(f"🤖 Processing issues with AI ({model_mode} mode)")
processed_issues = []
for issue in issues:
print(f" Processing issue #{issue['number']}: {issue['title'][:50]}...")
# Create context for the AI model
context = f"""
Title: {issue['title']}
Body: {issue['body'][:1000]}...
Labels: {', '.join(issue['labels'])}
"""
# Use different model configurations based on mode
if model_mode == "local":
# Use local inference with Ollama
from core.inference.ollama import InferWithOllamaChat
messages = [
{"role": "system", "content": "You are a technical project manager analyzing GitHub issues."},
{"role": "user", "content": f"""
Analyze this GitHub issue and provide:
1. Priority level (High/Medium/Low)
2. Category (Bug/Feature/Documentation/Other)
3. Estimated effort (Small/Medium/Large)
4. Brief summary of the issue
5. Suggested action items
Issue details:
{context}
Please respond in JSON format.
"""}
]
infer_with_ollama = InferWithOllamaChat()
analysis = infer_with_ollama(model="deepseek-r1:14b", messages=messages)
else:
# Use serverless inference via Hugging Face Hub
from core.inference.huggingface import InferWithHuggingfaceHub
messages = [
{"role": "system", "content": "You are a technical project manager analyzing GitHub issues."},
{"role": "user", "content": f"""
Analyze this GitHub issue and provide:
1. A brief summary (2-3 sentences)
2. Priority level (High/Medium/Low)
3. Category (Bug/Feature/Documentation/etc.)
4. Suggested action items
Issue: {context}
Format your response as JSON.
"""}
]
HF_TOKEN = os.environ.get("HF_TOKEN")
infer_with_hf_hub = InferWithHuggingfaceHub(
provider="together",
api_key=HF_TOKEN
)
analysis = infer_with_hf_hub(
inputs=messages,
model="deepseek-ai/DeepSeek-R1",
max_tokens=300
)
# Parse AI response
try:
ai_analysis = json.loads(analysis)
except:
ai_analysis = {
"summary": "AI analysis parsing failed",
"priority": "Medium",
"category": "Unknown",
"action_items": ["Manual review required"]
}
# Add AI analysis to issue
enhanced_issue = {
**issue,
"ai_analysis": ai_analysis,
"processed_at": datetime.utcnow().isoformat(),
"model_mode": model_mode
}
processed_issues.append(enhanced_issue)
print(f"✅ Processed {len(processed_issues)} issues with AI")
return processed_issues
def main():
"""Main processing pipeline."""
# Get parameters
github_repo = os.environ.get('github_repo', 'microsoft/vscode')
issue_count = int(os.environ.get('issue_count', '10'))
model_mode = os.environ.get('model_mode', 'local')
print(f"🚀 GitHub Issue Processor")
print(f" Repository: {github_repo}")
print(f" Issue Count: {issue_count}")
print(f" Model Mode: {model_mode}")
print("-" * 50)
try:
# Step 1: Fetch issues
issues = fetch_github_issues(github_repo, issue_count)
# Step 2: Process with AI
processed_issues = process_with_ai(issues, model_mode)
# Step 3: Display results
print("\n📊 Processing Results:")
priority_counts = {}
category_counts = {}
for issue in processed_issues:
analysis = issue["ai_analysis"]
priority = analysis.get("priority", "Unknown")
category = analysis.get("category", "Unknown")
priority_counts[priority] = priority_counts.get(priority, 0) + 1
category_counts[category] = category_counts.get(category, 0) + 1
print(f"\n#{issue['number']}: {issue['title']}")
print(f" Priority: {priority}")
print(f" Category: {category}")
print(f" Summary: {analysis.get('summary', 'No summary')}")
print(f"\n📈 Summary Statistics:")
print(f" Priority Distribution: {priority_counts}")
print(f" Category Distribution: {category_counts}")
# Step 4: Save to Iceberg table
import pyarrow as pa
table_name = "github_issues_processed"
# Convert to Arrow format
df = pd.DataFrame(processed_issues)
arrow_table = pa.Table.from_pandas(df)
# Create table with proper schema
table = tower.tables(table_name).create_if_not_exists(arrow_table.schema)
table.insert(arrow_table)
)
print(f"\n✅ Results saved to table: {table_name}")
except Exception as e:
print(f"❌ Processing failed: {e}")
raise
if __name__ == "__main__":
main()This example demonstrates Tower.dev's flexibility in handling both local and serverless AI models, making it perfect for organizations that need to balance cost, performance, and data privacy requirements.
Pause and Reflect: We've now built three complete data pipelines: ETL with dltHub, Analytics with Iceberg, and AI Processing with LLMs. Notice how Tower.dev's consistent environment and deployment model makes each pipeline follow the same pattern, reducing cognitive load and increasing productivity.
As your data pipelines grow in complexity, you'll need to orchestrate multiple processes with dependencies. Tower.dev handles this elegantly through its app composition model.
The Scenario: You're building a complete data platform that processes customer data, generates analytics, and updates ML models – all with proper dependency management.
graph TD
A[📊 Customer Data Ingestion] --> B[✅ Data Quality Validation]
B --> C[👥 Customer Segmentation]
B --> D[📈 Sales Analytics]
C --> E[🤖 ML Model Training]
D --> E
E --> F[🚀 Model Deployment]
F --> G[🔮 Prediction Pipeline]
H[⏰ Daily Scheduler] --> A
I[🕐 Hourly Trigger] --> G
%% Styling with pastel colors and good contrast
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px,color:#1b5e20
style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100
style C fill:#e0f2f1,stroke:#00695c,stroke-width:2px,color:#004d40
style D fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
style E fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px,color:#4a148c
style F fill:#ffebee,stroke:#d32f2f,stroke-width:2px,color:#b71c1c
style G fill:#e3f2fd,stroke:#1976d2,stroke-width:3px,color:#0d47a1
style H fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#880e4f
style I fill:#f9fbe7,stroke:#689f38,stroke-width:2px,color:#33691e
Create a master orchestration app:
import tower
import asyncio
from datetime import datetime
import json
class DataPlatformOrchestrator:
"""
Master orchestrator for complex data workflows.
This demonstrates Tower.dev's ability to coordinate multiple
applications with proper dependency management and error handling.
"""
def __init__(self):
self.execution_log = []
self.failed_jobs = []
async def run_app_with_dependencies(self, app_name: str, dependencies: List[str] = None,
params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Run a Tower app with dependency checking."""
# Check if dependencies completed successfully
if dependencies:
for dep in dependencies:
if not self.check_dependency_success(dep):
raise Exception(f"Dependency {dep} failed or not completed")
print(f"🚀 Running {app_name}...")
try:
# Run the Tower app
result = await tower.apps.run_async(
app_name=app_name,
parameters=params or {}
)
execution_record = {
"app_name": app_name,
"status": "success",
"started_at": datetime.utcnow().isoformat(),
"completed_at": datetime.utcnow().isoformat(),
"result": result
}
self.execution_log.append(execution_record)
print(f"✅ {app_name} completed successfully")
return execution_record
except Exception as e:
execution_record = {
"app_name": app_name,
"status": "failed",
"started_at": datetime.utcnow().isoformat(),
"error": str(e)
}
self.execution_log.append(execution_record)
self.failed_jobs.append(app_name)
print(f"❌ {app_name} failed: {e}")
raise
def check_dependency_success(self, app_name: str) -> bool:
"""Check if a dependency completed successfully."""
for record in self.execution_log:
if record["app_name"] == app_name and record["status"] == "success":
return True
return False
async def run_data_platform_workflow(self, date: str):
"""Execute the complete data platform workflow."""
print(f"🏭 Starting Data Platform Workflow for {date}")
print("-" * 60)
try:
# Phase 1: Data Ingestion
await self.run_app_with_dependencies(
app_name="customer-data-ingestion",
params={"processing_date": date}
)
# Phase 2: Data Quality (depends on ingestion)
await self.run_app_with_dependencies(
app_name="data-quality-validation",
dependencies=["customer-data-ingestion"],
params={"processing_date": date}
)
# Phase 3: Parallel Analytics (both depend on quality validation)
segmentation_task = self.run_app_with_dependencies(
app_name="customer-segmentation",
dependencies=["data-quality-validation"],
params={"processing_date": date}
)
analytics_task = self.run_app_with_dependencies(
app_name="sales-analytics",
dependencies=["data-quality-validation"],
params={"processing_date": date}
)
# Run analytics tasks in parallel
await asyncio.gather(segmentation_task, analytics_task)
# Phase 4: ML Model Training (depends on both analytics tasks)
await self.run_app_with_dependencies(
app_name="ml-model-training",
dependencies=["customer-segmentation", "sales-analytics"],
params={"processing_date": date}
)
# Phase 5: Model Deployment
await self.run_app_with_dependencies(
app_name="model-deployment",
dependencies=["ml-model-training"]
)
print(f"\n🎉 Data Platform Workflow completed successfully!")
self.print_execution_summary()
except Exception as e:
print(f"\n💥 Workflow failed: {e}")
self.print_execution_summary()
self.handle_failure_recovery()
raise
def print_execution_summary(self):
"""Print a summary of the execution."""
print(f"\n📊 Execution Summary:")
print(f" Total Apps: {len(self.execution_log)}")
print(f" Successful: {len([r for r in self.execution_log if r['status'] == 'success'])}")
print(f" Failed: {len(self.failed_jobs)}")
if self.failed_jobs:
print(f" Failed Apps: {', '.join(self.failed_jobs)}")
def handle_failure_recovery(self):
"""Handle failure recovery options."""
print(f"\n🔧 Failure Recovery Options:")
print(f" 1. Retry failed jobs: {', '.join(self.failed_jobs)}")
print(f" 2. Skip failed jobs and continue downstream")
print(f" 3. Rollback to previous successful state")
# Usage example
async def main():
orchestrator = DataPlatformOrchestrator()
await orchestrator.run_data_platform_workflow("2024-06-14")
if __name__ == "__main__":
asyncio.run(main())One of Tower.dev's most powerful features is its ability to facilitate team collaboration through shared environments and secrets . Here's how to set up a collaborative data engineering environment:
# Create different environments for different stages
tower environments create --name="development"
tower environments create --name="staging"
tower environments create --name="production"
# Set environment-specific secrets
tower secrets create --name DATABASE_URL --value "dev-database-url" --environment="development"
tower secrets create --name DATABASE_URL --value "staging-database-url" --environment="staging"
tower secrets create --name DATABASE_URL --value "prod-database-url" --environment="production"
# Share environments with team members
tower environments share --name="development" --user="teammate@company.com"
tower environments share --name="staging" --user="teammate@company.com"
# Production access typically restricted to senior team membersCreate applications that automatically adapt to their environment:
import os
import tower
def get_environment_config():
"""Get configuration based on current environment."""
env_name = os.environ.get('TOWER_ENVIRONMENT', 'development')
configs = {
'development': {
'database_url': os.environ.get('DATABASE_URL'),
'batch_size': 1000,
'enable_debug': True,
'sample_data': True
},
'staging': {
'database_url': os.environ.get('DATABASE_URL'),
'batch_size': 5000,
'enable_debug': True,
'sample_data': False
},
'production': {
'database_url': os.environ.get('DATABASE_URL'),
'batch_size': 10000,
'enable_debug': False,
'sample_data': False
}
}
config = configs.get(env_name, configs['development'])
config['environment'] = env_name
return config
def main():
config = get_environment_config()
print(f"🌍 Running in {config['environment']} environment")
print(f" Batch Size: {config['batch_size']}")
print(f" Debug Mode: {config['enable_debug']}")
print(f" Sample Data: {config['sample_data']}")
# Your application logic here, using environment-specific config
if __name__ == "__main__":
main()import tower
from datetime import datetime
import json
class ProductionMonitor:
"""
Production monitoring and alerting for Tower.dev applications.
"""
def __init__(self, app_name: str):
self.app_name = app_name
self.metrics = []
def log_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
"""Log a custom metric."""
metric = {
"app_name": self.app_name,
"metric_name": metric_name,
"value": value,
"tags": tags or {},
"timestamp": datetime.utcnow().isoformat()
}
self.metrics.append(metric)
# In production, you'd send this to your monitoring system
print(f"📊 METRIC: {metric_name} = {value}")
def check_data_quality(self, df, table_name: str):
"""Monitor data quality metrics."""
row_count = len(df)
null_count = df.isnull().sum().sum()
null_percentage = (null_count / (row_count * len(df.columns))) * 100
self.log_metric("row_count", row_count, {"table": table_name})
self.log_metric("null_percentage", null_percentage, {"table": table_name})
# Alert on data quality issues
if null_percentage > 10:
self.send_alert(
"HIGH_NULL_PERCENTAGE",
f"Table {table_name} has {null_percentage:.2f}% null values"
)
if row_count == 0:
self.send_alert(
"EMPTY_TABLE",
f"Table {table_name} is empty"
)
def send_alert(self, alert_type: str, message: str):
"""Send an alert (implement with your alerting system)."""
alert = {
"app_name": self.app_name,
"alert_type": alert_type,
"message": message,
"timestamp": datetime.utcnow().isoformat()
}
print(f"🚨 ALERT: {alert_type} - {message}")
# In production, integrate with:
# - Slack notifications
# - PagerDuty
# - Email alerts
# - Custom webhook
# Usage in your applications
def main():
monitor = ProductionMonitor("my-etl-pipeline")
# Your data processing logic
df = load_data()
# Monitor data quality
monitor.check_data_quality(df, "customer_data")
# Log business metrics
monitor.log_metric("customers_processed", len(df))
monitor.log_metric("processing_time_seconds", 45.2)Pro Tip: Tower.dev automatically provides execution logs and basic metrics through the CLI and web interface. Use
tower apps show <app-name>to see recent execution history and performance metrics.
One of Tower.dev's most powerful features is environment consistency. Here's how to set up your local development to perfectly mirror production:
# Create a local environment that mirrors production
export TOWER_ENVIRONMENT=local
export DATABASE_URL="your-local-db-url"
export API_KEY="your-dev-api-key"
# Run your app locally with the same parameters as production
python your_app.py
# This ensures zero surprises when deploying to Tower.devUse parameter files for complex configurations:
# In your Towerfile
[app]
name = "advanced-pipeline"
script = "./pipeline.py"
parameter_files = ["./configs/base.json", "./configs/prod.json"]
[[parameters]]
name = "config_override"
description = "JSON string to override specific config values"
default = "{}"# In your Python code
import json
import os
def load_config():
"""Advanced configuration loading with overrides."""
# Load base configuration
with open("./configs/base.json") as f:
config = json.load(f)
# Apply environment-specific overrides
env_config_path = f"./configs/{os.environ.get('TOWER_ENVIRONMENT', 'dev')}.json"
if os.path.exists(env_config_path):
with open(env_config_path) as f:
env_config = json.load(f)
config.update(env_config)
# Apply runtime parameter overrides
override_str = os.environ.get('config_override', '{}')
if override_str:
overrides = json.loads(override_str)
config.update(overrides)
return configPattern 1: Incremental Processing with State Management
import tower
import json
from datetime import datetime, timedelta
class IncrementalProcessor:
"""Handle incremental data processing with state management."""
def __init__(self, state_table: str = "pipeline_state"):
self.state_table = state_table
def get_last_processed_timestamp(self, pipeline_name: str) -> str:
"""Get the last processed timestamp for a pipeline."""
try:
# Load state from Iceberg table
state_df = tower.load_table(self.state_table)
# Filter for this pipeline
pipeline_state = state_df.filter(
state_df["pipeline_name"] == pipeline_name
).sort("updated_at", reverse=True).limit(1)
if len(pipeline_state) > 0:
return pipeline_state["last_processed_timestamp"]
else:
# Default to 30 days ago for new pipelines
default_date = datetime.utcnow() - timedelta(days=30)
return default_date.isoformat()
except Exception as e:
print(f"⚠️ Could not load state: {e}")
# Return safe default
default_date = datetime.utcnow() - timedelta(days=1)
return default_date.isoformat()
def update_processed_timestamp(self, pipeline_name: str, timestamp: str):
"""Update the last processed timestamp."""
state_record = {
"pipeline_name": pipeline_name,
"last_processed_timestamp": timestamp,
"updated_at": datetime.utcnow().isoformat()
}
# Save state to Iceberg table
import pyarrow as pa
# Convert to Arrow format
df = pd.DataFrame([state_record])
arrow_table = pa.Table.from_pandas(df)
# Create or get table reference
table = tower.tables(self.state_table).create_if_not_exists(arrow_table.schema)
table.insert(arrow_table)
print(f"💾 Updated state: {pipeline_name} -> {timestamp}")
# Usage example
def process_incremental_data():
processor = IncrementalProcessor()
# Get last processed timestamp
last_processed = processor.get_last_processed_timestamp("sales_pipeline")
# Process data since last timestamp
new_data = extract_data_since(last_processed)
if new_data:
process_data(new_data)
# Update state with latest timestamp
latest_timestamp = max(record["timestamp"] for record in new_data)
processor.update_processed_timestamp("sales_pipeline", latest_timestamp)
else:
print("📭 No new data to process")Pattern 2: Dynamic Schema Handling
import polars as pl
from typing import Dict, Any, List
class SchemaEvolutionHandler:
"""Handle schema evolution in data pipelines."""
def __init__(self, table_name: str):
self.table_name = table_name
self.expected_schema = self.get_expected_schema()
def get_expected_schema(self) -> Dict[str, str]:
"""Get the expected schema for the table."""
try:
# Get schema from existing Iceberg table
existing_df = tower.load_table(self.table_name, limit=0) # Just schema
return {col: str(dtype) for col, dtype in zip(existing_df.columns, existing_df.dtypes)}
except:
# Return default schema for new table
return {
"id": "String",
"timestamp": "Datetime",
"data": "String",
"created_at": "Datetime"
}
def adapt_dataframe_schema(self, df: pl.DataFrame) -> pl.DataFrame:
"""Adapt incoming DataFrame to match expected schema."""
print(f"🔄 Adapting schema for {len(df)} records")
# Add missing columns with null values
for col, dtype in self.expected_schema.items():
if col not in df.columns:
print(f" Adding missing column: {col} ({dtype})")
df = df.with_columns(pl.lit(None).alias(col))
# Remove unexpected columns (optional - or you could keep them)
expected_columns = list(self.expected_schema.keys())
df = df.select(expected_columns)
# Handle type conversions
for col, expected_type in self.expected_schema.items():
if col in df.columns:
current_type = str(df[col].dtype)
if current_type != expected_type:
print(f" Converting {col}: {current_type} -> {expected_type}")
df = self.convert_column_type(df, col, expected_type)
return df
def convert_column_type(self, df: pl.DataFrame, col: str, target_type: str) -> pl.DataFrame:
"""Convert column to target type with error handling."""
try:
if target_type == "Datetime":
df = df.with_columns(pl.col(col).str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"))
elif target_type == "Float64":
df = df.with_columns(pl.col(col).cast(pl.Float64))
elif target_type == "Int64":
df = df.with_columns(pl.col(col).cast(pl.Int64))
# Add more conversions as needed
except Exception as e:
print(f"⚠️ Could not convert {col} to {target_type}: {e}")
# Keep original type or set to null
return df
# Usage
def process_data_with_schema_evolution(raw_data: List[Dict[str, Any]]):
schema_handler = SchemaEvolutionHandler("my_table")
# Convert to DataFrame
df = pl.DataFrame(raw_data)
# Adapt schema
df = schema_handler.adapt_dataframe_schema(df)
# Save to Iceberg table
import pyarrow as pa
# Convert to Arrow format
arrow_table = df.to_arrow()
# Create or get table reference
table = tower.tables("my_table").create_if_not_exists(arrow_table.schema)
table.insert(arrow_table)import time
import random
from functools import wraps
from typing import Callable, Any
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0):
"""Decorator for retrying functions with exponential backoff."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries:
print(f"❌ Function {func.__name__} failed after {max_retries + 1} attempts")
raise e
# Calculate delay with exponential backoff and jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # Add 10% jitter
total_delay = delay + jitter
print(f"⚠️ Attempt {attempt + 1} failed: {e}")
print(f" Retrying in {total_delay:.2f} seconds...")
time.sleep(total_delay)
raise last_exception
return wrapper
return decorator
# Usage examples
@retry_with_backoff(max_retries=3, base_delay=2.0)
def call_external_api(url: str) -> Dict[str, Any]:
"""API call with automatic retry logic."""
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
@retry_with_backoff(max_retries=5, base_delay=1.0)
def write_to_database(data: List[Dict[str, Any]]):
"""Database write with retry logic."""
# Your database write logic here
pass❌ Wrong Way:
# Hardcoded values
DATABASE_URL = "postgresql://user:pass@localhost/db"
API_KEY = "sk-1234567890"✅ Right Way:
import os
# Use environment variables with defaults
DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql://localhost/dev_db')
API_KEY = os.environ.get('API_KEY')
if not API_KEY:
raise ValueError("API_KEY environment variable is required")❌ Wrong Way:
# Just process data without validation
df = pd.read_csv(input_file)
df.to_sql("output_table", connection)✅ Right Way:
def validate_data_quality(df: pd.DataFrame, table_name: str) -> bool:
"""Validate data quality before processing."""
issues = []
# Check for empty DataFrame
if len(df) == 0:
issues.append("DataFrame is empty")
# Check for required columns
required_columns = ["id", "timestamp", "value"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
issues.append(f"Missing required columns: {missing_columns}")
# Check for excessive null values
null_percentages = df.isnull().sum() / len(df) * 100
high_null_columns = null_percentages[null_percentages > 50].index.tolist()
if high_null_columns:
issues.append(f"High null percentage in columns: {high_null_columns}")
if issues:
print(f"❌ Data quality issues in {table_name}:")
for issue in issues:
print(f" - {issue}")
return False
print(f"✅ Data quality check passed for {table_name}")
return True
# Usage
df = pd.read_csv(input_file)
if validate_data_quality(df, "customer_data"):
df.to_sql("output_table", connection)
else:
raise Exception("Data quality validation failed")❌ Wrong Way:
def process_large_dataset():
# Load entire dataset into memory
df = pd.read_csv("huge_file.csv") # This might crash
return df.groupby("category").sum()✅ Right Way:
def process_large_dataset():
"""Process large datasets efficiently with cleanup."""
try:
# Use chunked processing
chunk_size = 10000
results = []
for chunk in pd.read_csv("huge_file.csv", chunksize=chunk_size):
# Process chunk
chunk_result = chunk.groupby("category").sum()
results.append(chunk_result)
# Explicitly free memory
del chunk
# Combine results
final_result = pd.concat(results).groupby("category").sum()
# Clean up intermediate results
del results
return final_result
except Exception as e:
print(f"❌ Error processing dataset: {e}")
# Cleanup code here
raise❌ Wrong Way:
# Secrets in code or config files
config = {
"database_password": "my_secret_password",
"api_key": "sk-1234567890"
}✅ Right Way:
import os
def get_secure_config():
"""Get configuration with proper secret management."""
# Use Tower.dev secrets (set via tower secrets create)
config = {
"database_url": os.environ.get('DATABASE_URL'),
"api_key": os.environ.get('API_KEY'),
"redis_url": os.environ.get('REDIS_URL')
}
# Validate required secrets
required_secrets = ["DATABASE_URL", "API_KEY"]
missing_secrets = [key for key in required_secrets if not config.get(key.lower())]
if missing_secrets:
raise ValueError(f"Missing required secrets: {missing_secrets}")
return configQuick Quiz: Which of these is the most important production best practice?
- Using retry logic for external API calls
- Validating data quality before processing
- Proper secret management
- All of the above
Answer: 4. All of the above - production systems need comprehensive error handling, data validation, and security.
Tower.dev is designed to complement, not replace, your existing data infrastructure. Here's how to integrate it strategically:
graph TB
subgraph "🏢 Existing Infrastructure"
A[📊 Data Sources]
B[🏪 Data Warehouse]
C[📈 BI Tools]
D[🤖 ML Platform]
end
subgraph "🏗️ Tower.dev Layer"
E[🔄 ETL Apps]
F[✅ Data Quality Apps]
G[⚙️ Feature Engineering]
H[🔮 Model Inference]
end
A --> E
E --> B
B --> F
F --> G
G --> D
H --> C
%% Existing infrastructure styling
style A fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#1b5e20
style B fill:#e0f2f1,stroke:#00695c,stroke-width:2px,color:#004d40
style C fill:#e0f7fa,stroke:#0277bd,stroke-width:2px,color:#01579b
style D fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1
%% Tower.dev layer styling - highlighted
style E fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#e65100
style F fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px,color:#4a148c
style G fill:#ffebee,stroke:#d32f2f,stroke-width:3px,color:#b71c1c
style H fill:#fce4ec,stroke:#c2185b,stroke-width:3px,color:#880e4f
Example Integration Code:
import tower
import snowflake.connector
import pandas as pd
from typing import Dict, Any
class DataStackIntegrator:
"""
Integrate Tower.dev with existing data infrastructure.
This class demonstrates how Tower.dev can serve as a compute layer
while leveraging existing data storage and visualization tools.
"""
def __init__(self):
self.snowflake_conn = self.get_snowflake_connection()
self.monitoring_enabled = True
def get_snowflake_connection(self):
"""Connect to existing Snowflake warehouse."""
return snowflake.connector.connect(
user=os.environ.get('SNOWFLAKE_USER'),
password=os.environ.get('SNOWFLAKE_PASSWORD'),
account=os.environ.get('SNOWFLAKE_ACCOUNT'),
warehouse=os.environ.get('SNOWFLAKE_WAREHOUSE'),
database=os.environ.get('SNOWFLAKE_DATABASE'),
schema=os.environ.get('SNOWFLAKE_SCHEMA')
)
def extract_from_warehouse(self, query: str) -> pd.DataFrame:
"""Extract data from existing warehouse for processing."""
print(f"🏭 Extracting data from Snowflake warehouse")
with self.snowflake_conn.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
columns = [desc for desc in cursor.description]
df = pd.DataFrame(results, columns=columns)
print(f"✅ Extracted {len(df)} rows from warehouse")
return df
def process_with_tower(self, df: pd.DataFrame, processing_logic: str) -> pd.DataFrame:
"""Process data using Tower.dev's compute capabilities."""
print(f"⚡ Processing data with Tower.dev compute")
# Use Tower.dev's high-performance processing
# This could be Polars, Spark, or custom Python logic
if processing_logic == "feature_engineering":
processed_df = self.feature_engineering(df)
elif processing_logic == "data_quality":
processed_df = self.data_quality_checks(df)
elif processing_logic == "aggregation":
processed_df = self.advanced_aggregation(df)
else:
processed_df = df
return processed_df
def load_to_warehouse(self, df: pd.DataFrame, table_name: str):
"""Load processed data back to warehouse."""
print(f"💾 Loading {len(df)} rows to {table_name}")
# Use Snowflake's pandas integration
df.to_sql(
table_name,
self.snowflake_conn,
if_exists='append',
index=False,
method='multi'
)
print(f"✅ Successfully loaded data to {table_name}")
def run_etl_job(self, source_query: str, target_table: str, processing_logic: str):
"""Run complete ETL job using hybrid architecture."""
try:
# Extract from existing warehouse
source_df = self.extract_from_warehouse(source_query)
# Process with Tower.dev
processed_df = self.process_with_tower(source_df, processing_logic)
# Store processed data in Iceberg for analytics
import pyarrow as pa
# Convert to Arrow format
arrow_table = processed_df.to_arrow()
# Create or get table reference
table = tower.tables(f"iceberg_{target_table}").create_if_not_exists(arrow_table.schema)
table.upsert(arrow_table) # Use upsert for overwrite behavior
# Also load to warehouse for BI tools
self.load_to_warehouse(processed_df, target_table)
print(f"🎉 ETL job completed successfully")
except Exception as e:
print(f"❌ ETL job failed: {e}")
self.send_failure_notification(str(e))
raise
# Usage example
def main():
integrator = DataStackIntegrator()
# Run daily customer analytics job
integrator.run_etl_job(
source_query="""
SELECT customer_id, order_date, order_amount, product_category
FROM orders
WHERE order_date >= CURRENT_DATE - 1
""",
target_table="daily_customer_metrics",
processing_logic="feature_engineering"
)Rather than replacing your entire data stack, gradually migrate workloads to Tower.dev:
class MigrationManager:
"""
Manage gradual migration of data workloads to Tower.dev.
"""
def __init__(self):
self.migration_registry = {}
self.rollback_enabled = True
def register_migration(self, job_name: str, legacy_system: str, tower_app: str):
"""Register a job for migration."""
self.migration_registry[job_name] = {
"legacy_system": legacy_system,
"tower_app": tower_app,
"status": "registered",
"migration_date": None,
"performance_comparison": {}
}
def run_parallel_execution(self, job_name: str, inputs: Dict[str, Any]):
"""Run job on both legacy and Tower.dev systems for comparison."""
print(f"🔄 Running parallel execution for {job_name}")
# Run on legacy system
legacy_start = time.time()
legacy_result = self.run_legacy_job(job_name, inputs)
legacy_duration = time.time() - legacy_start
# Run on Tower.dev
tower_start = time.time()
tower_result = self.run_tower_job(job_name, inputs)
tower_duration = time.time() - tower_start
# Compare results
comparison = self.compare_results(legacy_result, tower_result)
# Update registry
self.migration_registry[job_name]["performance_comparison"] = {
"legacy_duration": legacy_duration,
"tower_duration": tower_duration,
"results_match": comparison["match"],
"performance_improvement": (legacy_duration - tower_duration) / legacy_duration * 100
}
print(f"📊 Performance comparison:")
print(f" Legacy: {legacy_duration:.2f}s")
print(f" Tower.dev: {tower_duration:.2f}s")
print(f" Improvement: {self.migration_registry[job_name]['performance_comparison']['performance_improvement']:.1f}%")
print(f" Results match: {comparison['match']}")
return tower_result if comparison["match"] else legacy_resultimport tower
from datetime import datetime, timedelta
import polars as pl
from typing import Dict, List, Any
class RealTimeFeatureStore:
"""
Implement a real-time feature store using Tower.dev and Iceberg.
"""
def __init__(self, feature_store_table: str = "feature_store"):
self.feature_store_table = feature_store_table
self.feature_definitions = self.load_feature_definitions()
def compute_features(self, entity_id: str, entity_type: str) -> Dict[str, Any]:
"""Compute real-time features for an entity."""
print(f"🔢 Computing features for {entity_type}:{entity_id}")
features = {}
# Get historical data
historical_data = self.get_historical_data(entity_id, entity_type)
# Compute various feature types
features.update(self.compute_aggregation_features(historical_data))
features.update(self.compute_time_series_features(historical_data))
features.update(self.compute_ml_features(historical_data))
# Add metadata
features.update({
"entity_id": entity_id,
"entity_type": entity_type,
"computed_at": datetime.utcnow().isoformat(),
"feature_version": "v1.0"
})
# Store in feature store
self.store_features(features)
return features
def compute_aggregation_features(self, data: pl.DataFrame) -> Dict[str, Any]:
"""Compute aggregation-based features."""
if len(data) == 0:
return {}
# 30-day aggregations
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
recent_data = data.filter(pl.col("timestamp") >= thirty_days_ago.isoformat())
if len(recent_data) == 0:
return {}
features = {
"total_transactions_30d": len(recent_data),
"avg_transaction_amount_30d": recent_data["amount"].mean(),
"max_transaction_amount_30d": recent_data["amount"].max(),
"transaction_frequency_30d": len(recent_data) / 30,
"unique_categories_30d": recent_data["category"].n_unique()
}
return features
def compute_time_series_features(self, data: pl.DataFrame) -> Dict[str, Any]:
"""Compute time-series based features."""
if len(data) < 2:
return {}
# Sort by timestamp
data = data.sort("timestamp")
# Calculate trends
recent_week = data.tail(7)
previous_week = data.slice(-14, 7)
if len(recent_week) > 0 and len(previous_week) > 0:
recent_avg = recent_week["amount"].mean()
previous_avg = previous_week["amount"].mean()
trend = (recent_avg - previous_avg) / previous_avg * 100 if previous_avg > 0 else 0
else:
trend = 0
features = {
"amount_trend_7d_pct": trend,
"days_since_last_transaction": (datetime.utcnow() - datetime.fromisoformat(data["timestamp"][-1])).days,
"transaction_velocity": len(data) / max((datetime.utcnow() - datetime.fromisoformat(data["timestamp"])).days, 1)
}
return features
def serve_features(self, entity_id: str, entity_type: str, feature_names: List[str] = None) -> Dict[str, Any]:
"""Serve features for real-time inference."""
# Load from feature store
feature_df = tower.load_table(self.feature_store_table)
# Filter for entity
entity_features = feature_df.filter(
(pl.col("entity_id") == entity_id) &
(pl.col("entity_type") == entity_type)
).sort("computed_at", reverse=True).limit(1)
if len(entity_features) == 0:
# Compute features on demand
return self.compute_features(entity_id, entity_type)
# Return cached features
features = entity_features.to_dicts()
# Filter requested features
if feature_names:
features = {k: v for k, v in features.items() if k in feature_names or k in ["entity_id", "entity_type", "computed_at"]}
return features
# Usage in a real-time ML pipeline
def real_time_prediction_pipeline():
"""Example of using the feature store in a real-time ML pipeline."""
feature_store = RealTimeFeatureStore()
# Serve features for real-time inference
features = feature_store.serve_features(
entity_id="customer_12345",
entity_type="customer",
feature_names=["total_transactions_30d", "avg_transaction_amount_30d", "amount_trend_7d_pct"]
)
# Use features for ML inference
prediction = ml_model.predict(features)
return predictionclass DataMeshCoordinator:
"""
Implement data mesh principles using Tower.dev.
"""
def __init__(self):
self.domain_registry = {}
self.data_products = {}
def register_data_domain(self, domain_name: str, owner: str, apps: List[str]):
"""Register a data domain with its apps."""
self.domain_registry[domain_name] = {
"owner": owner,
"apps": apps,
"data_products": [],
"quality_score": 0.0,
"last_updated": datetime.utcnow().isoformat()
}
print(f"🌐 Registered data domain: {domain_name}")
def create_data_product(self, domain: str, product_name: str,
source_apps: List[str], schema: Dict[str, str]):
"""Create a data product within a domain."""
product_id = f"{domain}.{product_name}"
self.data_products[product_id] = {
"domain": domain,
"product_name": product_name,
"source_apps": source_apps,
"schema": schema,
"consumers": [],
"sla": {
"freshness_hours": 24,
"availability_pct": 99.9,
"quality_threshold": 0.95
}
}
# Add to domain registry
self.domain_registry[domain]["data_products"].append(product_id)
print(f"📦 Created data product: {product_id}")
def publish_data_product(self, product_id: str, data: Any):
"""Publish data product to the mesh."""
if product_id not in self.data_products:
raise ValueError(f"Data product {product_id} not found")
product = self.data_products[product_id]
table_name = f"data_product_{product_id.replace('.', '_')}"
# Store in Iceberg with governance metadata
import pyarrow as pa
import pandas as pd
# Convert data to Arrow format
if isinstance(data, pd.DataFrame):
arrow_table = pa.Table.from_pandas(data)
elif isinstance(data, list):
df = pd.DataFrame(data)
arrow_table = pa.Table.from_pandas(df)
else:
arrow_table = data
# Create or get table reference
table = tower.tables(table_name).create_if_not_exists(arrow_table.schema)
table.insert(arrow_table)
mode="overwrite",
metadata={
"data_product_id": product_id,
"domain": product["domain"],
"schema_version": "1.0",
"published_at": datetime.utcnow().isoformat(),
"quality_score": self.calculate_quality_score(data)
}
)
print(f"📡 Published data product: {product_id}")
def consume_data_product(self, product_id: str, consumer_app: str):
"""Consume a data product from the mesh."""
if product_id not in self.data_products:
raise ValueError(f"Data product {product_id} not found")
# Add consumer tracking
self.data_products[product_id]["consumers"].append({
"app": consumer_app,
"first_access": datetime.utcnow().isoformat()
})
# Load data product
table_name = f"data_product_{product_id.replace('.', '_')}"
data = tower.load_table(table_name)
print(f"🔌 Data product {product_id} consumed by {consumer_app}")
return dataPause and Reflect: We've now covered advanced integration patterns and use cases. Tower.dev's flexibility allows it to serve as anything from a simple compute layer to the foundation of a complete data mesh architecture. The key is starting simple and evolving your usage as your needs grow.
Congratulations! You've just completed a comprehensive journey through Tower.dev, from basic concepts to advanced production patterns. You now have the knowledge to build sophisticated data pipelines, implement modern data architectures, and integrate Tower.dev into complex data ecosystems.
In this article, you've learned:
- The Fundamentals: Understanding Tower.dev's architecture and core concepts
- Practical Implementation: Building real ETL pipelines, analytics workflows, and AI applications
- Production Excellence: Monitoring, error handling, and team collaboration patterns
- Advanced Techniques: Schema evolution, incremental processing, and integration strategies
- Strategic Thinking: How to position Tower.dev within your existing data infrastructure
Remember Simon Rosenberger's quote from Taktile: "Tower is like Docker, Dagster, and Jenkins having a baby 🐳+ 🐙+ 🤵🏼♂️ = 💜" . Just as Docker revolutionized application deployment, Tower.dev is revolutionizing data engineering by solving the fundamental problem of environment consistency and deployment complexity.
The traditional data engineering workflow that once took hours or days to deploy now takes minutes. The environment inconsistencies that plagued data teams are eliminated. The infrastructure complexity that required dedicated DevOps resources is abstracted away.
Companies using Tower.dev report dramatic productivity improvements:
- 10x faster deployment cycles compared to traditional containerized approaches
- 50-80% reduction in environment-related debugging time
- Team scaling from 2 dedicated data engineers to "everyone who knows Python" contributing to production pipelines
These aren't just incremental improvements – they're transformational changes that provide sustainable competitive advantages in data-driven markets.
Here's your challenge to cement your Tower.dev knowledge and join the ranks of practitioners seeing these productivity gains:
Build and Deploy a Complete Data Pipeline in 24 Hours
Your Mission: Create a production-ready data pipeline that:
- Extracts data from a public API (try the GitHub API, weather data, or stock prices)
- Transforms the data with quality validation
- Loads it into an Iceberg table
- Generates a simple analysis or insight
- Runs on a schedule (simulate with manual triggers)
Success Criteria:
- ✅ Code runs identically locally and in Tower.dev
- ✅ Includes proper error handling and logging
- ✅ Uses secrets management for any API keys
- ✅ Validates data quality before processing
- ✅ Stores results in a queryable format
- ✅ Generates actionable output (dashboard, report, or alert)
Bonus Points:
- 🌟 Add LLM-powered insights using Tower.dev's AI capabilities
- 🌟 Implement incremental processing with state management
- 🌟 Create a monitoring dashboard for your pipeline
- 🌟 Share your results with the Tower.dev community
- Sign up for Tower.dev: Visit https://tower.dev and get your beta access
- Clone the examples:
git clone https://github.com/tower/tower-examples - Start with Hello World: Follow the quickstart guide to deploy your first app
- Build your challenge project: Use the patterns from this article
- Share your results: Join the Tower.dev community and share your experience
Tower.dev represents a fundamental shift in how we think about data infrastructure. Just as the cloud eliminated the need for companies to manage physical servers, Tower.dev eliminates the need for data teams to manage complex deployment infrastructure.
The future belongs to data teams that can iterate quickly, deploy reliably, and focus on business value rather than infrastructure complexity. Tower.dev is your gateway to that future.
The modern data stack is evolving rapidly. Apache Iceberg is becoming the standard for data storage . Python frameworks like Polars and dltHub are replacing older technologies . AI is being integrated into every data workflow .
Tower.dev positions you at the forefront of all these trends with a single, cohesive platform. You're not just learning a new tool – you're mastering the future of data engineering.
The question isn't whether you'll eventually need a platform like Tower.dev. The question is whether you'll be ahead of the curve or catching up.
Your 24-hour challenge starts now. Build something amazing.
"The best time to plant a tree was 20 years ago. The second best time is now." - Chinese Proverb
The best time to transform your data engineering workflow was when Tower.dev launched. The second best time is right now.
Start building. The data revolution is waiting for you.