11# server/mcp-hub/gmail/helpers.py
22
3+ import os
34import base64
45from email .mime .text import MIMEText
56from typing import Dict , Any , List
6- import httpx
7- from sentence_transformers import SentenceTransformer , util
8- from urllib .parse import quote
7+
8+ from google import genai
9+ from google .genai import types
10+ import numpy as np
11+ from dotenv import load_dotenv
912from googleapiclient .discovery import Resource
1013from googleapiclient .errors import HttpError
1114
15+ # Load API key
16+ load_dotenv ()
17+ GEMINI_API_KEY = os .getenv ("GEMINI_API_KEY" )
18+
19+ # Initialize the Gemini client
20+ client = genai .Client (api_key = GEMINI_API_KEY )
21+
1222
1323def extract_email_body (payload : Dict [str , Any ]) -> str :
1424 """
1525 Recursively extracts the body of an email from its payload.
16- It prioritizes 'text/plain', then 'text/html'.
17-
18- Args:
19- payload (Dict[str, Any]): The payload of a Gmail message.
20-
21- Returns:
22- str: The decoded email body.
26+ Prefers 'text/plain' over 'text/html'.
2327 """
2428 if "parts" in payload :
25- # It's a multipart message, iterate through parts
26- text_plain_content = ""
27- text_html_content = ""
29+ text_plain , text_html = "" , ""
2830 for part in payload ["parts" ]:
31+ data = part .get ("body" , {}).get ("data" )
32+ if not data :
33+ continue
34+ decoded = base64 .urlsafe_b64decode (data ).decode ("utf-8" )
2935 if part ["mimeType" ] == "text/plain" :
30- text_plain_content += base64 . urlsafe_b64decode ( part [ "body" ][ "data" ]). decode ( "utf-8" )
36+ text_plain += decoded
3137 elif part ["mimeType" ] == "text/html" :
32- text_html_content += base64 .urlsafe_b64decode (part ["body" ]["data" ]).decode ("utf-8" )
33-
34- # Prefer plain text over HTML
35- return text_plain_content if text_plain_content else text_html_content
36- elif "body" in payload and "data" in payload ["body" ]:
37- # Single part message
38+ text_html += decoded
39+ return text_plain or text_html
40+
41+ if payload .get ("body" , {}).get ("data" ):
3842 return base64 .urlsafe_b64decode (payload ["body" ]["data" ]).decode ("utf-8" )
39-
43+
4044 return ""
4145
4246
4347async def create_message (to : str , subject : str , message : str ) -> str :
4448 """
45- Creates a MIME message for an email and encodes it.
46-
47- Args:
48- to (str): Recipient email address.
49- subject (str): Email subject.
50- message (str): Email body text.
51-
52- Returns:
53- str: Raw, URL-safe base64 encoded MIME message.
49+ Creates and base64-encodes a MIME email.
5450 """
55- try :
56- # The external elaborator service call can be added back here if needed
57- # For now, we use the message directly for simplicity.
58-
59- # Elaborator call example (if you run the service):
60- # async with httpx.AsyncClient() as client:
61- # response = await client.post("http://localhost:5000/elaborator", json={...})
62- # elaborated_message = response.json().get("message", message)
63-
64- msg = MIMEText (message )
65- msg ["To" ] = to
66- msg ["Subject" ] = subject
67- return base64 .urlsafe_b64encode (msg .as_bytes ()).decode ()
68- except Exception as error :
69- raise Exception (f"Error creating message: { error } " )
51+ msg = MIMEText (message )
52+ msg ["To" ] = to
53+ msg ["Subject" ] = subject
54+ return base64 .urlsafe_b64encode (msg .as_bytes ()).decode ("utf-8" )
7055
7156
7257async def find_best_matching_email (service : Resource , query : str ) -> Dict [str , Any ]:
7358 """
74- Searches inbox and finds the best matching email based on semantic similarity.
75-
76- Args:
77- service (Resource): Authenticated Gmail API service.
78- query (str): The query string to compare against email content.
79-
80- Returns:
81- Dict[str, Any]: A dictionary containing the status and details of the best match.
59+ Searches the user's inbox and returns the email whose subject+body
60+ is most semantically similar to the provided query, using Gemini embeddings.
8261 """
8362 try :
84- results = service . users (). messages (). list ( userId = "me" , q = "in:inbox" ). execute ()
85- messages = results . get ( " messages" , [] )
86-
87- if not messages :
88- return {"status" : "failure" , "error" : "No recent emails found in inbox." }
63+ # 1) List recent messages
64+ resp = service . users (). messages (). list ( userId = "me" , q = "in:inbox" ). execute ( )
65+ msgs = resp . get ( "messages" , [])
66+ if not msgs :
67+ return {"status" : "failure" , "error" : "No emails found in inbox." }
8968
69+ # 2) Fetch full details for up to 20 messages
9070 email_data : List [Dict [str , Any ]] = []
91- for message in messages [:20 ]: # Search more messages for better matching
92- msg = service .users ().messages ().get (userId = "me" , id = message ["id" ], format = "full" ).execute ()
93- headers = {h ["name" ]: h ["value" ] for h in msg .get ("payload" , {}).get ("headers" , [])}
94- email_body = extract_email_body (msg .get ("payload" , {}))
95-
71+ for m in msgs [:20 ]:
72+ msg = service .users ().messages ().get (
73+ userId = "me" , id = m ["id" ], format = "full"
74+ ).execute ()
75+ headers = {h ["name" ]: h ["value" ] for h in msg ["payload" ].get ("headers" , [])}
76+ body = extract_email_body (msg ["payload" ])
9677 email_data .append ({
97- "id" : message ["id" ],
78+ "id" : m ["id" ],
9879 "threadId" : msg .get ("threadId" ),
99- "subject" : headers .get ("Subject" , "No Subject " ),
100- "from" : headers .get ("From" , "Unknown Sender " ),
80+ "subject" : headers .get ("Subject" , "" ),
81+ "from" : headers .get ("From" , "" ),
10182 "to" : headers .get ("To" ),
102- "reply_to" : headers .get ("Reply-To" ),
103- "message_id_header" : headers .get ("Message-ID" ),
104- "snippet" : msg .get ("snippet" , "" ),
105- "body" : email_body ,
83+ "body" : body ,
10684 })
10785
108- model = SentenceTransformer ("all-MiniLM-L6-v2" )
109- query_embedding = model .encode (query , convert_to_tensor = True )
110- email_embeddings = model .encode ([e ["subject" ] + " " + e ["body" ] for e in email_data ], convert_to_tensor = True )
111-
112- scores = util .pytorch_cos_sim (query_embedding , email_embeddings )[0 ]
113- best_match_index = scores .argmax ().item ()
114- best_email = email_data [best_match_index ]
115-
116- return {"status" : "success" , "email_details" : best_email }
117-
118- except HttpError as error :
119- return {"status" : "failure" , "error" : f"Google API Error: { error } " }
120- except Exception as error :
121- return {"status" : "failure" , "error" : str (error )}
86+ # 3) Prepare the texts to embed: query + each email's subject+body
87+ docs = [f'{ e ["subject" ]} { e ["body" ]} ' for e in email_data ]
88+
89+ # 4) Call Gemini embed_content in a single batch for all texts
90+ # This returns a list of ContentEmbedding objects in resp.embeddings
91+ # We extract the .values list from each one.
92+ all_texts = [query ] + docs
93+ resp = client .models .embed_content (
94+ model = "gemini-embedding-exp-03-07" ,
95+ contents = all_texts ,
96+ config = types .EmbedContentConfig (task_type = "SEMANTIC_SIMILARITY" )
97+ )
98+ embeddings = [ce .values for ce in resp .embeddings ]
99+
100+ # 5) Separate query embedding and doc embeddings
101+ q_emb = np .array (embeddings [0 ])
102+ doc_embs = np .array (embeddings [1 :])
103+
104+ # 6) Compute cosine similarities
105+ norms = np .linalg .norm (doc_embs , axis = 1 ) * np .linalg .norm (q_emb )
106+ scores = (doc_embs @ q_emb ) / norms
107+ best_idx = int (np .argmax (scores ))
108+
109+ # 7) Return the best matching email
110+ return {
111+ "status" : "success" ,
112+ "email_details" : email_data [best_idx ]
113+ }
114+
115+ except HttpError as e :
116+ return {"status" : "failure" , "error" : f"Google API Error: { e } " }
117+ except Exception as e :
118+ return {"status" : "failure" , "error" : str (e )}
0 commit comments