Skip to content

Commit b569b18

Browse files
committed
Add evm block indexer example
1 parent 88879dd commit b569b18

File tree

3 files changed

+677
-236
lines changed

3 files changed

+677
-236
lines changed

examples/evm_block_indexer.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
from typing import Any
2+
3+
import argparse
4+
import json
5+
import os
6+
from datetime import datetime
7+
8+
import lz4.frame
9+
import msgpack
10+
11+
12+
def decompress_lz4(input_file, output_file):
13+
with open(input_file, "rb") as f_in:
14+
compressed_data = f_in.read()
15+
16+
decompressed_data = lz4.frame.decompress(compressed_data)
17+
18+
with open(output_file, "wb") as f_out:
19+
f_out.write(decompressed_data)
20+
21+
22+
class BytesEncoder(json.JSONEncoder):
23+
def default(self, obj):
24+
if isinstance(obj, bytes):
25+
return "0x" + obj.hex()
26+
return super().default(obj)
27+
28+
29+
class EthBlockIndexer:
30+
def __init__(self):
31+
self.blocks = []
32+
33+
# convert a Buffer object to hex string
34+
def _convert_buffer(self, buffer_obj: dict[str, Any]) -> str:
35+
if isinstance(buffer_obj, dict) and buffer_obj.get("type") == "Buffer":
36+
return "0x" + "".join(f"{x:02x}" for x in buffer_obj["data"])
37+
return str(buffer_obj)
38+
39+
# recursively process nested Buffer objects
40+
def _process_nested_buffers(self, data: Any) -> Any:
41+
if isinstance(data, dict):
42+
if data.get("type") == "Buffer":
43+
return self._convert_buffer(data)
44+
return {k: self._process_nested_buffers(v) for k, v in data.items()}
45+
elif isinstance(data, list):
46+
return [self._process_nested_buffers(item) for item in data]
47+
elif isinstance(data, bytes):
48+
return "0x" + data.hex()
49+
return data
50+
51+
def _bytes_to_int(self, value: Any) -> int:
52+
if isinstance(value, dict) and value.get("type") == "Buffer":
53+
raw_bytes = bytes(value["data"])
54+
return int.from_bytes(raw_bytes, byteorder="big")
55+
elif isinstance(value, bytes):
56+
return int.from_bytes(value, byteorder="big")
57+
return 0
58+
59+
def _process_transaction(self, tx: dict[str, Any]) -> dict[str, Any]:
60+
if not tx.get("transaction"):
61+
return {}
62+
63+
tx_data = tx["transaction"]
64+
tx_type = next(iter(tx_data.keys())) # Either 'Legacy' or 'Eip1559'
65+
tx_content = tx_data[tx_type]
66+
67+
processed = {
68+
"type": tx_type,
69+
"chainId": self._bytes_to_int(tx_content.get("chainId", {"type": "Buffer", "data": []})),
70+
"nonce": self._bytes_to_int(tx_content.get("nonce", {"type": "Buffer", "data": []})),
71+
"gas": self._bytes_to_int(tx_content.get("gas", {"type": "Buffer", "data": []})),
72+
"to": self._process_nested_buffers(tx_content.get("to")),
73+
"value": self._bytes_to_int(tx_content.get("value", {"type": "Buffer", "data": []})),
74+
"input": self._process_nested_buffers(tx_content.get("input")),
75+
"signature": [self._process_nested_buffers(sig) for sig in tx.get("signature", [])],
76+
}
77+
78+
if tx_type == "Legacy":
79+
processed["gasPrice"] = self._bytes_to_int(tx_content.get("gasPrice", {"type": "Buffer", "data": []}))
80+
elif tx_type == "Eip1559":
81+
processed.update(
82+
{
83+
"maxFeePerGas": self._bytes_to_int(tx_content.get("maxFeePerGas", {"type": "Buffer", "data": []})),
84+
"maxPriorityFeePerGas": self._bytes_to_int(
85+
tx_content.get("maxPriorityFeePerGas", {"type": "Buffer", "data": []})
86+
),
87+
"accessList": self._process_nested_buffers(tx_content.get("accessList", [])),
88+
}
89+
)
90+
91+
return processed
92+
93+
def _process_block(self, block_data: dict[str, Any]) -> dict[str, Any]:
94+
if not isinstance(block_data, dict) or "block" not in block_data:
95+
raise ValueError("invalid block format")
96+
97+
reth_block = block_data["block"]["Reth115"]
98+
header = reth_block.get("header", {}).get("header", {})
99+
100+
processed_block = {
101+
"hash": self._process_nested_buffers(reth_block["header"].get("hash")),
102+
"parentHash": self._process_nested_buffers(header.get("parentHash")),
103+
"sha3Uncles": self._process_nested_buffers(header.get("sha3Uncles")),
104+
"miner": self._process_nested_buffers(header.get("miner")),
105+
"stateRoot": self._process_nested_buffers(header.get("stateRoot")),
106+
"transactionsRoot": self._process_nested_buffers(header.get("transactionsRoot")),
107+
"receiptsRoot": self._process_nested_buffers(header.get("receiptsRoot")),
108+
"number": self._bytes_to_int(header.get("number", {"type": "Buffer", "data": []})),
109+
"gasLimit": self._bytes_to_int(header.get("gasLimit", {"type": "Buffer", "data": []})),
110+
"gasUsed": self._bytes_to_int(header.get("gasUsed", {"type": "Buffer", "data": []})),
111+
"timestamp": self._bytes_to_int(header.get("timestamp", {"type": "Buffer", "data": []})),
112+
"extraData": self._process_nested_buffers(header.get("extraData")),
113+
"baseFeePerGas": self._bytes_to_int(header.get("baseFeePerGas", {"type": "Buffer", "data": []})),
114+
"transactions": [
115+
self._process_transaction(tx) for tx in reth_block.get("body", {}).get("transactions", [])
116+
],
117+
}
118+
119+
if processed_block["timestamp"]:
120+
processed_block["datetime"] = datetime.fromtimestamp(processed_block["timestamp"]).isoformat()
121+
else:
122+
processed_block["datetime"] = None
123+
124+
return processed_block
125+
126+
def process_msgpack_file(self, filename: str) -> None:
127+
with open(filename, "rb") as f:
128+
data = msgpack.load(f)
129+
if isinstance(data, list):
130+
for block_data in data:
131+
processed_block = self._process_block(block_data)
132+
self.blocks.append(processed_block)
133+
else:
134+
processed_block = self._process_block(data)
135+
self.blocks.append(processed_block)
136+
137+
def save_to_json(self, output_filename: str) -> None:
138+
with open(output_filename, "w") as f:
139+
json.dump(
140+
{
141+
"blocks": self.blocks,
142+
"totalBlocks": len(self.blocks),
143+
"totalTransactions": sum(len(block["transactions"]) for block in self.blocks),
144+
},
145+
f,
146+
indent=2,
147+
cls=BytesEncoder,
148+
)
149+
150+
def summarize_blocks(self) -> dict[str, Any]:
151+
if not self.blocks:
152+
return {"error": "no blocks processed"}
153+
154+
total_gas_used = sum(block["gasUsed"] for block in self.blocks)
155+
total_txs = sum(len(block["transactions"]) for block in self.blocks)
156+
157+
return {
158+
"totalBlocks": len(self.blocks),
159+
"totalTransactions": total_txs,
160+
"averageGasUsed": total_gas_used / len(self.blocks) if self.blocks else 0,
161+
"blockNumbers": [block["number"] for block in self.blocks],
162+
"timeRange": {
163+
"first": next((b["datetime"] for b in self.blocks if b["datetime"]), None),
164+
"last": next((b["datetime"] for b in reversed(self.blocks) if b["datetime"]), None),
165+
},
166+
}
167+
168+
169+
if __name__ == "__main__":
170+
# Download ethereum block files from s3://hl-[testnet|mainnet]-evm-blocks
171+
# and input them into the indexer
172+
parser = argparse.ArgumentParser(description="index evm blocks")
173+
parser.add_argument("--data-dir", type=str, required=True)
174+
parser.add_argument("--start-height", type=int, required=True)
175+
parser.add_argument("--end-height", type=int, required=True)
176+
args = parser.parse_args()
177+
178+
data_dir = args.data_dir
179+
start_height = args.start_height
180+
end_height = args.end_height
181+
mp_flns = []
182+
for height in range(start_height, end_height + 1):
183+
lz4_fln = f"{data_dir}/{height}.rmp.lz4"
184+
if not os.path.exists(lz4_fln):
185+
raise Exception(
186+
f"block with height {height} not found - download missing block file(s) using 'aws s3 cp s3://hl-[testnet | mainnet]-evm-blocks/<block_object_path> --request-payer requester'"
187+
)
188+
mp_fln = f"{data_dir}/{height}.rmp"
189+
decompress_lz4(lz4_fln, mp_fln)
190+
mp_flns.append(mp_fln)
191+
192+
indexer = EthBlockIndexer()
193+
for mp_fln in mp_flns:
194+
indexer.process_msgpack_file(mp_fln)
195+
print(indexer.summarize_blocks())
196+
indexer.save_to_json(f"{data_dir}/processed_blocks.json")

0 commit comments

Comments
 (0)