Skip to content

Commit b8ffea3

Browse files
committed
feat: Add Snowflake parallel loading applications
Add comprehensive demo applications for Snowflake loading: 1. snowflake_parallel_loader.py - Full-featured parallel loader - Configurable block ranges, workers, and partition sizes - Label joining with CSV files - State management with resume capability - Support for all Snowflake loading methods - Reorg history tracking - Clean formatted output with progress indicators 2. test_erc20_parallel_load.py - Simple ERC20 transfer loader - Basic parallel loading example - Good starting point for new users 3. test_erc20_labeled_parallel.py - Label-enriched example - Demonstrates label joining with token metadata - Shows how to enrich blockchain data 4. Query templates in apps/queries/ - erc20_transfers.sql - Decode ERC20 Transfer events - README.md - Query documentation
1 parent cf223d4 commit b8ffea3

File tree

5 files changed

+1076
-0
lines changed

5 files changed

+1076
-0
lines changed

apps/queries/README.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# SQL Query Examples for Snowflake Parallel Loader
2+
3+
This directory contains example SQL queries that can be used with `snowflake_parallel_loader.py`.
4+
5+
## Query Requirements
6+
7+
### Required Columns
8+
9+
Your query **must** include:
10+
11+
- **`block_num`** (or specify a different column with `--block-column`)
12+
- Used for partitioning data across parallel workers
13+
- Should be an integer column representing block numbers
14+
15+
### Optional Columns for Label Joining
16+
17+
If you plan to use `--label-csv` for enrichment:
18+
19+
- Include a column that matches your label key (e.g., `token_address`)
20+
- The column can be binary or string format
21+
- The loader will auto-convert binary addresses to hex strings for matching
22+
23+
### Best Practices
24+
25+
1. **Filter early**: Apply WHERE clauses in your query to reduce data volume
26+
2. **Select specific columns**: Avoid `SELECT *` for better performance
27+
3. **Use event decoding**: Use `evm_decode()` and `evm_topic()` for Ethereum events
28+
4. **Include metadata**: Include useful columns like `block_hash`, `timestamp`, `tx_hash`
29+
30+
## Example Queries
31+
32+
### ERC20 Transfers (with labels)
33+
34+
See `erc20_transfers.sql` for a complete example that:
35+
- Decodes Transfer events from raw logs
36+
- Filters for standard ERC20 transfers (topic3 IS NULL)
37+
- Includes `token_address` for label joining
38+
- Can be enriched with token metadata (symbol, name, decimals)
39+
40+
Usage:
41+
```bash
42+
python apps/snowflake_parallel_loader.py \
43+
--query-file apps/queries/erc20_transfers.sql \
44+
--table-name erc20_transfers \
45+
--label-csv data/eth_mainnet_token_metadata.csv \
46+
--label-name tokens \
47+
--label-key token_address \
48+
--stream-key token_address \
49+
--blocks 50000
50+
```
51+
52+
### Simple Log Query (without labels)
53+
54+
```sql
55+
-- Basic logs query - no decoding
56+
select
57+
block_num,
58+
block_hash,
59+
timestamp,
60+
tx_hash,
61+
log_index,
62+
address,
63+
topic0,
64+
data
65+
from eth_firehose.logs
66+
where block_num >= 19000000
67+
```
68+
69+
Usage:
70+
```bash
71+
python apps/snowflake_parallel_loader.py \
72+
--query-file my_logs.sql \
73+
--table-name raw_logs \
74+
--min-block 19000000 \
75+
--max-block 19100000
76+
```
77+
78+
### Custom Event Decoding
79+
80+
```sql
81+
-- Decode Uniswap V2 Swap events
82+
select
83+
l.block_num,
84+
l.timestamp,
85+
l.address as pool_address,
86+
evm_decode(
87+
l.topic1, l.topic2, l.topic3, l.data,
88+
'Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)'
89+
) as swap_data
90+
from eth_firehose.logs l
91+
where l.topic0 = evm_topic('Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)')
92+
```
93+
94+
## Testing Your Query
95+
96+
Before running a full parallel load, test your query with a small block range:
97+
98+
```bash
99+
# Test with just 1000 blocks
100+
python apps/snowflake_parallel_loader.py \
101+
--query-file your_query.sql \
102+
--table-name test_table \
103+
--blocks 1000 \
104+
--workers 2
105+
```
106+
107+
## Query Performance Tips
108+
109+
1. **Partition size**: Default partition size is optimized for `block_num` ranges
110+
2. **Worker count**: More workers = smaller partitions. Start with 4-8 workers
111+
3. **Block range**: Larger ranges take longer but have better per-block efficiency
112+
4. **Event filtering**: Use `topic0` filters to reduce data scanned
113+
5. **Label joins**: Inner joins reduce output rows to only matching records
114+
115+
## Troubleshooting
116+
117+
**Error: "No blocks found"**
118+
- Check that your query's source table contains data
119+
- Verify `--source-table` matches your query's FROM clause
120+
121+
**Error: "Column not found: block_num"**
122+
- Your query must include a `block_num` column
123+
- Or specify a different column with `--block-column`
124+
125+
**Label join not working**
126+
- Ensure `--stream-key` column exists in your query
127+
- Check that column types match between query and CSV
128+
- Verify CSV file has a header row with the `--label-key` column

apps/queries/erc20_transfers.sql

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
-- ERC20 Transfer Events Query
2+
--
3+
-- This query decodes ERC20 Transfer events from raw Ethereum logs.
4+
--
5+
-- Required columns for parallel loading:
6+
-- - block_num: Used for partitioning across workers
7+
--
8+
-- Label join column (if using --label-csv):
9+
-- - token_address: Binary address of the ERC20 token contract
10+
--
11+
-- Example usage:
12+
-- python apps/snowflake_parallel_loader.py \
13+
-- --query-file apps/queries/erc20_transfers.sql \
14+
-- --table-name erc20_transfers \
15+
-- --label-csv data/eth_mainnet_token_metadata.csv \
16+
-- --label-name token_metadata \
17+
-- --label-key token_address \
18+
-- --stream-key token_address \
19+
-- --blocks 100000
20+
21+
select
22+
pc.block_num,
23+
pc.block_hash,
24+
pc.timestamp,
25+
pc.tx_hash,
26+
pc.tx_index,
27+
pc.log_index,
28+
pc.address as token_address,
29+
pc.dec['from'] as from_address,
30+
pc.dec['to'] as to_address,
31+
pc.dec['value'] as value
32+
from (
33+
select
34+
l.block_num,
35+
l.block_hash,
36+
l.tx_hash,
37+
l.tx_index,
38+
l.log_index,
39+
l.timestamp,
40+
l.address,
41+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)') as dec
42+
from eth_firehose.logs l
43+
where
44+
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
45+
l.topic3 IS NULL
46+
) pc

0 commit comments

Comments
 (0)