33
44Auth: Firebase ID token in Authorization header (Bearer <token>) during WS upgrade.
55Flow: validate token → fetch VM from Firestore → connect to VM WS → bidirectional pump.
6+ History: fetches last 10 agent messages from Firestore and prepends to prompt.
67"""
78
89import asyncio
10+ import json
911import logging
1012import os
13+ import uuid
14+ from datetime import datetime , timezone
1115
1216import firebase_admin
1317import websockets
1418from fastapi import FastAPI , WebSocket , WebSocketDisconnect
1519from firebase_admin import auth , credentials , firestore
20+ from google .cloud .firestore_v1 import Query
1621
1722logging .basicConfig (level = logging .INFO )
1823logger = logging .getLogger (__name__ )
2833
2934app = FastAPI ()
3035
36+ AGENT_PLUGIN_ID = '__agent__'
37+ HISTORY_LIMIT = 10
38+
3139
3240@app .get ("/health" )
3341def health ():
@@ -41,6 +49,59 @@ def _get_agent_vm(uid: str) -> dict | None:
4149 return None
4250
4351
52+ def _fetch_chat_history (uid : str ) -> list :
53+ """Fetch last N agent messages from Firestore, returned oldest-first."""
54+ messages_ref = (
55+ db .collection ('users' )
56+ .document (uid )
57+ .collection ('messages' )
58+ .where ('plugin_id' , '==' , AGENT_PLUGIN_ID )
59+ .order_by ('created_at' , direction = Query .DESCENDING )
60+ .limit (HISTORY_LIMIT )
61+ )
62+ messages = []
63+ for doc in messages_ref .stream ():
64+ data = doc .to_dict ()
65+ messages .append (
66+ {
67+ 'sender' : data .get ('sender' , '' ),
68+ 'text' : data .get ('text' , '' ),
69+ }
70+ )
71+ return list (reversed (messages ))
72+
73+
74+ def _save_message (uid : str , text : str , sender : str ):
75+ """Save a message to Firestore under the agent plugin_id."""
76+ msg_data = {
77+ 'id' : str (uuid .uuid4 ()),
78+ 'text' : text ,
79+ 'created_at' : datetime .now (timezone .utc ),
80+ 'sender' : sender ,
81+ 'plugin_id' : AGENT_PLUGIN_ID ,
82+ 'type' : 'text' ,
83+ 'from_external_integration' : False ,
84+ 'memories_id' : [],
85+ 'files_id' : [],
86+ }
87+ db .collection ('users' ).document (uid ).collection ('messages' ).add (msg_data )
88+
89+
90+ def _build_prompt_with_history (prompt : str , history : list ) -> str :
91+ """Prepend conversation history to the current prompt."""
92+ if not history :
93+ return prompt
94+
95+ lines = ["<conversation_history>" ]
96+ for msg in history :
97+ role = "Human" if msg ['sender' ] == 'human' else "Assistant"
98+ lines .append (f"{ role } : { msg ['text' ]} " )
99+ lines .append ("</conversation_history>" )
100+ lines .append ("" )
101+ lines .append (prompt )
102+ return "\n " .join (lines )
103+
104+
44105@app .websocket ("/v1/agent/ws" )
45106async def agent_ws (websocket : WebSocket ):
46107 # Validate Firebase token from Authorization header
@@ -76,17 +137,50 @@ async def agent_ws(websocket: WebSocket):
76137 async def phone_to_vm ():
77138 try :
78139 async for msg in websocket .iter_text ():
140+ try :
141+ data = json .loads (msg )
142+ if data .get ('type' ) == 'query' :
143+ prompt = data .get ('prompt' , '' )
144+ # Fetch history before saving new message
145+ history = await asyncio .to_thread (_fetch_chat_history , uid )
146+ data ['prompt' ] = _build_prompt_with_history (prompt , history )
147+ msg = json .dumps (data )
148+ # Save user message
149+ await asyncio .to_thread (_save_message , uid , prompt , 'human' )
150+ logger .info (f"[agent-proxy] uid={ uid } query with { len (history )} history messages" )
151+ except (json .JSONDecodeError , Exception ) as e :
152+ logger .warning (f"[agent-proxy] failed to process message: { e } " )
79153 await vm_ws .send (msg )
80154 except (WebSocketDisconnect , Exception ):
81155 pass
82156
83157 async def vm_to_phone ():
158+ response_text = ''
84159 try :
85160 async for msg in vm_ws :
86161 text = msg if isinstance (msg , str ) else msg .decode ()
87162 await websocket .send_text (text )
163+ # Collect response for saving
164+ try :
165+ event = json .loads (text )
166+ evt_type = event .get ('type' )
167+ evt_text = event .get ('text' , '' ) or event .get ('content' , '' ) or ''
168+ if evt_type == 'text_delta' :
169+ response_text += evt_text
170+ elif evt_type == 'result' and evt_text and not response_text :
171+ response_text = evt_text
172+ except json .JSONDecodeError :
173+ pass
88174 except Exception :
89175 pass
176+ finally :
177+ # Save AI response
178+ if response_text .strip ():
179+ try :
180+ await asyncio .to_thread (_save_message , uid , response_text .strip (), 'ai' )
181+ logger .info (f"[agent-proxy] uid={ uid } saved AI response ({ len (response_text )} chars)" )
182+ except Exception as e :
183+ logger .warning (f"[agent-proxy] failed to save AI response: { e } " )
90184
91185 t1 = asyncio .create_task (phone_to_vm ())
92186 t2 = asyncio .create_task (vm_to_phone ())
0 commit comments