diff --git a/backend/app/agents/devrel/nodes/gather_context.py b/backend/app/agents/devrel/nodes/gather_context.py index dc2f9e7a..6d68dbfb 100644 --- a/backend/app/agents/devrel/nodes/gather_context.py +++ b/backend/app/agents/devrel/nodes/gather_context.py @@ -2,6 +2,7 @@ from datetime import datetime from typing import Dict, Any from app.agents.state import AgentState +from app.database.supabase.services import ensure_user_exists, get_conversation_context logger = logging.getLogger(__name__) @@ -9,11 +10,17 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]: """Gather additional context for the user and their request""" logger.info(f"Gathering context for session {state.session_id}") - # TODO: Add context gathering from databases - # Currently, context is simple - # In production, query databases for user history, etc. - original_message = state.context.get("original_message", "") + author_info = state.context.get("author", {}) + + # Ensure user exists in database + user_uuid = await ensure_user_exists( + user_id=state.user_id, + platform=state.platform, + username=author_info.get("username"), + display_name=author_info.get("display_name"), + avatar_url=author_info.get("avatar_url") + ) new_message = { "role": "user", @@ -24,9 +31,36 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]: context_data = { "user_profile": {"user_id": state.user_id, "platform": state.platform}, "conversation_context": len(state.messages) + 1, # +1 for the new message - "session_info": {"session_id": state.session_id} + "session_info": {"session_id": state.session_id}, + "user_uuid": user_uuid } + # Only retrieve from database if we don't have conversation context already + should_fetch_from_db = not state.conversation_summary and not state.key_topics + + if user_uuid and should_fetch_from_db: + logger.info(f"No existing context in state, fetching from database for user {user_uuid}") + prev_context = await get_conversation_context(user_uuid) + if prev_context: + logger.info(f"Retrieved previous conversation context from database") + context_data["previous_conversation"] = prev_context + + # Populate state with previous conversation summary and topics + return { + "messages": [new_message], + "context": {**state.context, **context_data}, + "conversation_summary": prev_context.get("conversation_summary"), + "key_topics": prev_context.get("key_topics", []), + "current_task": "context_gathered", + "last_interaction_time": datetime.now() + } + else: + logger.info(f"No previous conversation context found in database") + else: + if not should_fetch_from_db: + logger.info( + f"Using existing context from state (conversation_summary: {bool(state.conversation_summary)}, key_topics: {len(state.key_topics)})") + updated_context = {**state.context, **context_data} return { diff --git a/backend/app/agents/devrel/nodes/generate_response.py b/backend/app/agents/devrel/nodes/generate_response.py index 5796877c..6bc382e3 100644 --- a/backend/app/agents/devrel/nodes/generate_response.py +++ b/backend/app/agents/devrel/nodes/generate_response.py @@ -1,9 +1,11 @@ import logging import json from typing import Dict, Any +from datetime import datetime from app.agents.state import AgentState from langchain_core.messages import HumanMessage from ..prompts.response_prompt import RESPONSE_PROMPT +from app.database.supabase.services import store_interaction logger = logging.getLogger(__name__) @@ -16,6 +18,9 @@ async def generate_response_node(state: AgentState, llm) -> Dict[str, Any]: try: final_response = await _create_response(state, llm) + # Store interaction to database + await _store_interaction_to_db(state, final_response) + return { "final_response": final_response, "current_task": "response_generated" @@ -99,3 +104,43 @@ def _get_latest_message(state: AgentState) -> str: if state.messages: return state.messages[-1].get("content", "") return state.context.get("original_message", "") + +async def _store_interaction_to_db(state: AgentState, final_response: str) -> None: + """Store the interaction to database""" + try: + user_uuid = state.context.get("user_uuid") + + if not user_uuid: + logger.warning(f"No user_uuid in context, skipping interaction storage for session {state.session_id}") + return + + # Get the latest user message content + latest_message = _get_latest_message(state) + + # Extract classification data + classification = state.context.get("classification", {}) + # TODO: intent key not present in classification schema (contains: needs_devrel, priority, reasoning) + # Modify prompt to include intent key + intent = classification.get("reasoning") # Fallback to reasoning for intent + + # Store the interaction + await store_interaction( + user_uuid=user_uuid, + platform=state.platform, + platform_specific_id=f"{state.session_id}_{datetime.now().timestamp()}", + channel_id=state.channel_id, + thread_id=state.thread_id, + content=latest_message, + interaction_type="message", + intent_classification=intent, + topics_discussed=state.key_topics if state.key_topics else None, + metadata={ + "session_id": state.session_id, + "response": final_response[:500] if final_response else None, + "tools_used": state.tools_used, + "classification": classification + } + ) + + except Exception as e: + logger.error(f"Error storing interaction to database: {str(e)}") diff --git a/backend/app/agents/devrel/nodes/summarization.py b/backend/app/agents/devrel/nodes/summarization.py index 334414ef..0a4ae61f 100644 --- a/backend/app/agents/devrel/nodes/summarization.py +++ b/backend/app/agents/devrel/nodes/summarization.py @@ -145,18 +145,22 @@ async def store_summary_to_database(state: AgentState) -> None: logger.error(f"Missing required fields: user_id={state.user_id}, platform={state.platform}") return - platform_id = state.user_id - platform_column = f"{state.platform}_id" + user_uuid = state.context.get("user_uuid") - # Fetch the user's UUID from the 'users' table - user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute() + if not user_uuid: + platform_id = state.user_id + platform_column = f"{state.platform}_id" - if not user_response.data: - logger.error(f"User with {platform_column} '{platform_id}' not found in users table.") - return + user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute() + + if not user_response.data: + logger.error(f"User with {platform_column} '{platform_id}' not found in users table.") + return - user_uuid = user_response.data[0]['id'] - logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}") + user_uuid = user_response.data[0]['id'] + logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}") + else: + logger.info(f"Using cached user UUID from context: {user_uuid}") # Record to insert/update record = { @@ -178,4 +182,4 @@ async def store_summary_to_database(state: AgentState) -> None: logger.error(f"❌ Supabase upsert failed for session {state.session_id}: {response}") except Exception as e: - logger.error(f"Unexpected error while storing summary: {str(e)}") \ No newline at end of file + logger.error(f"Unexpected error while storing summary: {str(e)}") diff --git a/backend/app/database/supabase/scripts/create_db.sql b/backend/app/database/supabase/scripts/create_db.sql index c4205821..9693bbd1 100644 --- a/backend/app/database/supabase/scripts/create_db.sql +++ b/backend/app/database/supabase/scripts/create_db.sql @@ -116,4 +116,27 @@ $$ language 'plpgsql'; CREATE TRIGGER update_conversation_context_updated_at BEFORE UPDATE ON conversation_context FOR EACH ROW -EXECUTE FUNCTION update_updated_at_column(); \ No newline at end of file +EXECUTE FUNCTION update_updated_at_column(); + +-- Migration: Add atomic increment function for user interaction count +-- This function safely increments the total_interactions_count for a user + +CREATE OR REPLACE FUNCTION increment_user_interaction_count(user_uuid UUID) +RETURNS INTEGER AS $$ +DECLARE + new_count INTEGER; +BEGIN + -- Atomically increment the counter and return the new value + UPDATE users + SET total_interactions_count = total_interactions_count + 1 + WHERE id = user_uuid + RETURNING total_interactions_count INTO new_count; + + -- Return the new count (NULL if user not found) + RETURN new_count; +END; +$$ LANGUAGE plpgsql; + +-- Optional: Add a comment for documentation +COMMENT ON FUNCTION increment_user_interaction_count(UUID) IS +'Atomically increments the total_interactions_count for a user. Returns the new count or NULL if user not found.'; \ No newline at end of file diff --git a/backend/app/database/supabase/services.py b/backend/app/database/supabase/services.py new file mode 100644 index 00000000..44755ba0 --- /dev/null +++ b/backend/app/database/supabase/services.py @@ -0,0 +1,189 @@ +import logging +from typing import Dict, Any, Optional +from datetime import datetime +import uuid +from app.database.supabase.client import get_supabase_client + +logger = logging.getLogger(__name__) +supabase = get_supabase_client() + + +async def ensure_user_exists( + user_id: str, + platform: str, + username: Optional[str] = None, + display_name: Optional[str] = None, + avatar_url: Optional[str] = None +) -> Optional[str]: + """ + Ensure a user exists in the database. If not, create them. + Returns the user's UUID, or None if an error occurs. + + Args: + user_id: Platform-specific user ID (e.g., discord_id, slack_id) + platform: Platform name (discord, slack, github) + username: Platform username + display_name: Display name for the user + avatar_url: Avatar URL + + Returns: + User UUID as string, or None on error + """ + try: + platform_id_column = f"{platform}_id" + platform_username_column = f"{platform}_username" + + # Check if user exists + response = await supabase.table("users").select("id").eq(platform_id_column, user_id).limit(1).execute() + + if response.data: + user_uuid = response.data[0]['id'] + logger.info(f"User found: {user_uuid} for {platform_id_column}: {user_id}") + + # Update last_active timestamp + last_active_column = f"last_active_{platform}" + await supabase.table("users").update({ + last_active_column: datetime.now().isoformat() + }).eq("id", user_uuid).execute() + + return user_uuid + + # User doesn't exist, create new user + logger.info(f"Creating new user for {platform_id_column}: {user_id}") + + new_user = { + "id": str(uuid.uuid4()), + platform_id_column: user_id, + "display_name": display_name or username or f"{platform}_user_{user_id[:8]}", + } + + if username: + new_user[platform_username_column] = username + if avatar_url: + new_user["avatar_url"] = avatar_url + + # Set last_active timestamp + last_active_column = f"last_active_{platform}" + new_user[last_active_column] = datetime.now().isoformat() + + insert_response = await supabase.table("users").insert(new_user).execute() + + if insert_response.data: + user_uuid = insert_response.data[0]['id'] + logger.info(f"User created successfully: {user_uuid}") + return user_uuid + else: + logger.error(f"Failed to create user: {insert_response}") + return None + + except Exception as e: + logger.error(f"Error ensuring user exists: {str(e)}") + return None + + +async def store_interaction( + user_uuid: str, + platform: str, + platform_specific_id: str, + channel_id: Optional[str] = None, + thread_id: Optional[str] = None, + content: Optional[str] = None, + interaction_type: Optional[str] = None, + intent_classification: Optional[str] = None, + topics_discussed: Optional[list] = None, + metadata: Optional[Dict[str, Any]] = None +) -> bool: + """ + Store an interaction in the database. + + Args: + user_uuid: User's UUID from users table + platform: Platform name (discord, slack, github) + platform_specific_id: Platform-specific message/interaction ID + channel_id: Channel ID where interaction occurred + thread_id: Thread ID where interaction occurred + content: Content of the interaction + interaction_type: Type of interaction (message, comment, pr, etc.) + intent_classification: Classification of user intent + topics_discussed: List of topics discussed + metadata: Additional metadata + + Returns: + True if successful, False otherwise + """ + try: + interaction_data = { + "id": str(uuid.uuid4()), + "user_id": user_uuid, + "platform": platform, + "platform_specific_id": platform_specific_id, + } + + if channel_id: + interaction_data["channel_id"] = channel_id + if thread_id: + interaction_data["thread_id"] = thread_id + if content: + interaction_data["content"] = content + if interaction_type: + interaction_data["interaction_type"] = interaction_type + if intent_classification: + interaction_data["intent_classification"] = intent_classification + if topics_discussed: + interaction_data["topics_discussed"] = topics_discussed + if metadata: + interaction_data["metadata"] = metadata + + response = await supabase.table("interactions").insert(interaction_data).execute() + + if response.data: + logger.info(f"Interaction stored successfully for user {user_uuid}") + + # Atomically increment user's total_interactions_count + try: + rpc_response = await supabase.rpc("increment_user_interaction_count", {"user_uuid": user_uuid}).execute() + if rpc_response.data is not None: + logger.debug(f"Updated interaction count for user {user_uuid}: {rpc_response.data}") + else: + logger.warning(f"User {user_uuid} not found when incrementing interaction count") + except Exception as e: + logger.exception("Error incrementing user interaction count") + + # Not failing the entire operation if incrementing the interaction count fails + return True + + except Exception as e: + logger.error(f"Error storing interaction: {str(e)}") + return False + + +async def get_conversation_context(user_uuid: str) -> Optional[Dict[str, Any]]: + """ + Retrieve conversation context for a user. + + Args: + user_uuid: User's UUID from users table + + Returns: + Dictionary containing conversation context, or None if not found + """ + try: + response = await supabase.table("conversation_context").select("*").eq("user_id", user_uuid).limit(1).execute() + + if response.data: + context = response.data[0] + logger.info(f"Retrieved conversation context for user {user_uuid}") + return { + "conversation_summary": context.get("conversation_summary"), + "key_topics": context.get("key_topics", []), + "total_interactions": context.get("total_interactions", 0), + "session_start_time": context.get("session_start_time"), + "session_end_time": context.get("session_end_time"), + } + else: + logger.info(f"No conversation context found for user {user_uuid}") + return None + + except Exception as e: + logger.error(f"Error retrieving conversation context: {str(e)}") + return None diff --git a/backend/integrations/discord/bot.py b/backend/integrations/discord/bot.py index 26ad8dbc..a55f66b4 100644 --- a/backend/integrations/discord/bot.py +++ b/backend/integrations/discord/bot.py @@ -65,13 +65,13 @@ async def on_message(self, message): except Exception as e: logger.error(f"Error processing message: {str(e)}") - + async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): """This now handles both new requests and follow-ups in threads.""" try: user_id = str(message.author.id) thread_id = await self._get_or_create_thread(message, user_id) - + agent_message = { "type": "devrel_request", "id": f"discord_{message.id}", @@ -81,17 +81,19 @@ async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): "memory_thread_id": user_id, "content": message.content, "triage": triage_result, + "classification": triage_result, "platform": "discord", "timestamp": message.created_at.isoformat(), "author": { "username": message.author.name, - "display_name": message.author.display_name + "display_name": message.author.display_name, + "avatar_url": str(message.author.avatar.url) if message.author.avatar else None } } priority_map = {"high": QueuePriority.HIGH, "medium": QueuePriority.MEDIUM, "low": QueuePriority.LOW - } + } priority = priority_map.get(triage_result.get("priority"), QueuePriority.MEDIUM) await self.queue_manager.enqueue(agent_message, priority) @@ -101,7 +103,7 @@ async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): if thread: await thread.send("I'm processing your request, please hold on...") # ------------------------------------ - + except Exception as e: logger.error(f"Error handling DevRel message: {str(e)}") @@ -114,7 +116,7 @@ async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]: return thread_id else: del self.active_threads[user_id] - + # This part only runs if it's not a follow-up message in an active thread. if isinstance(message.channel, discord.TextChannel): thread_name = f"DevRel Chat - {message.author.display_name}" @@ -139,4 +141,4 @@ async def _handle_agent_response(self, response_data: Dict[str, Any]): else: logger.error(f"Thread {thread_id} not found for agent response") except Exception as e: - logger.error(f"Error handling agent response: {str(e)}") \ No newline at end of file + logger.error(f"Error handling agent response: {str(e)}")