1+ import os
2+ import json
3+ import base64
4+ from typing import Dict , Optional
5+
6+ from cryptography .hazmat .primitives .ciphers import Cipher , algorithms , modes
7+ from cryptography .hazmat .primitives import padding
8+ from cryptography .hazmat .backends import default_backend
9+ import motor .motor_asyncio
10+ from google .oauth2 .credentials import Credentials
11+ from google .oauth2 import service_account
12+
13+ from fastmcp import Context
14+ from fastmcp .exceptions import ToolError
15+ from dotenv import load_dotenv
16+
17+ dotenv_path = os .path .join (os .path .dirname (__file__ ), '..' , '..' , '.env' )
18+ load_dotenv (dotenv_path = dotenv_path )
19+
20+ # --- Config ---
21+ MONGO_URI = os .getenv ("MONGO_URI" )
22+ MONGO_DB_NAME = os .getenv ("MONGO_DB_NAME" )
23+ AES_SECRET_KEY_HEX = os .getenv ("AES_SECRET_KEY" )
24+ AES_IV_HEX = os .getenv ("AES_IV" )
25+ # Default keys from the main .env, which the shopping search will use
26+ DEFAULT_GOOGLE_API_KEY = os .getenv ("GOOGLE_API_KEY" )
27+ DEFAULT_GOOGLE_CSE_ID = os .getenv ("GOOGLE_CSE_ID" )
28+
29+ AES_SECRET_KEY : Optional [bytes ] = bytes .fromhex (AES_SECRET_KEY_HEX ) if AES_SECRET_KEY_HEX and len (AES_SECRET_KEY_HEX ) == 64 else None
30+ AES_IV : Optional [bytes ] = bytes .fromhex (AES_IV_HEX ) if AES_IV_HEX and len (AES_IV_HEX ) == 32 else None
31+
32+ client = motor .motor_asyncio .AsyncIOMotorClient (MONGO_URI )
33+ db = client [MONGO_DB_NAME ]
34+ users_collection = db ["user_profiles" ]
35+
36+ def aes_decrypt (encrypted_data : str ) -> str :
37+ if not AES_SECRET_KEY or not AES_IV :
38+ raise ValueError ("AES encryption keys are not configured." )
39+ backend = default_backend ()
40+ cipher = Cipher (algorithms .AES (AES_SECRET_KEY ), modes .CBC (AES_IV ), backend = backend )
41+ decryptor = cipher .decryptor ()
42+ encrypted_bytes = base64 .b64decode (encrypted_data )
43+ decrypted = decryptor .update (encrypted_bytes ) + decryptor .finalize ()
44+ unpadder = padding .PKCS7 (algorithms .AES .block_size ).unpadder ()
45+ unpadded_data = unpadder .update (decrypted ) + unpadder .finalize ()
46+ return unpadded_data .decode ()
47+
48+ def get_user_id_from_context (ctx : Context ) -> str :
49+ http_request = ctx .get_http_request ()
50+ if not http_request :
51+ raise ToolError ("HTTP request context is not available." )
52+ user_id = http_request .headers .get ("X-User-ID" )
53+ if not user_id :
54+ raise ToolError ("Authentication failed: 'X-User-ID' header is missing." )
55+ return user_id
56+
57+ async def get_google_api_keys (user_id : str ) -> Dict [str , str ]:
58+ """
59+ Retrieves Google API keys. For Shopping, we rely on the Custom Search API keys
60+ which are currently stored globally in the .env file, not per user.
61+ This function validates the user exists before returning the global keys.
62+ """
63+ user_doc = await users_collection .find_one ({"user_id" : user_id })
64+ if not user_doc :
65+ raise ToolError (f"User profile not found for user_id: { user_id } ." )
66+
67+ if not DEFAULT_GOOGLE_API_KEY or not DEFAULT_GOOGLE_CSE_ID :
68+ raise ToolError ("Google Custom Search API Key or CSE ID is not configured on the server." )
69+
70+ return {"api_key" : DEFAULT_GOOGLE_API_KEY , "cse_id" : DEFAULT_GOOGLE_CSE_ID }
0 commit comments