Skip to content

Commit 7c66375

Browse files
authored
Streaming client improvements and snowflake loader features (#15)
* loaders: Add label management system for CSV-based enrichment - Load labels from CSV files with automatic type detection - Support hex string to binary conversion for Ethereum addresses - Thread-safe label storage and retrieval - Add LabelJoinConfig type for configuring joins * streaming: Add unified stream state management for resume and dedup - StreamStateStore interface with in-memory, null, and DB-backed implementations - Block range tracking with gap detection - Reorg invalidation support Key features: - Resume from last processed position after crashes - Exactly-once semantics via batch deduplication - Gap detection and intelligent backfill - Support for multiple networks and tables * streaming: Add resilience features - Exponential backoff with jitter for transient failures - Adaptive rate limiting with automatic adjustment - Back pressure detection and mitigation - Error classification (transient vs permanent) - Configurable retry policies Features: - Auto-detects rate limits and slows down requests - Detects timeouts and adjusts batch sizes - Production-tested configurations included * *: Major base loader improvements for streaming and resilience - Integrate state management for resume and deduplication - Add label joining support with automatic type conversion - Implement resilience features (retry, backpressure, rate limiting) - Add metadata columns (_amp_batch_id) for reorg handling - Support streaming with block ranges and reorg detection - Separate _try_load_batch() for better error handling * streaming: Enhance parallel execution; resumability & gap detection - Add resume optimization that adjusts min_block based on persistent state - Implement gap-aware partitioning for intelligent backfill - Add pre-flight table creation to avoid locking issues - Improve error handling and logging for state operations - Support label joining in parallel workers Key features: - Auto-detects processed ranges and skips already-loaded partitions - Prioritizes gap filling before processing new data - Efficient partition creation avoiding redundant work - Visible logging for resume operations and adjustments Resume workflow: 1. Query state store for max processed block 2. Adjust min_block to skip processed ranges 3. Detect gaps in processed data 4. Create partitions prioritizing gaps first 5. Process remaining historical data * client: Integrate label manager into Client for enriched streaming Add label management to Client class: - Initialize LabelManager with configurable label directory - Support loading labels from CSV files - Pass label_manager to all loader instances - Enable label joining in streaming queries via load() method Updates: - Client now supports label enrichment out of the box - Loaders inherit label_manager from client - Add pyarrow.csv dependency for label loading * loaders: Update all impls for new base class interface - PostgreSQL: Add reorg support with DELETE/UPDATE, metadata columns - Redis: Add streaming metadata and batch ID support - DeltaLake: Support new metadata columns - Iceberg: Update for base class changes - LMDB: Add metadata column support All loaders now support: - State-backed resume and deduplication - Label joining via base class - Resilience features (retry, backpressure) - Reorg-aware streaming with metadata tracking * test: Add comprehensive unit tests for streaming features Add unit tests for all new streaming features: - test_label_joining.py - Label enrichment with type conversion - test_label_manager.py - CSV loading and label storage - test_resilience.py - Retry, backoff, rate limiting - test_resume_optimization.py - Resume position calculation - test_stream_state.py - State store implementations - test_streaming_helpers.py - Utility functions and batch ID generation - test_streaming_types.py - BlockRange, ResumeWatermark types * snowflake_loader: Major improvements with state management - Add Snowflake-backed persistent state store (amp_stream_state table) - Implement SnowflakeStreamStateStore with overlap detection - Support multiple loading methods: stage, insert, pandas, snowpipe_streaming - Add connection pooling for parallel workers - Implement reorg history tracking with simplified schema - Support Parquet stage loading for better performance State management features: - Block-level overlap detection for different partition sizes - MERGE-based upsert to prevent duplicate state entries - Resume position calculation with gap detection - Deduplication across runs Performance improvements: - Parallel stage loading with connection pool - Optimized Parquet format for stage loads - Efficient batch processing with metadata columns * apps: Add Snowflake parallel loading applications Add comprehensive demo applications for Snowflake loading: 1. snowflake_parallel_loader.py - Full-featured parallel loader - Configurable block ranges, workers, and partition sizes - Label joining with CSV files - State management with resume capability - Support for all Snowflake loading methods - Reorg history tracking - Clean formatted output with progress indicators 2. test_erc20_parallel_load.py - Simple ERC20 transfer loader - Basic parallel loading example - Good starting point for new users 3. test_erc20_labeled_parallel.py - Label-enriched example - Demonstrates label joining with token metadata - Shows how to enrich blockchain data 4. Query templates in apps/queries/ - erc20_transfers.sql - Decode ERC20 Transfer events - README.md - Query documentation * test: Add integration tests for loaders and streaming features New tests: - test_resilient_streaming.py - Resilience with real databases - Enhanced Snowflake loader tests with state management - Enhanced PostgreSQL tests with reorg handling - Updated Redis, DeltaLake, Iceberg, LMDB loader tests Integration test features: - Real database containers (PostgreSQL, Redis, Snowflake) - State persistence and resume testing - Label joining with actual data - Reorg detection and invalidation - Parallel loading with multiple workers - Error injection and recovery Tests require Docker for database containers. * infra: Add Docker and Kubernetes deployment configurations Add containerization and orchestration support: - General-purpose Dockerfile for amp-python - Snowflake-specific Dockerfile with parallel loader - GitHub Actions workflow for automated Docker publishing to ghcr.io - Kubernetes deployment manifest for GKE with resource limits - Comprehensive .dockerignore and .gitignore Docker images: - amp-python: Base image with all loaders - amp-snowflake: Optimized for Snowflake parallel loading - Includes snowflake_parallel_loader.py as entrypoint - Pre-configured with Snowflake connector and dependencies * docs: Add comprehensive documentation for new features - All loading methods comparison (stage, insert, pandas, streaming) - State management and resume capability - Label joining for data enrichment - Performance tuning and optimization - Parallel loading configuration - Reorg handling strategies - Troubleshooting common issues * data: Save performance benchmarks * Formatting * Linting fixes * label manager: Remove data directory and document how to add label files Users should now mount label CSV files at runtime using volume mounts (Docker) or init containers with cloud storage (Kubernetes). Changes - Removed COPY data/ line from both Dockerfiles - The /data directory is still created (mkdir -p /app /data) but empty - Updated .gitignore to ignore entire data/ directory - Removed data/** trigger from docker-publish workflow - Added comprehensive docs/label_manager.md with: * Docker volume mount examples * Kubernetes init container pattern (recommended for large files) * ConfigMap examples (for small files <1MB) * PersistentVolume examples (for shared access) * Performance considerations and troubleshooting * redis loader: Fix reorg handling when using string data structure When data_structure='string', batch IDs are stored inside JSON values rather than as hash fields. The reorg handler now checks the data structure and uses GET+JSON parse for strings, HGET for hashes.
1 parent 68e1d60 commit 7c66375

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+10207
-1060
lines changed

.dockerignore

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Python
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class
5+
*.so
6+
.Python
7+
*.egg
8+
*.egg-info/
9+
dist/
10+
build/
11+
.eggs/
12+
13+
# Virtual environments
14+
.venv/
15+
venv/
16+
ENV/
17+
env/
18+
19+
# IDE
20+
.vscode/
21+
.idea/
22+
*.swp
23+
*.swo
24+
*~
25+
.DS_Store
26+
27+
# Testing
28+
.pytest_cache/
29+
.coverage
30+
.coverage.*
31+
htmlcov/
32+
.tox/
33+
*.cover
34+
35+
# Notebooks
36+
notebooks/
37+
*.ipynb
38+
.ipynb_checkpoints
39+
40+
# Documentation
41+
docs/
42+
*.md
43+
!README.md
44+
!DOCKER_DEPLOY.md
45+
46+
# Git
47+
.git/
48+
.gitignore
49+
.gitattributes
50+
51+
# CI/CD
52+
.github/
53+
.gitlab-ci.yml
54+
55+
# Local test data and logs
56+
tests/
57+
*.log
58+
/tmp/
59+
.test.env
60+
61+
# UV/pip cache
62+
.uv/
63+
uv.lock
64+
65+
# Docker
66+
Dockerfile*
67+
docker-compose*.yml
68+
.dockerignore
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
name: Build and Push Docker Images
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
paths:
8+
- 'src/**'
9+
- 'apps/**'
10+
- 'Dockerfile*'
11+
- 'pyproject.toml'
12+
- '.github/workflows/docker-publish.yml'
13+
pull_request:
14+
branches:
15+
- main
16+
workflow_dispatch: # Allow manual trigger
17+
inputs:
18+
tag:
19+
description: 'Docker image tag suffix (default: latest)'
20+
required: false
21+
default: 'latest'
22+
23+
env:
24+
REGISTRY: ghcr.io
25+
IMAGE_NAME: ${{ github.repository }}
26+
27+
jobs:
28+
build-and-push:
29+
runs-on: ubuntu-latest
30+
permissions:
31+
contents: read
32+
packages: write
33+
34+
strategy:
35+
matrix:
36+
include:
37+
- dockerfile: Dockerfile
38+
suffix: ""
39+
description: "Full image with all loader dependencies"
40+
- dockerfile: Dockerfile.snowflake
41+
suffix: "-snowflake"
42+
description: "Snowflake-only image (minimal dependencies)"
43+
44+
steps:
45+
- name: Checkout repository
46+
uses: actions/checkout@v4
47+
48+
- name: Set up Docker Buildx
49+
uses: docker/setup-buildx-action@v3
50+
51+
- name: Log in to GitHub Container Registry
52+
uses: docker/login-action@v3
53+
with:
54+
registry: ${{ env.REGISTRY }}
55+
username: ${{ github.actor }}
56+
password: ${{ secrets.GITHUB_TOKEN }}
57+
58+
- name: Extract metadata for Docker
59+
id: meta
60+
uses: docker/metadata-action@v5
61+
with:
62+
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
63+
flavor: |
64+
suffix=${{ matrix.suffix }},onlatest=true
65+
tags: |
66+
type=ref,event=branch
67+
type=ref,event=pr
68+
type=semver,pattern={{version}}
69+
type=semver,pattern={{major}}.{{minor}}
70+
type=sha,prefix=sha-
71+
type=raw,value=latest,enable={{is_default_branch}}
72+
73+
- name: Build and push Docker image (${{ matrix.description }})
74+
uses: docker/build-push-action@v5
75+
with:
76+
context: .
77+
file: ./${{ matrix.dockerfile }}
78+
push: ${{ github.event_name != 'pull_request' }}
79+
tags: ${{ steps.meta.outputs.tags }}
80+
labels: ${{ steps.meta.outputs.labels }}
81+
cache-from: type=gha,scope=${{ matrix.dockerfile }}
82+
cache-to: type=gha,mode=max,scope=${{ matrix.dockerfile }}
83+
platforms: linux/amd64,linux/arm64
84+
85+
- name: Image digest
86+
run: |
87+
echo "### ${{ matrix.description }}" >> $GITHUB_STEP_SUMMARY
88+
echo "Digest: ${{ steps.meta.outputs.digest }}" >> $GITHUB_STEP_SUMMARY
89+
echo "Tags: ${{ steps.meta.outputs.tags }}" >> $GITHUB_STEP_SUMMARY

.gitignore

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Environment files
2+
.env
3+
.test.env
4+
*.env
5+
6+
# Kubernetes secrets (NEVER commit these!)
7+
k8s/secret.yaml
8+
k8s/secrets.yaml
9+
10+
# Python
11+
__pycache__/
12+
*.py[cod]
13+
*$py.class
14+
*.so
15+
.Python
16+
*.egg
17+
*.egg-info/
18+
dist/
19+
build/
20+
.eggs/
21+
22+
# Virtual environments
23+
.venv/
24+
venv/
25+
ENV/
26+
env/
27+
28+
# IDE
29+
.vscode/
30+
.idea/
31+
*.swp
32+
*.swo
33+
*~
34+
.DS_Store
35+
36+
# Testing
37+
.pytest_cache/
38+
.coverage
39+
.coverage.*
40+
htmlcov/
41+
.tox/
42+
*.cover
43+
.hypothesis/
44+
45+
# Notebooks
46+
.ipynb_checkpoints/
47+
48+
# Logs
49+
*.log
50+
/tmp/
51+
52+
# UV/pip cache
53+
.uv/
54+
uv.lock
55+
56+
# Data directories (local development)
57+
# Large datasets should be downloaded on-demand or mounted via ConfigMaps
58+
data/
59+
60+
# Build artifacts
61+
*.tar.gz
62+
*.zip

Dockerfile

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Multi-stage build for optimized image size
2+
# Stage 1: Build dependencies
3+
FROM python:3.12-slim AS builder
4+
5+
# Install system dependencies
6+
RUN apt-get update && apt-get install -y --no-install-recommends \
7+
build-essential \
8+
curl \
9+
&& rm -rf /var/lib/apt/lists/*
10+
11+
# Install UV for fast dependency management
12+
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
13+
14+
# Set working directory
15+
WORKDIR /app
16+
17+
# Copy dependency files
18+
COPY pyproject.toml README.md ./
19+
20+
# Install dependencies using UV (much faster than pip)
21+
# Install ALL dependencies including all loader dependencies
22+
# This ensures optional dependencies don't cause import errors
23+
RUN uv pip install --system --no-cache \
24+
pandas>=2.3.1 \
25+
pyarrow>=20.0.0 \
26+
typer>=0.15.2 \
27+
adbc-driver-manager>=1.5.0 \
28+
adbc-driver-postgresql>=1.5.0 \
29+
protobuf>=4.21.0 \
30+
base58>=2.1.1 \
31+
'eth-hash[pysha3]>=0.7.1' \
32+
eth-utils>=5.2.0 \
33+
google-cloud-bigquery>=3.30.0 \
34+
google-cloud-storage>=3.1.0 \
35+
arro3-core>=0.5.1 \
36+
arro3-compute>=0.5.1 \
37+
psycopg2-binary>=2.9.0 \
38+
redis>=4.5.0 \
39+
deltalake>=1.0.2 \
40+
'pyiceberg[sql-sqlite]>=0.10.0' \
41+
'pydantic>=2.0,<2.12' \
42+
snowflake-connector-python>=4.0.0 \
43+
snowpipe-streaming>=1.0.0 \
44+
lmdb>=1.4.0
45+
46+
# Stage 2: Runtime image
47+
FROM python:3.12-slim
48+
49+
# Install runtime dependencies only
50+
RUN apt-get update && apt-get install -y --no-install-recommends \
51+
libpq5 \
52+
&& rm -rf /var/lib/apt/lists/*
53+
54+
# Create non-root user for security
55+
RUN useradd -m -u 1000 amp && \
56+
mkdir -p /app /data && \
57+
chown -R amp:amp /app /data
58+
59+
# Set working directory
60+
WORKDIR /app
61+
62+
# Copy Python packages from builder
63+
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
64+
65+
# Copy UV from builder for package installation
66+
COPY --from=builder /usr/local/bin/uv /usr/local/bin/uv
67+
68+
# Copy application code
69+
COPY --chown=amp:amp src/ ./src/
70+
COPY --chown=amp:amp apps/ ./apps/
71+
COPY --chown=amp:amp pyproject.toml README.md ./
72+
73+
# Note: /data directory is created but empty by default
74+
# Mount data files at runtime using Kubernetes ConfigMaps or volumes
75+
76+
# Install the amp package in the system Python (NOT editable for Docker)
77+
RUN uv pip install --system --no-cache .
78+
79+
# Switch to non-root user
80+
USER amp
81+
82+
# Set Python path
83+
ENV PYTHONPATH=/app
84+
ENV PYTHONUNBUFFERED=1
85+
86+
# Health check
87+
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
88+
CMD python -c "import sys; sys.exit(0)"
89+
90+
# Default command - run ERC20 loader
91+
# Can be overridden with docker run arguments
92+
ENTRYPOINT ["python", "apps/test_erc20_labeled_parallel.py"]
93+
CMD ["--blocks", "100000", "--workers", "8", "--flush-interval", "0.5"]

0 commit comments

Comments
 (0)