Skip to content

Commit 35141e2

Browse files
authored
add router config management and improve error handling in shutdown (#46)
* Add router config management and improve error handling for broken pipe errors * clean up session resources when connection error occurs during message send --------- Co-authored-by: calmini <[email protected]>
1 parent e958756 commit 35141e2

File tree

3 files changed

+116
-16
lines changed

3 files changed

+116
-16
lines changed

src/mcpm/commands/router.py

Lines changed: 100 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import psutil
1313
from rich.console import Console
1414

15+
from mcpm.utils.config import ConfigManager
1516
from mcpm.utils.platform import get_log_directory, get_pid_directory
1617

1718
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
@@ -25,6 +26,55 @@
2526
LOG_DIR = get_log_directory("mcpm")
2627
LOG_DIR.mkdir(parents=True, exist_ok=True)
2728

29+
# default config
30+
DEFAULT_HOST = "localhost"
31+
DEFAULT_PORT = 6276 # 6276 represents MCPM on a T9 keypad (6=M, 2=C, 7=P, 6=M)
32+
33+
34+
def get_router_config():
35+
"""get router configuration from config file, if not exists, flush default config"""
36+
config_manager = ConfigManager()
37+
config = config_manager.get_config()
38+
39+
# check if router config exists
40+
if "router" not in config:
41+
# create default config and save
42+
router_config = {"host": DEFAULT_HOST, "port": DEFAULT_PORT}
43+
config_manager.set_config("router", router_config)
44+
return router_config
45+
46+
# get existing config
47+
router_config = config.get("router", {})
48+
49+
# check if host and port exist, if not, set default values and update config
50+
# user may only set a customized port while leave host undefined
51+
updated = False
52+
if "host" not in router_config:
53+
router_config["host"] = DEFAULT_HOST
54+
updated = True
55+
if "port" not in router_config:
56+
router_config["port"] = DEFAULT_PORT
57+
updated = True
58+
59+
# save config if updated
60+
if updated:
61+
config_manager.set_config("router", router_config)
62+
63+
return router_config
64+
65+
66+
def save_router_config(host, port):
67+
"""save router configuration to config file"""
68+
config_manager = ConfigManager()
69+
router_config = config_manager.get_config().get("router", {})
70+
71+
# update config
72+
router_config["host"] = host
73+
router_config["port"] = port
74+
75+
# save config
76+
return config_manager.set_config("router", router_config)
77+
2878

2979
def is_process_running(pid):
3080
"""check if the process is running"""
@@ -56,7 +106,7 @@ def write_pid_file(pid):
56106
"""write the process id to the pid file"""
57107
try:
58108
PID_FILE.write_text(str(pid))
59-
logger.info(f"PID {pid} written to {PID_FILE}")
109+
logger.debug(f"PID {pid} written to {PID_FILE}")
60110
except IOError as e:
61111
logger.error(f"Error writing PID file: {e}")
62112
sys.exit(1)
@@ -77,16 +127,11 @@ def router():
77127

78128

79129
@router.command(name="on")
80-
@click.option("--host", type=str, default="0.0.0.0", help="Host to bind the SSE server to")
81-
@click.option("--port", type=int, default=8080, help="Port to bind the SSE server to")
82-
@click.option("--cors", type=str, help="Comma-separated list of allowed origins for CORS")
83-
def start_router(host, port, cors):
130+
def start_router():
84131
"""Start MCPRouter as a daemon process.
85132
86133
Example:
87134
mcpm router on
88-
mcpm router on --port 8888
89-
mcpm router on --host 0.0.0.0 --port 9000
90135
"""
91136
# check if there is a router already running
92137
existing_pid = read_pid_file()
@@ -95,10 +140,10 @@ def start_router(host, port, cors):
95140
console.print("Use 'mcpm router off' to stop the running instance.")
96141
return
97142

98-
# prepare environment variables
99-
env = os.environ.copy()
100-
if cors:
101-
env["MCPM_ROUTER_CORS"] = cors
143+
# get router config
144+
config = get_router_config()
145+
host = config["host"]
146+
port = config["port"]
102147

103148
# prepare uvicorn command
104149
uvicorn_cmd = [
@@ -126,7 +171,7 @@ def start_router(host, port, cors):
126171
uvicorn_cmd,
127172
stdout=log,
128173
stderr=log,
129-
env=env,
174+
env=os.environ.copy(),
130175
start_new_session=True, # create new session, so the process won't be affected by terminal closing
131176
)
132177

@@ -142,6 +187,42 @@ def start_router(host, port, cors):
142187
console.print(f"[bold red]Error:[/] Failed to start MCPRouter: {e}")
143188

144189

190+
@router.command(name="set")
191+
@click.option("-H", "--host", type=str, help="Host to bind the SSE server to")
192+
@click.option("-p", "--port", type=int, help="Port to bind the SSE server to")
193+
def set_router_config(host, port):
194+
"""Set MCPRouter global configuration.
195+
196+
Example:
197+
mcpm router set -H localhost -p 8888
198+
mcpm router set --host 127.0.0.1 --port 9000
199+
"""
200+
if not host and not port:
201+
console.print("[yellow]No changes were made. Please specify at least one option (--host or --port)[/]")
202+
return
203+
204+
# get current config, make sure all field are filled by default value if not exists
205+
current_config = get_router_config()
206+
207+
# if user does not specify a host, use current config
208+
host = host or current_config["host"]
209+
port = port or current_config["port"]
210+
211+
# save config
212+
if save_router_config(host, port):
213+
console.print(f"[bold green]Router configuration updated:[/] host={host}, port={port}")
214+
console.print("The new configuration will be used next time you start the router.")
215+
216+
# if router is running, prompt user to restart
217+
pid = read_pid_file()
218+
if pid:
219+
console.print("[yellow]Note: Router is currently running. Restart it to apply new settings:[/]")
220+
console.print(" mcpm router off")
221+
console.print(" mcpm router on")
222+
else:
223+
console.print("[bold red]Error:[/] Failed to save router configuration.")
224+
225+
145226
@router.command(name="off")
146227
def stop_router():
147228
"""Stop the running MCPRouter daemon process.
@@ -178,8 +259,14 @@ def router_status():
178259
Example:
179260
mcpm router status
180261
"""
262+
# get router config
263+
config = get_router_config()
264+
host = config["host"]
265+
port = config["port"]
266+
267+
# check process status
181268
pid = read_pid_file()
182269
if pid:
183-
console.print(f"[bold green]MCPRouter is running[/] (PID: {pid})")
270+
console.print(f"[bold green]MCPRouter is running[/] at http://{host}:{port} (PID: {pid})")
184271
else:
185272
console.print("[yellow]MCPRouter is not running.[/]")

src/mcpm/router/app.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import os
33
from contextlib import asynccontextmanager
4-
from pathlib import Path
54

65
from starlette.applications import Starlette
76
from starlette.middleware import Middleware

src/mcpm/router/transport.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,24 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
158158
logger.error(f"Failed to parse message: {err}")
159159
response = Response("Could not parse message", status_code=400)
160160
await response(scope, receive, send)
161-
await writer.send(err)
161+
try:
162+
await writer.send(err)
163+
except (BrokenPipeError, ConnectionError, OSError) as pipe_err:
164+
logger.warning(f"Failed to send error due to pipe issue: {pipe_err}")
162165
return
163166

164167
logger.debug(f"Sending message to writer: {message}")
165168
response = Response("Accepted", status_code=202)
166169
await response(scope, receive, send)
167-
await writer.send(message)
170+
171+
# add error handling, catch possible pipe errors
172+
try:
173+
await writer.send(message)
174+
except (BrokenPipeError, ConnectionError, OSError) as e:
175+
# if it's EPIPE error or other connection error, log it but don't throw an exception
176+
if isinstance(e, OSError) and e.errno == 32: # EPIPE
177+
logger.warning(f"EPIPE error when sending message to session {session_id}, connection may be closing")
178+
else:
179+
logger.warning(f"Connection error when sending message to session {session_id}: {e}")
180+
self._read_stream_writers.pop(session_id, None)
181+
self._session_id_to_profile.pop(session_id, None)

0 commit comments

Comments
 (0)