Skip to content

Commit 8a3ab18

Browse files
committed
Added plugin registry system.
1 parent ebf917e commit 8a3ab18

File tree

7 files changed

+1512
-4
lines changed

7 files changed

+1512
-4
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,6 @@ coverage.xml
4747

4848
# uv
4949
uv.lock
50+
51+
# Claude
52+
CLAUDE.md

eoapi_notifier/core/plugin.py

Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
"""
2+
Plugin system.
3+
4+
Provides protocol-based plugin architecture with type safety, structured metadata,
5+
and async context management for notification sources and outputs.
6+
"""
7+
8+
import logging
9+
from abc import ABC, abstractmethod
10+
from collections.abc import AsyncIterator
11+
from contextlib import asynccontextmanager
12+
from dataclasses import dataclass, field
13+
from typing import Any, Generic, Protocol, TypeVar, runtime_checkable
14+
15+
from pydantic import BaseModel, ConfigDict
16+
17+
from .event import NotificationEvent
18+
19+
# Type variables for generic plugin system
20+
T = TypeVar("T", bound="BasePlugin[Any]")
21+
C = TypeVar("C", bound="BasePluginConfig")
22+
23+
24+
@dataclass(frozen=True)
25+
class PluginMetadata:
26+
"""Structured metadata for plugins."""
27+
28+
name: str
29+
description: str
30+
version: str = "1.0.0"
31+
tags: list[str] = field(default_factory=list)
32+
category: str = "unknown"
33+
priority: int = 0
34+
35+
36+
@runtime_checkable
37+
class PluginConfigProtocol(Protocol):
38+
"""Protocol for plugin configurations with required interface methods."""
39+
40+
@classmethod
41+
def get_sample_config(cls) -> dict[str, Any]:
42+
"""Get sample configuration for this plugin type."""
43+
...
44+
45+
@classmethod
46+
def get_metadata(cls) -> PluginMetadata:
47+
"""Get structured metadata for this plugin type."""
48+
...
49+
50+
def get_connection_info(self) -> str:
51+
"""Get connection info string for display."""
52+
...
53+
54+
def get_status_info(self) -> dict[str, Any]:
55+
"""Get detailed status information for display."""
56+
...
57+
58+
59+
class BasePluginConfig(BaseModel):
60+
"""
61+
Base configuration class for plugins implementing the protocol.
62+
63+
Provides common configuration methods and Pydantic validation.
64+
"""
65+
66+
model_config = ConfigDict(extra="forbid")
67+
68+
@classmethod
69+
def get_sample_config(cls) -> dict[str, Any]:
70+
"""Default implementation - subclasses should override."""
71+
return {}
72+
73+
@classmethod
74+
def get_metadata(cls) -> PluginMetadata:
75+
"""Default implementation - subclasses should override."""
76+
return PluginMetadata(
77+
name=cls.__name__, description=f"Configuration for {cls.__name__}"
78+
)
79+
80+
def get_connection_info(self) -> str:
81+
"""Default implementation - subclasses should override."""
82+
return f"Connection: {self.__class__.__name__}"
83+
84+
def get_status_info(self) -> dict[str, Any]:
85+
"""Default implementation - subclasses should override."""
86+
return {"config_type": self.__class__.__name__}
87+
88+
89+
class BasePlugin(ABC, Generic[C]):
90+
"""
91+
Abstract base class for all plugins with lifecycle management and async context
92+
support.
93+
94+
Provides common functionality for sources and outputs with type-safe configuration.
95+
"""
96+
97+
def __init__(self, config: C) -> None:
98+
"""Initialize base plugin."""
99+
if not isinstance(config, PluginConfigProtocol):
100+
raise TypeError(
101+
f"Config must implement PluginConfigProtocol, got {type(config)}"
102+
)
103+
104+
self.config: C = config
105+
self.logger: logging.Logger = logging.getLogger(self.__class__.__module__)
106+
self._running: bool = False
107+
self._started: bool = False
108+
109+
@abstractmethod
110+
async def start(self) -> None:
111+
"""
112+
Start the plugin.
113+
114+
Should set _running to True and initialize any connections.
115+
Must be implemented by subclasses.
116+
"""
117+
if self._started:
118+
raise RuntimeError(f"Plugin {self.__class__.__name__} is already started")
119+
self._running = True
120+
self._started = True
121+
122+
@abstractmethod
123+
async def stop(self) -> None:
124+
"""
125+
Stop the plugin.
126+
127+
Should set _running to False and clean up any resources.
128+
Must be implemented by subclasses.
129+
"""
130+
if not self._started:
131+
return # Already stopped or never started
132+
self._running = False
133+
self._started = False
134+
135+
async def restart(self) -> None:
136+
"""Restart the plugin."""
137+
if self._started:
138+
await self.stop()
139+
await self.start()
140+
141+
@property
142+
def is_running(self) -> bool:
143+
"""Check if plugin is currently running."""
144+
return self._running
145+
146+
@property
147+
def is_started(self) -> bool:
148+
"""Check if plugin has been started (even if not currently running)."""
149+
return self._started
150+
151+
def get_status(self) -> dict[str, Any]:
152+
"""Get plugin status information."""
153+
metadata = self.config.get_metadata()
154+
return {
155+
"name": metadata.name,
156+
"type": self.__class__.__name__,
157+
"running": self._running,
158+
"started": self._started,
159+
"config_type": self.config.__class__.__name__,
160+
"metadata": {
161+
"description": metadata.description,
162+
"version": metadata.version,
163+
"tags": metadata.tags,
164+
"category": metadata.category,
165+
},
166+
}
167+
168+
async def __aenter__(self: T) -> T:
169+
"""Async context manager entry."""
170+
await self.start()
171+
return self
172+
173+
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
174+
"""Async context manager exit."""
175+
await self.stop()
176+
177+
178+
class BaseSource(BasePlugin[C]):
179+
"""
180+
Abstract base class for notification sources.
181+
182+
Sources listen for events from external systems and yield NotificationEvent objects.
183+
"""
184+
185+
@abstractmethod
186+
def listen(self) -> AsyncIterator[NotificationEvent]:
187+
"""
188+
Listen for events from the source.
189+
190+
Yields:
191+
NotificationEvent objects as they are received
192+
193+
Should run continuously while _running is True.
194+
Must be implemented by subclasses.
195+
"""
196+
pass
197+
198+
@asynccontextmanager
199+
async def event_stream(self) -> AsyncIterator[AsyncIterator[NotificationEvent]]:
200+
"""Async context manager for event streaming."""
201+
async with self:
202+
yield self.listen()
203+
204+
205+
class BaseOutput(BasePlugin[C]):
206+
"""
207+
Abstract base class for notification outputs.
208+
209+
Outputs send NotificationEvent objects to external systems.
210+
"""
211+
212+
@abstractmethod
213+
async def send_event(self, event: NotificationEvent) -> bool:
214+
"""
215+
Send a single notification event.
216+
217+
Args:
218+
event: Event to send
219+
220+
Returns:
221+
True if sent successfully, False otherwise
222+
223+
Must be implemented by subclasses.
224+
"""
225+
pass
226+
227+
async def send_batch(self, events: list[NotificationEvent]) -> int:
228+
"""
229+
Send multiple events in batch.
230+
231+
Default implementation sends events one by one.
232+
Subclasses can override for more efficient batch processing.
233+
234+
Args:
235+
events: List of events to send
236+
237+
Returns:
238+
Number of events successfully sent
239+
"""
240+
success_count = 0
241+
for event in events:
242+
try:
243+
if await self.send_event(event):
244+
success_count += 1
245+
except Exception as e:
246+
self.logger.error(f"Failed to send event: {e}")
247+
248+
return success_count
249+
250+
async def health_check(self) -> bool:
251+
"""
252+
Perform health check on the output.
253+
254+
Default implementation checks if running.
255+
Subclasses can override for more sophisticated health checks.
256+
257+
Returns:
258+
True if healthy, False otherwise
259+
"""
260+
return self._running
261+
262+
async def send_with_retry(
263+
self,
264+
event: NotificationEvent,
265+
max_retries: int = 3,
266+
backoff_factor: float = 1.0,
267+
) -> bool:
268+
"""
269+
Send event with retry logic.
270+
271+
Args:
272+
event: Event to send
273+
max_retries: Maximum number of retry attempts
274+
backoff_factor: Backoff multiplier for retry delays
275+
276+
Returns:
277+
True if sent successfully, False otherwise
278+
"""
279+
import asyncio
280+
281+
for attempt in range(max_retries + 1):
282+
try:
283+
return await self.send_event(event)
284+
except Exception as e:
285+
if attempt == max_retries:
286+
self.logger.error(
287+
f"Failed to send event after {max_retries} retries: {e}"
288+
)
289+
return False
290+
291+
delay = backoff_factor * (2**attempt)
292+
self.logger.warning(
293+
f"Send attempt {attempt + 1} failed, retrying in {delay}s: {e}"
294+
)
295+
await asyncio.sleep(delay)
296+
297+
return False
298+
299+
300+
class PluginError(Exception):
301+
"""Base exception for plugin-related errors."""
302+
303+
def __init__(
304+
self,
305+
plugin_name: str,
306+
message: str,
307+
cause: Exception | None = None,
308+
metadata: dict[str, Any] | None = None,
309+
):
310+
"""
311+
Initialize plugin error.
312+
313+
Args:
314+
plugin_name: Name of the plugin that caused the error
315+
message: Error message
316+
cause: Optional underlying exception
317+
metadata: Optional additional error context
318+
"""
319+
self.plugin_name = plugin_name
320+
self.cause = cause
321+
self.metadata = metadata or {}
322+
super().__init__(f"Plugin '{plugin_name}': {message}")
323+
324+
325+
class PluginConfigError(PluginError):
326+
"""Error in plugin configuration."""
327+
328+
pass
329+
330+
331+
class PluginConnectionError(PluginError):
332+
"""Error connecting to external system."""
333+
334+
pass
335+
336+
337+
class PluginValidationError(PluginError):
338+
"""Error during plugin validation."""
339+
340+
pass
341+
342+
343+
class PluginLifecycleError(PluginError):
344+
"""Error during plugin lifecycle operations."""
345+
346+
pass

0 commit comments

Comments
 (0)