Skip to content

Commit b4d8c54

Browse files
authored
CLI and Translate (#228)
Signed-off-by: Mihai Criveti <[email protected]>
1 parent 664541c commit b4d8c54

File tree

2 files changed

+168
-13
lines changed

2 files changed

+168
-13
lines changed

mcpgateway/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def main() -> None: # noqa: D401 – imperative mood is fine here
122122

123123
# Uvicorn's `main()` uses sys.argv – patch it in and run.
124124
sys.argv = ["mcpgateway", *uvicorn_argv]
125-
uvicorn.main()
125+
uvicorn.main() # pylint: disable=no-value-for-parameter
126126

127127

128128
if __name__ == "__main__": # pragma: no cover – executed only when run directly

mcpgateway/translate.py

Lines changed: 167 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@
6060
from sse_starlette.sse import EventSourceResponse
6161
import uvicorn
6262

63+
# Conditional imports
64+
try:
65+
# Third-Party
66+
from fastapi.middleware.cors import CORSMiddleware
67+
except ImportError:
68+
CORSMiddleware = None
69+
70+
try:
71+
# Third-Party
72+
import httpx
73+
except ImportError:
74+
httpx = None
75+
6376
LOGGER = logging.getLogger("mcpgateway.translate")
6477
KEEP_ALIVE_INTERVAL = 30 # seconds - matches the reference implementation
6578
__all__ = ["main"] # for console-script entry-point
@@ -75,6 +88,14 @@ def __init__(self) -> None:
7588
self._subscribers: List[asyncio.Queue[str]] = []
7689

7790
async def publish(self, data: str) -> None:
91+
"""Publish data to all subscribers.
92+
93+
Args:
94+
data: The data string to publish to all subscribers.
95+
96+
Returns:
97+
None
98+
"""
7899
dead: List[asyncio.Queue[str]] = []
79100
for q in self._subscribers:
80101
try:
@@ -86,11 +107,24 @@ async def publish(self, data: str) -> None:
86107
self._subscribers.remove(q)
87108

88109
def subscribe(self) -> "asyncio.Queue[str]":
110+
"""Subscribe to published data.
111+
112+
Returns:
113+
asyncio.Queue[str]: A queue that will receive published data.
114+
"""
89115
q: asyncio.Queue[str] = asyncio.Queue(maxsize=1024)
90116
self._subscribers.append(q)
91117
return q
92118

93119
def unsubscribe(self, q: "asyncio.Queue[str]") -> None:
120+
"""Unsubscribe from published data.
121+
122+
Args:
123+
q: The queue to unsubscribe from published data.
124+
125+
Returns:
126+
None
127+
"""
94128
with suppress(ValueError):
95129
self._subscribers.remove(q)
96130

@@ -109,6 +143,16 @@ def __init__(self, cmd: str, pubsub: _PubSub) -> None:
109143
self._pump_task: Optional[asyncio.Task[None]] = None
110144

111145
async def start(self) -> None:
146+
"""Start the stdio subprocess.
147+
148+
Creates the subprocess and starts the stdout pump task.
149+
150+
Returns:
151+
None
152+
153+
Raises:
154+
OSError: If the subprocess cannot be started.
155+
"""
112156
LOGGER.info("Starting stdio subprocess: %s", self._cmd)
113157
self._proc = await asyncio.create_subprocess_exec(
114158
*shlex.split(self._cmd),
@@ -121,6 +165,13 @@ async def start(self) -> None:
121165
self._pump_task = asyncio.create_task(self._pump_stdout())
122166

123167
async def stop(self) -> None:
168+
"""Stop the stdio subprocess.
169+
170+
Terminates the subprocess and cancels the pump task.
171+
172+
Returns:
173+
None
174+
"""
124175
if self._proc is None:
125176
return
126177
LOGGER.info("Stopping subprocess (pid=%s)", self._proc.pid)
@@ -131,13 +182,35 @@ async def stop(self) -> None:
131182
self._pump_task.cancel()
132183

133184
async def send(self, raw: str) -> None:
185+
"""Send data to the subprocess stdin.
186+
187+
Args:
188+
raw: The raw data string to send to the subprocess.
189+
190+
Returns:
191+
None
192+
193+
Raises:
194+
RuntimeError: If the stdio endpoint is not started.
195+
"""
134196
if not self._stdin:
135197
raise RuntimeError("stdio endpoint not started")
136198
LOGGER.debug("→ stdio: %s", raw.strip())
137199
self._stdin.write(raw.encode())
138200
await self._stdin.drain()
139201

140202
async def _pump_stdout(self) -> None:
203+
"""Pump stdout from subprocess to pubsub.
204+
205+
Continuously reads lines from the subprocess stdout and publishes them
206+
to the pubsub system.
207+
208+
Returns:
209+
None
210+
211+
Raises:
212+
Exception: If the stdout pump encounters an error.
213+
"""
141214
assert self._proc and self._proc.stdout
142215
reader = self._proc.stdout
143216
try:
@@ -168,13 +241,23 @@ def _build_fastapi(
168241
message_path: str = "/message",
169242
cors_origins: Optional[List[str]] = None,
170243
) -> FastAPI:
244+
"""Build FastAPI application with SSE and message endpoints.
245+
246+
Args:
247+
pubsub: The publish/subscribe system for message routing.
248+
stdio: The stdio endpoint for subprocess communication.
249+
keep_alive: Interval in seconds for keepalive messages. Defaults to KEEP_ALIVE_INTERVAL.
250+
sse_path: Path for the SSE endpoint. Defaults to "/sse".
251+
message_path: Path for the message endpoint. Defaults to "/message".
252+
cors_origins: Optional list of CORS allowed origins.
253+
254+
Returns:
255+
FastAPI: The configured FastAPI application.
256+
"""
171257
app = FastAPI()
172258

173259
# Add CORS middleware if origins specified
174-
if cors_origins:
175-
# Third-Party
176-
from fastapi.middleware.cors import CORSMiddleware
177-
260+
if cors_origins and CORSMiddleware:
178261
app.add_middleware(
179262
CORSMiddleware,
180263
allow_origins=cors_origins,
@@ -254,6 +337,7 @@ async def post_message(raw: Request, session_id: str | None = None) -> Response:
254337
Response: ``202 Accepted`` if the payload is forwarded successfully,
255338
or ``400 Bad Request`` when the body is not valid JSON.
256339
"""
340+
_ = session_id # Unused but required for API compatibility
257341
payload = await raw.body()
258342
try:
259343
json.loads(payload) # validate
@@ -268,6 +352,11 @@ async def post_message(raw: Request, session_id: str | None = None) -> Response:
268352
# ----- Liveness ---------------------------------------------------------#
269353
@app.get("/healthz")
270354
async def health() -> Response: # noqa: D401
355+
"""Health check endpoint.
356+
357+
Returns:
358+
Response: A plain text response with "ok" status.
359+
"""
271360
return PlainTextResponse("ok")
272361

273362
return app
@@ -279,6 +368,17 @@ async def health() -> Response: # noqa: D401
279368

280369

281370
def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
371+
"""Parse command line arguments.
372+
373+
Args:
374+
argv: Sequence of command line arguments.
375+
376+
Returns:
377+
argparse.Namespace: Parsed command line arguments.
378+
379+
Raises:
380+
NotImplementedError: If streamableHttp option is specified.
381+
"""
282382
p = argparse.ArgumentParser(
283383
prog="mcpgateway.translate",
284384
description="Bridges stdio JSON-RPC to SSE or SSE to stdio.",
@@ -312,6 +412,17 @@ def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
312412

313413

314414
async def _run_stdio_to_sse(cmd: str, port: int, log_level: str = "info", cors: Optional[List[str]] = None) -> None:
415+
"""Run stdio to SSE bridge.
416+
417+
Args:
418+
cmd: The command to run as a stdio subprocess.
419+
port: The port to bind the HTTP server to.
420+
log_level: The logging level to use. Defaults to "info".
421+
cors: Optional list of CORS allowed origins.
422+
423+
Returns:
424+
None
425+
"""
315426
pubsub = _PubSub()
316427
stdio = StdIOEndpoint(cmd, pubsub)
317428
await stdio.start()
@@ -346,9 +457,21 @@ async def _shutdown() -> None:
346457
await _shutdown() # final cleanup
347458

348459

349-
async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str], log_level: str = "info") -> None:
350-
# Third-Party
351-
import httpx
460+
async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str]) -> None:
461+
"""Run SSE to stdio bridge.
462+
463+
Args:
464+
url: The SSE endpoint URL to connect to.
465+
oauth2_bearer: Optional OAuth2 bearer token for authentication.
466+
467+
Returns:
468+
None
469+
470+
Raises:
471+
ImportError: If httpx package is not available.
472+
"""
473+
if not httpx:
474+
raise ImportError("httpx package is required for SSE to stdio bridging")
352475

353476
headers = {}
354477
if oauth2_bearer:
@@ -384,24 +507,56 @@ async def pump_sse_to_stdio():
384507

385508

386509
def start_stdio(cmd, port, log_level, cors):
510+
"""Start stdio bridge.
511+
512+
Args:
513+
cmd: The command to run as a stdio subprocess.
514+
port: The port to bind the HTTP server to.
515+
log_level: The logging level to use.
516+
cors: Optional list of CORS allowed origins.
517+
518+
Returns:
519+
None
520+
"""
387521
return asyncio.run(_run_stdio_to_sse(cmd, port, log_level, cors))
388522

389523

390-
def start_sse(url, bearer, log_level):
391-
return asyncio.run(_run_sse_to_stdio(url, bearer, log_level))
524+
def start_sse(url, bearer):
525+
"""Start SSE bridge.
526+
527+
Args:
528+
url: The SSE endpoint URL to connect to.
529+
bearer: Optional OAuth2 bearer token for authentication.
530+
531+
Returns:
532+
None
533+
"""
534+
return asyncio.run(_run_sse_to_stdio(url, bearer))
535+
536+
537+
def main(argv: Optional[Sequence[str]] | None = None) -> None:
538+
"""Entry point for the translate module.
539+
540+
Args:
541+
argv: Optional sequence of command line arguments. If None, uses sys.argv[1:].
392542
543+
Returns:
544+
None
393545
394-
def main(argv: Optional[Sequence[str]] | None = None) -> None: # entry-point
546+
Raises:
547+
NotImplementedError: If an unsupported option is specified.
548+
KeyboardInterrupt: If the user interrupts the process.
549+
"""
395550
args = _parse_args(argv or sys.argv[1:])
396551
logging.basicConfig(
397552
level=getattr(logging, args.logLevel.upper(), logging.INFO),
398553
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
399554
)
400555
try:
401556
if args.stdio:
402-
return start_stdio(args.stdio, args.port, args.logLevel, args.cors)
557+
start_stdio(args.stdio, args.port, args.logLevel, args.cors)
403558
elif args.sse:
404-
return start_sse(args.sse, args.oauth2Bearer, args.logLevel)
559+
start_sse(args.sse, args.oauth2Bearer)
405560
except KeyboardInterrupt:
406561
print("") # restore shell prompt
407562
sys.exit(0)

0 commit comments

Comments
 (0)