Skip to content

Commit ab601dd

Browse files
committed
feat (multi-tenancy): activity based tiered polling
1 parent f9e2111 commit ab601dd

File tree

4 files changed

+412
-320
lines changed

4 files changed

+412
-320
lines changed

src/server/app/app.py

Lines changed: 81 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -733,117 +733,121 @@ async def gdrive_tool(tool_call_input: dict) -> Dict[str, Any]:
733733
agent_task_processor_instance: Optional[AgentTaskProcessor] = None
734734

735735
async def polling_scheduler_loop():
736-
print(f"[POLLING_SCHEDULER] Starting polling scheduler loop (interval: {POLLING_SCHEDULER_INTERVAL_SECONDS}s)")
736+
print(f"[POLLING_SCHEDULER] Starting loop (interval: {POLLING_SCHEDULER_INTERVAL_SECONDS}s)")
737+
if not mongo_manager:
738+
print("[POLLING_SCHEDULER_ERROR] MongoManager not initialized. Scheduler cannot run.")
739+
return
740+
741+
await mongo_manager.reset_stale_polling_locks()
742+
737743
while True:
738744
try:
739-
print(f"[POLLING_SCHEDULER] Checking for due polling tasks at {datetime.now(timezone.utc).isoformat()}")
740-
due_tasks_states = await mongo_manager.get_due_polling_tasks()
745+
# print(f"[POLLING_SCHEDULER] Checking for due polling tasks at {datetime.now(timezone.utc).isoformat()}")
746+
due_tasks_states = await mongo_manager.get_due_polling_tasks()
741747

742748
if not due_tasks_states:
743749
# print(f"[POLLING_SCHEDULER] No tasks due at this time.")
744750
pass
745751
else:
746-
print(f"[POLLING_SCHEDULER] Found {len(due_tasks_states)} due tasks.")
752+
print(f"[POLLING_SCHEDULER] Found {len(due_tasks_states)} due polling tasks.")
747753

748754
for task_state in due_tasks_states:
749755
user_id = task_state["user_id"]
750-
engine_category = task_state["engine_category"]
756+
service_name = task_state["service_name"]
751757

752-
print(f"[POLLING_SCHEDULER] Attempting to process task for {user_id}/{engine_category}")
758+
print(f"[POLLING_SCHEDULER] Attempting to process task for {user_id}/{service_name}")
753759

754-
# Try to acquire lock and get the task. If successful, proceed.
755-
locked_task_state = await mongo_manager.set_polling_status_and_get(user_id, engine_category)
760+
locked_task_state = await mongo_manager.set_polling_status_and_get(user_id, service_name)
756761

757762
if locked_task_state:
758-
print(f"[POLLING_SCHEDULER] Acquired lock for {user_id}/{engine_category}. Triggering poll.")
759-
engine_instance = active_context_engines.get(user_id, {}).get(engine_category)
763+
print(f"[POLLING_SCHEDULER] Acquired lock for {user_id}/{service_name}. Triggering poll.")
764+
765+
engine_instance = active_context_engines.get(user_id, {}).get(service_name)
760766

761767
if not engine_instance:
762-
engine_class = DATA_SOURCES_CONFIG.get(engine_category, {}).get("engine_class")
763-
if engine_class:
764-
print(f"[POLLING_SCHEDULER] Creating new instance for {user_id}/{engine_category}")
768+
engine_config = DATA_SOURCES_CONFIG.get(service_name)
769+
if engine_config and engine_config.get("engine_class"):
770+
engine_class = engine_config["engine_class"]
771+
print(f"[POLLING_SCHEDULER] Creating new {engine_class.__name__} instance for {user_id}/{service_name}")
772+
# Ensure all dependencies for engine_class are passed correctly
765773
engine_instance = engine_class(
766774
user_id=user_id,
767-
task_queue=task_queue, # global
768-
memory_backend=memory_backend, # global
769-
websocket_manager=manager, # global websocket_manager
770-
mongo_manager_instance=mongo_manager # global
775+
task_queue=task_queue, # Pass your global/initialized instances
776+
memory_backend=memory_backend,
777+
websocket_manager=manager, # Global websocket_manager
778+
mongo_manager_instance=mongo_manager # Global mongo_manager
771779
)
772780
if user_id not in active_context_engines:
773781
active_context_engines[user_id] = {}
774-
active_context_engines[user_id][engine_category] = engine_instance
775-
# No need to call engine_instance.initialize_polling_state() here,
776-
# run_poll_cycle will handle it if state is missing or needs reset.
782+
active_context_engines[user_id][service_name] = engine_instance
777783
else:
778-
print(f"[POLLING_SCHEDULER_ERROR] No engine class configured for category: {engine_category}")
779-
# Release lock if instance can't be created
780-
await mongo_manager.update_polling_state(user_id, engine_category, {"is_currently_polling": False})
784+
print(f"[POLLING_SCHEDULER_ERROR] No engine class configured for service: {service_name}")
785+
await mongo_manager.update_polling_state(user_id, service_name, {"is_currently_polling": False})
781786
continue
782787

783-
# Run the poll cycle in a new task so the scheduler doesn't block
784-
asyncio.create_task(engine_instance.run_poll_cycle())
788+
asyncio.create_task(engine_instance.run_poll_cycle())
785789
else:
786-
print(f"[POLLING_SCHEDULER] Could not acquire lock for {user_id}/{engine_category} (already processing or not due).")
790+
print(f"[POLLING_SCHEDULER] Could not acquire lock for {user_id}/{service_name} (already processing or no longer due).")
787791

788792
except Exception as e:
789793
print(f"[POLLING_SCHEDULER_ERROR] Error in scheduler loop: {e}")
790-
traceback.print_exc()
794+
traceback.print_exc()
791795

792796
await asyncio.sleep(POLLING_SCHEDULER_INTERVAL_SECONDS)
793797

794-
795798
async def start_user_context_engines(user_id: str):
796799
"""Ensures polling state exists for all enabled services for a user."""
797-
if user_id not in active_context_engines: # This dict might be less critical now for *running* engines
800+
print(f"[CONTEXT_ENGINE_MGR] Starting/Ensuring context engines and polling states for user {user_id}.")
801+
if user_id not in active_context_engines:
798802
active_context_engines[user_id] = {}
799803

804+
if not mongo_manager:
805+
print(f"[CONTEXT_ENGINE_MGR_ERROR] MongoManager not initialized. Cannot start engines for user {user_id}.")
806+
return
807+
800808
user_profile = await mongo_manager.get_user_profile(user_id)
801809
user_settings = user_profile.get("userData", {}) if user_profile else {}
802810

803811
for service_name, config in DATA_SOURCES_CONFIG.items():
804-
# Determine if the service is enabled for the user
805-
# This could come from user_profile's userData, or a default.
806-
# Example: is_service_enabled = user_settings.get(f"{service_name}_polling_enabled", config["enabled_by_default"])
807-
# For now, let's assume we check a specific field or default
808-
809-
is_service_enabled_in_db = False
810-
polling_state_doc = await mongo_manager.get_polling_state(user_id, service_name)
811-
if polling_state_doc:
812-
is_service_enabled_in_db = polling_state_doc.get("is_enabled", False) # Default to False if key missing
813-
else: # No state yet, use default from config
814-
is_service_enabled_in_db = config.get("enabled_by_default", True)
812+
# Default to enabled_by_default if the specific setting isn't in user_profile
813+
# This user-specific setting in user_profile would be set by /utils/set_data_source_enabled
814+
is_service_explicitly_enabled_in_profile = user_settings.get(f"{service_name}_polling_enabled")
815815

816+
is_service_considered_enabled = False
817+
if is_service_explicitly_enabled_in_profile is not None:
818+
is_service_considered_enabled = is_service_explicitly_enabled_in_profile
819+
else: # Not set in profile, use default from config
820+
is_service_considered_enabled = config.get("enabled_by_default", True)
816821

817-
if is_service_enabled_in_db:
818-
# Check if an engine instance exists (less critical now, but can be kept for potential direct calls)
822+
823+
if is_service_considered_enabled:
819824
engine_instance = active_context_engines.get(user_id, {}).get(service_name)
820-
if not engine_instance:
825+
if not engine_instance: # Engine instance not in memory, create one for initialization
821826
engine_class = config["engine_class"]
822827
print(f"[CONTEXT_ENGINE_MGR] Creating transient {engine_class.__name__} instance for {user_id}/{service_name} for state init.")
823828
engine_instance = engine_class(
824829
user_id=user_id,
825-
task_queue=task_queue, # Pass your global/initialized instances
830+
task_queue=task_queue,
826831
memory_backend=memory_backend,
827-
websocket_manager=manager,
828-
mongo_manager_instance=mongo_manager
832+
websocket_manager=manager, # global websocket_manager
833+
mongo_manager_instance=mongo_manager # global mongo_manager
829834
)
830-
# active_context_engines[user_id][service_name] = engine_instance # Optional to store
835+
# We might not need to store it in active_context_engines if scheduler recreates on demand
836+
# However, initialize_polling_state needs an instance.
831837

832-
# Crucially, ensure the polling state is initialized in the DB
833-
# This will set it up for the central scheduler if it's new or enable it.
834838
await engine_instance.initialize_polling_state()
835839
print(f"[CONTEXT_ENGINE_MGR] Polling state ensured for {service_name} for user {user_id}.")
836840
else:
837841
print(f"[CONTEXT_ENGINE_MGR] Service {service_name} is disabled for user {user_id}. Skipping engine start/state init.")
842+
# If disabling, also ensure the polling state in DB is marked as is_enabled=False
843+
await mongo_manager.update_polling_state(user_id, service_name, {"is_enabled": False})
844+
838845

839846
@app.on_event("startup")
840847
async def startup_event():
841-
# ... (your existing startup code: STT, TTS, DB init, agent_task_processor, memory_backend) ...
842-
print(f"[FASTAPI_LIFECYCLE] App startup.")
843-
global stt_model, tts_model, agent_task_processor_instance, polling_scheduler_task_handle
844-
845-
agent_task_processor_instance = agent_task_processor
846-
848+
print(f"[FASTAPI_LIFECYCLE] App startup at {datetime.now(timezone.utc).isoformat()}")
849+
global agent_task_processor_instance, polling_scheduler_task_handle, mongo_manager, task_queue, memory_backend, manager
850+
# ... (STT, TTS init as before) ...
847851
print("[LIFECYCLE] Loading STT...")
848852
try:
849853
stt_model = FasterWhisperSTT(model_size="base", device="cpu", compute_type="int8")
@@ -870,11 +874,29 @@ async def startup_event():
870874
except Exception as e:
871875
print(f"[ERROR] TTS model failed to load. Voice features will be unavailable. Details: {e}")
872876
tts_model = None
877+
878+
mongo_manager = MongoManager()
879+
await mongo_manager.initialize_db()
873880

874-
await task_queue.initialize_db()
875-
await mongo_manager.initialize_db() # This now includes polling state indexes
876-
await memory_backend.memory_queue.initialize_db()
881+
# Assuming task_queue and memory_backend are initialized globally or here using mongo_manager
882+
# task_queue = TaskQueue() # If it uses its own DB connection
883+
# memory_backend = MemoryBackend(mongo_manager=mongo_manager) # Pass the initialized manager
877884

885+
# Re-initialize these if they depend on mongo_manager being set
886+
if 'task_queue' not in globals() or task_queue is None:
887+
task_queue = TaskQueue() # Initialize if not already
888+
await task_queue.initialize_db()
889+
890+
if 'memory_backend' not in globals() or memory_backend is None:
891+
memory_backend = MemoryBackend(mongo_manager=mongo_manager)
892+
await memory_backend.memory_queue.initialize_db()
893+
894+
if 'manager' not in globals() or manager is None: # Assuming 'manager' is your WebSocketManager
895+
manager = WebSocketManager()
896+
897+
898+
agent_task_processor_instance = agent_task_processor
899+
878900
if agent_task_processor_instance:
879901
asyncio.create_task(agent_task_processor_instance.process_queue())
880902
asyncio.create_task(agent_task_processor_instance.cleanup_tasks_periodically())
@@ -883,11 +905,10 @@ async def startup_event():
883905

884906
asyncio.create_task(memory_backend.process_memory_operations())
885907

886-
# Start the central polling scheduler
887908
polling_scheduler_task_handle = asyncio.create_task(polling_scheduler_loop())
888909
print(f"[FASTAPI_LIFECYCLE] Central polling scheduler started.")
889910

890-
print(f"[FASTAPI_LIFECYCLE] App startup complete.")
911+
print(f"[FASTAPI_LIFECYCLE] App startup complete at {datetime.now(timezone.utc).isoformat()}")
891912

892913

893914
@app.on_event("shutdown")

0 commit comments

Comments
 (0)