11import os
22from typing import Dict , Any , List , Optional
3+ import datetime
34
5+ import datetime
46import re
57import asyncio
8+ import re
69
710from dotenv import load_dotenv
811from fastmcp import FastMCP , Context
912from fastmcp .prompts .prompt import Message
1013from fastmcp .utilities .logging import configure_logging , get_logger
14+ from fastmcp .exceptions import ToolError
1115from composio import Composio
1216from main .config import COMPOSIO_API_KEY
1317
@@ -62,58 +66,92 @@ async def _execute_tool(ctx: Context, action_name: str, **kwargs) -> Dict[str, A
6266 connection_id = await auth .get_composio_connection_id (user_id , "gmail" )
6367
6468 # NEW: Fetch user info including privacy filters
65- user_info = await auth .get_user_info (user_id )
66- privacy_filters = user_info .get ("privacy_filters" , {})
67- keyword_filters = privacy_filters .get ("keywords" , [])
68- email_filters = [email .lower () for email in privacy_filters .get ("emails" , [])]
69- label_filters = [label .lower () for label in privacy_filters .get ("labels" , [])]
70-
7169 # Composio's execute method is synchronous, so we use asyncio.to_thread
7270 result = await asyncio .to_thread (
7371 composio .tools .execute ,
7472 action_name ,
75- arguments = kwargs , # Changed from params to arguments
73+ arguments = kwargs ,
7674 connected_account_id = connection_id
7775 )
78-
79- # NEW: Apply privacy filters if the action is fetching emails
80- if action_name == "GMAIL_FETCH_EMAILS" :
81- if result .get ("successful" ) and result .get ("data" ):
82- emails = result ["data" ]
83- filtered_emails = []
84- for email in emails :
85- # The data from Composio is already simplified
86- subject = email .get ("subject" , "" ).lower ()
87- snippet = email .get ("snippet" , "" ).lower ()
88- content_to_check = f"{ subject } { snippet } "
89-
90- # Keyword check
91- if any (word .lower () in content_to_check for word in keyword_filters ):
92- logger .info (f"Filtering email '{ subject } ' due to keyword match." )
93- continue
94-
95- # Sender email check
96- sender_email = email .get ("sender_email" , "" ).lower ()
97- if any (blocked_email in sender_email for blocked_email in email_filters ):
98- logger .info (f"Filtering email '{ subject } ' due to sender email match." )
99- continue
100-
101- # Label check
102- email_labels = [label .lower () for label in email .get ("labels" , [])]
103- if any (blocked_label in email_labels for blocked_label in label_filters ):
104- logger .info (f"Filtering email '{ subject } ' due to label match." )
105- continue
106-
107- filtered_emails .append (email )
108-
109- result ["data" ] = filtered_emails
110- logger .info (f"Applied privacy filters. Kept { len (filtered_emails )} out of { len (emails )} emails." )
111-
112- return {"status" : "success" , "result" : result }
76+
77+ if not result .get ("successful" ):
78+ raise ToolError (f"Composio action '{ action_name } ' failed: { result .get ('error' , 'Unknown error' )} " )
79+
80+ data_payload = result .get ("data" )
81+
82+ # Apply privacy filters and simplify if the action is fetching emails
83+ if action_name == "GMAIL_FETCH_EMAILS" and isinstance (data_payload , list ):
84+ user_info = await auth .get_user_info (user_id )
85+ privacy_filters = user_info .get ("privacy_filters" , {})
86+ keyword_filters = privacy_filters .get ("keywords" , [])
87+ email_filters = [email .lower () for email in privacy_filters .get ("emails" , [])]
88+ label_filters = [label .lower () for label in privacy_filters .get ("labels" , [])]
89+ logger .info (f"Applying privacy filters for user { user_id } : Keywords={ len (keyword_filters )} , Emails={ len (email_filters )} , Labels={ len (label_filters )} " )
90+
91+ emails = data_payload
92+ filtered_emails = []
93+ for email in emails :
94+ if not isinstance (email , dict ):
95+ logger .warning (f"Skipping non-dictionary item in email list: { type (email )} " )
96+ continue
97+
98+ # Correctly extract fields for filtering based on the sample response
99+ subject = email .get ("subject" , "" )
100+ body = email .get ("messageText" , "" ) # Use the full text body
101+ content_to_check = f"{ subject } { body } " .lower ()
102+
103+ if any (word .lower () in content_to_check for word in keyword_filters ):
104+ logger .info (f"Filtering email '{ subject } ' due to keyword match." )
105+ continue
106+
107+ sender_email = _extract_email_from_sender (email .get ("sender" , "" ))
108+ if any (blocked_email in sender_email for blocked_email in email_filters ):
109+ logger .info (f"Filtering email '{ subject } ' due to sender email match." )
110+ continue
111+
112+ email_labels = [label .lower () for label in email .get ("labelIds" , [])]
113+ if any (blocked_label in email_labels for blocked_label in label_filters ):
114+ logger .info (f"Filtering email '{ subject } ' due to label match." )
115+ continue
116+
117+ filtered_emails .append (email )
118+
119+ logger .info (f"Applied privacy filters. Kept { len (filtered_emails )} out of { len (emails )} emails." )
120+
121+ # Now, simplify the filtered emails to provide a clean, useful structure to the LLM
122+ simplified_emails = [
123+ {
124+ # Keys requested by the user
125+ "attachmentList" : email .get ('attachmentList' , []),
126+ "labelIds" : email .get ('labelIds' , []),
127+ "messageId" : email .get ('messageId' ),
128+ "messageText" : email .get ('messageText' , '' ),
129+ "messageTimestamp" : email .get ('messageTimestamp' ),
130+ # Also include a few other highly relevant fields for context
131+ "sender" : email .get ('sender' ),
132+ "subject" : email .get ('subject' , '' ),
133+ "snippet" : email .get ('preview' , {}).get ('body' , '' ),
134+ "threadId" : email .get ('threadId' ),
135+ } for email in filtered_emails
136+ ]
137+
138+ # The final result for the agent should be a dictionary containing the list of simplified emails
139+ return {"status" : "success" , "result" : {"messages" : simplified_emails }}
140+
141+ # For all other actions, just return the data payload
142+ return {"status" : "success" , "result" : data_payload }
113143 except Exception as e :
114144 logger .error (f"Tool execution failed for action '{ action_name } ': { e } " , exc_info = True )
115145 return {"status" : "failure" , "error" : str (e )}
116146
147+ def _extract_email_from_sender (sender_string : str ) -> str :
148+ """Extracts email from 'Name <[email protected] >' format.""" 149+ if not isinstance (sender_string , str ):
150+ return ""
151+ match = re .search (r'<(.+?)>' , sender_string )
152+ if match :
153+ return match .group (1 ).lower ()
154+ return sender_string .lower ()
117155
118156# --- Async Tool Definitions ---
119157
@@ -131,16 +169,20 @@ async def replyToEmail(ctx: Context, message_id: str, body: str, reply_all: bool
131169 return {"status" : "failure" , "error" : "Replying directly by message_id is not supported. Please find the thread_id and use that." }
132170
133171@mcp .tool ()
134- async def getLatestEmails (ctx : Context , max_results : int = 10 ) -> Dict [str , Any ]:
135- """Retrieve the most recent email messages from your inbox, sorted by date received."""
136- logger .info (f"Executing tool: getLatestEmails with max_results={ max_results } " )
137- return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = "in:inbox" , max_results = max_results )
172+ async def getLatestEmails (ctx : Context , max_results : int = 10 , inbox_type : str = "primary" ) -> Dict [str , Any ]:
173+ """Retrieve the most recent email messages from your inbox, sorted by date received. Can specify inbox_type: 'primary', 'social', 'promotions', 'updates', 'forums'."""
174+ logger .info (f"Executing tool: getLatestEmails with max_results={ max_results } , inbox_type='{ inbox_type } '" )
175+ timestamp_48h_ago = int ((datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 48 )).timestamp ())
176+ query = f"in:inbox category:{ inbox_type } after:{ timestamp_48h_ago } "
177+ return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = query , max_results = max_results )
138178
139179@mcp .tool ()
140- async def getUnreadEmails (ctx : Context , max_results : int = 10 ) -> Dict [str , Any ]:
141- """Retrieve unread email messages from your inbox."""
142- logger .info (f"Executing tool: getUnreadEmails with max_results={ max_results } " )
143- return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = "is:unread in:inbox" , max_results = max_results )
180+ async def getUnreadEmails (ctx : Context , max_results : int = 10 , inbox_type : str = "primary" ) -> Dict [str , Any ]:
181+ """Retrieve unread email messages from your inbox. Can specify inbox_type: 'primary', 'social', 'promotions', 'updates', 'forums'."""
182+ logger .info (f"Executing tool: getUnreadEmails with max_results={ max_results } , inbox_type='{ inbox_type } '" )
183+ timestamp_48h_ago = int ((datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 48 )).timestamp ())
184+ query = f"is:unread in:inbox category:{ inbox_type } after:{ timestamp_48h_ago } "
185+ return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = query , max_results = max_results )
144186
145187@mcp .tool ()
146188async def createLabel (ctx : Context , name : str ) -> Dict [str , Any ]:
@@ -196,29 +238,19 @@ async def searchInFolder(ctx: Context, folder_name: str, max_results: int = 10)
196238 logger .info (f"Executing tool: searchInFolder for folder='{ folder_name } '" )
197239 return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = f"in:{ folder_name } " , max_results = max_results )
198240
199- @mcp .tool ()
200- async def createFilter (ctx : Context , from_email : Optional [str ] = None , to_email : Optional [str ] = None , subject : Optional [str ] = None , add_label_id : Optional [str ] = None , remove_label_ids : Optional [List [str ]] = None ) -> Dict [str , Any ]:
201- """Create a new Gmail filter that automatically applies actions to incoming messages."""
202- logger .info (f"Executing tool: createFilter" )
203- return {"status" : "failure" , "error" : "Creating filters is not currently supported via this interface." }
204-
205- @mcp .tool ()
206- async def deleteFilter (ctx : Context , filter_id : str ) -> Dict [str , Any ]:
207- """Delete a Gmail filter."""
208- logger .info (f"Executing tool: deleteFilter with filter_id='{ filter_id } '" )
209- return {"status" : "failure" , "error" : "Deleting filters is not currently supported." }
210-
211241@mcp .tool ()
212242async def cancelScheduled (ctx : Context , message_id : str ) -> Dict [str , Any ]:
213243 """Cancel a scheduled email. This is done by moving the email to trash."""
214244 logger .info (f"Executing tool: cancelScheduled for message_id='{ message_id } '" )
215245 return await _execute_tool (ctx , "GMAIL_MOVE_TO_TRASH" , message_id = message_id )
216246
217247@mcp .tool ()
218- async def catchup (ctx : Context ) -> Dict [str , Any ]:
219- """Get a quick compact summary of all unread emails from your primary inbox."""
220- logger .info ("Executing tool: catchup" )
221- return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = "is:unread in:inbox" , max_results = 20 )
248+ async def catchup (ctx : Context , inbox_type : str = "primary" ) -> Dict [str , Any ]:
249+ """Get a quick compact summary of all unread emails from your inbox. Can specify inbox_type: 'primary', 'social', 'promotions', 'updates', 'forums'."""
250+ logger .info (f"Executing tool: catchup for inbox_type='{ inbox_type } '" )
251+ timestamp_48h_ago = int ((datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 48 )).timestamp ())
252+ query = f"is:unread in:inbox category:{ inbox_type } after:{ timestamp_48h_ago } "
253+ return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = query , max_results = 20 )
222254
223255@mcp .tool ()
224256async def readEmail (ctx : Context , message_id : str ) -> Dict [str , Any ]:
@@ -256,12 +288,6 @@ async def removeLabels(ctx: Context, message_id: str, label_ids: List[str]) -> D
256288 logger .info (f"Executing tool: removeLabels from message_id='{ message_id } '" )
257289 return await _execute_tool (ctx , "GMAIL_ADD_LABEL_TO_EMAIL" , message_id = message_id , remove_label_ids = label_ids )
258290
259- @mcp .tool ()
260- async def updateDraft (ctx : Context , draft_id : str , to : Optional [str ] = None , subject : Optional [str ] = None , body : Optional [str ] = None ) -> Dict [str , Any ]:
261- """Update an existing draft email with new content."""
262- logger .info (f"Executing tool: updateDraft for draft_id='{ draft_id } '" )
263- return {"status" : "failure" , "error" : "Updating drafts is not currently supported." }
264-
265291@mcp .tool ()
266292async def deleteDraft (ctx : Context , draft_id : str ) -> Dict [str , Any ]:
267293 """Delete a saved draft email."""
@@ -296,30 +322,6 @@ async def searchBySize(ctx: Context, size_mb: int, comparison: str = "larger", m
296322 logger .info (f"Executing tool: searchBySize with size_mb={ size_mb } " )
297323 return await _execute_tool (ctx , "GMAIL_FETCH_EMAILS" , query = f"size:{ size_mb } m" , max_results = max_results )
298324
299- @mcp .tool ()
300- async def forwardEmail (ctx : Context , message_id : str , to : str ) -> Dict [str , Any ]:
301- """Forward an existing email message to new recipients."""
302- logger .info (f"Executing tool: forwardEmail for message_id='{ message_id } ' to='{ to } '" )
303- return {"status" : "failure" , "error" : "Forwarding emails is not currently supported." }
304-
305- @mcp .tool ()
306- async def listFilters (ctx : Context ) -> Dict [str , Any ]:
307- """List all Gmail filters in the user's account."""
308- logger .info ("Executing tool: listFilters" )
309- return {"status" : "failure" , "error" : "Listing filters is not currently supported." }
310-
311- @mcp .tool ()
312- async def scheduleEmail (ctx : Context , to : str , subject : str , body : str , send_at_iso : str ) -> Dict [str , Any ]:
313- """Create an email to be sent at a specified future time (ISO 8601 format)."""
314- logger .info (f"Executing tool: scheduleEmail to='{ to } ' at '{ send_at_iso } '" )
315- return {"status" : "failure" , "error" : "Scheduling emails is not currently supported." }
316-
317- @mcp .tool ()
318- async def listScheduled (ctx : Context ) -> Dict [str , Any ]:
319- """List all scheduled emails. (Simulated)"""
320- logger .info ("Executing tool: listScheduled" )
321- return {"status" : "failure" , "error" : "Listing scheduled emails is not currently supported." }
322-
323325# --- Server Execution ---
324326if __name__ == "__main__" :
325327 host = os .getenv ("MCP_SERVER_HOST" , "127.0.0.1" )
0 commit comments