Skip to content

Commit d844f79

Browse files
committed
refactor: separate file watcher from router
1 parent 5580809 commit d844f79

File tree

3 files changed

+185
-1
lines changed

3 files changed

+185
-1
lines changed

src/mcpm/core/router/notifier.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
from typing import Any, Awaitable, Callable
3+
4+
from mcpm.core.schema import ConfigType
5+
6+
logger = logging.getLogger(__name__)
7+
8+
CallableT = Callable[[ConfigType], Awaitable[Any]]
9+
10+
class ConfigUpdateNotifier:
11+
12+
_instance = None
13+
14+
@classmethod
15+
def get_instance(cls):
16+
if cls._instance is None:
17+
cls._instance = cls()
18+
return cls._instance
19+
20+
21+
def __init__(self) -> None:
22+
"""
23+
Initialize the ConfigUpdateNotifier singleton.
24+
25+
This class implements the observer pattern to notify subscribers when configuration changes occur.
26+
Subscribers can register callbacks that will be executed when configuration updates are detected.
27+
"""
28+
self._subscribers: list[CallableT] = []
29+
30+
def subscribe(self, callback: CallableT):
31+
if callback not in self._subscribers:
32+
self._subscribers.append(callback)
33+
return lambda: self.unsubscribe(callback)
34+
35+
def unsubscribe(self, callback: CallableT):
36+
if callback in self._subscribers:
37+
self._subscribers.remove(callback)
38+
39+
async def notify_update(self, config_type: ConfigType):
40+
""" Notify all subscribers about the update """
41+
for subscriber in self._subscribers:
42+
try:
43+
await subscriber(config_type)
44+
except Exception as e:
45+
logger.error(f"Failed to notify subscriber due to error: {e}")

src/mcpm/core/router/watcher.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""
2+
Configuration watchers for monitoring changes from different sources.
3+
"""
4+
import asyncio
5+
import json
6+
import logging
7+
from abc import ABC, abstractmethod
8+
from pathlib import Path
9+
from typing import Any, Optional
10+
11+
from watchfiles import Change, awatch
12+
13+
from mcpm.core.router.notifier import ConfigUpdateNotifier
14+
from mcpm.core.schema import ConfigType
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class BaseConfigWatcher(ABC):
20+
"""
21+
A base class for configuration watcher
22+
"""
23+
24+
def __init__(self, source_id: Any) -> None:
25+
self.source_id = source_id
26+
self.running = False
27+
self.watch_task: Optional[asyncio.Task] = None
28+
self.notifier: ConfigUpdateNotifier = ConfigUpdateNotifier.get_instance()
29+
30+
async def start(self) -> Optional[asyncio.Task]:
31+
if not self.running:
32+
self.running = True
33+
self.watch_task = asyncio.create_task(self._watch_config())
34+
logger.info(f"Started watching config source: {self.source_id}")
35+
return self.watch_task
36+
return self.watch_task
37+
38+
async def stop(self):
39+
if self.running:
40+
self.running = False
41+
if self.watch_task and not self.watch_task.done():
42+
self.watch_task.cancel()
43+
try:
44+
await self.watch_task
45+
logger.info("Watcher stopped")
46+
except asyncio.CancelledError:
47+
pass
48+
49+
50+
@abstractmethod
51+
async def _watch_config(self):
52+
pass
53+
54+
55+
async def notify_update(self, config_type: ConfigType):
56+
await self.notifier.notify_update(config_type)
57+
58+
59+
class FileConfigWatcher(BaseConfigWatcher):
60+
61+
def __init__(self, config_path: str) -> None:
62+
"""
63+
FileConfigWatcher watches for changes in a local config file.
64+
65+
Args:
66+
config_path: Path to the config file to watch
67+
"""
68+
super().__init__(source_id=config_path)
69+
self.config_path = Path(config_path)
70+
71+
async def _watch_config(self):
72+
try:
73+
async for changes in awatch(self.config_path):
74+
if not self.running:
75+
break
76+
77+
for change_type, file_path in changes:
78+
if Path(file_path) == self.config_path:
79+
if change_type in (Change.modified, Change.added):
80+
await self._reload()
81+
82+
except asyncio.CancelledError:
83+
pass
84+
except Exception as e:
85+
logger.error(f"Error watching config file: {e}")
86+
87+
async def _reload(self):
88+
updated = self._validate_config()
89+
if updated:
90+
logger.info("Config file has been modified, notifying subscribers...")
91+
await self.notify_update(ConfigType.FILE)
92+
93+
def _validate_config(self):
94+
"""Validate the config file format."""
95+
try:
96+
with open(self.config_path, "r", encoding="utf-8") as f:
97+
_ = json.load(f)
98+
except json.JSONDecodeError:
99+
logger.error(f"Error parsing config file: {self.config_path}")
100+
return False
101+
else:
102+
return True
103+
104+
105+
class CloudConfigWatcher(BaseConfigWatcher):
106+
107+
def __init__(self, api_endpoint: str, poll_interval_ms: int = 1000) -> None:
108+
"""
109+
CloudConfigWatcher watches for changes in a remote config file.
110+
111+
Args:
112+
api_endpoint: API endpoint for polling remote config
113+
poll_interval_ms: Polling interval in milliseconds
114+
"""
115+
super().__init__(source_id=api_endpoint)
116+
self.api_endpoint = api_endpoint
117+
self.poll_interval_ms = poll_interval_ms
118+
self.last_config_hash = None
119+
120+
async def _watch_config(self):
121+
try:
122+
while self.running:
123+
config_data = await self._poll_remote_config()
124+
if config_data:
125+
current_hash = hash(str(config_data))
126+
if (self.last_config_hash is None) or (current_hash != self.last_config_hash):
127+
self.last_config_hash = current_hash
128+
await self.notify_update(ConfigType.CLOUD)
129+
130+
await asyncio.sleep(self.poll_interval_ms)
131+
except asyncio.CancelledError:
132+
pass
133+
134+
async def _poll_remote_config(self):
135+
return json.dumps({})

src/mcpm/core/schema.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Any, Dict, List, Optional, Union
2-
2+
from enum import Enum
33
from pydantic import BaseModel
44

55

@@ -82,3 +82,7 @@ class Profile(BaseModel):
8282
name: str
8383
api_key: Optional[str]
8484
servers: list[ServerConfig]
85+
86+
class ConfigType(str, Enum):
87+
FILE = "file"
88+
CLOUD = "cloud"

0 commit comments

Comments
 (0)