Skip to content

Commit 7742764

Browse files
authored
fix: stream data export to prevent 504 timeouts (BasedHardware#5117)
## Summary - Converts `/v1/users/export` endpoint from sync to async `StreamingResponse`, fetching conversations in batches of 200 using `get_conversations_without_photos` (no photo loading) and streaming JSON incrementally — bypasses the 120s TimeoutMiddleware - Updates the Next.js API proxy to pass `Content-Disposition` responses through as a `ReadableStream` instead of buffering with `response.json()` - Changes frontend `exportAllData()` to use `response.blob()` instead of parsing the full JSON into memory - Adds an export-in-progress banner ("Exporting your data... Please don't close this tab") visible at the top of the settings page during export ## Test plan - [x] Export data for a user with 1400+ conversations — should complete without 504 - [x] Verify the downloaded `omi-export.json` is valid JSON with profile, conversations, memories, people, action_items, chat_messages - [ ] Verify export for a user with 0 conversations returns valid JSON with empty arrays - [x] Verify the banner appears during export and disappears on completion - [ ] Verify error toast appears if export fails 🤖 Generated with [Claude Code](https://claude.com/claude-code)
2 parents f0c22a9 + 11db5e4 commit 7742764

File tree

6 files changed

+140
-39
lines changed

6 files changed

+140
-39
lines changed

backend/database/chat.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,25 @@ def get_messages(
240240
return messages
241241

242242

243+
def iter_all_messages(uid: str, batch_size: int = 1000):
244+
"""Yield all chat messages for a user, decrypted, in batches. Used for streaming data export."""
245+
user_ref = db.collection('users').document(uid)
246+
msgs_ref = user_ref.collection('messages').order_by('created_at', direction=firestore.Query.DESCENDING)
247+
offset = 0
248+
while True:
249+
batch_ref = msgs_ref.limit(batch_size).offset(offset)
250+
batch = []
251+
for doc in batch_ref.stream():
252+
msg = doc.to_dict()
253+
msg['id'] = doc.id
254+
msg = _prepare_message_for_read(msg, uid) or msg
255+
batch.append(msg)
256+
yield from batch
257+
if len(batch) < batch_size:
258+
break
259+
offset += batch_size
260+
261+
243262
def get_message(uid: str, message_id: str) -> tuple[Message, str] | None:
244263
user_ref = db.collection('users').document(uid)
245264
message_ref = user_ref.collection('messages').where('id', '==', message_id).limit(1).stream()

backend/database/conversations.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,26 @@ def get_conversations_without_photos(
259259
return conversations
260260

261261

262+
def iter_all_conversations(uid: str, batch_size: int = 400, include_discarded: bool = True):
263+
"""Yield all conversations for a user, decrypted, in batches. Used for streaming data export."""
264+
conversations_ref = db.collection('users').document(uid).collection(conversations_collection)
265+
if not include_discarded:
266+
conversations_ref = conversations_ref.where(filter=FieldFilter('discarded', '==', False))
267+
conversations_ref = conversations_ref.order_by('created_at', direction=firestore.Query.DESCENDING)
268+
offset = 0
269+
while True:
270+
batch_ref = conversations_ref.limit(batch_size).offset(offset)
271+
batch = []
272+
for doc in batch_ref.stream():
273+
conv = doc.to_dict()
274+
conv = _prepare_conversation_for_read(conv, uid) or conv
275+
batch.append(conv)
276+
yield from batch
277+
if len(batch) < batch_size:
278+
break
279+
offset += batch_size
280+
281+
262282
def update_conversation(uid: str, conversation_id: str, update_data: dict):
263283
doc_ref = db.collection('users').document(uid).collection(conversations_collection).document(conversation_id)
264284
doc_snapshot = doc_ref.get()

backend/routers/users.py

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import threading
23
import uuid
34
from typing import List, Dict, Any, Union, Optional
@@ -6,6 +7,7 @@
67

78
import pytz
89
from fastapi import APIRouter, Depends, HTTPException, Query
10+
from fastapi.responses import StreamingResponse
911
from pydantic import BaseModel
1012

1113
from database import (
@@ -1137,38 +1139,54 @@ def get_llm_top_features(
11371139
return llm_usage_db.get_top_features(uid, days=days, limit=limit)
11381140

11391141

1140-
@router.get('/v1/users/export', tags=['v1'])
1141-
def export_all_user_data(uid: str = Depends(auth.get_current_user_uid)):
1142-
"""Export all user data for GDPR/CCPA compliance."""
1143-
profile = get_user_profile(uid)
1142+
def _json_default(obj):
1143+
if isinstance(obj, datetime):
1144+
return obj.isoformat()
1145+
raise TypeError(f"Type {type(obj)} not serializable")
11441146

1145-
conversations = conversations_db.get_conversations(uid, limit=10000, offset=0, include_discarded=True)
11461147

1147-
memories_list = memories_db.get_memories(uid, limit=10000, offset=0)
1148-
1149-
people = get_people(uid)
1150-
1151-
action_items = get_standalone_action_items(uid, limit=10000, offset=0)
1152-
1153-
# Get chat messages (all, no app_id filter)
1154-
chat_messages = []
1155-
try:
1156-
user_ref = chat_db.db.collection('users').document(uid)
1157-
msgs_ref = user_ref.collection('messages').order_by(
1158-
'created_at', direction=cloud_firestore.Query.DESCENDING
1159-
).limit(10000)
1160-
for doc in msgs_ref.stream():
1161-
msg = doc.to_dict()
1162-
msg['id'] = doc.id
1163-
chat_messages.append(msg)
1164-
except Exception as e:
1165-
logger.warning(f'export_all_user_data: failed to fetch chat messages: {e}')
1166-
1167-
return {
1168-
'profile': profile if profile else {},
1169-
'conversations': conversations,
1170-
'memories': memories_list,
1171-
'people': people,
1172-
'action_items': action_items,
1173-
'chat_messages': chat_messages,
1174-
}
1148+
@router.get('/v1/users/export', tags=['v1'])
1149+
async def export_all_user_data(uid: str = Depends(auth.get_current_user_uid)):
1150+
"""Export all user data for GDPR/CCPA compliance. Streams response to avoid timeouts."""
1151+
1152+
def generate():
1153+
profile = get_user_profile(uid)
1154+
memories_list = memories_db.get_memories(uid, limit=10000, offset=0)
1155+
people = get_people(uid)
1156+
action_items = get_standalone_action_items(uid, limit=10000, offset=0)
1157+
1158+
# Stream pretty-printed JSON, yielding conversations and messages one at a time
1159+
yield '{\n'
1160+
yield ' "profile": ' + json.dumps(profile if profile else {}, default=_json_default, indent=2) + ',\n'
1161+
1162+
# Stream conversations via generator (batched internally, never all in memory)
1163+
yield ' "conversations": [\n'
1164+
first = True
1165+
for conv in conversations_db.iter_all_conversations(uid, include_discarded=True):
1166+
if not first:
1167+
yield ',\n'
1168+
first = False
1169+
yield ' ' + json.dumps(conv, default=_json_default, indent=4)
1170+
yield '\n ],\n'
1171+
1172+
yield ' "memories": ' + json.dumps(memories_list, default=_json_default, indent=2) + ',\n'
1173+
yield ' "people": ' + json.dumps(people, default=_json_default, indent=2) + ',\n'
1174+
yield ' "action_items": ' + json.dumps(action_items, default=_json_default, indent=2) + ',\n'
1175+
1176+
# Stream chat messages via generator (batched internally, never all in memory)
1177+
yield ' "chat_messages": [\n'
1178+
first = True
1179+
for msg in chat_db.iter_all_messages(uid):
1180+
if not first:
1181+
yield ',\n'
1182+
first = False
1183+
yield ' ' + json.dumps(msg, default=_json_default, indent=4)
1184+
yield '\n ]\n'
1185+
1186+
yield '}\n'
1187+
1188+
return StreamingResponse(
1189+
generate(),
1190+
media_type='application/json',
1191+
headers={'Content-Disposition': 'attachment; filename="omi-export.json"'},
1192+
)

web/app/src/app/api/proxy/[...path]/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ async function handleRequest(
117117
});
118118
}
119119

120+
// Handle download/streaming responses (e.g., data export) — pass body through without buffering
121+
const contentDisposition = response.headers.get('content-disposition');
122+
if (contentDisposition) {
123+
return new NextResponse(response.body, {
124+
status: response.status,
125+
headers: {
126+
'Content-Type': responseContentType || 'application/octet-stream',
127+
'Content-Disposition': contentDisposition,
128+
},
129+
});
130+
}
131+
120132
// Handle JSON responses
121133
if (responseContentType?.includes('application/json')) {
122134
const data = await response.json();

web/app/src/components/settings/SettingsPage.tsx

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2848,9 +2848,7 @@ export function SettingsPage() {
28482848
if (isExporting) return;
28492849
setIsExporting(true);
28502850
try {
2851-
const data = await exportAllData();
2852-
const json = JSON.stringify(data, null, 2);
2853-
const blob = new Blob([json], { type: 'application/json' });
2851+
const blob = await exportAllData();
28542852
const url = URL.createObjectURL(blob);
28552853
const a = document.createElement('a');
28562854
a.href = url;
@@ -3029,6 +3027,30 @@ export function SettingsPage() {
30293027

30303028
return (
30313029
<div className="h-full flex flex-col">
3030+
{/* Export in-progress dialog */}
3031+
{isExporting && (
3032+
<div className="fixed inset-0 z-50 flex items-center justify-center">
3033+
<div className="absolute inset-0 bg-black/60" />
3034+
<div className="relative bg-bg-secondary rounded-2xl p-6 max-w-sm w-full mx-4 shadow-2xl border border-white/[0.06]">
3035+
<div className="flex flex-col items-center text-center gap-4">
3036+
<div className="p-3 rounded-full bg-purple-500/10">
3037+
<Loader2 className="w-8 h-8 text-purple-400 animate-spin" />
3038+
</div>
3039+
<div>
3040+
<h3 className="text-lg font-semibold text-text-primary">Exporting Your Data</h3>
3041+
<p className="text-text-secondary mt-2 text-sm">
3042+
This may take a moment depending on the amount of data in your account.
3043+
</p>
3044+
</div>
3045+
<div className="flex items-center gap-2 bg-yellow-500/10 rounded-xl px-4 py-2">
3046+
<AlertTriangle className="w-4 h-4 text-yellow-400 flex-shrink-0" />
3047+
<span className="text-xs text-yellow-400">Please don&apos;t close this tab</span>
3048+
</div>
3049+
</div>
3050+
</div>
3051+
</div>
3052+
)}
3053+
30323054
{/* Page Header */}
30333055
<PageHeader title={sectionInfo.title} icon={Settings} showBackButton />
30343056

web/app/src/lib/api.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,10 +1741,20 @@ export async function deleteMcpApiKey(keyId: string): Promise<void> {
17411741
// ============================================================================
17421742

17431743
/**
1744-
* Export all conversations as JSON
1744+
* Export all user data as a downloadable JSON blob (streamed from backend).
17451745
*/
1746-
export async function exportAllData(): Promise<Record<string, unknown>> {
1747-
return fetchWithAuth<Record<string, unknown>>('/v1/users/export');
1746+
export async function exportAllData(): Promise<Blob> {
1747+
const token = await getIdToken();
1748+
if (!token) {
1749+
throw new Error('Not authenticated');
1750+
}
1751+
const response = await fetch(`${API_BASE_URL}/v1/users/export`, {
1752+
headers: { Authorization: `Bearer ${token}` },
1753+
});
1754+
if (!response.ok) {
1755+
throw new Error(`Export failed: ${response.status} ${response.statusText}`);
1756+
}
1757+
return response.blob();
17481758
}
17491759

17501760
/**

0 commit comments

Comments
 (0)