Skip to content

Commit 657d279

Browse files
feat: add event filter base class
1 parent 1915fbb commit 657d279

File tree

2 files changed

+198
-1
lines changed

2 files changed

+198
-1
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ target-version = "py312"
9292

9393
[tool.ruff.lint]
9494
select = ["E", "W", "F", "I", "B", "C4", "UP", "RUF"]
95-
ignore = ["E501"]
95+
ignore = [
96+
"E501", # Line too long (handled by formatter)
97+
"UP046", # Generic class uses Generic subclass (need Python 3.10 compatibility)
98+
]
9699

97100
[tool.ruff.format]
98101
quote-style = "double"

src/arkiv/events_base.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""Base class for event filters with shared logic."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from abc import ABC, abstractmethod
7+
from typing import TYPE_CHECKING, Generic, TypeVar
8+
9+
from eth_typing import HexStr
10+
from web3.types import EventData
11+
12+
from .contract import EVENTS
13+
from .types import (
14+
CreateEvent,
15+
DeleteEvent,
16+
EventType,
17+
ExtendEvent,
18+
TxHash,
19+
UpdateEvent,
20+
)
21+
from .utils import to_entity_key
22+
23+
if TYPE_CHECKING:
24+
from web3.contract import Contract
25+
26+
logger = logging.getLogger(__name__)
27+
28+
# Type variable for callback types
29+
# This allows subclasses to specify their own callback type (sync or async)
30+
CallbackT = TypeVar("CallbackT")
31+
32+
# Union type for all event objects
33+
EventObject = CreateEvent | UpdateEvent | ExtendEvent | DeleteEvent
34+
35+
36+
class EventFilterBase(ABC, Generic[CallbackT]):
37+
"""
38+
Abstract base class for event filters.
39+
40+
Provides shared logic for parsing events and extracting data.
41+
Subclasses implement sync or async execution strategies.
42+
43+
Type Parameters:
44+
CallbackT: The callback type (sync callbacks for EventFilter,
45+
async callbacks for AsyncEventFilter)
46+
"""
47+
48+
def __init__(
49+
self,
50+
contract: Contract,
51+
event_type: EventType,
52+
callback: CallbackT,
53+
from_block: str | int = "latest",
54+
auto_start: bool = True,
55+
) -> None:
56+
"""
57+
Initialize event filter base.
58+
59+
Args:
60+
contract: Web3 contract instance
61+
event_type: Type of event to watch ("created", "updated", "extended", "deleted")
62+
callback: Callback function (sync or async depending on subclass)
63+
from_block: Starting block for the filter ("latest" or block number)
64+
auto_start: If True, starts monitoring immediately (handled by subclass)
65+
"""
66+
self.contract: Contract = contract
67+
self.event_type: EventType = event_type
68+
self.callback: CallbackT = callback
69+
self.from_block: str | int = from_block
70+
self._running: bool = False
71+
self._poll_interval: float = 2.0 # seconds
72+
73+
@property
74+
def is_running(self) -> bool:
75+
"""
76+
Check if the filter is currently running.
77+
78+
Returns:
79+
True if the filter's monitoring is active, False otherwise
80+
"""
81+
return self._running
82+
83+
def _get_contract_event_name(self) -> str:
84+
"""
85+
Get the Web3 contract event name for this event type.
86+
87+
Returns:
88+
Contract event name (e.g., "GolemBaseStorageEntityCreated")
89+
90+
Raises:
91+
NotImplementedError: If event type is not supported
92+
"""
93+
if self.event_type not in EVENTS:
94+
raise NotImplementedError(
95+
f"Event type {self.event_type} not yet implemented"
96+
)
97+
return EVENTS[self.event_type]
98+
99+
def _extract_tx_hash(self, event_data: EventData) -> TxHash:
100+
"""
101+
Extract and normalize transaction hash from event data.
102+
103+
Args:
104+
event_data: Event data from Web3 filter
105+
106+
Returns:
107+
Transaction hash with 0x prefix
108+
"""
109+
tx_hash_hex = event_data["transactionHash"].hex()
110+
if not tx_hash_hex.startswith("0x"):
111+
tx_hash_hex = f"0x{tx_hash_hex}"
112+
return TxHash(HexStr(tx_hash_hex))
113+
114+
def _parse_event_data(self, event_data: EventData) -> tuple[EventObject, TxHash]:
115+
"""
116+
Parse event data and create appropriate event object.
117+
118+
This method contains the shared logic for parsing events from Web3.
119+
It does NOT trigger the callback - that's done by subclasses to allow
120+
for sync vs async callback invocation.
121+
122+
Args:
123+
event_data: Event data from Web3 filter
124+
125+
Returns:
126+
Tuple of (event_object, tx_hash)
127+
128+
Raises:
129+
ValueError: If event_type is unknown
130+
"""
131+
logger.info(f"Parsing event: {event_data}")
132+
133+
# Extract common data
134+
entity_key = to_entity_key(event_data["args"]["entityKey"])
135+
tx_hash = self._extract_tx_hash(event_data)
136+
137+
# Create event object based on type
138+
event: EventObject
139+
if self.event_type == "created":
140+
event = CreateEvent(
141+
entity_key=entity_key,
142+
expiration_block=event_data["args"]["expirationBlock"],
143+
)
144+
elif self.event_type == "updated":
145+
event = UpdateEvent(
146+
entity_key=entity_key,
147+
expiration_block=event_data["args"]["expirationBlock"],
148+
)
149+
elif self.event_type == "extended":
150+
event = ExtendEvent(
151+
entity_key=entity_key,
152+
old_expiration_block=event_data["args"]["oldExpirationBlock"],
153+
new_expiration_block=event_data["args"]["newExpirationBlock"],
154+
)
155+
elif self.event_type == "deleted":
156+
event = DeleteEvent(entity_key=entity_key)
157+
else:
158+
raise ValueError(f"Unknown event type: {self.event_type}")
159+
160+
return event, tx_hash
161+
162+
# Abstract methods that subclasses must implement
163+
@abstractmethod
164+
def start(self) -> None:
165+
"""
166+
Start monitoring for events.
167+
168+
Subclasses implement this as either:
169+
- Sync method that starts a polling thread (EventFilter)
170+
- Async method that starts an asyncio task (AsyncEventFilter)
171+
"""
172+
...
173+
174+
@abstractmethod
175+
def stop(self) -> None:
176+
"""
177+
Stop monitoring for events.
178+
179+
Subclasses implement this as either:
180+
- Sync method that stops the polling thread (EventFilter)
181+
- Async method that cancels the asyncio task (AsyncEventFilter)
182+
"""
183+
...
184+
185+
@abstractmethod
186+
def uninstall(self) -> None:
187+
"""
188+
Uninstall the filter and cleanup resources.
189+
190+
Subclasses implement this as either:
191+
- Sync method that cleans up thread and filter (EventFilter)
192+
- Async method that cleans up task and filter (AsyncEventFilter)
193+
"""
194+
...

0 commit comments

Comments
 (0)