99# MongoDB Configuration
1010MONGO_URI = os .getenv ("MONGO_URI" , "mongodb://localhost:27017" )
1111MONGO_DB_NAME = os .getenv ("MONGO_DB_NAME" , "sentient_agent_db" )
12- POLLING_DB_NAME = os .getenv ("MONGO_DB_NAME" , "sentient_polling_db" )
12+ # POLLING_DB_NAME is no longer needed as we'll use MONGO_DB_NAME for all collections
13+
1314USER_PROFILES_COLLECTION = "user_profiles"
1415CHAT_HISTORY_COLLECTION = "chat_history"
1516NOTIFICATIONS_COLLECTION = "notifications"
1617MEMORY_COLLECTION_NAME = "memory_operations"
1718
18- # New Collections for Polling
19+ # Collections for Polling (will now be in the main DB)
1920POLLING_STATE_COLLECTION = "polling_state_store"
2021PROCESSED_ITEMS_COLLECTION = "processed_items_log"
2122
2223class MongoManager :
2324 def __init__ (self ):
2425 self .client = motor .motor_asyncio .AsyncIOMotorClient (MONGO_URI )
25- self .main_db = self .client [MONGO_DB_NAME ]
26- self .user_profiles_collection = self .main_db [USER_PROFILES_COLLECTION ]
27- self .chat_history_collection = self .main_db [CHAT_HISTORY_COLLECTION ]
28- self .notifications_collection = self .main_db [NOTIFICATIONS_COLLECTION ]
29- self .memory_collection = self .main_db [MEMORY_COLLECTION_NAME ]
30-
31- self .polling_db = self .client [POLLING_DB_NAME ]
32- self .polling_state_collection = self .polling_db [POLLING_STATE_COLLECTION ]
33- self .processed_items_collection = self .polling_db [PROCESSED_ITEMS_COLLECTION ]
26+ self .db = self .client [MONGO_DB_NAME ]
27+
28+ # Main application collections
29+ self .user_profiles_collection = self .db [USER_PROFILES_COLLECTION ]
30+ self .chat_history_collection = self .db [CHAT_HISTORY_COLLECTION ]
31+ self .notifications_collection = self .db [NOTIFICATIONS_COLLECTION ]
32+ self .memory_collection = self .db [MEMORY_COLLECTION_NAME ]
33+
34+ # Polling related collections, now part of the db
35+ self .polling_state_collection = self .db [POLLING_STATE_COLLECTION ]
36+ self .processed_items_collection = self .db [PROCESSED_ITEMS_COLLECTION ]
3437
35- print (f"[MongoManager] Initialized. Main DB : { MONGO_DB_NAME } , Polling DB: { POLLING_DB_NAME } " )
38+ print (f"[MongoManager] Initialized. Database : { MONGO_DB_NAME } " )
3639
3740
3841 async def initialize_db (self ):
@@ -75,8 +78,8 @@ async def initialize_db(self):
7578 IndexModel ([
7679 ("is_enabled" , ASCENDING ),
7780 ("next_scheduled_poll_time" , ASCENDING ),
78- ("error_backoff_until_timestamp" , ASCENDING ), # Added error_backoff_until_timestamp
79- ("is_currently_polling" , ASCENDING ) # Added is_currently_polling
81+ ("error_backoff_until_timestamp" , ASCENDING ),
82+ ("is_currently_polling" , ASCENDING )
8083 ], name = "polling_due_tasks_idx" ),
8184 IndexModel ([("is_currently_polling" , ASCENDING ), ("last_attempted_poll_timestamp" , ASCENDING )], name = "polling_stale_locks_idx" )
8285 ]
@@ -165,34 +168,34 @@ async def update_user_last_active(self, user_id: str) -> bool:
165168 return result .matched_count > 0 or result .upserted_id is not None
166169
167170 async def get_user_activity_and_timezone (self , user_id : str ) -> Dict [str , Any ]:
168- # Ensure user_profile is awaited
169171 profile = await self .get_user_profile (user_id )
170172 user_data = profile .get ("userData" , {}) if profile else {}
171173
172174 last_active_ts_val = user_data .get ("last_active_timestamp" )
173- # Ensure last_active_timestamp is a datetime object if it exists
174175 if last_active_ts_val and isinstance (last_active_ts_val , str ):
175176 try :
176- last_active_ts_val = datetime .datetime .fromisoformat (last_active_ts_val .replace ("Z" , "+00:00" ))
177+ # Handle both "Z" and "+00:00" for UTC
178+ if last_active_ts_val .endswith ("Z" ):
179+ last_active_ts_val = last_active_ts_val [:- 1 ] + "+00:00"
180+ last_active_ts_val = datetime .datetime .fromisoformat (last_active_ts_val )
177181 except ValueError :
178182 print (f"Could not parse last_active_timestamp string: { last_active_ts_val } " )
179- last_active_ts_val = None # Or handle error appropriately
183+ last_active_ts_val = None
180184 elif not isinstance (last_active_ts_val , datetime .datetime ):
181185 last_active_ts_val = None
182186
183-
184187 return {
185188 "last_active_timestamp" : last_active_ts_val ,
186189 "timezone" : user_data .get ("personalInfo" , {}).get ("timezone" )
187190 }
188191
189- async def get_polling_state (self , user_id : str , service_name : str ) -> Optional [Dict [str , Any ]]: # Changed from engine_category
192+ async def get_polling_state (self , user_id : str , service_name : str ) -> Optional [Dict [str , Any ]]:
190193 if not user_id or not service_name : return None
191194 return await self .polling_state_collection .find_one (
192195 {"user_id" : user_id , "service_name" : service_name }
193196 )
194197
195- async def update_polling_state (self , user_id : str , service_name : str , state_data : Dict [str , Any ]) -> bool : # Changed
198+ async def update_polling_state (self , user_id : str , service_name : str , state_data : Dict [str , Any ]) -> bool :
196199 if not user_id or not service_name or state_data is None : return False
197200
198201 for key , value in state_data .items ():
@@ -208,8 +211,8 @@ async def update_polling_state(self, user_id: str, service_name: str, state_data
208211 state_data ["last_updated_at" ] = datetime .datetime .now (datetime .timezone .utc )
209212
210213 result = await self .polling_state_collection .update_one (
211- {"user_id" : user_id , "service_name" : service_name }, # Changed
212- {"$set" : state_data , "$setOnInsert" : {"created_at" : datetime .datetime .now (datetime .timezone .utc ), "service_name" : service_name }}, # Added service_name to $setOnInsert
214+ {"user_id" : user_id , "service_name" : service_name },
215+ {"$set" : state_data , "$setOnInsert" : {"created_at" : datetime .datetime .now (datetime .timezone .utc ), "user_id" : user_id , " service_name" : service_name }},
213216 upsert = True
214217 )
215218 return result .matched_count > 0 or result .upserted_id is not None
@@ -228,12 +231,12 @@ async def get_due_polling_tasks(self) -> List[Dict[str, Any]]:
228231 cursor = self .polling_state_collection .find (query ).sort ("next_scheduled_poll_time" , ASCENDING )
229232 return await cursor .to_list (length = None )
230233
231- async def set_polling_status_and_get (self , user_id : str , service_name : str ) -> Optional [Dict [str , Any ]]: # Changed
234+ async def set_polling_status_and_get (self , user_id : str , service_name : str ) -> Optional [Dict [str , Any ]]:
232235 now_utc = datetime .datetime .now (datetime .timezone .utc )
233236 doc = await self .polling_state_collection .find_one_and_update (
234237 {
235238 "user_id" : user_id ,
236- "service_name" : service_name , # Changed
239+ "service_name" : service_name ,
237240 "is_enabled" : True ,
238241 "next_scheduled_poll_time" : {"$lte" : now_utc },
239242 "is_currently_polling" : False ,
@@ -258,8 +261,7 @@ async def reset_stale_polling_locks(self, timeout_minutes: int = 30):
258261 "$set" : {
259262 "is_currently_polling" : False ,
260263 "last_successful_poll_status_message" : "Reset stale lock by scheduler." ,
261- # Optionally, schedule an immediate retry or a short delay
262- "next_scheduled_poll_time" : datetime .datetime .now (timezone .utc ) + timedelta (seconds = 60 )
264+ "next_scheduled_poll_time" : datetime .datetime .now (datetime .timezone .utc ) + datetime .timedelta (seconds = 60 )
263265 }
264266 }
265267 )
@@ -277,7 +279,7 @@ async def log_processed_item(self, user_id: str, service_name: str, item_id: str
277279 "processing_timestamp" : datetime .datetime .now (datetime .timezone .utc )
278280 })
279281 return True
280- except motor .motor_asyncio .DuplicateKeyError :
282+ except motor .motor_asyncio .DuplicateKeyError : # Changed to use the specific DuplicateKeyError from motor
281283 print (f"[ProcessedItemsLog] Item { user_id } /{ service_name } /{ item_id } already processed." )
282284 return True
283285 except Exception as e :
@@ -303,11 +305,17 @@ async def add_chat_message(self, user_id: str, chat_id: str, message_data: Dict)
303305 result = await self .chat_history_collection .update_one (
304306 {"user_id" : user_id , "chat_id" : chat_id },
305307 {"$push" : {"messages" : message_data },
306- "$set" : {"last_updated" : datetime .datetime .now (datetime .timezone .utc )}},
308+ "$set" : {"last_updated" : datetime .datetime .now (datetime .timezone .utc )},
309+ "$setOnInsert" : {"user_id" : user_id , "chat_id" : chat_id , "created_at" : datetime .datetime .now (datetime .timezone .utc )} # Added $setOnInsert
310+ },
307311 upsert = True
308312 )
309313 if result .modified_count == 0 and result .upserted_id is None :
310- raise Exception ("Failed to add chat message." )
314+ # This condition might be too strict if an upsert happens but no existing document was modified.
315+ # Check result.upserted_id as well for successful operations
316+ chat_exists = await self .chat_history_collection .count_documents ({"user_id" : user_id , "chat_id" : chat_id }) > 0
317+ if not chat_exists : # If no upsert and no match, then it's a failure
318+ raise Exception ("Failed to add chat message: No document modified or upserted." )
311319 return message_id
312320
313321 async def get_chat_history (self , user_id : str , chat_id : str ) -> Optional [List [Dict ]]:
@@ -342,7 +350,9 @@ async def add_notification(self, user_id: str, notification_data: Dict) -> bool:
342350 notification_data ["id" ] = str (uuid .uuid4 ())
343351 result = await self .notifications_collection .update_one (
344352 {"user_id" : user_id },
345- {"$push" : {"notifications" : {"$each" : [notification_data ], "$slice" : - 50 }}},
353+ {"$push" : {"notifications" : {"$each" : [notification_data ], "$slice" : - 50 }},
354+ "$setOnInsert" : {"user_id" : user_id , "created_at" : datetime .datetime .now (datetime .timezone .utc )} # Added $setOnInsert
355+ },
346356 upsert = True
347357 )
348358 return result .modified_count > 0 or result .upserted_id is not None
@@ -353,14 +363,20 @@ async def clear_notifications(self, user_id: str) -> bool:
353363 {"user_id" : user_id },
354364 {"$set" : {"notifications" : []}}
355365 )
366+ # Check if the document existed or was upserted (though upserting an empty list is less common here)
356367 return result .matched_count > 0 or result .upserted_id is not None
357368
358369 async def get_collection (self , collection_name : str ):
359- if collection_name in [POLLING_STATE_COLLECTION , PROCESSED_ITEMS_COLLECTION ]:
360- db_to_use = self .polling_db
361- else :
362- db_to_use = self .main_db
370+ # All collections are now in self.db
371+ db_to_use = self .db
363372
364- if not hasattr (db_to_use , collection_name ):
365- raise ValueError (f"Collection '{ collection_name } ' not found in database '{ db_to_use .name } '." )
373+ if not hasattr (db_to_use , collection_name ) and collection_name not in [
374+ USER_PROFILES_COLLECTION , CHAT_HISTORY_COLLECTION , NOTIFICATIONS_COLLECTION ,
375+ MEMORY_COLLECTION_NAME , POLLING_STATE_COLLECTION , PROCESSED_ITEMS_COLLECTION
376+ ]: # Check against known collection names as direct hasattr might not work for pymongo collection attributes
377+ try :
378+ # Try to access it to see if it's a valid collection name that pymongo can handle
379+ _ = db_to_use [collection_name ]
380+ except Exception : # Or more specific pymongo exceptions if known
381+ raise ValueError (f"Collection '{ collection_name } ' not found or accessible in database '{ db_to_use .name } '." )
366382 return db_to_use [collection_name ]
0 commit comments