Skip to content

Commit 71d6ac8

Browse files
feat: add AsyncEventFilter and async HTTP based filtering
1 parent 12a9bee commit 71d6ac8

File tree

7 files changed

+880
-17
lines changed

7 files changed

+880
-17
lines changed

src/arkiv/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .account import NamedAccount
66
from .client import Arkiv, AsyncArkiv
77
from .events import EventFilter
8+
from .events_async import AsyncEventFilter
89
from .node import ArkivNode
910
from .query import QueryIterator
1011
from .types import (
@@ -25,6 +26,7 @@
2526
"Arkiv",
2627
"ArkivNode",
2728
"AsyncArkiv",
29+
"AsyncEventFilter",
2830
"CreateEvent",
2931
"DeleteEvent",
3032
"EventFilter",

src/arkiv/events.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from web3._utils.filters import LogFilter
1111
from web3.contract import Contract
12-
from web3.contract.contract import ContractEvent
1312
from web3.types import EventData, LogReceipt
1413

1514
from .events_base import EventFilterBase
@@ -79,9 +78,7 @@ def start(self) -> None:
7978
logger.info(f"Starting event filter for {self.event_type}")
8079

8180
# Create the Web3 filter using base class helper
82-
event_name = self._get_contract_event_name()
83-
contract_event: ContractEvent = self.contract.events[event_name]
84-
self._filter = contract_event.create_filter(from_block=self.from_block)
81+
self._filter = self._create_filter()
8582

8683
# Start polling thread
8784
self._running = True

src/arkiv/events_async.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
"""Async event filtering and monitoring for Arkiv entities."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import logging
7+
from typing import TYPE_CHECKING, Any, cast
8+
9+
from web3._utils.filters import LogFilter
10+
from web3.contract import Contract
11+
from web3.types import EventData, LogReceipt
12+
13+
from .events_base import EventFilterBase
14+
from .types import (
15+
AsyncCreateCallback,
16+
AsyncDeleteCallback,
17+
AsyncExtendCallback,
18+
AsyncUpdateCallback,
19+
EventType,
20+
)
21+
22+
if TYPE_CHECKING:
23+
pass
24+
25+
logger = logging.getLogger(__name__)
26+
27+
# Union of all async callback types
28+
AsyncCallback = (
29+
AsyncCreateCallback
30+
| AsyncUpdateCallback
31+
| AsyncExtendCallback
32+
| AsyncDeleteCallback
33+
)
34+
35+
36+
class AsyncEventFilter(EventFilterBase[AsyncCallback]):
37+
"""
38+
Handle for watching entity events using async HTTP polling.
39+
40+
Uses async polling-based filter with get_new_entries() for event monitoring.
41+
WebSocket providers are not supported yet (future enhancement).
42+
43+
Inherits shared event parsing logic from EventFilterBase.
44+
"""
45+
46+
def __init__(
47+
self,
48+
contract: Contract,
49+
event_type: EventType,
50+
callback: AsyncCallback,
51+
from_block: str | int = "latest",
52+
) -> None:
53+
"""
54+
Initialize async event filter for HTTP polling.
55+
56+
Args:
57+
contract: Web3 contract instance
58+
event_type: Type of event to watch
59+
callback: Async callback function for the event
60+
from_block: Starting block for the filter
61+
62+
Note:
63+
Unlike the sync EventFilter, AsyncEventFilter does not support auto_start
64+
since we cannot await in __init__. Caller must explicitly await start().
65+
"""
66+
# Initialize base class (never auto-start since we need async context)
67+
super().__init__(contract, event_type, callback, from_block, auto_start=False)
68+
69+
# Async-specific state for HTTP polling
70+
self._task: asyncio.Task[None] | None = None
71+
self._filter: LogFilter | None = None
72+
73+
async def _create_filter(self) -> Any:
74+
"""
75+
Create a Web3 contract event filter for async HTTP polling.
76+
77+
Overrides the base class method to handle async create_filter calls.
78+
For async providers, contract_event.create_filter() returns a coroutine
79+
that must be awaited.
80+
81+
Returns:
82+
LogFilter for async HTTP providers
83+
"""
84+
event_name = self._get_contract_event_name()
85+
contract_event = self.contract.events[event_name]
86+
return await contract_event.create_filter(from_block=self.from_block)
87+
88+
async def start(self) -> None:
89+
"""
90+
Start async HTTP polling for events.
91+
"""
92+
if self._running:
93+
logger.warning(f"Filter for {self.event_type} is already running")
94+
return
95+
96+
logger.info(f"Starting async event filter for {self.event_type}")
97+
98+
# Create the Web3 filter using async helper
99+
self._filter = await self._create_filter()
100+
101+
# Start async polling task
102+
self._running = True
103+
self._task = asyncio.create_task(self._poll_loop())
104+
105+
logger.info(f"Async event filter for {self.event_type} started")
106+
107+
async def stop(self) -> None:
108+
"""
109+
Stop async polling for events.
110+
"""
111+
if not self._running:
112+
logger.warning(f"Filter for {self.event_type} is not running")
113+
return
114+
115+
logger.info(f"Stopping async event filter for {self.event_type}")
116+
self._running = False
117+
118+
# Cancel and wait for task to finish
119+
if self._task and not self._task.done():
120+
self._task.cancel()
121+
try:
122+
await self._task
123+
except asyncio.CancelledError:
124+
pass
125+
self._task = None
126+
127+
logger.info(f"Async event filter for {self.event_type} stopped")
128+
129+
async def uninstall(self) -> None:
130+
"""Uninstall the filter and cleanup resources."""
131+
logger.info(f"Uninstalling async event filter for {self.event_type}")
132+
133+
# Stop polling if running
134+
if self._running:
135+
await self.stop()
136+
137+
# Clear filter reference (Web3 filters don't have uninstall method)
138+
self._filter = None
139+
140+
logger.info(f"Async event filter for {self.event_type} uninstalled")
141+
142+
async def _poll_loop(self) -> None:
143+
"""Background async polling loop for HTTP provider events."""
144+
logger.debug(f"Async poll loop started for {self.event_type}")
145+
146+
while self._running:
147+
try:
148+
# Get new entries from filter
149+
if self._filter:
150+
# For async providers, get_new_entries() returns a coroutine
151+
# Type system doesn't reflect this, so we need to ignore the type error
152+
new_entries: list[LogReceipt] = await self._filter.get_new_entries() # type: ignore[misc]
153+
154+
for entry in new_entries:
155+
try:
156+
# LogFilter from contract event has log_entry_formatter that
157+
# converts LogReceipt to EventData, but type system shows LogReceipt
158+
await self._process_event(cast(EventData, entry))
159+
except Exception as e:
160+
logger.error(f"Error processing event: {e}", exc_info=True)
161+
162+
# Async sleep before next poll
163+
await asyncio.sleep(self._poll_interval)
164+
165+
except asyncio.CancelledError:
166+
logger.debug(f"Async poll loop cancelled for {self.event_type}")
167+
break
168+
except Exception as e:
169+
logger.error(f"Error in async poll loop: {e}", exc_info=True)
170+
await asyncio.sleep(self._poll_interval)
171+
172+
logger.debug(f"Async poll loop ended for {self.event_type}")
173+
174+
async def _process_event(self, event_data: EventData) -> None:
175+
"""
176+
Process a single event and trigger async callback.
177+
178+
Args:
179+
event_data: Event data from Web3 filter
180+
"""
181+
# Use base class to parse event data
182+
event, tx_hash = self._parse_event_data(event_data)
183+
184+
# Trigger async callback with error handling
185+
try:
186+
await self.callback(event, tx_hash) # type: ignore[arg-type]
187+
except Exception as e:
188+
logger.error(f"Error in async callback: {e}", exc_info=True)

src/arkiv/events_base.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
from abc import ABC, abstractmethod
7-
from typing import TYPE_CHECKING, Generic, TypeVar
7+
from typing import TYPE_CHECKING, Any, Generic, TypeVar
88

99
from eth_typing import HexStr
1010
from web3.types import EventData
@@ -96,6 +96,25 @@ def _get_contract_event_name(self) -> str:
9696
)
9797
return EVENTS[self.event_type]
9898

99+
def _create_filter(self) -> Any:
100+
"""
101+
Create a Web3 contract event filter for HTTP polling.
102+
103+
This method creates a LogFilter using the contract's create_filter method.
104+
Subclasses that use different subscription mechanisms (e.g., WebSocket)
105+
should override this method to return an appropriate subscription handle.
106+
107+
Returns:
108+
LogFilter for HTTP providers, or subscription handle for WebSocket providers
109+
110+
Note:
111+
Default implementation is for HTTP polling. WebSocket subclasses should
112+
override to create subscription handles via provider-specific APIs.
113+
"""
114+
event_name = self._get_contract_event_name()
115+
contract_event = self.contract.events[event_name]
116+
return contract_event.create_filter(from_block=self.from_block)
117+
99118
def _extract_tx_hash(self, event_data: EventData) -> TxHash:
100119
"""
101120
Extract and normalize transaction hash from event data.
@@ -161,34 +180,34 @@ def _parse_event_data(self, event_data: EventData) -> tuple[EventObject, TxHash]
161180

162181
# Abstract methods that subclasses must implement
163182
@abstractmethod
164-
def start(self) -> None:
183+
def start(self) -> Any:
165184
"""
166185
Start monitoring for events.
167186
168187
Subclasses implement this as either:
169-
- Sync method that starts a polling thread (EventFilter)
170-
- Async method that starts an asyncio task (AsyncEventFilter)
188+
- Sync method that starts a polling thread (EventFilter) -> None
189+
- Async method that starts an asyncio task (AsyncEventFilter) -> Awaitable[None]
171190
"""
172191
...
173192

174193
@abstractmethod
175-
def stop(self) -> None:
194+
def stop(self) -> Any:
176195
"""
177196
Stop monitoring for events.
178197
179198
Subclasses implement this as either:
180-
- Sync method that stops the polling thread (EventFilter)
181-
- Async method that cancels the asyncio task (AsyncEventFilter)
199+
- Sync method that stops the polling thread (EventFilter) -> None
200+
- Async method that cancels the asyncio task (AsyncEventFilter) -> Awaitable[None]
182201
"""
183202
...
184203

185204
@abstractmethod
186-
def uninstall(self) -> None:
205+
def uninstall(self) -> Any:
187206
"""
188207
Uninstall the filter and cleanup resources.
189208
190209
Subclasses implement this as either:
191-
- Sync method that cleans up thread and filter (EventFilter)
192-
- Async method that cleans up task and filter (AsyncEventFilter)
210+
- Sync method that cleans up thread and filter (EventFilter) -> None
211+
- Async method that cleans up task and filter (AsyncEventFilter) -> Awaitable[None]
193212
"""
194213
...

0 commit comments

Comments
 (0)