Skip to content

Commit 5d3822e

Browse files
committed
v0.2.0 Overhauled eth.get_logs implementation to use batch requests for consistency instead of getting most recent logs via block hashes.
1 parent 16cb412 commit 5d3822e

File tree

5 files changed

+108
-326
lines changed

5 files changed

+108
-326
lines changed

IceCreamSwapWeb3/BatchRetryMiddleware.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ def request_wrapper(method, params):
4646
for method, params in requests_info
4747
]
4848

49+
no_splitting = len(requests_info) == 2 and requests_info[0][0] == "eth_getLogs" and requests_info[1][0] == "eth_getBlockByNumber"
50+
4951
try:
5052
response = make_batch_request(requests_info)
5153
except Exception as e:
@@ -76,12 +78,17 @@ def request_wrapper(method, params):
7678

7779
print(f"{len(requests_retry)}/{len(requests_info)} requests in batch failed, retrying. Example response: {response[request_indexes[0][0]]}")
7880

79-
if len(requests_retry) != len(requests_info): # if some requests succeeded, retry failed requests
81+
if len(requests_retry) != len(requests_info) and not no_splitting: # if some requests succeeded, retry failed requests
8082
response_new = middleware(requests_retry)
8183
for old_idx, new_idx in request_indexes:
8284
response[old_idx] = response_new[new_idx]
8385
return response
8486

87+
if no_splitting:
88+
# retry request. This being recursive is not optimal, but should be ok
89+
sleep(0.1)
90+
return middleware(requests_info)
91+
8592
assert len(requests_info) > 1
8693
middle = len(requests_info) // 2
8794
sleep(0.1)

IceCreamSwapWeb3/EthAdvanced.py

Lines changed: 99 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import os
2-
from functools import lru_cache
32
from time import sleep
43
from typing import Optional, TypedDict, Sequence
54

65
from eth_typing import BlockNumber, Address, ChecksumAddress
6+
from eth_typing.evm import BlockParams
77
from hexbytes import HexBytes
8-
from web3.datastructures import AttributeDict
98
from web3.eth import Eth
109
from web3.exceptions import ContractLogicError
1110
from web3.types import FilterParams, LogReceipt, StateOverride, BlockIdentifier, TxParams, BlockData, _Hash32
@@ -19,10 +18,22 @@ class ForkedBlock(Exception):
1918

2019

2120
class FilterParamsExtended(TypedDict, total=False):
21+
address: ChecksumAddress | list[ChecksumAddress]
22+
blockHash: HexBytes
23+
fromBlock: BlockParams | BlockNumber | int
24+
fromBlockParentHash: _Hash32
25+
toBlock: BlockParams | BlockNumber | int
26+
toBlockHash: _Hash32
27+
topics: Sequence[_Hash32 | Sequence[_Hash32] | None]
28+
29+
30+
class FilterParamsSanitized(TypedDict, total=False):
2231
address: Address | ChecksumAddress | list[Address] | list[ChecksumAddress]
2332
blockHash: HexBytes
24-
fromBlock: BlockIdentifier | BlockData
25-
toBlock: BlockIdentifier | BlockData
33+
fromBlock: int
34+
fromBlockParentHash: str
35+
toBlock: int
36+
toBlockHash: str
2637
topics: Sequence[_Hash32 | Sequence[_Hash32] | None]
2738

2839

@@ -139,32 +150,60 @@ def get_block(
139150

140151
def get_logs(
141152
self,
142-
filter_params: FilterParamsExtended,
153+
_filter_params: FilterParamsExtended,
143154
show_progress_bar: bool = False,
144155
p_bar=None,
145156
no_retry: bool = False,
146-
use_subsquid: bool = os.getenv("NO_SUBSQUID_LOGS") is None,
147-
get_logs_by_block_hash: bool = False
157+
use_subsquid: bool = os.getenv("NO_SUBSQUID_LOGS") is None
148158
) -> list[LogReceipt]:
149159
filter_block_range = self.w3.filter_block_range
150160
if filter_block_range == 0:
151161
raise Exception("RPC does not support eth_getLogs")
152162

153163
# getting logs for a single block defined by its block hash. No drama
154-
if "blockHash" in filter_params:
155-
assert "fromBlock" not in filter_params and "toBlock" not in filter_params
156-
return self.get_logs_inner(filter_params, no_retry=no_retry)
164+
if "blockHash" in _filter_params:
165+
assert "fromBlock" not in _filter_params and "toBlock" not in _filter_params
166+
return self.get_logs_inner(_filter_params, no_retry=no_retry)
167+
168+
filter_params: FilterParamsSanitized = {**_filter_params}
169+
170+
if "fromBlockParentHash" in filter_params:
171+
if filter_params["fromBlockParentHash"] is None:
172+
del filter_params["fromBlockParentHash"]
173+
elif not isinstance(filter_params["fromBlockParentHash"], str):
174+
filter_params["fromBlockParentHash"] = filter_params["fromBlockParentHash"].to_0x_hex()
175+
176+
if "toBlockHash" in filter_params:
177+
if filter_params["toBlockHash"] is None:
178+
del filter_params["toBlockHash"]
179+
elif not isinstance(filter_params["toBlockHash"], str):
180+
filter_params["toBlockHash"] = filter_params["toBlockHash"].to_0x_hex()
181+
182+
if "fromBlock" not in filter_params or not isinstance(filter_params["fromBlock"], int):
183+
assert "fromBlockParentHash" not in filter_params, "can not specify fromBlockParentHash without fromBlock number"
184+
filter_params["fromBlock"] = self.get_block(filter_params.get("fromBlock", "earliest"))["number"]
185+
186+
if "toBlock" not in filter_params or not isinstance(filter_params["toBlock"], int):
187+
assert "toBlockHash" not in filter_params, "can not specify toBlockHash without toBlock number"
188+
filter_params["toBlock"] = self.get_block(filter_params.get("toBlock", "latest"))["number"]
157189

158-
from_block, from_block_body = self.sanitize_block(filter_params.get("fromBlock", "latest"))
159-
to_block, to_block_body = self.sanitize_block(filter_params.get("toBlock", "latest"))
160-
filter_params = {**filter_params, "fromBlock": from_block, "toBlock": to_block}
190+
kwargs = dict(
191+
show_progress_bar=show_progress_bar,
192+
p_bar=p_bar,
193+
no_retry=no_retry,
194+
use_subsquid=use_subsquid,
195+
)
196+
197+
from_block = filter_params["fromBlock"]
198+
to_block = filter_params["toBlock"]
199+
from_block_parent_hash = filter_params.get("fromBlockParentHash")
200+
to_block_hash = filter_params.get("toBlockHash")
161201

162-
assert to_block >= from_block, f"{from_block=}, {to_block=}"
202+
assert to_block >= from_block, f"from block after to block, {from_block=}, {to_block=}"
163203

164204
# if logs for a single block are queried, and we know the block hash, query by it
165-
if from_block == to_block and (from_block_body or to_block_body):
166-
block_body = from_block_body if from_block_body else to_block_body
167-
single_hash_filter = {**filter_params, "blockHash": block_body["hash"]}
205+
if from_block == to_block and to_block_hash is not None:
206+
single_hash_filter = {**filter_params, "blockHash": to_block_hash}
168207
del single_hash_filter["fromBlock"]
169208
del single_hash_filter["toBlock"]
170209
return self.get_logs_inner(single_hash_filter, no_retry=no_retry)
@@ -177,78 +216,26 @@ def get_logs(
177216
# local import as tqdm is an optional dependency of this package
178217
from tqdm import tqdm
179218
p_bar = tqdm(total=num_blocks)
219+
kwargs["p_bar"] = p_bar
180220

181-
kwargs = dict(
182-
show_progress_bar=show_progress_bar,
183-
p_bar=p_bar,
184-
no_retry=no_retry,
185-
use_subsquid=use_subsquid,
186-
)
187-
188-
if use_subsquid and self.w3.subsquid_available and from_block < self.w3.latest_seen_block - self.w3.unstable_blocks:
221+
if use_subsquid and self.w3.subsquid_available and from_block < self.w3.latest_seen_block - 1_000:
189222
kwargs["use_subsquid"] = False # make sure we only try once with Subsquid
190223
try:
191224
# trying to get logs from SubSquid
192-
till_block, results = get_filter(
225+
next_block, results = get_filter(
193226
chain_id=self.chain_id,
194-
filter_params={**filter_params, "toBlock": min(to_block, self.w3.latest_seen_block - self.w3.unstable_blocks)},
227+
filter_params=filter_params,
195228
partial_allowed=True,
196229
p_bar=p_bar,
197230
)
198-
if till_block >= to_block:
199-
return results
200-
return results + self.get_logs({**filter_params, "fromBlock": till_block + 1}, **kwargs)
201231
except Exception as e:
202232
if not isinstance(e, ValueError) or "Subsquid only has indexed till block " not in str(e):
203233
print(f"Getting logs from SubSquid threw exception {repr(e)}, falling back to RPC")
204-
205-
last_stable_block = self.w3.latest_seen_block - self.w3.unstable_blocks
206-
if get_logs_by_block_hash or to_block > last_stable_block:
207-
# getting logs via from and to block range sometimes drops logs.
208-
# This does not happen when getting them individually for each block by their block hash
209-
# get all block hashes and ensure they build upon each other
210-
211-
# if only unstable blocks need to be gathered by hash, gather stable blocks as log range
212-
results: list[LogReceipt] = []
213-
if not get_logs_by_block_hash and from_block < last_stable_block:
214-
results += self.get_logs({**filter_params, "toBlock": last_stable_block}, **kwargs)
215-
from_block = last_stable_block + 1
216-
assert to_block >= from_block
217-
num_blocks = to_block - from_block + 1
218-
219-
with self.w3.batch_requests() as batch:
220-
batch.add_mapping({
221-
self.w3.eth._get_block: list(range(from_block, to_block + 1))
222-
})
223-
blocks: list[BlockData] = batch.execute()
224-
assert len(blocks) == num_blocks
225-
226-
# make sure chain of blocks is consistent with each block building on the previous one
227-
for i, block_number in enumerate(range(from_block, to_block + 1)):
228-
block = blocks[i]
229-
if i != 0:
230-
if block["parentHash"] != blocks[i-1]["hash"]:
231-
raise ForkedBlock(f"expected={blocks[i-1]['hash'].to_0x_hex()}, actual={block['parentHash'].to_0x_hex()}")
232-
if from_block_body is not None and from_block_body["number"] == block_number:
233-
if block["hash"] != from_block_body["hash"]:
234-
raise ForkedBlock(f"expected={from_block_body['hash'].to_0x_hex()}, actual={block['hash'].to_0x_hex()}")
235-
if to_block_body is not None and to_block_body["number"] == block_number:
236-
if block["hash"] != to_block_body["hash"]:
237-
raise ForkedBlock(f"expected={to_block_body['hash'].to_0x_hex()}, actual={block['hash'].to_0x_hex()}")
238-
239-
single_hash_filter = filter_params.copy()
240-
del single_hash_filter["fromBlock"]
241-
del single_hash_filter["toBlock"]
242-
with self.w3.batch_requests() as batch:
243-
batch.add_mapping({
244-
self.w3.eth._get_logs: [{**single_hash_filter, "blockHash": block["hash"]} for block in blocks]
245-
})
246-
results_per_block: list[list[LogReceipt]] = batch.execute()
247-
assert len(results_per_block) == num_blocks
248-
if p_bar is not None:
249-
p_bar.update(len(blocks))
250-
results += sum(results_per_block, [])
251-
return results
234+
else:
235+
assert next_block <= to_block + 1, "SubSquid returned logs for more blocks than specified"
236+
if next_block == to_block + 1:
237+
return results
238+
return results + self.get_logs({**filter_params, "fromBlock": next_block}, **kwargs)
252239

253240
# getting logs for a single block, which is not at the chain head. No drama
254241
if num_blocks == 1:
@@ -258,16 +245,42 @@ def get_logs(
258245
if num_blocks > filter_block_range:
259246
results = []
260247
for filter_start in range(from_block, to_block + 1, filter_block_range):
261-
results += self.get_logs({
248+
filter_end = min(filter_start + filter_block_range - 1, to_block)
249+
partial_filter = {
262250
**filter_params,
263251
"fromBlock": filter_start,
264-
"toBlock": min(filter_start + filter_block_range - 1, to_block),
265-
}, **kwargs)
252+
"toBlock": filter_end,
253+
}
254+
if to_block_hash is not None and filter_end != to_block:
255+
del partial_filter["toBlockHash"]
256+
if from_block_parent_hash is not None and filter_start != from_block:
257+
del partial_filter["fromBlockParentHash"]
258+
results += self.get_logs(partial_filter, **kwargs)
266259
return results
267260

268261
# get logs and split on exception
269262
try:
270-
events = self._get_logs(filter_params)
263+
with self.w3.batch_requests() as batch:
264+
if from_block_parent_hash is not None:
265+
batch.add(self._get_block(from_block))
266+
batch.add(self._get_logs(filter_params))
267+
batch.add(self._get_block(to_block))
268+
events: list[LogReceipt]
269+
to_block_body: BlockData
270+
batch_results = batch.execute()
271+
if from_block_parent_hash is not None:
272+
events, to_block_body = batch_results
273+
else:
274+
from_block_body: BlockData
275+
events, to_block_body, from_block_body = batch_results
276+
assert from_block_body["number"] == from_block, "eth_getLogs RPC returned unexpected from block number"
277+
if from_block_body["parentHash"].to_0x_hex() != from_block_parent_hash:
278+
raise ForkedBlock(f"expected={from_block_parent_hash}, actual={from_block_body['parentHash'].to_0x_hex()}")
279+
280+
assert to_block_body["number"] == to_block, "eth_getLogs RPC returned unexpected to block number"
281+
if to_block_hash is not None and to_block_body["hash"].to_0x_hex() != to_block_hash:
282+
raise ForkedBlock(f"expected={to_block_hash}, actual={to_block_body['hash'].to_0x_hex()}")
283+
271284
if p_bar is not None:
272285
p_bar.update(num_blocks)
273286
return events
@@ -277,21 +290,12 @@ def get_logs(
277290
mid_block = (from_block + to_block) // 2
278291
left_filter = {**filter_params, "toBlock": mid_block}
279292
right_filter = {**filter_params, "fromBlock": mid_block + 1}
293+
if "toBlockHash" in left_filter:
294+
del left_filter["toBlockHash"]
295+
if "fromBlockParentHash" in right_filter:
296+
del right_filter["fromBlockParentHash"]
280297
return self.get_logs(left_filter, **kwargs) + self.get_logs(right_filter, **kwargs)
281298

282-
283-
def sanitize_block(self, block: BlockIdentifier | BlockData) -> tuple[int, BlockData | None]:
284-
if isinstance(block, int):
285-
block_body = None
286-
block_number = block
287-
elif isinstance(block, AttributeDict) or isinstance(block, dict):
288-
block_body = block
289-
block_number = block["number"]
290-
else:
291-
block_body = self.get_block(block)
292-
block_number = block_body["number"]
293-
return block_number, block_body
294-
295299
def get_logs_inner(self, filter_params: FilterParams, no_retry: bool = False):
296300
if not self.w3.should_retry:
297301
no_retry = True

0 commit comments

Comments
 (0)