forked from IBM/mcp-context-forge
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsmoketest.py
More file actions
executable file
Β·1062 lines (868 loc) Β· 42.2 KB
/
smoketest.py
File metadata and controls
executable file
Β·1062 lines (868 loc) Β· 42.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# Author : Mihai Criveti
# Description: π οΈ MCP Gateway Smoke-Test Utility
This script verifies a full install + runtime setup of the MCP Gateway:
- Creates a virtual environment and installs dependencies.
- Builds and runs the Docker HTTPS container.
- Starts the MCP Time Server via mcpgateway.translate.
- Verifies /health, /ready, /version before registering the gateway.
- Federates the time server as a gateway, verifies its tool list.
- Invokes the remote tool via /rpc and checks the result.
- Tests resource management, prompts, virtual servers, and error handling.
- Cleans up all created entities (gateway, process, container).
- Streams logs live with --tail and prints step timings.
Usage:
./smoketest.py Run full test
./smoketest.py --start-step 6 Resume from step 6
./smoketest.py --cleanup-only Just run cleanup
./smoketest.py -v Verbose (shows full logs)
"""
# Future
from __future__ import annotations
# Standard
import argparse
from collections import deque
import itertools
import json
import logging
import os
import shlex
import signal
import socket
import subprocess
import sys
import threading
import time
from types import SimpleNamespace
from typing import Callable, Dict, List, Tuple
import uuid
from urllib.parse import urlencode
# First-Party
from mcpgateway.config import settings
# βββββββββββββββββββββββββ Ports / constants ββββββββββββββββββββββββββββ
PORT_GATEWAY = 4444 # HTTPS container
PORT_TIME_SERVER = 8002 # mcpgateway.translate
DOCKER_CONTAINER = "mcpgateway"
MAKE_VENV_CMD = ["make", "venv", "install", "install-dev"]
MAKE_DOCKER_BUILD = ["make", "docker"]
MAKE_DOCKER_RUN = ["make", "docker-run-ssl-host"]
MAKE_DOCKER_STOP = ["make", "docker-stop"]
TRANSLATE_CMD = [
"python3",
"-m",
"mcpgateway.translate",
"--stdio",
"uvx mcp-server-time --local-timezone=Europe/Dublin",
"--port",
str(PORT_TIME_SERVER),
]
# βββββββββββββββββββββββββ Test State Tracking βββββββββββββββββββββββββ
class TestContext:
"""Track all created entities for proper cleanup"""
def __init__(self):
self.gateways: List[int] = []
self.resources: List[str] = []
self.prompts: List[str] = []
self.tools: List[str] = []
self.virtual_servers: List[str] = []
self.test_results: Dict[str, bool] = {}
self.error_messages: Dict[str, str] = {}
def add_gateway(self, gid: int):
self.gateways.append(gid)
def add_resource(self, uri: str):
self.resources.append(uri)
def add_prompt(self, name: str):
self.prompts.append(name)
def add_tool(self, tool_id: str):
self.tools.append(tool_id)
def add_virtual_server(self, server_id: str):
self.virtual_servers.append(server_id)
def record_test(self, name: str, success: bool, error: str = ""):
self.test_results[name] = success
if error:
self.error_messages[name] = error
def summary(self) -> str:
total = len(self.test_results)
passed = sum(1 for v in self.test_results.values() if v)
failed = total - passed
return f"Tests: {total} | Passed: {passed} | Failed: {failed}"
# Global test context
test_ctx = TestContext()
# βββββββββββββββββββββββ Helper: pretty sections ββββββββββββββββββββββββ
def log_section(title: str, emoji: str = "βοΈ"):
logging.info("\n%s %s\n%s", emoji, title, "β" * (len(title) + 4))
# βββββββββββββββββββββββββ Tail-N streaming runner βββββββββββββββββββββββ
_spinner_cycle = itertools.cycle("β β β Ήβ Έβ Όβ ΄β ¦β §β β ")
def run_shell(
cmd: List[str] | str,
desc: str,
*,
tail: int,
verbose: bool,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""Run *cmd*; show rolling tail N lines refreshed in place."""
log_section(desc, "π")
logging.debug("CMD: %s", cmd if isinstance(cmd, str) else " ".join(shlex.quote(c) for c in cmd))
proc = subprocess.Popen(
cmd,
shell=isinstance(cmd, str),
text=True,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
full_buf: list[str] = []
tail_buf: deque[str] = deque(maxlen=tail)
done = threading.Event()
def pump():
assert proc.stdout
for raw in proc.stdout:
line = raw.rstrip("\n")
full_buf.append(line)
tail_buf.append(line)
if verbose:
print(line)
done.set()
threading.Thread(target=pump, daemon=True).start()
start = time.time()
try:
while not done.is_set():
time.sleep(0.2)
if verbose:
continue
spinner = next(_spinner_cycle)
header = f"{spinner} {desc} (elapsed {time.time() - start:4.0f}s)"
pane_lines = list(tail_buf)
pane_height = len(pane_lines) + 2
sys.stdout.write(f"\x1b[{pane_height}F\x1b[J") # rewind & clear
print(header)
for l in pane_lines:
print(l[:120])
print()
sys.stdout.flush()
except KeyboardInterrupt:
proc.terminate()
raise
finally:
proc.wait()
if not verbose: # clear final pane
sys.stdout.write(f"\x1b[{min(len(tail_buf) + 2, tail + 2)}F\x1b[J")
sys.stdout.flush()
globals()["_PREV_CMD_OUTPUT"] = "\n".join(full_buf) # for show_last()
status = "β
PASS" if proc.returncode == 0 else "β FAIL"
logging.info("%s - %s", status, desc)
if proc.returncode and check:
logging.error("β³ Last %d lines:\n%s", tail, "\n".join(tail_buf))
raise subprocess.CalledProcessError(proc.returncode, cmd, output="\n".join(full_buf))
return subprocess.CompletedProcess(cmd, proc.returncode, "\n".join(full_buf), "")
def show_last(lines: int = 30):
txt = globals().get("_PREV_CMD_OUTPUT", "")
print("\n".join(txt.splitlines()[-lines:]))
# βββββββββββββββββββββββββββββ Networking utils ββββββββββββββββββββββββββ
def port_open(port: int, host="127.0.0.1", timeout=1.0) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
return s.connect_ex((host, port)) == 0
def wait_http_ok(url: str, timeout: int = 30, *, headers: dict | None = None) -> bool:
# Third-Party
import requests
end = time.time() + timeout
while time.time() < end:
try:
if requests.get(url, timeout=2, verify=False, headers=headers).status_code == 200:
return True
except requests.RequestException:
pass
time.sleep(1)
return False
# Third-Party
# βββββββββββββββββββββββββββββ Requests wrapper ββββββββββββββββββββββββββ
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def generate_jwt() -> str:
"""
Create a short-lived admin JWT that matches the gateway's settings.
Resolution order β environment-variable override, then package defaults.
"""
# Use email format for new authentication system
user = os.getenv("PLATFORM_ADMIN_EMAIL", "admin@example.com")
secret = os.getenv("JWT_SECRET_KEY", "my-test-key")
expiry = os.getenv("TOKEN_EXPIRY", "300") # seconds
cmd = [
"docker",
"exec",
DOCKER_CONTAINER,
"python3",
"-m",
"mcpgateway.utils.create_jwt_token",
"--username",
user,
"--exp",
expiry,
"--secret",
secret,
]
return subprocess.check_output(cmd, text=True).strip().strip('"')
def request(method: str, path: str, *, json_data=None, form_data=None, **kw):
# Third-Party
import requests
token = generate_jwt()
kw.setdefault("headers", {})["Authorization"] = f"Bearer {token}"
kw["verify"] = False
url = f"https://localhost:{PORT_GATEWAY}{path}"
t0 = time.time()
content_type = os.getenv("FORGE_CONTENT_TYPE", "application/json")
kw["headers"]["Content-Type"] = content_type
if content_type == "application/x-www-form-urlencoded" and form_data is not None:
resp = requests.request(method, url, data=urlencode(form_data), **kw)
else:
resp = requests.request(method, url, json=json_data, **kw)
ms = (time.time() - t0) * 1000
logging.info("β %s %s %s %.0f ms", method.upper(), path, resp.status_code, ms)
logging.debug(" β³ response: %s", resp.text[:400])
return resp
# βββββββββββββββββββββββββββββ Cleanup logic βββββββββββββββββββββββββββββ
_translate_proc: subprocess.Popen | None = None
_translate_log_file = None
def cleanup():
log_section("Cleanup", "π§Ή")
global _translate_proc, _translate_log_file
# Clean up all created entities
cleanup_errors = []
# Delete virtual servers
for server_id in test_ctx.virtual_servers:
try:
logging.info("ποΈ Deleting virtual server: %s", server_id)
request("DELETE", f"/servers/{server_id}")
except Exception as e:
cleanup_errors.append(f"Failed to delete server {server_id}: {e}")
# Delete tools
for tool_id in test_ctx.tools:
try:
logging.info("ποΈ Deleting tool: %s", tool_id)
request("DELETE", f"/tools/{tool_id}")
except Exception as e:
cleanup_errors.append(f"Failed to delete tool {tool_id}: {e}")
# Delete prompts
for prompt_name in test_ctx.prompts:
try:
logging.info("ποΈ Deleting prompt: %s", prompt_name)
request("DELETE", f"/prompts/{prompt_name}")
except Exception as e:
cleanup_errors.append(f"Failed to delete prompt {prompt_name}: {e}")
# Delete resources
for resource_uri in test_ctx.resources:
try:
logging.info("ποΈ Deleting resource: %s", resource_uri)
request("DELETE", f"/resources/{resource_uri}")
except Exception as e:
cleanup_errors.append(f"Failed to delete resource {resource_uri}: {e}")
# Delete gateways
for gid in test_ctx.gateways:
try:
logging.info("ποΈ Deleting gateway ID: %s", gid)
request("DELETE", f"/gateways/{gid}")
except Exception as e:
cleanup_errors.append(f"Failed to delete gateway {gid}: {e}")
# Clean up the translate process
if _translate_proc and _translate_proc.poll() is None:
logging.info("π Terminating mcpgateway.translate process (PID: %d)", _translate_proc.pid)
_translate_proc.terminate()
try:
_translate_proc.wait(timeout=5)
logging.info("β
mcpgateway.translate process terminated cleanly")
except subprocess.TimeoutExpired:
logging.warning("β οΈ mcpgateway.translate didn't terminate in time, killing it")
_translate_proc.kill()
_translate_proc.wait()
# Close log file if open
if _translate_log_file:
_translate_log_file.close()
_translate_log_file = None
# Stop docker container
logging.info("π Stopping Docker container")
subprocess.run(MAKE_DOCKER_STOP, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if cleanup_errors:
logging.warning("β οΈ Cleanup completed with errors:")
for err in cleanup_errors:
logging.warning(" - %s", err)
else:
logging.info("β
Cleanup complete")
# βββββββββββββββββββββββββββββ Test steps ββββββββββββββββββββββββββββββββ
cfg = SimpleNamespace(tail=10, verbose=False) # populated in main()
def sh(cmd, desc): # shorthand
return run_shell(cmd, desc, tail=cfg.tail, verbose=cfg.verbose)
def step_1_setup_venv():
sh(MAKE_VENV_CMD, "1οΈβ£ Create venv + install deps")
def step_2_pip_install():
sh(["pip", "install", "."], "2οΈβ£ pip install .")
def step_3_docker_build():
sh(MAKE_DOCKER_BUILD, "3οΈβ£ Build Docker image")
def step_4_docker_run():
sh(MAKE_DOCKER_RUN, "4οΈβ£ Run Docker container (HTTPS)")
# Build one token and reuse it for the health probes below.
token = generate_jwt()
auth_headers = {"Authorization": f"Bearer {token}"}
# Probe endpoints until they respond with 200.
for ep in ("/health", "/ready", "/version"):
full = f"https://localhost:{PORT_GATEWAY}{ep}"
need_auth = os.getenv("AUTH_REQUIRED", "true").lower() == "true"
headers = auth_headers if (ep == "/version" or need_auth) else None
logging.info("π Waiting for endpoint %s (auth: %s)", ep, bool(headers))
if not wait_http_ok(full, 45, headers=headers):
raise RuntimeError(f"Gateway endpoint {ep} not ready")
logging.info("β
Endpoint %s is ready", ep)
logging.info("β
Gateway /health /ready /version all OK")
def step_5_start_time_server(restart=False):
global _translate_proc, _translate_log_file
# Check if uvx is available
try:
uvx_check = subprocess.run(["uvx", "--version"], capture_output=True, text=True)
if uvx_check.returncode == 0:
logging.info("π Found uvx version: %s", uvx_check.stdout.strip())
else:
logging.warning("β οΈ uvx not found or not working. This may cause issues.")
except FileNotFoundError:
logging.warning("β οΈ uvx not found. Please install uv (pip install uv) if the time server fails.")
if port_open(PORT_TIME_SERVER):
if restart:
logging.info("π Restarting process on port %d", PORT_TIME_SERVER)
try:
pid = int(subprocess.check_output(["lsof", "-ti", f"TCP:{PORT_TIME_SERVER}"], text=True).strip())
logging.info("π Found existing process PID: %d", pid)
os.kill(pid, signal.SIGTERM)
time.sleep(2)
except Exception as e:
logging.warning("Could not stop existing server: %s", e)
else:
logging.info("βΉοΈ Re-using MCP-Time-Server on port %d", PORT_TIME_SERVER)
return
if not port_open(PORT_TIME_SERVER):
log_section("Launching MCP-Time-Server", "β°")
logging.info("π Command: %s", " ".join(shlex.quote(c) for c in TRANSLATE_CMD))
# Create a log file for the time server output
log_filename = f"translate_{int(time.time())}.log"
_translate_log_file = open(log_filename, "w")
logging.info("π Logging mcpgateway.translate output to: %s", log_filename)
# Start the process directly
_translate_proc = subprocess.Popen(TRANSLATE_CMD, stdout=_translate_log_file, stderr=subprocess.STDOUT, text=True, bufsize=1)
logging.info("π Started mcpgateway.translate process with PID: %d", _translate_proc.pid)
# Wait for the server to start
start_time = time.time()
timeout = 30
check_interval = 0.5
while time.time() - start_time < timeout:
# Check if process is still running
exit_code = _translate_proc.poll()
if exit_code is not None:
# Process exited, read the log file
_translate_log_file.close()
with open(log_filename, "r") as f:
output = f.read()
logging.error("β Time-Server process exited with code %d", exit_code)
logging.error("π Process output:\n%s", output)
raise RuntimeError(f"Time-Server exited with code {exit_code}. Check the logs above.")
# Check if port is open
if port_open(PORT_TIME_SERVER):
elapsed = time.time() - start_time
logging.info("β
Time-Server is listening on port %d (took %.1fs)", PORT_TIME_SERVER, elapsed)
# Give it a moment to fully initialize
time.sleep(1)
# Double-check it's still running
if _translate_proc.poll() is None:
return
else:
raise RuntimeError("Time-Server started but then immediately exited")
# Log progress
if int(time.time() - start_time) % 5 == 0:
logging.info("β³ Still waiting for Time-Server to start... (%.0fs elapsed)", time.time() - start_time)
time.sleep(check_interval)
# Timeout reached
if _translate_proc.poll() is None:
_translate_proc.terminate()
_translate_proc.wait()
_translate_log_file.close()
with open(log_filename, "r") as f:
output = f.read()
logging.error("π Process output:\n%s", output)
raise RuntimeError(f"Time-Server failed to start within {timeout}s")
def step_6_register_gateway() -> int:
log_section("Registering gateway", "π")
payload = {"name": "smoketest_time_server", "url": f"http://localhost:{PORT_TIME_SERVER}/sse"}
logging.info("π€ Registering gateway with payload: %s", json.dumps(payload, indent=2))
r = request("POST", "/gateways", json_data=payload)
if r.status_code in (200, 201):
gid = r.json()["id"]
logging.info("β
Gateway ID %s registered", gid)
test_ctx.add_gateway(gid)
return gid
# 409 conflict β find existing
if r.status_code == 409:
logging.info("β οΈ Gateway already exists, fetching existing one")
gateways = request("GET", "/gateways").json()
gw = next((g for g in gateways if g["name"] == payload["name"]), None)
if gw:
logging.info("βΉοΈ Gateway already present - using ID %s", gw["id"])
test_ctx.add_gateway(gw["id"])
return gw["id"]
else:
raise RuntimeError("Gateway conflict but not found in list")
# other error
msg = r.text
try:
msg = json.loads(msg)
except Exception:
pass
raise RuntimeError(f"Gateway registration failed {r.status_code}: {msg}")
def step_7_verify_tools():
logging.info("π Fetching tool list")
tools = request("GET", "/tools").json()
tool_names = [t["name"] for t in tools]
expected_tool = f"smoketest-time-server{settings.gateway_tool_name_separator}get-current-time"
logging.info("π Found %d tools total", len(tool_names))
logging.debug("π All tools: %s", json.dumps(tool_names, indent=2))
if expected_tool not in tool_names:
# Log similar tools to help debug
similar = [t for t in tool_names if "time" in t.lower() or "smoketest" in t.lower()]
if similar:
logging.error("β Expected tool not found. Similar tools: %s", similar)
raise AssertionError(f"{expected_tool} not found in tools list")
logging.info("β
Tool '%s' visible in /tools", expected_tool)
def step_8_invoke_tool():
log_section("Invoking remote tool", "π§")
body = {"jsonrpc": "2.0", "id": 1, "method": f"smoketest-time-server{settings.gateway_tool_name_separator}get-current-time", "params": {"timezone": "Europe/Dublin"}}
logging.info("π€ RPC request: %s", json.dumps(body, indent=2))
j = request("POST", "/rpc", json_data=body).json()
logging.info("π₯ RPC response: %s", json.dumps(j, indent=2))
if "error" in j:
raise RuntimeError(f"RPC error: {j['error']}")
result = j.get("result", j)
if "content" not in result:
raise RuntimeError(f"Missing 'content' in tool response. Got: {result}")
content = result["content"]
if not content or not isinstance(content, list):
raise RuntimeError(f"Invalid content format. Expected list, got: {type(content)}")
text = content[0].get("text", "")
if not text:
raise RuntimeError(f"No text in content. Content: {content}")
if "datetime" not in text:
raise RuntimeError(f"Expected 'datetime' in response, got: {text}")
logging.info("β
Tool invocation returned time: %s", text[:100])
def step_9_version_health():
log_section("Final health check", "π₯")
health_resp = request("GET", "/health").json()
logging.info("π₯ Health response: %s", json.dumps(health_resp, indent=2))
health = health_resp.get("status", "").lower()
assert health in ("ok", "healthy"), f"Unexpected health status: {health}"
ver_resp = request("GET", "/version").json()
logging.info("π₯ Version response: %s", json.dumps(ver_resp, indent=2))
ver = ver_resp.get("app", {}).get("name", "Unknown")
logging.info("β
Health OK - app %s", ver)
def step_10_cleanup_gateway(gid: int | None = None):
log_section("Cleanup gateway registration", "π§Ή")
if gid is None:
logging.warning("π§Ή No gateway ID; nothing to delete")
return
logging.info("ποΈ Deleting gateway ID: %s", gid)
request("DELETE", f"/gateways/{gid}")
# Verify it's gone
gateways = request("GET", "/gateways").json()
if any(g["id"] == gid for g in gateways):
raise RuntimeError(f"Gateway {gid} still exists after deletion")
logging.info("β
Gateway deleted successfully")
# ===== NEW PHASE 1 TEST STEPS =====
def step_11_enhanced_tool_testing():
"""Enhanced tool testing with multiple scenarios"""
log_section("Enhanced Tool Testing", "π§")
# Test 1: Multiple tool invocations in sequence
logging.info("π Test: Multiple tool invocations in sequence")
for tz in ["Europe/London", "America/New_York", "Asia/Tokyo"]:
body = {"jsonrpc": "2.0", "id": f"seq-{tz}", "method": f"smoketest-time-server{settings.gateway_tool_name_separator}get-current-time", "params": {"timezone": tz}}
r = request("POST", "/rpc", json_data=body)
assert r.status_code == 200, f"Failed to get time for {tz}"
test_ctx.record_test(f"tool_invoke_{tz}", True)
# Test 2: Tool with invalid parameters
logging.info("π Test: Tool with invalid parameters")
body = {"jsonrpc": "2.0", "id": "invalid-params", "method": f"smoketest-time-server{settings.gateway_tool_name_separator}get-current-time", "params": {"invalid_param": "test"}}
r = request("POST", "/rpc", json_data=body)
if r.status_code != 200:
test_ctx.record_test("tool_invalid_params", True)
else:
# Check if response contains error
resp = r.json()
test_ctx.record_test("tool_invalid_params", "error" in resp)
# Test 3: Tool discovery filtering
logging.info("π Test: Tool discovery with filtering")
tools = request("GET", "/tools").json()
time_tools = [t for t in tools if "time" in t["name"].lower()]
test_ctx.record_test("tool_discovery_filter", len(time_tools) > 0)
# Test 4: Get specific tool details
logging.info("π Test: Get specific tool details")
if tools:
tool_id = tools[0]["id"]
r = request("GET", f"/tools/{tool_id}")
test_ctx.record_test("tool_details", r.status_code == 200)
if r.status_code == 200:
details = r.json()
# Verify tool has required fields
required_fields = ["id", "name", "description"]
has_fields = all(field in details for field in required_fields)
test_ctx.record_test("tool_schema_validation", has_fields)
logging.info("β
Enhanced tool testing completed")
def step_12_resource_management():
"""Test resource creation, retrieval, update, and deletion"""
log_section("Resource Management Testing", "π")
# Test 1: Create Markdown resource
logging.info("π Test: Create Markdown resource")
md_resource = {
"uri": f"test/readme_{uuid.uuid4().hex[:8]}",
"name": "Test README",
"description": "Test markdown resource",
"mimeType": "text/markdown",
"content": "# Test Resource\n\nThis is a test markdown resource.\n\n## Features\n- Test item 1\n- Test item 2",
}
r = request("POST", "/resources", json_data=md_resource)
if r.status_code in (200, 201):
test_ctx.add_resource(md_resource["uri"])
test_ctx.record_test("resource_create_markdown", True)
else:
test_ctx.record_test("resource_create_markdown", False, r.text)
# Test 2: Create JSON resource
logging.info("π Test: Create JSON resource")
json_resource = {
"uri": f"config/test_{uuid.uuid4().hex[:8]}",
"name": "Test Config",
"description": "Test JSON configuration",
"mimeType": "application/json",
"content": json.dumps({"version": "1.0.0", "debug": True, "features": ["test1", "test2"]}),
}
r = request("POST", "/resources", json_data=json_resource)
if r.status_code in (200, 201):
test_ctx.add_resource(json_resource["uri"])
test_ctx.record_test("resource_create_json", True)
else:
test_ctx.record_test("resource_create_json", False, r.text)
# Test 3: Create plain text resource
logging.info("π Test: Create plain text resource")
text_resource = {"uri": f"docs/notes_{uuid.uuid4().hex[:8]}", "name": "Test Notes", "description": "Plain text notes", "mimeType": "text/plain", "content": "These are test notes.\nLine 2\nLine 3"}
r = request("POST", "/resources", json_data=text_resource)
if r.status_code in (200, 201):
test_ctx.add_resource(text_resource["uri"])
test_ctx.record_test("resource_create_text", True)
else:
test_ctx.record_test("resource_create_text", False, r.text)
# Test 4: List resources
logging.info("π Test: List resources")
r = request("GET", "/resources")
if r.status_code == 200:
resources = r.json()
test_ctx.record_test("resource_list", len(resources) >= 3)
else:
test_ctx.record_test("resource_list", False, r.text)
# Test 5: Get resource by URI (content)
if test_ctx.resources:
logging.info("π Test: Get resource content")
# Note: The endpoint might be /resources/{uri}/content or similar
# Adjust based on actual API
test_uri = test_ctx.resources[0]
r = request("GET", f"/resources/{test_uri}")
test_ctx.record_test("resource_get_content", r.status_code == 200)
# Test 6: Update resource
if test_ctx.resources:
logging.info("π Test: Update resource")
update_data = {"content": "# Updated Content\n\nThis content has been updated."}
r = request("PUT", f"/resources/{test_ctx.resources[0]}", json_data=update_data)
test_ctx.record_test("resource_update", r.status_code in (200, 204))
# Test 7: Delete resource
if len(test_ctx.resources) > 1:
logging.info("π Test: Delete resource")
delete_uri = test_ctx.resources.pop() # Remove from tracking
r = request("DELETE", f"/resources/{delete_uri}")
test_ctx.record_test("resource_delete", r.status_code in (200, 204))
logging.info("β
Resource management testing completed")
def step_13_prompt_management():
"""Test prompt creation with and without arguments"""
log_section("Prompt Management Testing", "π¬")
# Test 1: Create simple prompt without arguments
logging.info("π Test: Create simple prompt")
simple_prompt = {"name": f"greeting_{uuid.uuid4().hex[:8]}", "description": "Simple greeting prompt", "template": "Hello! Welcome to the MCP Gateway. How can I help you today?", "arguments": []}
r = request("POST", "/prompts", json_data=simple_prompt)
if r.status_code in (200, 201):
test_ctx.add_prompt(simple_prompt["name"])
test_ctx.record_test("prompt_create_simple", True)
else:
test_ctx.record_test("prompt_create_simple", False, r.text)
# Test 2: Create prompt with arguments
logging.info("π Test: Create prompt with arguments")
template_prompt = {
"name": f"code_review_{uuid.uuid4().hex[:8]}",
"description": "Code review prompt with parameters",
"template": "Please review the following {{ language }} code:\n\n```{{ language }}\n{{ code }}\n```\n\nFocus areas: {{ focus_areas }}",
"arguments": [
{"name": "language", "description": "Programming language", "required": True},
{"name": "code", "description": "Code to review", "required": True},
{"name": "focus_areas", "description": "Areas to focus on", "required": False},
],
}
r = request("POST", "/prompts", json_data=template_prompt)
if r.status_code in (200, 201):
test_ctx.add_prompt(template_prompt["name"])
test_ctx.record_test("prompt_create_template", True)
else:
test_ctx.record_test("prompt_create_template", False, r.text)
# Test 3: List prompts
logging.info("π Test: List prompts")
r = request("GET", "/prompts")
if r.status_code == 200:
prompts = r.json()
test_ctx.record_test("prompt_list", len(prompts) >= 2)
else:
test_ctx.record_test("prompt_list", False, r.text)
# Test 4: Execute prompt with parameters
if len(test_ctx.prompts) > 1:
logging.info("π Test: Execute prompt with parameters")
prompt_name = test_ctx.prompts[1] # Use template prompt
params = {"language": "python", "code": "def hello():\n print('Hello, World!')", "focus_areas": "code style and best practices"}
r = request("POST", f"/prompts/{prompt_name}", json_data=params)
if r.status_code == 200:
result = r.json()
# Check if messages array exists
test_ctx.record_test("prompt_execute", "messages" in result)
else:
test_ctx.record_test("prompt_execute", False, r.text)
# Test 5: Execute prompt without parameters
if test_ctx.prompts:
logging.info("π Test: Execute simple prompt")
r = request("POST", f"/prompts/{test_ctx.prompts[0]}", json_data={})
test_ctx.record_test("prompt_execute_simple", r.status_code == 200)
# Test 6: Delete prompt
if len(test_ctx.prompts) > 1:
logging.info("π Test: Delete prompt")
delete_name = test_ctx.prompts.pop()
r = request("DELETE", f"/prompts/{delete_name}")
test_ctx.record_test("prompt_delete", r.status_code in (200, 204))
logging.info("β
Prompt management testing completed")
def step_14_error_handling_validation():
"""Test error handling and input validation"""
log_section("Error Handling & Validation Testing", "π‘οΈ")
# Test 1: XSS in tool name
logging.info("π Test: XSS prevention in tool names")
xss_tool = {"name": "<script>alert('xss')</script>", "url": "https://example.com/api", "description": "Test XSS", "integrationType": "REST", "requestType": "GET"}
r = request("POST", "/tools", json_data=xss_tool)
test_ctx.record_test("validation_xss_tool_name", r.status_code in (400, 422))
# Test 2: SQL injection pattern
logging.info("π Test: SQL injection prevention")
sql_inject = {"name": "tool'; DROP TABLE tools; --", "url": "https://example.com", "description": "Test SQL injection", "integrationType": "REST", "requestType": "GET"}
r = request("POST", "/tools", json_data=sql_inject)
test_ctx.record_test("validation_sql_injection", r.status_code in (400, 422))
# Test 3: Invalid URL scheme
logging.info("π Test: Invalid URL scheme")
invalid_url = {"name": f"test_tool_{uuid.uuid4().hex[:8]}", "url": "javascript:alert(1)", "description": "Test invalid URL", "integrationType": "REST", "requestType": "GET"}
r = request("POST", "/tools", json_data=invalid_url)
test_ctx.record_test("validation_invalid_url", r.status_code in (400, 422))
# Test 4: Directory traversal in resource URI
logging.info("π Test: Directory traversal prevention")
traversal_resource = {"uri": "../../etc/passwd", "name": "Test traversal", "content": "test"}
r = request("POST", "/resources", json_data=traversal_resource)
test_ctx.record_test("validation_directory_traversal", r.status_code in (400, 422, 500))
# Test 5: Name too long (255+ chars)
logging.info("π Test: Name length validation")
long_name = {"name": "a" * 300, "url": "https://example.com", "description": "Test long name", "integrationType": "REST", "requestType": "GET"}
r = request("POST", "/tools", json_data=long_name)
test_ctx.record_test("validation_name_too_long", r.status_code in (400, 422))
# Test 6: Empty required fields
logging.info("π Test: Empty required fields")
empty_fields = {"name": "", "url": "https://example.com"}
r = request("POST", "/tools", json_data=empty_fields)
test_ctx.record_test("validation_empty_required", r.status_code in (400, 422))
# Test 7: Whitespace only in name
logging.info("π Test: Whitespace-only validation")
whitespace_only = {"name": " ", "url": "https://example.com", "description": "Test whitespace"}
r = request("POST", "/tools", json_data=whitespace_only)
test_ctx.record_test("validation_whitespace_only", r.status_code in (400, 422))
# Test 8: Invalid JSON-RPC request
logging.info("π Test: Malformed JSON-RPC request")
malformed_rpc = {"jsonrpc": "1.0", "method": "test", "id": "test"} # Wrong version
r = request("POST", "/rpc", json_data=malformed_rpc)
test_ctx.record_test("validation_invalid_jsonrpc", r.status_code != 200 or "error" in r.json())
# Test 9: Tool not found
logging.info("π Test: Tool not found error")
r = request("GET", f"/tools/{uuid.uuid4()}")
test_ctx.record_test("error_tool_not_found", r.status_code == 404)
# Test 10: Gateway not found
logging.info("π Test: Gateway not found error")
r = request("GET", "/gateways/99999")
test_ctx.record_test("error_gateway_not_found", r.status_code == 404)
logging.info("β
Error handling & validation testing completed")
def step_15_virtual_server_management():
"""Test virtual server creation and management"""
log_section("Virtual Server Management", "π₯οΈ")
# Get available tools first
tools = request("GET", "/tools").json()
if not tools:
logging.warning("β οΈ No tools available for virtual server testing")
test_ctx.record_test("virtual_server_skipped", False, "No tools available")
return
# Select time-related tools
time_tools = [t for t in tools if "time" in t["name"].lower()]
if not time_tools:
time_tools = tools[:2] # Just take first 2 tools
tool_ids = [t["id"] for t in time_tools[:3]] # Max 3 tools
# Test 1: Create virtual server
logging.info("π Test: Create virtual server")
virtual_server = {"name": f"time_utils_{uuid.uuid4().hex[:8]}", "description": "Time utilities virtual server", "associatedTools": tool_ids}
r = request("POST", "/servers", json_data=virtual_server)
if r.status_code in (200, 201):
server_data = r.json()
server_id = server_data["id"]
test_ctx.add_virtual_server(server_id)
test_ctx.record_test("virtual_server_create", True)
# Test 2: List virtual servers
logging.info("π Test: List virtual servers")
r = request("GET", "/servers")
if r.status_code == 200:
servers = r.json()
test_ctx.record_test("virtual_server_list", len(servers) >= 1)
else:
test_ctx.record_test("virtual_server_list", False, r.text)
# Test 3: Get specific virtual server
logging.info("π Test: Get virtual server details")
r = request("GET", f"/servers/{server_id}")
test_ctx.record_test("virtual_server_get", r.status_code == 200)
# Test 4: Test SSE endpoint (brief connection test)
logging.info("π Test: Virtual server SSE endpoint")
try:
# Just test that the endpoint exists and responds
# Third-Party
import requests
token = generate_jwt()
url = f"https://localhost:{PORT_GATEWAY}/servers/{server_id}/sse"
# Use stream=True to test SSE connection
with requests.get(url, headers={"Authorization": f"Bearer {token}"}, verify=False, stream=True, timeout=2) as r:
test_ctx.record_test("virtual_server_sse", r.status_code == 200)
except requests.Timeout:
# Timeout is OK for SSE - it means connection was established
test_ctx.record_test("virtual_server_sse", True)
except Exception as e:
test_ctx.record_test("virtual_server_sse", False, str(e))
# Test 5: Update virtual server
logging.info("π Test: Update virtual server")
update_data = {"description": "Updated time utilities server"}
r = request("PUT", f"/servers/{server_id}", json_data=update_data)
test_ctx.record_test("virtual_server_update", r.status_code in (200, 204))
else:
test_ctx.record_test("virtual_server_create", False, r.text)
logging.info("β
Virtual server management testing completed")
def step_16_test_summary():
"""Print test summary"""
log_section("Test Summary", "π")
summary = test_ctx.summary()
logging.info(summary)
# Show failed tests
failed_tests = [name for name, passed in test_ctx.test_results.items() if not passed]
if failed_tests:
logging.warning("\nβ Failed tests:")
for test_name in failed_tests:
error = test_ctx.error_messages.get(test_name, "No error message")
logging.warning(" - %s: %s", test_name, error)
else:
logging.info("\nβ
All additional tests passed!")
# Show entity counts
logging.info("\nπ¦ Created entities:")
logging.info(" - Gateways: %d", len(test_ctx.gateways))
logging.info(" - Resources: %d", len(test_ctx.resources))
logging.info(" - Prompts: %d", len(test_ctx.prompts))
logging.info(" - Tools: %d", len(test_ctx.tools))
logging.info(" - Virtual Servers: %d", len(test_ctx.virtual_servers))
# βββββββββββββββββββββββββββββ Step registry βββββββββββββββββββββββββββββ
StepFunc = Callable[..., None]
STEPS: List[Tuple[str, StepFunc]] = [
("setup_venv", step_1_setup_venv),
("pip_install", step_2_pip_install),
("docker_build", step_3_docker_build),
("docker_run", step_4_docker_run),
("start_time_server", step_5_start_time_server),
("register_gateway", step_6_register_gateway),
("verify_tools", step_7_verify_tools),
("invoke_tool", step_8_invoke_tool),
("version_health", step_9_version_health),
# New Phase 1 tests
("enhanced_tool_testing", step_11_enhanced_tool_testing),
("resource_management", step_12_resource_management),
("prompt_management", step_13_prompt_management),
("error_handling_validation", step_14_error_handling_validation),
("virtual_server_management", step_15_virtual_server_management),
("test_summary", step_16_test_summary),
("cleanup_gateway", step_10_cleanup_gateway),
]
# ββββββββββββββββββββββββββββββββ Main βββββββββββββββββββββββββββββββββββ
def main():
ap = argparse.ArgumentParser(description="MCP Gateway smoke-test")
ap.add_argument("-v", "--verbose", action="store_true")
ap.add_argument("--tail", type=int, default=10, help="Tail window (default 10)")
ap.add_argument("--start-step", type=int, default=1)
ap.add_argument("--end-step", type=int)
ap.add_argument("--only-steps", help="Comma separated indices (1-based)")
ap.add_argument("--cleanup-only", action="store_true")
ap.add_argument("--restart-time-server", action="store_true")
args = ap.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%H:%M:%S",