1212import multiprocessing
1313import traceback # For detailed error printing
1414import json # Keep json for websocket messages
15+ from server .context .base import BaseContextEngine , POLLING_INTERVALS # Import POLLING_INTERVALS
16+ from server .context .gmail import GmailContextEngine
17+ from server .context .gcalendar import GCalendarContextEngine
18+ from server .context .internet import InternetSearchContextEngine
19+ from datetime import timezone , timedelta # For scheduling
1520
1621# --- MongoDB Manager Import ---
1722from server .db import MongoManager
@@ -382,6 +387,13 @@ async def broadcast_json(self, data: dict):
382387mongo_manager = MongoManager ()
383388print (f"[INIT] { datetime .datetime .now ()} : MongoManager initialized." )
384389
390+ # Dictionary to hold active context engine instances per user
391+ # Structure: { "user_id1": {"gmail": GmailEngineInstance, "gcalendar": GCalendarEngineInstance}, ... }
392+ active_context_engines : Dict [str , Dict [str , BaseContextEngine ]] = {}
393+ polling_scheduler_task_handle : Optional [asyncio .Task ] = None
394+
395+ POLLING_SCHEDULER_INTERVAL_SECONDS = 30
396+
385397# --- Helper Functions (User-Specific DB Operations) ---
386398async def load_user_profile (user_id : str ) -> Dict [str , Any ]:
387399 profile = await mongo_manager .get_user_profile (user_id )
@@ -638,10 +650,110 @@ async def gdrive_tool(tool_call_input: dict) -> Dict[str, Any]:
638650
639651agent_task_processor_instance : Optional [AgentTaskProcessor ] = None
640652
653+ async def polling_scheduler_loop ():
654+ print (f"[POLLING_SCHEDULER] Starting polling scheduler loop (interval: { POLLING_SCHEDULER_INTERVAL_SECONDS } s)" )
655+ while True :
656+ try :
657+ print (f"[POLLING_SCHEDULER] Checking for due polling tasks at { datetime .now (timezone .utc ).isoformat ()} " )
658+ due_tasks_states = await mongo_manager .get_due_polling_tasks ()
659+
660+ if not due_tasks_states :
661+ # print(f"[POLLING_SCHEDULER] No tasks due at this time.")
662+ pass
663+ else :
664+ print (f"[POLLING_SCHEDULER] Found { len (due_tasks_states )} due tasks." )
665+
666+ for task_state in due_tasks_states :
667+ user_id = task_state ["user_id" ]
668+ engine_category = task_state ["engine_category" ]
669+
670+ print (f"[POLLING_SCHEDULER] Attempting to process task for { user_id } /{ engine_category } " )
671+
672+ # Try to acquire lock and get the task. If successful, proceed.
673+ locked_task_state = await mongo_manager .set_polling_status_and_get (user_id , engine_category )
674+
675+ if locked_task_state :
676+ print (f"[POLLING_SCHEDULER] Acquired lock for { user_id } /{ engine_category } . Triggering poll." )
677+ engine_instance = active_context_engines .get (user_id , {}).get (engine_category )
678+
679+ if not engine_instance :
680+ engine_class = DATA_SOURCES_CONFIG .get (engine_category , {}).get ("engine_class" )
681+ if engine_class :
682+ print (f"[POLLING_SCHEDULER] Creating new instance for { user_id } /{ engine_category } " )
683+ engine_instance = engine_class (
684+ user_id = user_id ,
685+ task_queue = task_queue , # global
686+ memory_backend = memory_backend , # global
687+ websocket_manager = manager , # global websocket_manager
688+ mongo_manager_instance = mongo_manager # global
689+ )
690+ if user_id not in active_context_engines :
691+ active_context_engines [user_id ] = {}
692+ active_context_engines [user_id ][engine_category ] = engine_instance
693+ # No need to call engine_instance.initialize_polling_state() here,
694+ # run_poll_cycle will handle it if state is missing or needs reset.
695+ else :
696+ print (f"[POLLING_SCHEDULER_ERROR] No engine class configured for category: { engine_category } " )
697+ # Release lock if instance can't be created
698+ await mongo_manager .update_polling_state (user_id , engine_category , {"is_currently_polling" : False })
699+ continue
700+
701+ # Run the poll cycle in a new task so the scheduler doesn't block
702+ asyncio .create_task (engine_instance .run_poll_cycle ())
703+ else :
704+ print (f"[POLLING_SCHEDULER] Could not acquire lock for { user_id } /{ engine_category } (already processing or not due)." )
705+
706+ except Exception as e :
707+ print (f"[POLLING_SCHEDULER_ERROR] Error in scheduler loop: { e } " )
708+ traceback .print_exc ()
709+
710+ await asyncio .sleep (POLLING_SCHEDULER_INTERVAL_SECONDS )
711+
712+
713+ async def start_user_context_engines (user_id : str ):
714+ """Starts context engines for a given user if not already running and initializes polling state."""
715+ if user_id not in active_context_engines :
716+ active_context_engines [user_id ] = {}
717+
718+ user_profile = await load_user_profile (user_id ) # load_user_profile is your existing helper
719+ user_settings = user_profile .get ("userData" , {})
720+
721+ for source_name , config in DATA_SOURCES_CONFIG .items ():
722+ is_enabled = user_settings .get (f"{ source_name } Enabled" , config ["enabled_by_default" ])
723+
724+ if is_enabled :
725+ if source_name not in active_context_engines [user_id ]:
726+ print (f"[CONTEXT_ENGINE_MGR] Starting { source_name } engine for user { user_id } ..." )
727+ engine_class = config ["engine_class" ]
728+ engine_instance = engine_class (
729+ user_id = user_id ,
730+ task_queue = task_queue ,
731+ memory_backend = memory_backend ,
732+ websocket_manager = manager ,
733+ mongo_manager_instance = mongo_manager
734+ )
735+ active_context_engines [user_id ][source_name ] = engine_instance
736+ # Initialize polling state (will set next_poll_at to now if new)
737+ await engine_instance .initialize_polling_state ()
738+ print (f"[CONTEXT_ENGINE_MGR] { source_name } engine started and polling state initialized for user { user_id } ." )
739+ else :
740+ # Engine already active, ensure its polling state is initialized (e.g. if server restarted)
741+ await active_context_engines [user_id ][source_name ].initialize_polling_state ()
742+ print (f"[CONTEXT_ENGINE_MGR] { source_name } engine for user { user_id } already active. Ensured polling state." )
743+
744+ elif not is_enabled and source_name in active_context_engines [user_id ]:
745+ print (f"[CONTEXT_ENGINE_MGR] { source_name } engine for user { user_id } is disabled. Stopping (if implemented) and removing." )
746+ # Add engine stop logic if available:
747+ # if hasattr(active_context_engines[user_id][source_name], 'stop'):
748+ # await active_context_engines[user_id][source_name].stop()
749+ del active_context_engines [user_id ][source_name ]
750+
751+
641752@app .on_event ("startup" )
642753async def startup_event ():
754+ # ... (your existing startup code: STT, TTS, DB init, agent_task_processor, memory_backend) ...
643755 print (f"[FASTAPI_LIFECYCLE] App startup." )
644- global stt_model , tts_model , agent_task_processor_instance
756+ global stt_model , tts_model , agent_task_processor_instance , polling_scheduler_task_handle
645757
646758 agent_task_processor_instance = agent_task_processor
647759
@@ -665,15 +777,15 @@ async def startup_event():
665777 elif TTS_PROVIDER == "ELEVENLABS" :
666778 tts_model = ElevenLabsTTS ()
667779 print ("[LIFECYCLE] ElevenLabs TTS loaded (Production Mode)." )
668- else : # Default to ORPHEUS
780+ else :
669781 tts_model = OrpheusTTS (verbose = False , default_voice_id = SELECTED_TTS_VOICE )
670782 print (f"[WARN] Unknown TTS_PROVIDER '{ TTS_PROVIDER } '. Defaulting to Orpheus TTS." )
671783 except Exception as e :
672784 print (f"[ERROR] TTS model failed to load. Voice features will be unavailable. Details: { e } " )
673785 tts_model = None
674786
675787 await task_queue .initialize_db ()
676- await mongo_manager .initialize_db ()
788+ await mongo_manager .initialize_db () # This now includes polling state indexes
677789 await memory_backend .memory_queue .initialize_db ()
678790
679791 if agent_task_processor_instance :
@@ -684,18 +796,35 @@ async def startup_event():
684796
685797 asyncio .create_task (memory_backend .process_memory_operations ())
686798
799+ # Start the central polling scheduler
800+ polling_scheduler_task_handle = asyncio .create_task (polling_scheduler_loop ())
801+ print (f"[FASTAPI_LIFECYCLE] Central polling scheduler started." )
802+
687803 print (f"[FASTAPI_LIFECYCLE] App startup complete." )
688804
805+
689806@app .on_event ("shutdown" )
690807async def shutdown_event ():
808+ # ... (your existing shutdown code) ...
809+ global polling_scheduler_task_handle
810+ if polling_scheduler_task_handle and not polling_scheduler_task_handle .done ():
811+ print ("[FASTAPI_LIFECYCLE] Cancelling polling scheduler task..." )
812+ polling_scheduler_task_handle .cancel ()
813+ try :
814+ await polling_scheduler_task_handle
815+ except asyncio .CancelledError :
816+ print ("[FASTAPI_LIFECYCLE] Polling scheduler task cancelled." )
817+ except Exception as e :
818+ print (f"[FASTAPI_LIFECYCLE_ERROR] Error during polling scheduler shutdown: { e } " )
819+ # ... (rest of your shutdown) ...
691820 print (f"[FASTAPI_LIFECYCLE] App shutdown." )
692821 if mongo_manager .client :
693822 mongo_manager .client .close ()
694823 print ("[FASTAPI_LIFECYCLE] MongoManager client closed." )
695- if task_queue .client : # TaskQueue now has its own client
824+ if task_queue .client :
696825 task_queue .client .close ()
697826 print ("[FASTAPI_LIFECYCLE] TaskQueue MongoDB client closed." )
698- if memory_backend .memory_queue .client : # MemoryQueue now has its own client
827+ if memory_backend .memory_queue .client :
699828 memory_backend .memory_queue .client .close ()
700829 print ("[FASTAPI_LIFECYCLE] MemoryQueue MongoDB client closed." )
701830 print (f"[FASTAPI_LIFECYCLE] All MongoDB clients known to app.py closed." )
0 commit comments