Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions backend/app/agents/devrel/nodes/gather_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
from datetime import datetime
from typing import Dict, Any
from app.agents.state import AgentState
from app.database.supabase.services import ensure_user_exists, get_conversation_context

logger = logging.getLogger(__name__)

async def gather_context_node(state: AgentState) -> Dict[str, Any]:
"""Gather additional context for the user and their request"""
logger.info(f"Gathering context for session {state.session_id}")

# TODO: Add context gathering from databases
# Currently, context is simple
# In production, query databases for user history, etc.

original_message = state.context.get("original_message", "")
author_info = state.context.get("author", {})

# Ensure user exists in database
user_uuid = await ensure_user_exists(
user_id=state.user_id,
platform=state.platform,
username=author_info.get("username"),
display_name=author_info.get("display_name"),
avatar_url=author_info.get("avatar_url")
)

new_message = {
"role": "user",
Expand All @@ -24,9 +31,36 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]:
context_data = {
"user_profile": {"user_id": state.user_id, "platform": state.platform},
"conversation_context": len(state.messages) + 1, # +1 for the new message
"session_info": {"session_id": state.session_id}
"session_info": {"session_id": state.session_id},
"user_uuid": user_uuid
}

# Only retrieve from database if we don't have conversation context already
should_fetch_from_db = not state.conversation_summary and not state.key_topics

if user_uuid and should_fetch_from_db:
logger.info(f"No existing context in state, fetching from database for user {user_uuid}")
prev_context = await get_conversation_context(user_uuid)
if prev_context:
logger.info(f"Retrieved previous conversation context from database")
context_data["previous_conversation"] = prev_context

# Populate state with previous conversation summary and topics
return {
"messages": [new_message],
"context": {**state.context, **context_data},
"conversation_summary": prev_context.get("conversation_summary"),
"key_topics": prev_context.get("key_topics", []),
"current_task": "context_gathered",
"last_interaction_time": datetime.now()
}
else:
logger.info(f"No previous conversation context found in database")
else:
if not should_fetch_from_db:
logger.info(
f"Using existing context from state (conversation_summary: {bool(state.conversation_summary)}, key_topics: {len(state.key_topics)})")

updated_context = {**state.context, **context_data}

return {
Expand Down
45 changes: 45 additions & 0 deletions backend/app/agents/devrel/nodes/generate_response.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import json
from typing import Dict, Any
from datetime import datetime
from app.agents.state import AgentState
from langchain_core.messages import HumanMessage
from ..prompts.response_prompt import RESPONSE_PROMPT
from app.database.supabase.services import store_interaction

logger = logging.getLogger(__name__)

Expand All @@ -16,6 +18,9 @@ async def generate_response_node(state: AgentState, llm) -> Dict[str, Any]:
try:
final_response = await _create_response(state, llm)

# Store interaction to database
await _store_interaction_to_db(state, final_response)

return {
"final_response": final_response,
"current_task": "response_generated"
Expand Down Expand Up @@ -99,3 +104,43 @@ def _get_latest_message(state: AgentState) -> str:
if state.messages:
return state.messages[-1].get("content", "")
return state.context.get("original_message", "")

async def _store_interaction_to_db(state: AgentState, final_response: str) -> None:
"""Store the interaction to database"""
try:
user_uuid = state.context.get("user_uuid")

if not user_uuid:
logger.warning(f"No user_uuid in context, skipping interaction storage for session {state.session_id}")
return

# Get the latest user message content
latest_message = _get_latest_message(state)

# Extract classification data
classification = state.context.get("classification", {})
# TODO: intent key not present in classification schema (contains: needs_devrel, priority, reasoning)
# Modify prompt to include intent key
intent = classification.get("reasoning") # Fallback to reasoning for intent

# Store the interaction
await store_interaction(
user_uuid=user_uuid,
platform=state.platform,
platform_specific_id=f"{state.session_id}_{datetime.now().timestamp()}",
channel_id=state.channel_id,
thread_id=state.thread_id,
content=latest_message,
interaction_type="message",
intent_classification=intent,
topics_discussed=state.key_topics if state.key_topics else None,
metadata={
"session_id": state.session_id,
"response": final_response[:500] if final_response else None,
"tools_used": state.tools_used,
"classification": classification
}
)

except Exception as e:
logger.error(f"Error storing interaction to database: {str(e)}")
24 changes: 14 additions & 10 deletions backend/app/agents/devrel/nodes/summarization.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,22 @@ async def store_summary_to_database(state: AgentState) -> None:
logger.error(f"Missing required fields: user_id={state.user_id}, platform={state.platform}")
return

platform_id = state.user_id
platform_column = f"{state.platform}_id"
user_uuid = state.context.get("user_uuid")

# Fetch the user's UUID from the 'users' table
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
if not user_uuid:
platform_id = state.user_id
platform_column = f"{state.platform}_id"

if not user_response.data:
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
return
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()

if not user_response.data:
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
return

user_uuid = user_response.data[0]['id']
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
user_uuid = user_response.data[0]['id']
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
else:
logger.info(f"Using cached user UUID from context: {user_uuid}")

# Record to insert/update
record = {
Expand All @@ -178,4 +182,4 @@ async def store_summary_to_database(state: AgentState) -> None:
logger.error(f"❌ Supabase upsert failed for session {state.session_id}: {response}")

except Exception as e:
logger.error(f"Unexpected error while storing summary: {str(e)}")
logger.error(f"Unexpected error while storing summary: {str(e)}")
25 changes: 24 additions & 1 deletion backend/app/database/supabase/scripts/create_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,27 @@ $$ language 'plpgsql';
CREATE TRIGGER update_conversation_context_updated_at
BEFORE UPDATE ON conversation_context
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
EXECUTE FUNCTION update_updated_at_column();

-- Migration: Add atomic increment function for user interaction count
-- This function safely increments the total_interactions_count for a user

CREATE OR REPLACE FUNCTION increment_user_interaction_count(user_uuid UUID)
RETURNS INTEGER AS $$
DECLARE
new_count INTEGER;
BEGIN
-- Atomically increment the counter and return the new value
UPDATE users
SET total_interactions_count = total_interactions_count + 1
WHERE id = user_uuid
RETURNING total_interactions_count INTO new_count;

-- Return the new count (NULL if user not found)
RETURN new_count;
END;
$$ LANGUAGE plpgsql;

-- Optional: Add a comment for documentation
COMMENT ON FUNCTION increment_user_interaction_count(UUID) IS
'Atomically increments the total_interactions_count for a user. Returns the new count or NULL if user not found.';
189 changes: 189 additions & 0 deletions backend/app/database/supabase/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import logging
from typing import Dict, Any, Optional
from datetime import datetime
import uuid
from app.database.supabase.client import get_supabase_client

logger = logging.getLogger(__name__)
supabase = get_supabase_client()


async def ensure_user_exists(
user_id: str,
platform: str,
username: Optional[str] = None,
display_name: Optional[str] = None,
avatar_url: Optional[str] = None
) -> Optional[str]:
"""
Ensure a user exists in the database. If not, create them.
Returns the user's UUID, or None if an error occurs.

Args:
user_id: Platform-specific user ID (e.g., discord_id, slack_id)
platform: Platform name (discord, slack, github)
username: Platform username
display_name: Display name for the user
avatar_url: Avatar URL

Returns:
User UUID as string, or None on error
"""
try:
platform_id_column = f"{platform}_id"
platform_username_column = f"{platform}_username"

# Check if user exists
response = await supabase.table("users").select("id").eq(platform_id_column, user_id).limit(1).execute()

if response.data:
user_uuid = response.data[0]['id']
logger.info(f"User found: {user_uuid} for {platform_id_column}: {user_id}")

# Update last_active timestamp
last_active_column = f"last_active_{platform}"
await supabase.table("users").update({
last_active_column: datetime.now().isoformat()
}).eq("id", user_uuid).execute()

return user_uuid

# User doesn't exist, create new user
logger.info(f"Creating new user for {platform_id_column}: {user_id}")

new_user = {
"id": str(uuid.uuid4()),
platform_id_column: user_id,
"display_name": display_name or username or f"{platform}_user_{user_id[:8]}",
}

if username:
new_user[platform_username_column] = username
if avatar_url:
new_user["avatar_url"] = avatar_url

# Set last_active timestamp
last_active_column = f"last_active_{platform}"
new_user[last_active_column] = datetime.now().isoformat()

insert_response = await supabase.table("users").insert(new_user).execute()

if insert_response.data:
user_uuid = insert_response.data[0]['id']
logger.info(f"User created successfully: {user_uuid}")
return user_uuid
else:
logger.error(f"Failed to create user: {insert_response}")
return None

except Exception as e:
logger.error(f"Error ensuring user exists: {str(e)}")
return None


async def store_interaction(
user_uuid: str,
platform: str,
platform_specific_id: str,
channel_id: Optional[str] = None,
thread_id: Optional[str] = None,
content: Optional[str] = None,
interaction_type: Optional[str] = None,
intent_classification: Optional[str] = None,
topics_discussed: Optional[list] = None,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Store an interaction in the database.

Args:
user_uuid: User's UUID from users table
platform: Platform name (discord, slack, github)
platform_specific_id: Platform-specific message/interaction ID
channel_id: Channel ID where interaction occurred
thread_id: Thread ID where interaction occurred
content: Content of the interaction
interaction_type: Type of interaction (message, comment, pr, etc.)
intent_classification: Classification of user intent
topics_discussed: List of topics discussed
metadata: Additional metadata

Returns:
True if successful, False otherwise
"""
try:
interaction_data = {
"id": str(uuid.uuid4()),
"user_id": user_uuid,
"platform": platform,
"platform_specific_id": platform_specific_id,
}

if channel_id:
interaction_data["channel_id"] = channel_id
if thread_id:
interaction_data["thread_id"] = thread_id
if content:
interaction_data["content"] = content
if interaction_type:
interaction_data["interaction_type"] = interaction_type
if intent_classification:
interaction_data["intent_classification"] = intent_classification
if topics_discussed:
interaction_data["topics_discussed"] = topics_discussed
if metadata:
interaction_data["metadata"] = metadata

response = await supabase.table("interactions").insert(interaction_data).execute()

if response.data:
logger.info(f"Interaction stored successfully for user {user_uuid}")

# Atomically increment user's total_interactions_count
try:
rpc_response = await supabase.rpc("increment_user_interaction_count", {"user_uuid": user_uuid}).execute()
if rpc_response.data is not None:
logger.debug(f"Updated interaction count for user {user_uuid}: {rpc_response.data}")
else:
logger.warning(f"User {user_uuid} not found when incrementing interaction count")
except Exception as e:
logger.exception("Error incrementing user interaction count")

# Not failing the entire operation if incrementing the interaction count fails
return True

except Exception as e:
logger.error(f"Error storing interaction: {str(e)}")
return False


async def get_conversation_context(user_uuid: str) -> Optional[Dict[str, Any]]:
"""
Retrieve conversation context for a user.

Args:
user_uuid: User's UUID from users table

Returns:
Dictionary containing conversation context, or None if not found
"""
try:
response = await supabase.table("conversation_context").select("*").eq("user_id", user_uuid).limit(1).execute()

if response.data:
context = response.data[0]
logger.info(f"Retrieved conversation context for user {user_uuid}")
return {
"conversation_summary": context.get("conversation_summary"),
"key_topics": context.get("key_topics", []),
"total_interactions": context.get("total_interactions", 0),
"session_start_time": context.get("session_start_time"),
"session_end_time": context.get("session_end_time"),
}
else:
logger.info(f"No conversation context found for user {user_uuid}")
return None

except Exception as e:
logger.error(f"Error retrieving conversation context: {str(e)}")
return None
Loading