Skip to content

Commit 0cb4429

Browse files
committed
Add example query with specific schema
1 parent c32ac14 commit 0cb4429

File tree

2 files changed

+68
-4
lines changed

2 files changed

+68
-4
lines changed

apps/kafka_streaming_loader.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,9 @@ def main(
7373
if start_block is None:
7474
start_block = get_latest_block(client, raw_dataset)
7575

76-
print(f'Starting from block {start_block}')
77-
7876
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
79-
if resume_watermark:
80-
print(f'Watermark: {resume_watermark.to_json()}')
8177

78+
print(f'Starting query from block {start_block}')
8279
print(f'Streaming to Kafka: {kafka_brokers} -> {topic}\n')
8380

8481
batch_count = 0
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
-- ERC20 Transfer Events Query (Activity Schema)
2+
--
3+
-- This query decodes ERC20 Transfer events from raw Ethereum logs
4+
-- and formats them according to the activity schema for Kafka streaming.
5+
--
6+
-- Output Schema:
7+
-- - version: Schema version (1.0)
8+
-- - chain: Chain identifier (e.g., 1 for Ethereum mainnet)
9+
-- - block_num: Block number
10+
-- - block_hash: Block hash (0x prefixed hex)
11+
-- - transaction: Transaction hash (0x prefixed hex)
12+
-- - activity_identity: Activity identity type (e.g., 'wallet')
13+
-- - activity_type: Activity type (e.g., 'payment')
14+
-- - activity_address: Primary address involved in activity
15+
-- - activity_operations: Array of operations with debited/credited amounts
16+
-- - token_address: ERC20 token contract address (0x prefixed hex)
17+
-- - token_symbol: Token symbol (from label join)
18+
-- - token_name: Token name (from label join)
19+
-- - token_decimals: Token decimals (from label join)
20+
--
21+
-- Required columns for parallel loading:
22+
-- - block_num: Used for partitioning across workers
23+
--
24+
-- Label join column (if using --label-csv):
25+
-- - token_address: Binary address of the ERC20 token contract
26+
--
27+
-- Example usage:
28+
-- uv apps/kafka_streaming_loader.py \
29+
-- --query-file apps/queries/erc20_transfers_activity.sql \
30+
-- --topic erc20_transfers \
31+
-- --label-csv data/eth_mainnet_token_metadata.csv \
32+
-- --raw-dataset eth_firehose \
33+
-- --network ethereum
34+
35+
select
36+
1.0 as version,
37+
1 as chain,
38+
l.block_num,
39+
l.block_hash,
40+
l.tx_hash as transaction,
41+
'wallet' as activity_identity,
42+
'payment' as activity_type,
43+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)')['from'] as activity_address,
44+
[
45+
struct(
46+
concat('log:', cast(l.block_num as string), ':', cast(l.log_index as string)) as id,
47+
l.address as token,
48+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)')['from'] as address,
49+
-cast(evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)')['value'] as double) as amount,
50+
'debited' as type
51+
),
52+
struct(
53+
'' as id,
54+
l.address as token,
55+
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)')['to'] as address,
56+
cast(evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)')['value'] as double) as amount,
57+
'credited' as type
58+
)
59+
] as activity_operations,
60+
l.address as token_address,
61+
cast(null as string) as token_symbol,
62+
cast(null as string) as token_name,
63+
cast(null as int) as token_decimals
64+
from eth_firehose.logs l
65+
where
66+
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
67+
l.topic3 IS NULL

0 commit comments

Comments
 (0)