Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions backend/app/agents/devrel/nodes/gather_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
from datetime import datetime
from typing import Dict, Any
from app.agents.state import AgentState
from app.database.supabase.services import ensure_user_exists, get_conversation_context

logger = logging.getLogger(__name__)

async def gather_context_node(state: AgentState) -> Dict[str, Any]:
"""Gather additional context for the user and their request"""
logger.info(f"Gathering context for session {state.session_id}")

# TODO: Add context gathering from databases
# Currently, context is simple
# In production, query databases for user history, etc.

original_message = state.context.get("original_message", "")
author_info = state.context.get("author", {})

# Ensure user exists in database
user_uuid = await ensure_user_exists(
user_id=state.user_id,
platform=state.platform,
username=author_info.get("username"),
display_name=author_info.get("display_name"),
avatar_url=author_info.get("avatar_url")
)

new_message = {
"role": "user",
Expand All @@ -24,9 +31,36 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]:
context_data = {
"user_profile": {"user_id": state.user_id, "platform": state.platform},
"conversation_context": len(state.messages) + 1, # +1 for the new message
"session_info": {"session_id": state.session_id}
"session_info": {"session_id": state.session_id},
"user_uuid": user_uuid
}

# Only retrieve from database if we don't have conversation context already
should_fetch_from_db = not state.conversation_summary and not state.key_topics

if user_uuid and should_fetch_from_db:
logger.info(f"No existing context in state, fetching from database for user {user_uuid}")
prev_context = await get_conversation_context(user_uuid)
if prev_context:
logger.info(f"Retrieved previous conversation context from database")
context_data["previous_conversation"] = prev_context

# Populate state with previous conversation summary and topics
return {
"messages": [new_message],
"context": {**state.context, **context_data},
"conversation_summary": prev_context.get("conversation_summary"),
"key_topics": prev_context.get("key_topics", []),
"current_task": "context_gathered",
"last_interaction_time": datetime.now()
}
else:
logger.info(f"No previous conversation context found in database")
else:
if not should_fetch_from_db:
logger.info(
f"Using existing context from state (conversation_summary: {bool(state.conversation_summary)}, key_topics: {len(state.key_topics)})")

updated_context = {**state.context, **context_data}

return {
Expand Down
43 changes: 43 additions & 0 deletions backend/app/agents/devrel/nodes/generate_response.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import json
from typing import Dict, Any
from datetime import datetime
from app.agents.state import AgentState
from langchain_core.messages import HumanMessage
from ..prompts.response_prompt import RESPONSE_PROMPT
from app.database.supabase.services import store_interaction

logger = logging.getLogger(__name__)

Expand All @@ -16,6 +18,9 @@ async def generate_response_node(state: AgentState, llm) -> Dict[str, Any]:
try:
final_response = await _create_response(state, llm)

# Store interaction to database
await _store_interaction_to_db(state, final_response)

return {
"final_response": final_response,
"current_task": "response_generated"
Expand Down Expand Up @@ -99,3 +104,41 @@ def _get_latest_message(state: AgentState) -> str:
if state.messages:
return state.messages[-1].get("content", "")
return state.context.get("original_message", "")

async def _store_interaction_to_db(state: AgentState, final_response: str) -> None:
"""Store the interaction to database"""
try:
user_uuid = state.context.get("user_uuid")

if not user_uuid:
logger.warning(f"No user_uuid in context, skipping interaction storage for session {state.session_id}")
return

# Get the latest user message content
latest_message = _get_latest_message(state)

# Extract classification data
classification = state.context.get("classification", {})
intent = classification.get("intent")

# Store the interaction
await store_interaction(
user_uuid=user_uuid,
platform=state.platform,
platform_specific_id=f"{state.session_id}_{datetime.now().timestamp()}",
channel_id=state.channel_id,
thread_id=state.thread_id,
content=latest_message,
interaction_type="message",
intent_classification=intent,
topics_discussed=state.key_topics if state.key_topics else None,
metadata={
"session_id": state.session_id,
"response": final_response[:500] if final_response else None,
"tools_used": state.tools_used,
"classification": classification
}
)

except Exception as e:
logger.error(f"Error storing interaction to database: {str(e)}")
24 changes: 14 additions & 10 deletions backend/app/agents/devrel/nodes/summarization.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,22 @@ async def store_summary_to_database(state: AgentState) -> None:
logger.error(f"Missing required fields: user_id={state.user_id}, platform={state.platform}")
return

platform_id = state.user_id
platform_column = f"{state.platform}_id"
user_uuid = state.context.get("user_uuid")

# Fetch the user's UUID from the 'users' table
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()
if not user_uuid:
platform_id = state.user_id
platform_column = f"{state.platform}_id"

if not user_response.data:
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
return
user_response = await supabase.table("users").select("id").eq(platform_column, platform_id).limit(1).execute()

if not user_response.data:
logger.error(f"User with {platform_column} '{platform_id}' not found in users table.")
return

user_uuid = user_response.data[0]['id']
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
user_uuid = user_response.data[0]['id']
logger.info(f"Found user UUID: {user_uuid} for {platform_column}: {platform_id}")
else:
logger.info(f"Using cached user UUID from context: {user_uuid}")

# Record to insert/update
record = {
Expand All @@ -178,4 +182,4 @@ async def store_summary_to_database(state: AgentState) -> None:
logger.error(f"❌ Supabase upsert failed for session {state.session_id}: {response}")

except Exception as e:
logger.error(f"Unexpected error while storing summary: {str(e)}")
logger.error(f"Unexpected error while storing summary: {str(e)}")
190 changes: 190 additions & 0 deletions backend/app/database/supabase/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import logging
from typing import Dict, Any, Optional
from datetime import datetime
import uuid
from app.database.supabase.client import get_supabase_client

logger = logging.getLogger(__name__)
supabase = get_supabase_client()


async def ensure_user_exists(
user_id: str,
platform: str,
username: Optional[str] = None,
display_name: Optional[str] = None,
avatar_url: Optional[str] = None
) -> Optional[str]:
"""
Ensure a user exists in the database. If not, create them.
Returns the user's UUID, or None if an error occurs.

Args:
user_id: Platform-specific user ID (e.g., discord_id, slack_id)
platform: Platform name (discord, slack, github)
username: Platform username
display_name: Display name for the user
avatar_url: Avatar URL

Returns:
User UUID as string, or None on error
"""
try:
platform_id_column = f"{platform}_id"
platform_username_column = f"{platform}_username"

# Check if user exists
response = await supabase.table("users").select("id").eq(platform_id_column, user_id).limit(1).execute()

if response.data:
user_uuid = response.data[0]['id']
logger.info(f"User found: {user_uuid} for {platform_id_column}: {user_id}")

# Update last_active timestamp
last_active_column = f"last_active_{platform}"
await supabase.table("users").update({
last_active_column: datetime.now().isoformat()
}).eq("id", user_uuid).execute()

return user_uuid

# User doesn't exist, create new user
logger.info(f"Creating new user for {platform_id_column}: {user_id}")

new_user = {
"id": str(uuid.uuid4()),
platform_id_column: user_id,
"display_name": display_name or username or f"{platform}_user_{user_id[:8]}",
}

if username:
new_user[platform_username_column] = username
if avatar_url:
new_user["avatar_url"] = avatar_url

# Set last_active timestamp
last_active_column = f"last_active_{platform}"
new_user[last_active_column] = datetime.now().isoformat()

insert_response = await supabase.table("users").insert(new_user).execute()

if insert_response.data:
user_uuid = insert_response.data[0]['id']
logger.info(f"User created successfully: {user_uuid}")
return user_uuid
else:
logger.error(f"Failed to create user: {insert_response}")
return None

except Exception as e:
logger.error(f"Error ensuring user exists: {str(e)}")
return None


async def store_interaction(
user_uuid: str,
platform: str,
platform_specific_id: str,
channel_id: Optional[str] = None,
thread_id: Optional[str] = None,
content: Optional[str] = None,
interaction_type: Optional[str] = None,
intent_classification: Optional[str] = None,
topics_discussed: Optional[list] = None,
metadata: Optional[Dict[str, Any]] = None
) -> bool:
"""
Store an interaction in the database.

Args:
user_uuid: User's UUID from users table
platform: Platform name (discord, slack, github)
platform_specific_id: Platform-specific message/interaction ID
channel_id: Channel ID where interaction occurred
thread_id: Thread ID where interaction occurred
content: Content of the interaction
interaction_type: Type of interaction (message, comment, pr, etc.)
intent_classification: Classification of user intent
topics_discussed: List of topics discussed
metadata: Additional metadata

Returns:
True if successful, False otherwise
"""
try:
interaction_data = {
"id": str(uuid.uuid4()),
"user_id": user_uuid,
"platform": platform,
"platform_specific_id": platform_specific_id,
}

if channel_id:
interaction_data["channel_id"] = channel_id
if thread_id:
interaction_data["thread_id"] = thread_id
if content:
interaction_data["content"] = content
if interaction_type:
interaction_data["interaction_type"] = interaction_type
if intent_classification:
interaction_data["intent_classification"] = intent_classification
if topics_discussed:
interaction_data["topics_discussed"] = topics_discussed
if metadata:
interaction_data["metadata"] = metadata

response = await supabase.table("interactions").insert(interaction_data).execute()

if response.data:
logger.info(f"Interaction stored successfully for user {user_uuid}")

# Increment user's total_interactions_count
# First get the current count
user_response = await supabase.table("users").select("total_interactions_count").eq("id", user_uuid).limit(1).execute()
if user_response.data:
current_count = user_response.data[0].get("total_interactions_count", 0)
await supabase.table("users").update({
"total_interactions_count": current_count + 1
}).eq("id", user_uuid).execute()

return True
else:
logger.error(f"Failed to store interaction: {response}")
return False

except Exception as e:
logger.error(f"Error storing interaction: {str(e)}")
return False


async def get_conversation_context(user_uuid: str) -> Optional[Dict[str, Any]]:
"""
Retrieve conversation context for a user.

Args:
user_uuid: User's UUID from users table

Returns:
Dictionary containing conversation context, or None if not found
"""
try:
response = await supabase.table("conversation_context").select("*").eq("user_id", user_uuid).limit(1).execute()

if response.data:
context = response.data[0]
logger.info(f"Retrieved conversation context for user {user_uuid}")
return {
"conversation_summary": context.get("conversation_summary"),
"key_topics": context.get("key_topics", []),
"total_interactions": context.get("total_interactions", 0),
"session_start_time": context.get("session_start_time"),
"session_end_time": context.get("session_end_time"),
}
else:
logger.info(f"No conversation context found for user {user_uuid}")
return None

except Exception as e:
logger.error(f"Error retrieving conversation context: {str(e)}")
return None
Loading