Skip to content

Commit 8018649

Browse files
author
Matthias Zimmermann
committed
fix: adjust for unit testing against kaolin
1 parent d3602c8 commit 8018649

File tree

6 files changed

+215
-32
lines changed

6 files changed

+215
-32
lines changed

src/arkiv/contract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188
mungers=[default_root_munger],
189189
),
190190
"query": Method(
191-
json_rpc_method=RPCEndpoint("golembase_query"),
191+
json_rpc_method=RPCEndpoint("arkiv_query"),
192192
mungers=[default_root_munger],
193193
),
194194
}

src/arkiv/events.py

Lines changed: 106 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
1-
"""Event filtering and monitoring for Arkiv entities."""
1+
"""Event filtering and monitoring for Arkiv entities.
2+
3+
This module provides event watching functionality for Arkiv smart contract events.
4+
It supports two polling strategies:
5+
6+
1. Filter-based polling (eth_newFilter + eth_getFilterChanges):
7+
- More efficient, uses persistent filters on the node
8+
- Works with local nodes and some RPC providers
9+
- Automatically tried first
10+
11+
2. Log polling (eth_getLogs):
12+
- Maximum compatibility, works with all RPC providers
13+
- Used as fallback when filter creation fails (e.g., Kaolin testnet blocks eth_newFilter)
14+
- Polls for new logs on each interval
15+
16+
The implementation automatically detects which method to use based on provider capabilities.
17+
"""
218

319
from __future__ import annotations
420

@@ -42,7 +58,8 @@ class EventFilter(EventFilterBase[SyncCallback]):
4258
"""
4359
Handle for watching entity events using HTTP polling.
4460
45-
Uses polling-based filter with get_new_entries() for event monitoring.
61+
Uses direct log polling via eth_getLogs for maximum compatibility.
62+
This approach works with RPC providers that don't support eth_newFilter.
4663
WebSocket providers are not supported by the sync Arkiv client.
4764
4865
Inherits shared event parsing logic from EventFilterBase.
@@ -73,6 +90,12 @@ def __init__(
7390
self._thread: threading.Thread | None = None
7491
self._filter: LogFilter | None = None
7592

93+
# Track last processed block for log polling
94+
self._last_block: int | None = None
95+
96+
# Try filter-based approach first (fallback to log polling if it fails)
97+
self._use_filter = True
98+
7699
if auto_start:
77100
self.start()
78101

@@ -86,13 +109,24 @@ def start(self) -> None:
86109

87110
logger.info(f"Starting event filter for {self.event_type}")
88111

89-
# Create the Web3 filter using base class helper
90-
# Note: base class returns LogFilter | AsyncLogFilter, but for sync we only use LogFilter
91-
filter_result = self._create_filter()
92-
assert isinstance(filter_result, LogFilter), (
93-
"Expected LogFilter for sync client"
94-
)
95-
self._filter = filter_result
112+
# Try to create a Web3 filter first (works with most local nodes)
113+
# If it fails (403 Forbidden from Kaolin), we'll use log polling instead
114+
try:
115+
filter_result = self._create_filter()
116+
assert isinstance(filter_result, LogFilter), (
117+
"Expected LogFilter for sync client"
118+
)
119+
self._filter = filter_result
120+
self._use_filter = True
121+
logger.info(f"Using filter-based polling for {self.event_type}")
122+
except Exception as e:
123+
logger.warning(f"Filter creation failed ({e}), falling back to log polling")
124+
self._use_filter = False
125+
# Initialize last_block for log polling
126+
if self.from_block == "latest":
127+
self._last_block = self.contract.w3.eth.block_number
128+
else:
129+
self._last_block = int(self.from_block) - 1
96130

97131
# Start polling thread
98132
self._running = True
@@ -138,14 +172,20 @@ def _poll_loop(self) -> None:
138172

139173
while self._running:
140174
try:
141-
# Get new entries from filter
142-
if self._filter:
143-
logs: list[LogReceipt] = self._filter.get_new_entries()
144-
for log in logs:
145-
try:
146-
self._process_log(log)
147-
except Exception as e:
148-
logger.error(f"Error processing event: {e}", exc_info=True)
175+
if self._use_filter:
176+
# Filter-based approach (works with local nodes)
177+
if self._filter:
178+
logs: list[LogReceipt] = self._filter.get_new_entries()
179+
for log in logs:
180+
try:
181+
self._process_log(log)
182+
except Exception as e:
183+
logger.error(
184+
f"Error processing event: {e}", exc_info=True
185+
)
186+
else:
187+
# Log polling approach (works with Kaolin and other restricted RPC providers)
188+
self._poll_logs()
149189

150190
# Sleep before next poll
151191
time.sleep(self._poll_interval)
@@ -156,6 +196,55 @@ def _poll_loop(self) -> None:
156196

157197
logger.debug(f"Poll loop ended for {self.event_type}")
158198

199+
def _poll_logs(self) -> None:
200+
"""
201+
Poll for logs using eth_getLogs.
202+
203+
This is a fallback method for RPC providers that don't support eth_newFilter.
204+
"""
205+
try:
206+
current_block = self.contract.w3.eth.block_number
207+
208+
# Only query if there are new blocks
209+
if self._last_block is not None and current_block <= self._last_block:
210+
return
211+
212+
# Get the contract event
213+
event_name = self._get_contract_event_name()
214+
contract_event = self.contract.events[event_name]
215+
216+
# Calculate from_block for this poll
217+
from_block = (
218+
(self._last_block + 1)
219+
if self._last_block is not None
220+
else current_block
221+
)
222+
223+
# Get logs for new blocks
224+
logs = contract_event.get_logs(
225+
from_block=from_block,
226+
to_block=current_block,
227+
)
228+
229+
# Process each log
230+
for log in logs:
231+
try:
232+
self._process_log(log)
233+
except Exception as e:
234+
logger.error(f"Error processing event: {e}", exc_info=True)
235+
236+
# Update last processed block
237+
self._last_block = current_block
238+
239+
if logs:
240+
logger.debug(
241+
f"Processed {len(logs)} {self.event_type} events "
242+
f"from blocks {from_block} to {current_block}"
243+
)
244+
245+
except Exception as e:
246+
logger.error(f"Error polling logs: {e}", exc_info=True)
247+
159248
def _process_log(self, log: LogReceipt) -> None:
160249
"""
161250
Process a single log receipt and trigger sync callback.

src/arkiv/events_async.py

Lines changed: 102 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
1-
"""Async event filtering and monitoring for Arkiv entities."""
1+
"""Async event filtering and monitoring for Arkiv entities.
2+
3+
This module provides async event watching functionality for Arkiv smart contract events.
4+
It supports two polling strategies:
5+
6+
1. Filter-based polling (eth_newFilter + eth_getFilterChanges):
7+
- More efficient, uses persistent filters on the node
8+
- Works with local nodes and some RPC providers
9+
- Automatically tried first
10+
11+
2. Log polling (eth_getLogs):
12+
- Maximum compatibility, works with all RPC providers
13+
- Used as fallback when filter creation fails (e.g., Kaolin testnet blocks eth_newFilter)
14+
- Polls for new logs on each interval
15+
16+
The implementation automatically detects which method to use based on provider capabilities.
17+
"""
218

319
from __future__ import annotations
420

@@ -41,7 +57,8 @@ class AsyncEventFilter(EventFilterBase[AsyncCallback]):
4157
"""
4258
Handle for watching entity events using async HTTP polling.
4359
44-
Uses async polling-based filter with get_new_entries() for event monitoring.
60+
Uses direct log polling via eth_getLogs for maximum compatibility.
61+
This approach works with RPC providers that don't support eth_newFilter.
4562
WebSocket providers are not supported yet (future enhancement).
4663
4764
Inherits shared event parsing logic from EventFilterBase.
@@ -74,6 +91,12 @@ def __init__(
7491
self._task: asyncio.Task[None] | None = None
7592
self._filter: LogFilter | None = None
7693

94+
# Track last processed block for log polling
95+
self._last_block: int | None = None
96+
97+
# Try filter-based approach first (fallback to log polling if it fails)
98+
self._use_filter = True
99+
77100
async def _create_filter(self) -> Any: # type: ignore[override]
78101
"""
79102
Create a Web3 contract event filter for async HTTP polling.
@@ -102,8 +125,20 @@ async def start(self) -> None:
102125

103126
logger.info(f"Starting async event filter for {self.event_type}")
104127

105-
# Create the Web3 filter using async helper
106-
self._filter = await self._create_filter()
128+
# Try to create a Web3 filter first (works with most local nodes)
129+
# If it fails (403 Forbidden from Kaolin), we'll use log polling instead
130+
try:
131+
self._filter = await self._create_filter()
132+
self._use_filter = True
133+
logger.info(f"Using filter-based polling for {self.event_type}")
134+
except Exception as e:
135+
logger.warning(f"Filter creation failed ({e}), falling back to log polling")
136+
self._use_filter = False
137+
# Initialize last_block for log polling
138+
if self.from_block == "latest":
139+
self._last_block = await self.contract.w3.eth.block_number # type: ignore[misc]
140+
else:
141+
self._last_block = int(self.from_block) - 1
107142

108143
# Start async polling task
109144
self._running = True
@@ -152,14 +187,20 @@ async def _poll_loop(self) -> None:
152187

153188
while self._running:
154189
try:
155-
# Get new entries from filter
156-
if self._filter:
157-
logs: list[LogReceipt] = await self._filter.get_new_entries() # type: ignore[misc]
158-
for log in logs:
159-
try:
160-
await self._process_log(log)
161-
except Exception as e:
162-
logger.error(f"Error processing event: {e}", exc_info=True)
190+
if self._use_filter:
191+
# Filter-based approach (works with local nodes)
192+
if self._filter:
193+
logs: list[LogReceipt] = await self._filter.get_new_entries() # type: ignore[misc]
194+
for log in logs:
195+
try:
196+
await self._process_log(log)
197+
except Exception as e:
198+
logger.error(
199+
f"Error processing event: {e}", exc_info=True
200+
)
201+
else:
202+
# Log polling approach (works with Kaolin and other restricted RPC providers)
203+
await self._poll_logs()
163204

164205
# Async sleep before next poll
165206
await asyncio.sleep(self._poll_interval)
@@ -173,6 +214,55 @@ async def _poll_loop(self) -> None:
173214

174215
logger.debug(f"Async poll loop ended for {self.event_type}")
175216

217+
async def _poll_logs(self) -> None:
218+
"""
219+
Poll for logs using eth_getLogs (async version).
220+
221+
This is a fallback method for RPC providers that don't support eth_newFilter.
222+
"""
223+
try:
224+
current_block = await self.contract.w3.eth.block_number # type: ignore[misc]
225+
226+
# Only query if there are new blocks
227+
if self._last_block is not None and current_block <= self._last_block:
228+
return
229+
230+
# Get the contract event
231+
event_name = self._get_contract_event_name()
232+
contract_event = self.contract.events[event_name]
233+
234+
# Calculate from_block for this poll
235+
from_block = (
236+
(self._last_block + 1)
237+
if self._last_block is not None
238+
else current_block
239+
)
240+
241+
# Get logs for new blocks
242+
logs = await contract_event.get_logs(
243+
from_block=from_block,
244+
to_block=current_block,
245+
)
246+
247+
# Process each log
248+
for log in logs:
249+
try:
250+
await self._process_log(log)
251+
except Exception as e:
252+
logger.error(f"Error processing event: {e}", exc_info=True)
253+
254+
# Update last processed block
255+
self._last_block = current_block
256+
257+
if logs:
258+
logger.debug(
259+
f"Processed {len(logs)} {self.event_type} events "
260+
f"from blocks {from_block} to {current_block}"
261+
)
262+
263+
except Exception as e:
264+
logger.error(f"Error polling logs: {e}", exc_info=True)
265+
176266
async def _process_log(self, log: LogReceipt) -> None:
177267
"""
178268
Process a single event and trigger async callback.

src/arkiv/events_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def __init__(
6565
self.callback: CallbackT = callback
6666
self.from_block: str | int = from_block
6767
self._running: bool = False
68-
self._poll_interval: float = 2.0 # seconds
68+
self._poll_interval: float = 1.0 # seconds
6969

7070
@property
7171
def is_running(self) -> bool:

src/arkiv/node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ def _get_command(self) -> str:
397397
return (
398398
"--dev "
399399
"--http "
400-
"--http.api 'eth,web3,net,debug,golembase' "
400+
"--http.api 'eth,web3,net,debug,golembase,arkiv' "
401401
f"--http.port {self._http_port} "
402402
"--http.addr '0.0.0.0' "
403403
"--http.corsdomain '*' "

tests/test_arkiv_fixture.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import logging
44

5+
import pytest
6+
57
from arkiv import Arkiv
68
from arkiv.account import NamedAccount
79
from arkiv.module import ArkivModule
@@ -83,6 +85,7 @@ def test_arkiv_accounts_are_funded(
8385
logger.info("Arkiv fixture accounts are funded (balances > 0)")
8486

8587

88+
@pytest.mark.skip("Skipping ETH transfer not (yet) supported on db chains.")
8689
def test_arkiv_transfer_eth_account(
8790
arkiv_client_http: Arkiv, account_1: NamedAccount, account_2: NamedAccount
8891
) -> None:
@@ -106,6 +109,7 @@ def test_arkiv_transfer_eth_account(
106109
logger.info("Arkiv ETH transfer between accounts succeeded (to: NamedAccount)")
107110

108111

112+
@pytest.mark.skip("Skipping ETH transfer not (yet) supported on db chains.")
109113
def test_arkiv_transfer_eth_address(
110114
arkiv_client_http: Arkiv, account_1: NamedAccount, account_2: NamedAccount
111115
) -> None:

0 commit comments

Comments
 (0)