Skip to content

Commit 9edb563

Browse files
committed
Added core logic and cli.
1 parent 8a3ab18 commit 9edb563

File tree

12 files changed

+1238
-58
lines changed

12 files changed

+1238
-58
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repos:
99
rev: v1.17.1
1010
hooks:
1111
- id: mypy
12-
additional_dependencies: [pydantic>=2.11.7, loguru>=0.7.3]
12+
additional_dependencies: [pydantic>=2.11.7, loguru>=0.7.3, types-PyYAML]
1313
- repo: local
1414
hooks:
1515
- id: pytest-coverage

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# eoAPI Notifier
1+
# eoAPI-notifier
22

33
Message handler for eoAPI components. A middleware tool that listens to sources for messages and forwards them to output receivers.
44

@@ -14,6 +14,36 @@ Install using `uv`:
1414
uv add eoapi-notifier
1515
```
1616

17+
## Usage
18+
19+
The notifier provides a CLI tool to run the message handler with a YAML configuration file.
20+
21+
### Command Line Interface
22+
23+
Run the notifier with a configuration file:
24+
25+
```bash
26+
eoapi-notifier config.yaml
27+
```
28+
29+
Set logging level:
30+
31+
```bash
32+
eoapi-notifier --log-level DEBUG config.yaml
33+
```
34+
35+
Show help:
36+
37+
```bash
38+
eoapi-notifier --help
39+
```
40+
41+
Show version:
42+
43+
```bash
44+
eoapi-notifier --version
45+
```
46+
1747
## Testing
1848

1949
Install test dependencies and run tests with `uv`:

eoapi_notifier/__init__.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,34 @@
22

33
from importlib.metadata import version as _version
44

5-
from .logging import get_logger, logger, setup_logging
5+
from .core.main import NotifierApp, setup_logging
6+
from .core.registry import (
7+
create_output,
8+
create_source,
9+
get_available_outputs,
10+
get_available_sources,
11+
register_output,
12+
register_source,
13+
)
14+
from .logging import get_logger, logger
615

716
__version__ = _version("eoapi-notifier")
817

918

10-
__all__ = ["__version__", "version", "get_logger", "setup_logging", "logger"]
19+
__all__ = [
20+
"__version__",
21+
"version",
22+
"get_logger",
23+
"setup_logging",
24+
"logger",
25+
"NotifierApp",
26+
"create_source",
27+
"create_output",
28+
"register_source",
29+
"register_output",
30+
"get_available_sources",
31+
"get_available_outputs",
32+
]
1133

1234

1335
def version() -> None:

eoapi_notifier/cli.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""CLI entry point for eoapi-notifier."""
2+
3+
import asyncio
4+
import sys
5+
from pathlib import Path
6+
7+
import click
8+
from loguru import logger
9+
10+
from .core.main import NotifierApp, setup_logging
11+
12+
13+
@click.command()
14+
@click.argument(
15+
"config",
16+
type=click.Path(exists=True, dir_okay=False, path_type=Path),
17+
metavar="CONFIG_FILE",
18+
)
19+
@click.option(
20+
"--log-level",
21+
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"], case_sensitive=False),
22+
default="INFO",
23+
help="Set logging level (default: INFO)",
24+
)
25+
@click.version_option(prog_name="eoapi-notifier")
26+
def main(config: Path, log_level: str) -> None:
27+
"""Message handler for eoAPI components.
28+
29+
CONFIG_FILE: Path to YAML configuration file
30+
"""
31+
# Setup logging
32+
setup_logging(log_level.upper())
33+
34+
# Create and run application
35+
app = NotifierApp()
36+
37+
try:
38+
asyncio.run(app.run(config))
39+
except KeyboardInterrupt:
40+
logger.info("Application interrupted by user")
41+
except Exception as e:
42+
logger.error(f"Application failed: {e}")
43+
sys.exit(1)
44+
45+
46+
if __name__ == "__main__":
47+
main()

eoapi_notifier/core/main.py

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
"""Core application logic for eoapi-notifier."""
2+
3+
import asyncio
4+
import signal
5+
import sys
6+
from pathlib import Path
7+
from typing import Any
8+
9+
import yaml
10+
from loguru import logger
11+
12+
from .registry import create_output, create_source
13+
14+
15+
class NotifierApp:
16+
"""Main application class for the notifier."""
17+
18+
def __init__(self) -> None:
19+
"""Initialize the application."""
20+
self.sources: list[Any] = []
21+
self.outputs: list[Any] = []
22+
self._shutdown_event = asyncio.Event()
23+
self._running = False
24+
25+
def load_config(self, config_path: Path) -> dict[str, Any]:
26+
"""Load configuration from YAML file."""
27+
try:
28+
with config_path.open() as f:
29+
config = yaml.safe_load(f)
30+
31+
# Ensure config is a dictionary
32+
if not isinstance(config, dict):
33+
raise ValueError(
34+
f"Configuration file must contain a YAML object/dictionary, "
35+
f"got {type(config)}"
36+
)
37+
38+
logger.info(f"Loaded configuration from {config_path}")
39+
return config
40+
except FileNotFoundError:
41+
logger.error(f"Configuration file not found: {config_path}")
42+
raise
43+
except yaml.YAMLError as e:
44+
logger.error(f"Invalid YAML configuration: {e}")
45+
raise
46+
except Exception as e:
47+
logger.error(f"Failed to load configuration: {e}")
48+
raise
49+
50+
def create_plugins(self, config: dict[str, Any]) -> None:
51+
"""Create source and output plugins from configuration."""
52+
# Create sources
53+
sources_config = config.get("sources", [])
54+
for source_config in sources_config:
55+
source_type = source_config.get("type")
56+
if not source_type:
57+
logger.error("Source configuration missing 'type' field")
58+
continue
59+
60+
try:
61+
source = create_source(source_type, source_config.get("config", {}))
62+
self.sources.append(source)
63+
logger.info(f"Created source: {source_type}")
64+
except Exception as e:
65+
logger.error(f"Failed to create source {source_type}: {e}")
66+
67+
# Create outputs
68+
outputs_config = config.get("outputs", [])
69+
for output_config in outputs_config:
70+
output_type = output_config.get("type")
71+
if not output_type:
72+
logger.error("Output configuration missing 'type' field")
73+
continue
74+
75+
try:
76+
output = create_output(output_type, output_config.get("config", {}))
77+
self.outputs.append(output)
78+
logger.info(f"Created output: {output_type}")
79+
except Exception as e:
80+
logger.error(f"Failed to create output {output_type}: {e}")
81+
82+
async def start_plugins(self) -> None:
83+
"""Start all plugins."""
84+
# Start sources
85+
for source in self.sources:
86+
try:
87+
await source.start()
88+
logger.info(f"Started source: {source.__class__.__name__}")
89+
except Exception as e:
90+
logger.error(f"Failed to start source {source.__class__.__name__}: {e}")
91+
92+
# Start outputs
93+
for output in self.outputs:
94+
try:
95+
await output.start()
96+
logger.info(f"Started output: {output.__class__.__name__}")
97+
except Exception as e:
98+
logger.error(f"Failed to start output {output.__class__.__name__}: {e}")
99+
100+
async def stop_plugins(self) -> None:
101+
"""Stop all plugins."""
102+
# Stop sources
103+
for source in self.sources:
104+
try:
105+
await source.stop()
106+
logger.info(f"Stopped source: {source.__class__.__name__}")
107+
except Exception as e:
108+
logger.error(f"Failed to stop source {source.__class__.__name__}: {e}")
109+
110+
# Stop outputs
111+
for output in self.outputs:
112+
try:
113+
await output.stop()
114+
logger.info(f"Stopped output: {output.__class__.__name__}")
115+
except Exception as e:
116+
logger.error(f"Failed to stop output {output.__class__.__name__}: {e}")
117+
118+
async def process_events(self) -> None:
119+
"""Main event processing loop."""
120+
if not self.sources:
121+
logger.warning("No sources configured")
122+
return
123+
124+
if not self.outputs:
125+
logger.warning("No outputs configured")
126+
return
127+
128+
logger.info("Starting event processing...")
129+
self._running = True
130+
131+
# Create event processing tasks
132+
tasks = []
133+
for source in self.sources:
134+
task = asyncio.create_task(self._process_source_events(source))
135+
tasks.append(task)
136+
137+
# Wait for shutdown or task completion
138+
try:
139+
await asyncio.gather(*tasks, return_exceptions=True)
140+
except asyncio.CancelledError:
141+
logger.info("Event processing cancelled")
142+
finally:
143+
self._running = False
144+
145+
async def _process_source_events(self, source: Any) -> None:
146+
"""Process events from a single source."""
147+
source_name = source.__class__.__name__
148+
logger.debug(f"Starting event processing for source: {source_name}")
149+
150+
try:
151+
async for event in source.listen():
152+
# Check for shutdown before processing
153+
if self._shutdown_event.is_set():
154+
logger.debug(f"Shutdown requested, stopping {source_name}")
155+
break
156+
157+
logger.debug(f"Received event from {source_name}: {event}")
158+
159+
# Send event to all outputs
160+
for output in self.outputs:
161+
try:
162+
success = await output.send_event(event)
163+
if success:
164+
logger.debug(
165+
f"Successfully sent event via "
166+
f"{output.__class__.__name__}"
167+
)
168+
else:
169+
logger.warning(
170+
f"Failed to send event via {output.__class__.__name__}"
171+
)
172+
except Exception as e:
173+
logger.error(
174+
f"Error sending event via {output.__class__.__name__}: {e}"
175+
)
176+
177+
except Exception as e:
178+
logger.error(f"Error processing events from {source_name}: {e}")
179+
finally:
180+
logger.debug(f"Stopped processing events for source: {source_name}")
181+
182+
def setup_signal_handlers(self) -> None:
183+
"""Setup signal handlers for graceful shutdown."""
184+
185+
def signal_handler(signum: int, frame: Any) -> None:
186+
logger.info(f"Received signal {signum}, initiating shutdown...")
187+
self._shutdown_event.set()
188+
189+
signal.signal(signal.SIGINT, signal_handler)
190+
signal.signal(signal.SIGTERM, signal_handler)
191+
192+
async def shutdown(self) -> None:
193+
"""Initiate graceful shutdown."""
194+
logger.info("Shutting down application...")
195+
self._shutdown_event.set()
196+
197+
# Give running tasks a moment to see the shutdown event
198+
await asyncio.sleep(0.1)
199+
200+
@property
201+
def is_running(self) -> bool:
202+
"""Check if the application is running."""
203+
return self._running
204+
205+
@property
206+
def is_shutdown_requested(self) -> bool:
207+
"""Check if shutdown has been requested."""
208+
return self._shutdown_event.is_set()
209+
210+
async def run(self, config_path: Path) -> None:
211+
"""Run the application with the given configuration."""
212+
try:
213+
# Load configuration
214+
config = self.load_config(config_path)
215+
216+
# Create plugins
217+
self.create_plugins(config)
218+
219+
if not self.sources and not self.outputs:
220+
logger.error("No plugins configured")
221+
return
222+
223+
# Setup signal handlers
224+
self.setup_signal_handlers()
225+
226+
# Start plugins
227+
await self.start_plugins()
228+
logger.info("Application started successfully")
229+
230+
# Process events until shutdown
231+
await self.process_events()
232+
233+
except KeyboardInterrupt:
234+
logger.info("Application interrupted by user")
235+
except Exception as e:
236+
logger.error(f"Application error: {e}")
237+
raise
238+
finally:
239+
# Stop plugins
240+
await self.stop_plugins()
241+
logger.info("Application stopped")
242+
243+
244+
def setup_logging(level: str = "INFO") -> None:
245+
"""Configure application logging."""
246+
logger.remove()
247+
logger.add(
248+
sys.stderr,
249+
level=level,
250+
format=(
251+
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
252+
"<level>{level: <8}</level> | "
253+
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
254+
"<level>{message}</level>"
255+
),
256+
)

0 commit comments

Comments
 (0)