Skip to content

Commit 68e1d60

Browse files
authored
Parallel streaming: Support concurrent streams from Amp Server, partitioned by block_range (#10)
* streaming: Setup primitives for parallel streaming * loader: Wire up parallel streaming * docs: Parallel streaming implementation plan * tests: Unit and integration tests for parallel streaming - Integration tests require an Amp server * docs, README: Document parallel streams usage * postgresql_loader: Use Numeric type for Arrow UINT64 columns * parallel streaming: various improvements - Configurable reorg buffer - Create table ahead of spinning up parallel workers to ensure it's ready for all of them and avoid complexity of thread locking - SQL variables for string replacement - Better docs, including limitations
1 parent 61d1537 commit 68e1d60

File tree

16 files changed

+2586
-70
lines changed

16 files changed

+2586
-70
lines changed

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ When implementing new loaders:
7272
5. Follow existing patterns from PostgreSQL and Redis loaders
7373

7474
### Testing Strategy
75-
- **Unit tests**: Mock external dependencies, test business logic
76-
- **Integration tests**: Use testcontainers for real database testing
75+
- **Unit tests**: Test pure logic and data structures WITHOUT mocking. Unit tests should be simple, fast, and test isolated components (dataclasses, utility functions, partitioning logic, etc.). Do NOT add tests that require mocking to `tests/unit/`.
76+
- **Integration tests**: Use testcontainers for real database testing. Tests that require external dependencies (databases, Flight SQL server, etc.) belong in `tests/integration/`.
7777
- **Performance tests**: Benchmark data loading operations
7878
- Tests can be filtered using pytest markers (e.g., `-m unit` for unit tests only)
7979

Makefile

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ test-lmdb:
4444
@echo "⚡ Running LMDB tests..."
4545
$(PYTHON) pytest tests/ -m "lmdb" -v --log-cli-level=ERROR
4646

47+
# Parallel streaming integration tests
48+
test-parallel-streaming:
49+
@echo "⚡ Running parallel streaming integration tests..."
50+
$(PYTHON) pytest tests/integration/test_parallel_streaming.py -v -s --log-cli-level=INFO
51+
4752
# Performance tests
4853
test-performance:
4954
@echo "🏇 Running performance tests..."
@@ -109,16 +114,17 @@ clean:
109114
# Show available commands
110115
help:
111116
@echo "Available commands:"
112-
@echo " make setup - Setup development environment"
113-
@echo " make test-unit - Run unit tests (fast)"
114-
@echo " make test-integration - Run integration tests"
115-
@echo " make test-all - Run all tests with coverage"
116-
@echo " make test-postgresql - Run PostgreSQL tests"
117-
@echo " make test-redis - Run Redis tests"
118-
@echo " make test-snowflake - Run Snowflake tests"
119-
@echo " make test-performance - Run performance tests"
120-
@echo " make lint - Lint code with ruff"
121-
@echo " make format - Format code with ruff"
122-
@echo " make test-setup - Start test databases"
123-
@echo " make test-cleanup - Stop test databases"
124-
@echo " make clean - Clean test artifacts"
117+
@echo " make setup - Setup development environment"
118+
@echo " make test-unit - Run unit tests (fast)"
119+
@echo " make test-integration - Run integration tests"
120+
@echo " make test-parallel-streaming - Run parallel streaming integration tests"
121+
@echo " make test-all - Run all tests with coverage"
122+
@echo " make test-postgresql - Run PostgreSQL tests"
123+
@echo " make test-redis - Run Redis tests"
124+
@echo " make test-snowflake - Run Snowflake tests"
125+
@echo " make test-performance - Run performance tests"
126+
@echo " make lint - Lint code with ruff"
127+
@echo " make format - Format code with ruff"
128+
@echo " make test-setup - Start test databases"
129+
@echo " make test-cleanup - Stop test databases"
130+
@echo " make clean - Clean test artifacts"

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ and the `amp` package. For example, you can run the `execute_query` app with the
4141
uv run apps/execute_query.py
4242
```
4343

44+
## Documentation
45+
46+
### Features
47+
- **[Parallel Streaming Usage Guide](docs/parallel_streaming_usage.md)** - User guide for high-throughput parallel data loading
48+
- **[Parallel Streaming Design](docs/parallel_streaming.md)** - Technical design documentation for parallel streaming architecture
49+
- **[Reorganization Handling](docs/reorg_handling.md)** - Guide for handling blockchain reorganizations
50+
- **[Implementing Data Loaders](docs/implementing_data_loaders.md)** - Guide for creating custom data loaders
51+
4452
# Self-hosted Amp server
4553

4654
In order to operate a local Amp server you will need to have the files
@@ -133,6 +141,19 @@ make test-iceberg # Iceberg tests
133141
make test-lmdb # LMDB tests
134142
```
135143

144+
## Feature-Specific Tests
145+
146+
Run tests for specific features:
147+
```bash
148+
make test-parallel-streaming # Parallel streaming integration tests (requires Amp server)
149+
```
150+
151+
**Note**: Parallel streaming tests require an Amp server. Configure using environment variables in `.test.env`:
152+
- `AMP_SERVER_URL` - Amp server URL (e.g., `grpc://your-server:80`)
153+
- `AMP_TEST_TABLE` - Source table name (e.g., `eth_firehose.blocks`)
154+
- `AMP_TEST_BLOCK_COLUMN` - Block column name (default: `block_num`)
155+
- `AMP_TEST_MAX_BLOCK` - Max block for testing (default: `1000`)
156+
136157
# Linting and formatting
137158

138159
Ruff is configured to be used for linting and formatting of this project.

0 commit comments

Comments
 (0)