Skip to content

Commit 389bb69

Browse files
committed
get_logs now individually requests logs by block hash for unstable blocks (default last 2). requesting a filter with toBlock higher than what the node knows of just ignores the missing blocks which results in missed events.
1 parent f30ea7f commit 389bb69

File tree

2 files changed

+58
-40
lines changed

2 files changed

+58
-40
lines changed

IceCreamSwapWeb3/EthAdvanced.py

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,18 @@ def get_logs(
125125
p_bar=None,
126126
no_retry: bool = False
127127
) -> list[LogReceipt]:
128-
if not self.w3.should_retry:
129-
no_retry = True
128+
filter_block_range = self.w3.filter_block_range
129+
if filter_block_range == 0:
130+
raise Exception("RPC does not support eth_getLogs")
130131

131-
# getting the respective block numbers, could be block hashes or strings like "latest"
132-
from_block = filter_params["fromBlock"]
133-
to_block = filter_params["toBlock"]
134-
if not isinstance(from_block, int):
135-
from_block = self.get_block(from_block)["number"]
136-
if not isinstance(to_block, int):
137-
to_block = self.get_block(to_block)["number"]
132+
# getting logs for a single block defined by its block hash. No drama
133+
if "blockHash" in filter_params:
134+
assert "fromBlock" not in filter_params and "toBlock" not in filter_params
135+
return self.get_logs_inner(filter_params, no_retry=no_retry)
138136

137+
# sanitizing block numbers, could be strings like "latest"
138+
filter_params["fromBlock"] = from_block = self.get_block_number_from_identifier(filter_params["fromBlock"])
139+
filter_params["toBlock"] = to_block = self.get_block_number_from_identifier(filter_params["toBlock"])
139140
assert to_block >= from_block, f"{from_block=}, {to_block=}"
140141

141142
# note: fromBlock and toBlock are both inclusive. e.g. 5 to 6 are 2 blocks
@@ -147,51 +148,66 @@ def get_logs(
147148
from tqdm import tqdm
148149
p_bar = tqdm(total=num_blocks)
149150

151+
kwargs = dict(
152+
show_progress_bar=show_progress_bar,
153+
p_bar=p_bar,
154+
no_retry=no_retry,
155+
)
156+
157+
# the latest blocks might be available on some nodes but not at others.
158+
# setting toBlock to a block bigger than the latest known block of the node
159+
# simply ignores logs from the missing block
160+
# to prevent this, we get the latest blocks individually by their hashes
161+
unstable_blocks = self.w3.unstable_blocks
162+
if to_block > self.w3.latest_seen_block - unstable_blocks and to_block > self.block_number - unstable_blocks:
163+
results = []
164+
while to_block > self.w3.latest_seen_block - unstable_blocks and to_block >= from_block:
165+
single_hash_filter = {**filter_params, "blockHash": self.get_block(to_block)["hash"]}
166+
del single_hash_filter["fromBlock"]
167+
del single_hash_filter["toBlock"]
168+
results += self.get_logs_inner(single_hash_filter, no_retry=no_retry)
169+
to_block -= 1
170+
if p_bar is not None:
171+
p_bar.update(1)
172+
if to_block >= from_block:
173+
results += self.get_logs({**filter_params, "toBlock": to_block}, **kwargs)
174+
return results
175+
176+
# getting logs for a single block, which is not at the chain head. No drama
177+
if num_blocks == 1:
178+
return self.get_logs_inner(filter_params, no_retry=no_retry)
179+
150180
# if we already know that the filter range is too large, split it
151-
filter_block_range = self.w3.filter_block_range
152-
if filter_block_range == 0:
153-
raise Exception("RPC does not support eth_getLogs")
154181
if num_blocks > filter_block_range:
155182
results = []
156183
for filter_start in range(from_block, to_block + 1, filter_block_range):
157-
filter_end = min(filter_start + filter_block_range - 1, to_block)
158-
partial_filter = filter_params.copy()
159-
partial_filter["fromBlock"] = filter_start
160-
partial_filter["toBlock"] = filter_end
161-
results += self.get_logs(partial_filter, show_progress_bar=show_progress_bar, p_bar=p_bar)
184+
results += self.get_logs({
185+
**filter_params,
186+
"fromBlock": filter_start,
187+
"toBlock": min(filter_start + filter_block_range - 1, to_block),
188+
}, **kwargs)
162189
return results
163190

164191
# get logs
165192
try:
166193
events = self._get_logs(filter_params)
167-
except Exception:
168-
# if errors should not be retried, still do splitting but not retry if it can not be split further
169-
if no_retry and num_blocks == 1:
170-
raise
171-
else:
172194
if p_bar is not None:
173195
p_bar.update(num_blocks)
174196
return events
175-
176-
# if directly getting logs did not work, split the filter range and try again
177-
if num_blocks > 1:
197+
except Exception:
198+
# split the filter range and try again
178199
mid_block = (from_block + to_block) // 2
179-
left_filter = filter_params.copy()
180-
left_filter["toBlock"] = mid_block
181-
right_filter = filter_params.copy()
182-
right_filter["fromBlock"] = mid_block + 1
200+
left_filter = {**filter_params, "toBlock": mid_block}
201+
right_filter = {**filter_params, "fromBlock": mid_block + 1}
202+
return self.get_logs(left_filter, **kwargs) + self.get_logs(right_filter, **kwargs)
183203

184-
results = []
185-
results += self.get_logs(left_filter, show_progress_bar=show_progress_bar, p_bar=p_bar)
186-
results += self.get_logs(right_filter, show_progress_bar=show_progress_bar, p_bar=p_bar)
187-
return results
204+
def get_logs_inner(self, filter_params: FilterParams, no_retry: bool = False):
205+
if not self.w3.should_retry:
206+
no_retry = True
207+
return exponential_retry(func_name="get_logs")(self._get_logs)(filter_params, no_retry=no_retry)
188208

189-
# filter is trying to get a single block, retrying till it works
190-
assert from_block == to_block and num_blocks == 1, f"{from_block=}, {to_block=}, {num_blocks=}"
191-
events = exponential_retry(func_name="get_logs")(self._get_logs)(filter_params)
192-
if p_bar is not None:
193-
p_bar.update(num_blocks)
194-
return events
209+
def get_block_number_from_identifier(self, block_identifier: BlockIdentifier) -> BlockNumber:
210+
return block_identifier if isinstance(block_identifier, int) else self.get_block(block_identifier)["number"]
195211

196212
def _chain_id(self):
197213
# usually this causes an RPC call and is used in every eth_call. Getting it once in the init and then not again.

IceCreamSwapWeb3/Web3Advanced.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ class Web3Advanced(Web3):
3434
def __init__(
3535
self,
3636
node_url: str,
37-
should_retry=True,
37+
should_retry: bool = True,
38+
unstable_blocks: int = 2, # not all nodes might have latest n blocks, these are seen as unstable
3839
):
3940
patch_error_formatters()
4041
self.node_url = node_url
4142
self.should_retry = should_retry
43+
self.unstable_blocks = unstable_blocks
4244

4345
provider = self._construct_provider(node_url=self.node_url)
4446

0 commit comments

Comments
 (0)