Skip to content

Commit 7aabba8

Browse files
feat: Update registry main application and server service
1 parent f267704 commit 7aabba8

File tree

2 files changed

+79
-1
lines changed

2 files changed

+79
-1
lines changed

registry/main.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
import logging
1111
from contextlib import asynccontextmanager
12+
from typing import Annotated
1213

13-
from fastapi import FastAPI
14+
from fastapi import FastAPI, Cookie
1415
from fastapi.staticfiles import StaticFiles
1516
from fastapi.templating import Jinja2Templates
17+
from fastapi.responses import RedirectResponse
1618

1719
# Import domain routers
1820
from registry.auth.routes import router as auth_router
@@ -145,6 +147,9 @@ async def health_check():
145147
return {"status": "healthy", "service": "mcp-gateway-registry"}
146148

147149

150+
151+
152+
148153
if __name__ == "__main__":
149154
import uvicorn
150155
uvicorn.run(

registry/services/server_service.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,79 @@ def get_all_servers(self) -> Dict[str, Dict[str, Any]]:
221221
"""Get all registered servers."""
222222
return self.registered_servers.copy()
223223

224+
def get_filtered_servers(self, accessible_servers: List[str]) -> Dict[str, Dict[str, Any]]:
225+
"""
226+
Get servers filtered by user's accessible servers list.
227+
228+
Args:
229+
accessible_servers: List of server names the user can access
230+
231+
Returns:
232+
Dict of servers the user is authorized to see
233+
"""
234+
if not accessible_servers:
235+
logger.debug("User has no accessible servers, returning empty dict")
236+
return {}
237+
238+
logger.info(f"DEBUG: get_filtered_servers called with accessible_servers: {accessible_servers}")
239+
logger.info(f"DEBUG: Available registered servers paths: {list(self.registered_servers.keys())}")
240+
241+
filtered_servers = {}
242+
for path, server_info in self.registered_servers.items():
243+
server_name = server_info.get("server_name", "")
244+
# Extract technical name from path (remove leading slash)
245+
technical_name = path.lstrip('/')
246+
logger.info(f"DEBUG: Checking server path='{path}', server_name='{server_name}', technical_name='{technical_name}' against accessible_servers")
247+
248+
# Check if user has access to this server using technical name
249+
if technical_name in accessible_servers:
250+
filtered_servers[path] = server_info
251+
logger.info(f"DEBUG: ✓ User has access to server: {technical_name} ({server_name})")
252+
else:
253+
logger.info(f"DEBUG: ✗ User does not have access to server: {technical_name} ({server_name})")
254+
255+
logger.info(f"Filtered {len(filtered_servers)} servers from {len(self.registered_servers)} total servers")
256+
return filtered_servers
257+
258+
def get_all_servers_with_permissions(self, accessible_servers: Optional[List[str]] = None) -> Dict[str, Dict[str, Any]]:
259+
"""
260+
Get servers with optional filtering based on user permissions.
261+
262+
Args:
263+
accessible_servers: Optional list of server names the user can access.
264+
If None, returns all servers (admin access).
265+
266+
Returns:
267+
Dict of servers the user is authorized to see
268+
"""
269+
if accessible_servers is None:
270+
# Admin access - return all servers
271+
logger.debug("Admin access - returning all servers")
272+
return self.get_all_servers()
273+
else:
274+
# Filtered access - return only accessible servers
275+
logger.debug(f"Filtered access - returning servers accessible to user: {accessible_servers}")
276+
return self.get_filtered_servers(accessible_servers)
277+
278+
def user_can_access_server_path(self, path: str, accessible_servers: List[str]) -> bool:
279+
"""
280+
Check if user can access a specific server by path.
281+
282+
Args:
283+
path: Server path to check
284+
accessible_servers: List of server names the user can access
285+
286+
Returns:
287+
True if user can access the server, False otherwise
288+
"""
289+
server_info = self.get_server_info(path)
290+
if not server_info:
291+
return False
292+
293+
# Extract technical name from path (remove leading slash)
294+
technical_name = path.lstrip('/')
295+
return technical_name in accessible_servers
296+
224297
def is_service_enabled(self, path: str) -> bool:
225298
"""Check if a service is enabled."""
226299
result = self.service_state.get(path, False)

0 commit comments

Comments
 (0)