diff --git a/README.md b/README.md index 280301e..544b3ac 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Right now, this MCP server supports Gmail and Calendar integration with the foll + Custom timezone support + Notification preferences * Delete calendar events +* List calendar colors Example prompts you can try: @@ -121,7 +122,7 @@ On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
Development/Unpublished Servers Configuration - + ```json { @@ -166,7 +167,7 @@ Note: You can also use the `uv run mcp-gsuite --accounts-file /path/to/custom/.a
Published Servers Configuration - + ```json { diff --git a/src/mcp_gsuite/calendar.py b/src/mcp_gsuite/calendar.py index 4548831..18c9887 100644 --- a/src/mcp_gsuite/calendar.py +++ b/src/mcp_gsuite/calendar.py @@ -189,4 +189,20 @@ def delete_event(self, event_id: str, send_notifications: bool = True, calendar_ except Exception as e: logging.error(f"Error deleting calendar event {event_id}: {str(e)}") logging.error(traceback.format_exc()) - return False \ No newline at end of file + return False + + def list_colors(self) -> dict | None: + """ + Lists the available colors for calendar and events. + + Returns: + dict: A dictionary containing 'event' and 'calendar' color definitions if successful. + None: If an error occurred. + """ + try: + colors = self.service.colors().get().execute() + return colors + except Exception as e: + logging.error("Error while listing colors:") + logging.error(traceback.format_exc()) + return None \ No newline at end of file diff --git a/src/mcp_gsuite/gauth.py b/src/mcp_gsuite/gauth.py index 693470e..52ed646 100644 --- a/src/mcp_gsuite/gauth.py +++ b/src/mcp_gsuite/gauth.py @@ -12,6 +12,7 @@ import pydantic import json import argparse +from urllib.parse import urlparse def get_gauth_file() -> str: @@ -28,7 +29,32 @@ def get_gauth_file() -> str: CLIENTSECRETS_LOCATION = get_gauth_file() -REDIRECT_URI = 'http://localhost:4100/code' +def get_redirect_uri() -> str: + with open(get_gauth_file(), "r") as f: + data = json.load(f) + return data["web"]["redirect_uris"][0] + +def extract_redirect_uri_port() -> int: + try: + redirect_uri = get_redirect_uri() + if not redirect_uri: + logging.warning("Redirect URI is empty. Defaulting to port 4100.") + return 4100 + + parsed = urlparse(redirect_uri) + if parsed.scheme not in ("http", "https"): + logging.warning(f"Unexpected URI scheme: {parsed.scheme}. Defaulting to port 4100.") + return 4100 + + if parsed.port: + return parsed.port + + except Exception as e: + logging.error(f"Failed to extract port from redirect URI: {e}") + + return 4100 + +REDIRECT_URI = get_redirect_uri() SCOPES = [ "openid", "https://www.googleapis.com/auth/userinfo.email", @@ -140,7 +166,7 @@ def store_credentials(credentials: OAuth2Credentials, user_id: str): """Store OAuth 2.0 credentials in the specified directory.""" cred_file_path = _get_credential_filename(user_id=user_id) os.makedirs(os.path.dirname(cred_file_path), exist_ok=True) - + data = credentials.to_json() with open(cred_file_path, "w") as f: f.write(data) @@ -236,7 +262,7 @@ def get_credentials(authorization_code, state): import json logging.error(f"user_info: {json.dumps(user_info)}") email_address = user_info.get('email') - + if credentials.refresh_token is not None: store_credentials(credentials, user_id=email_address) return credentials @@ -256,4 +282,3 @@ def get_credentials(authorization_code, state): # No refresh token has been retrieved. authorization_url = get_authorization_url(email_address, state) raise NoRefreshTokenException(authorization_url) - diff --git a/src/mcp_gsuite/server.py b/src/mcp_gsuite/server.py index c217a3e..2f97e4f 100644 --- a/src/mcp_gsuite/server.py +++ b/src/mcp_gsuite/server.py @@ -51,6 +51,7 @@ def do_GET(self): + load_dotenv() from . import tools_gmail @@ -72,8 +73,7 @@ def start_auth_flow(user_id: str): import webbrowser webbrowser.open(auth_url) - # start server for code callback - server_address = ('', 4100) + server_address = ('', gauth.extract_redirect_uri_port()) server = HTTPServer(server_address, OauthListener) server.serve_forever() @@ -109,7 +109,7 @@ def add_tool_handler(tool_class: toolhandler.ToolHandler): def get_tool_handler(name: str) -> toolhandler.ToolHandler | None: if name not in tool_handlers: return None - + return tool_handlers[name] add_tool_handler(tools_gmail.QueryEmailsToolHandler()) @@ -125,6 +125,7 @@ def get_tool_handler(name: str) -> toolhandler.ToolHandler | None: add_tool_handler(tools_calendar.GetCalendarEventsToolHandler()) add_tool_handler(tools_calendar.CreateCalendarEventToolHandler()) add_tool_handler(tools_calendar.DeleteCalendarEventToolHandler()) +add_tool_handler(tools_calendar.ListCalendarEventColorToolHandler()) @app.list_tools() async def list_tools() -> list[Tool]: @@ -135,10 +136,10 @@ async def list_tools() -> list[Tool]: @app.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]: - try: + try: if not isinstance(arguments, dict): raise RuntimeError("arguments must be dictionary") - + if toolhandler.USER_ID_ARG not in arguments: raise RuntimeError("user_id argument is missing in dictionary.") @@ -156,7 +157,7 @@ async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageCo async def main(): - print(sys.platform) + logging.info(f"Platform: {sys.platform}") accounts = gauth.get_account_info() for account in accounts: creds = gauth.get_stored_credentials(user_id=account.email) @@ -170,4 +171,4 @@ async def main(): read_stream, write_stream, app.create_initialization_options() - ) \ No newline at end of file + ) diff --git a/src/mcp_gsuite/tools_calendar.py b/src/mcp_gsuite/tools_calendar.py index 1a63551..8daa839 100644 --- a/src/mcp_gsuite/tools_calendar.py +++ b/src/mcp_gsuite/tools_calendar.py @@ -17,8 +17,8 @@ def get_calendar_id_arg_schema() -> dict[str, str]: return { "type": "string", "description": """Optional ID of the specific agenda for which you are executing this action. - If not provided, the default calendar is being used. - If not known, the specific calendar id can be retrieved with the list_calendars tool""", + If not provided, the default calendar is being used. + If not known, the specific calendar id can be retrieved with the list_calendars tool""", "default": "primary" } @@ -30,7 +30,7 @@ def __init__(self): def get_tool_description(self) -> Tool: return Tool( name=self.name, - description="""Lists all calendars accessible by the user. + description="""Lists all calendars accessible by the user. Call it before any other tool whenever the user specifies a particular agenda (Family, Holidays, etc.).""", inputSchema={ "type": "object", @@ -74,7 +74,7 @@ def get_tool_description(self) -> Tool: "description": "Start time in RFC3339 format (e.g. 2024-12-01T00:00:00Z). Defaults to current time if not specified." }, "time_max": { - "type": "string", + "type": "string", "description": "End time in RFC3339 format (e.g. 2024-12-31T23:59:59Z). Optional." }, "max_results": { @@ -95,11 +95,11 @@ def get_tool_description(self) -> Tool: ) def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: - + user_id = args.get(toolhandler.USER_ID_ARG) if not user_id: raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}") - + calendar_service = calendar.CalendarService(user_id=user_id) events = calendar_service.get_events( time_min=args.get('time_min'), @@ -199,7 +199,7 @@ def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | Embedded text=json.dumps(event, indent=2) ) ] - + class DeleteCalendarEventToolHandler(toolhandler.ToolHandler): def __init__(self): super().__init__("delete_calendar_event") @@ -230,7 +230,7 @@ def get_tool_description(self) -> Tool: def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: if "event_id" not in args: raise RuntimeError("Missing required argument: event_id") - + user_id = args.get(toolhandler.USER_ID_ARG) if not user_id: raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}") @@ -250,4 +250,37 @@ def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | Embedded "message": "Event successfully deleted" if success else "Failed to delete event" }, indent=2) ) + ] + +class ListCalendarEventColorToolHandler(toolhandler.ToolHandler): + def __init__(self): + super().__init__("list_calendar_event_color") + + def get_tool_description(self) -> Tool: + return Tool( + name=self.name, + description="Lists available event colors from the user's Google Calendar by its event ID.", + inputSchema={ + "type": "object", + "properties": { + "__user_id__": self.get_user_id_arg_schema() + }, + "required": [toolhandler.USER_ID_ARG] + } + ) + + def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: + user_id = args.get(toolhandler.USER_ID_ARG) + + if not user_id: + raise RuntimeError(f"Missing required argument: {toolhandler.USER_ID_ARG}") + + calendar_service = calendar.CalendarService(user_id=user_id) + colors = calendar_service.list_colors() + + return [ + TextContent( + type="text", + text=json.dumps(colors, indent=2) + ) ] \ No newline at end of file