Skip to content

Commit b46f5c6

Browse files
authored
Conversations migrate firebase and typesense collection (#2150)
part of #1973 - [x] Use conversations collection for typesense - [x] Use conversations collection for db - [x] Add backwards compatibility for plugins_results (replaced by apps_results in conversations collection) - [x] Test with old endpoints - [x] Update models in app - [x] Deprecate unused processing memories Deployment Plan: - Deploy backend Typesense <img width="656" alt="Screenshot 2025-04-04 at 6 08 44 PM" src="https://github.com/user-attachments/assets/77062a72-56cd-4b4f-b552-d2ed550ea4ac" /> Firestore conversations <img width="540" alt="Screenshot 2025-04-05 at 1 12 33 AM" src="https://github.com/user-attachments/assets/8d9fc0bc-639b-49cd-8c37-874a4354accc" />
2 parents 85ea82b + 61b43d4 commit b46f5c6

File tree

15 files changed

+180
-249
lines changed

15 files changed

+180
-249
lines changed

app/lib/backend/schema/conversation.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class ServerConversation {
132132
.map((segment) => TranscriptSegment.fromJson(segment))
133133
.toList(),
134134
appResults:
135-
((json['plugins_results'] ?? []) as List<dynamic>).map((result) => AppResponse.fromJson(result)).toList(),
135+
((json['apps_results'] ?? []) as List<dynamic>).map((result) => AppResponse.fromJson(result)).toList(),
136136
geolocation: json['geolocation'] != null ? Geolocation.fromJson(json['geolocation']) : null,
137137
photos: json['photos'] != null
138138
? ((json['photos'] ?? []) as List<dynamic>).map((photo) => ConversationPhoto.fromJson(photo)).toList()

app/lib/backend/schema/structured.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,10 @@ class AppResponse {
116116

117117
AppResponse(this.content, {this.id = 0, this.appId});
118118

119-
toJson() => {'pluginId': appId, 'content': content};
119+
toJson() => {'appId': appId, 'content': content};
120120

121121
factory AppResponse.fromJson(Map<String, dynamic> json) {
122-
return AppResponse(json['content'], appId: json['pluginId'] ?? json['plugin_id']);
122+
return AppResponse(json['content'], appId: json['appId'] ?? json['app_id']);
123123
}
124124
}
125125

backend/database/chat.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def get_plugin_messages(uid: str, plugin_id: str, limit: int = 20, offset: int =
7676

7777
# Fetch all conversations at once
7878
conversations = {}
79-
conversations_ref = user_ref.collection('memories')
79+
conversations_ref = user_ref.collection('conversations')
8080
doc_refs = [conversations_ref.document(str(conversation_id)) for conversation_id in conversations_id]
8181
docs = db.get_all(doc_refs)
8282
for doc in docs:
@@ -129,7 +129,7 @@ def get_messages(
129129

130130
# Fetch all conversations at once
131131
conversations = {}
132-
conversations_ref = user_ref.collection('memories')
132+
conversations_ref = user_ref.collection('conversations')
133133
doc_refs = [conversations_ref.document(str(conversation_id)) for conversation_id in conversations_id]
134134
docs = db.get_all(doc_refs)
135135
for doc in docs:

backend/database/conversations.py

Lines changed: 27 additions & 118 deletions
Large diffs are not rendered by default.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# DEPRECATED: This file has been deprecated long ago
2+
#
3+
# This file is deprecated and should be removed. The code is not used anymore and is not referenced in any other file.
4+
# The only files that references this file are routers/processing_memories.py and utils/processing_conversations.py, which are also deprecated.
5+
6+
from datetime import datetime
7+
from typing import List
8+
9+
from google.cloud import firestore
10+
from google.cloud.firestore_v1 import FieldFilter
11+
12+
from ._client import db
13+
14+
15+
def upsert_processing_conversation(uid: str, processing_conversation_data: dict):
16+
user_ref = db.collection('users').document(uid)
17+
processing_conversation_ref = user_ref.collection('processing_memories').document(processing_conversation_data['id'])
18+
processing_conversation_ref.set(processing_conversation_data)
19+
20+
21+
def update_processing_conversation(uid: str, processing_conversation_id: str, memoy_data: dict):
22+
user_ref = db.collection('users').document(uid)
23+
processing_conversation_ref = user_ref.collection('processing_memories').document(processing_conversation_id)
24+
processing_conversation_ref.update(memoy_data)
25+
26+
27+
def delete_processing_conversation(uid, processing_conversation_id):
28+
user_ref = db.collection('users').document(uid)
29+
processing_conversation_ref = user_ref.collection('processing_memories').document(processing_conversation_id)
30+
processing_conversation_ref.update({'deleted': True})
31+
32+
33+
def get_processing_conversations_by_id(uid, processing_conversation_ids):
34+
user_ref = db.collection('users').document(uid)
35+
conversations_ref = user_ref.collection('processing_memories')
36+
37+
doc_refs = [conversations_ref.document(str(processing_conversation_id)) for processing_conversation_id in processing_conversation_ids]
38+
docs = db.get_all(doc_refs)
39+
40+
conversations = []
41+
for doc in docs:
42+
if doc.exists:
43+
conversations.append(doc.to_dict())
44+
return conversations
45+
46+
47+
def get_processing_conversation_by_id(uid, processing_conversation_id):
48+
conversation_ref = db.collection('users').document(uid).collection('processing_memories').document(processing_conversation_id)
49+
return conversation_ref.get().to_dict()
50+
51+
52+
def get_processing_conversations(uid: str, statuses: [str] = [], filter_ids: [str] = [], limit: int = 5):
53+
processing_conversations_ref = (
54+
db.collection('users').document(uid).collection('processing_memories')
55+
)
56+
if len(statuses) > 0:
57+
processing_conversations_ref = processing_conversations_ref.where(filter=FieldFilter('status', 'in', statuses))
58+
if len(filter_ids) > 0:
59+
processing_conversations_ref = processing_conversations_ref.where(filter=FieldFilter('id', 'in', filter_ids))
60+
processing_conversations_ref = processing_conversations_ref.order_by('created_at', direction=firestore.Query.DESCENDING)
61+
processing_conversations_ref = processing_conversations_ref.limit(limit)
62+
return [doc.to_dict() for doc in processing_conversations_ref.stream()]
63+
64+
65+
def update_processing_conversation_segments(uid: str, id: str, segments: List[dict], capturing_to: datetime):
66+
user_ref = db.collection('users').document(uid)
67+
conversation_ref = user_ref.collection('processing_memories').document(id)
68+
conversation_ref.update({
69+
'transcript_segments': segments,
70+
'capturing_to': capturing_to,
71+
})
72+
73+
74+
def update_processing_conversation_status(uid: str, id: str, status: str):
75+
user_ref = db.collection('users').document(uid)
76+
conversation_ref = user_ref.collection('processing_memories').document(id)
77+
conversation_ref.update({
78+
'status': status,
79+
})
80+
81+
82+
def update_audio_url(uid: str, id: str, audio_url: str):
83+
user_ref = db.collection('users').document(uid)
84+
conversation_ref = user_ref.collection('processing_memories').document(id)
85+
conversation_ref.update({
86+
'audio_url': audio_url,
87+
})
88+
89+
90+
def get_last(uid: str):
91+
processing_conversations_ref = (
92+
db.collection('users').document(uid).collection('processing_memories')
93+
)
94+
processing_conversations_ref = processing_conversations_ref.order_by('created_at', direction=firestore.Query.DESCENDING)
95+
processing_conversations_ref = processing_conversations_ref.limit(1)
96+
docs = [doc.to_dict() for doc in processing_conversations_ref.stream()]
97+
if len(docs) > 0:
98+
return docs[0]
99+
return None

backend/database/processing_memories.py

Lines changed: 0 additions & 94 deletions
This file was deleted.

backend/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from modal import Image, App, asgi_app, Secret
88
from routers import workflow, chat, firmware, plugins, memories_deprecated, transcribe, notifications, \
9-
speech_profile, agents, facts_deprecated, users, processing_memories, trends, sdcard, sync, apps, custom_auth, \
9+
speech_profile, agents, facts_deprecated, users, processing_conversations, trends, sdcard, sync, apps, custom_auth, \
1010
payment, integration, conversations, memories
1111

1212
from utils.other.timeout import TimeoutMiddleware
@@ -33,7 +33,7 @@
3333
app.include_router(integration.router)
3434
app.include_router(agents.router)
3535
app.include_router(users.router)
36-
app.include_router(processing_memories.router)
36+
app.include_router(processing_conversations.router)
3737
app.include_router(trends.router)
3838

3939
app.include_router(firmware.router)

backend/models/conversation.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@ class ConversationPhoto(BaseModel):
5454
base64: str
5555
description: str
5656

57-
57+
# TODO: remove this class when the app is updated to use apps_results
5858
class PluginResult(BaseModel):
5959
plugin_id: Optional[str]
6060
content: str
6161

6262

63+
class AppResult(BaseModel):
64+
app_id: Optional[str]
65+
content: str
66+
67+
6368
class ActionItem(BaseModel):
6469
description: str = Field(description="The action item to be completed")
6570
completed: bool = False # IGNORE ME from the model parser
@@ -164,14 +169,17 @@ class Conversation(BaseModel):
164169
started_at: Optional[datetime]
165170
finished_at: Optional[datetime]
166171

167-
source: Optional[ConversationSource] = ConversationSource.omi # TODO: once released migrate db to include this field
172+
source: Optional[ConversationSource] = ConversationSource.omi
168173
language: Optional[str] = None # applies only to Friend # TODO: once released migrate db to default 'en'
169174

170175
structured: Structured
171176
transcript_segments: List[TranscriptSegment] = []
172177
geolocation: Optional[Geolocation] = None
173178
photos: List[ConversationPhoto] = []
174179

180+
apps_results: List[AppResult] = []
181+
182+
# TODO: plugins_results for backward compatibility with the old memories routes and app
175183
plugins_results: List[PluginResult] = []
176184

177185
external_data: Optional[Dict] = None
@@ -181,10 +189,19 @@ class Conversation(BaseModel):
181189
deleted: bool = False
182190
visibility: ConversationVisibility = ConversationVisibility.private
183191

184-
# processing_memory_id should be migrated to processing_conversation_id once the memories routes are deprecated
192+
# TODO: processing_memory_id for backward compatibility with the old memories routes and app
185193
processing_memory_id: Optional[str] = None
194+
195+
processing_conversation_id: Optional[str] = None
196+
186197
status: Optional[ConversationStatus] = ConversationStatus.completed
187198

199+
def __init__(self, **data):
200+
super().__init__(**data)
201+
# Update plugins_results based on apps_results
202+
self.plugins_results = [PluginResult(plugin_id=app.app_id, content=app.content) for app in self.apps_results]
203+
self.processing_memory_id = self.processing_conversation_id
204+
188205
@staticmethod
189206
def conversations_to_string(conversations: List['Conversation'], use_transcript: bool = False) -> str:
190207
result = []

backend/routers/processing_memories.py renamed to backend/routers/processing_conversations.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ def list_processing_conversation(uid: str = Depends(auth.get_current_user_uid),
7777
:param uid: user id.
7878
:return: The list of processing_memories.
7979
"""
80-
processing_memories = processing_conversation_utils.get_processing_memories(
80+
processing_conversations = processing_conversation_utils.get_processing_memories(
8181
uid, filter_ids=filter_ids.split(",") if filter_ids else [], limit=5
8282
)
83-
if not processing_memories or len(processing_memories) == 0:
83+
if not processing_conversations or len(processing_conversations) == 0:
8484
return DetailProcessingConversationsResponse(result=[])
8585

86-
return DetailProcessingConversationsResponse(result=list(processing_memories))
86+
return DetailProcessingConversationsResponse(result=list(processing_conversations))

backend/routers/transcribe.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,16 +195,16 @@ async def _create_conversation(conversation: dict):
195195

196196
_send_message_event(ConversationEvent(event_type="memory_created", memory=conversation, messages=messages))
197197

198-
async def finalize_processing_memories(processing: List[dict]):
198+
async def finalize_processing_conversations(processing: List[dict]):
199199
# handle edge case of conversation was actually processing? maybe later, doesn't hurt really anyway.
200200
# also fix from getMemories endpoint?
201-
print('finalize_processing_memories len(processing):', len(processing), uid)
201+
print('finalize_processing_conversations len(processing):', len(processing), uid)
202202
for conversation in processing:
203203
await _create_conversation(conversation)
204204

205205
# Process processing conversations
206206
processing = conversations_db.get_processing_conversations(uid)
207-
asyncio.create_task(finalize_processing_memories(processing))
207+
asyncio.create_task(finalize_processing_conversations(processing))
208208

209209
# Send last completed conversation to client
210210
async def send_last_conversation():

0 commit comments

Comments
 (0)