Skip to content

Commit 91bc344

Browse files
committed
feat: Added celery tasks to populate blocknote_document for existing documents
1 parent e33b42f commit 91bc344

File tree

6 files changed

+231
-12
lines changed

6 files changed

+231
-12
lines changed

surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121

2222
def upgrade() -> None:
23-
"""Upgrade schema - Add BlockNote fields only."""
23+
"""Upgrade schema - Add BlockNote fields and trigger population task."""
2424

25+
# Add the columns
2526
op.add_column(
2627
"documents",
2728
sa.Column(
@@ -42,6 +43,21 @@ def upgrade() -> None:
4243
sa.Column("last_edited_at", sa.TIMESTAMP(timezone=True), nullable=True),
4344
)
4445

46+
# Trigger the Celery task to populate blocknote_document for existing documents
47+
try:
48+
from app.tasks.celery_tasks.blocknote_migration_tasks import (
49+
populate_blocknote_for_documents_task,
50+
)
51+
52+
# Queue the task to run asynchronously
53+
populate_blocknote_for_documents_task.apply_async()
54+
print("✓ Queued Celery task to populate blocknote_document for existing documents")
55+
except Exception as e:
56+
# If Celery is not available or task queueing fails, log but don't fail the migration
57+
print(f"⚠ Warning: Could not queue blocknote population task: {e}")
58+
print(" You can manually trigger it later with:")
59+
print(" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task")
60+
4561

4662
def downgrade() -> None:
4763
"""Downgrade schema - Remove BlockNote fields."""

surfsense_backend/app/celery_app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def parse_schedule_interval(interval: str) -> dict:
6363
"app.tasks.celery_tasks.podcast_tasks",
6464
"app.tasks.celery_tasks.connector_tasks",
6565
"app.tasks.celery_tasks.schedule_checker_task",
66+
"app.tasks.celery_tasks.blocknote_migration_tasks",
6667
],
6768
)
6869

surfsense_backend/app/routes/editor_routes.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ async def get_editor_content(
3030
Get document content for editing.
3131
3232
Returns BlockNote JSON document. If blocknote_document is NULL,
33-
attempts to convert from `content` - though this won't work well
34-
for old documents that only have summaries.
33+
attempts to generate it from chunks (lazy migration).
3534
"""
35+
from sqlalchemy.orm import selectinload
36+
3637
result = await session.execute(
3738
select(Document)
39+
.options(selectinload(Document.chunks))
3840
.join(SearchSpace)
3941
.filter(Document.id == document_id, SearchSpace.user_id == user.id)
4042
)
@@ -54,12 +56,47 @@ async def get_editor_content(
5456
else None,
5557
}
5658

57-
# For old documents without blocknote_document, return error
58-
# (Can't convert summary back to full document)
59-
raise HTTPException(
60-
status_code=400,
61-
detail="This document was uploaded before editing was enabled. Please re-upload to enable editing.",
62-
)
59+
# Lazy migration: Try to generate blocknote_document from chunks
60+
from app.utils.blocknote_converter import convert_markdown_to_blocknote
61+
62+
chunks = sorted(document.chunks, key=lambda c: c.id)
63+
64+
if not chunks:
65+
raise HTTPException(
66+
status_code=400,
67+
detail="This document has no chunks and cannot be edited. Please re-upload to enable editing.",
68+
)
69+
70+
# Reconstruct markdown from chunks
71+
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
72+
73+
if not markdown_content.strip():
74+
raise HTTPException(
75+
status_code=400,
76+
detail="This document has empty content and cannot be edited.",
77+
)
78+
79+
# Convert to BlockNote
80+
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
81+
82+
if not blocknote_json:
83+
raise HTTPException(
84+
status_code=500,
85+
detail="Failed to convert document to editable format. Please try again later.",
86+
)
87+
88+
# Save the generated blocknote_document (lazy migration)
89+
document.blocknote_document = blocknote_json
90+
document.content_needs_reindexing = False
91+
document.last_edited_at = None
92+
await session.commit()
93+
94+
return {
95+
"document_id": document.id,
96+
"title": document.title,
97+
"blocknote_document": blocknote_json,
98+
"last_edited_at": None,
99+
}
63100

64101

65102
@router.put("/documents/{document_id}/blocknote-content")
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Celery tasks for populating blocknote_document for existing documents."""
2+
3+
import logging
4+
from typing import Any
5+
6+
from sqlalchemy import select
7+
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
8+
from sqlalchemy.orm import selectinload
9+
from sqlalchemy.pool import NullPool
10+
11+
from app.celery_app import celery_app
12+
from app.config import config
13+
from app.db import Chunk, Document
14+
from app.utils.blocknote_converter import convert_markdown_to_blocknote
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
def get_celery_session_maker():
20+
"""
21+
Create a new async session maker for Celery tasks.
22+
This is necessary because Celery tasks run in a new event loop,
23+
and the default session maker is bound to the main app's event loop.
24+
"""
25+
engine = create_async_engine(
26+
config.DATABASE_URL,
27+
poolclass=NullPool,
28+
echo=False,
29+
)
30+
return async_sessionmaker(engine, expire_on_commit=False)
31+
32+
33+
@celery_app.task(name="populate_blocknote_for_documents", bind=True)
34+
def populate_blocknote_for_documents_task(
35+
self, document_ids: list[int] | None = None, batch_size: int = 50
36+
):
37+
"""
38+
Celery task to populate blocknote_document for existing documents.
39+
40+
Args:
41+
document_ids: Optional list of specific document IDs to process.
42+
If None, processes all documents with blocknote_document IS NULL.
43+
batch_size: Number of documents to process in each batch (default: 50)
44+
"""
45+
import asyncio
46+
47+
loop = asyncio.new_event_loop()
48+
asyncio.set_event_loop(loop)
49+
50+
try:
51+
loop.run_until_complete(
52+
_populate_blocknote_for_documents(document_ids, batch_size)
53+
)
54+
finally:
55+
loop.close()
56+
57+
58+
async def _populate_blocknote_for_documents(
59+
document_ids: list[int] | None = None, batch_size: int = 50
60+
):
61+
"""
62+
Async function to populate blocknote_document for documents.
63+
64+
Args:
65+
document_ids: Optional list of specific document IDs to process
66+
batch_size: Number of documents to process per batch
67+
"""
68+
async with get_celery_session_maker()() as session:
69+
try:
70+
# Build query for documents that need blocknote_document populated
71+
query = select(Document).where(Document.blocknote_document.is_(None))
72+
73+
# If specific document IDs provided, filter by them
74+
if document_ids:
75+
query = query.where(Document.id.in_(document_ids))
76+
77+
# Load chunks relationship to avoid N+1 queries
78+
query = query.options(selectinload(Document.chunks))
79+
80+
# Execute query
81+
result = await session.execute(query)
82+
documents = result.scalars().all()
83+
84+
total_documents = len(documents)
85+
logger.info(f"Found {total_documents} documents to process")
86+
87+
if total_documents == 0:
88+
logger.info("No documents to process")
89+
return
90+
91+
# Process documents in batches
92+
processed = 0
93+
failed = 0
94+
95+
for i in range(0, total_documents, batch_size):
96+
batch = documents[i : i + batch_size]
97+
logger.info(f"Processing batch {i // batch_size + 1}: documents {i+1}-{min(i+batch_size, total_documents)}")
98+
99+
for document in batch:
100+
try:
101+
# Use preloaded chunks from selectinload - no need to query again
102+
chunks = sorted(document.chunks, key=lambda c: c.id)
103+
104+
if not chunks:
105+
logger.warning(
106+
f"Document {document.id} ({document.title}) has no chunks, skipping"
107+
)
108+
failed += 1
109+
continue
110+
111+
# Reconstruct markdown by concatenating chunk contents
112+
markdown_content = "\n\n".join(chunk.content for chunk in chunks)
113+
114+
if not markdown_content or not markdown_content.strip():
115+
logger.warning(
116+
f"Document {document.id} ({document.title}) has empty markdown content, skipping"
117+
)
118+
failed += 1
119+
continue
120+
121+
# Convert markdown to BlockNote JSON
122+
blocknote_json = await convert_markdown_to_blocknote(markdown_content)
123+
124+
if not blocknote_json:
125+
logger.warning(
126+
f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})"
127+
)
128+
failed += 1
129+
continue
130+
131+
# Update document with blocknote_document (other fields already have correct defaults)
132+
document.blocknote_document = blocknote_json
133+
134+
processed += 1
135+
136+
# Commit every batch_size documents to avoid long transactions
137+
if processed % batch_size == 0:
138+
await session.commit()
139+
logger.info(f"Committed batch: {processed} documents processed so far")
140+
141+
except Exception as e:
142+
logger.error(
143+
f"Error processing document {document.id} ({document.title}): {e}",
144+
exc_info=True,
145+
)
146+
failed += 1
147+
# Continue with next document instead of failing entire batch
148+
continue
149+
150+
# Commit remaining changes in the batch
151+
await session.commit()
152+
logger.info(f"Completed batch {i // batch_size + 1}")
153+
154+
logger.info(
155+
f"Migration complete: {processed} documents processed, {failed} failed"
156+
)
157+
158+
except Exception as e:
159+
await session.rollback()
160+
logger.error(f"Error in blocknote migration task: {e}", exc_info=True)
161+
raise

surfsense_backend/app/tasks/document_processors/file_processors.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,9 @@ async def add_received_file_document_using_docling(
396396
"ETL_SERVICE": "DOCLING",
397397
}
398398
existing_document.chunks = chunks
399-
existing_document.blocknote_document = blocknote_json
399+
existing_document.blocknote_document = None
400+
existing_document.content_needs_reindexing = False
401+
existing_document.last_edited_at = None
400402

401403
await session.commit()
402404
await session.refresh(existing_document)
@@ -416,7 +418,9 @@ async def add_received_file_document_using_docling(
416418
chunks=chunks,
417419
content_hash=content_hash,
418420
unique_identifier_hash=unique_identifier_hash,
419-
blocknote_document=blocknote_json,
421+
blocknote_document=None,
422+
content_needs_reindexing=False,
423+
last_edited_at=None,
420424
)
421425

422426
session.add(document)

surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export default function EditorPage() {
9696
}
9797
}, [editorContent, document]);
9898

99-
// TODO: Auto-save every 30 seconds - DIRECT CALL TO FASTAPI
99+
// TODO: Maybe add Auto-save every 30 seconds - DIRECT CALL TO FASTAPI
100100

101101
// Save and exit - DIRECT CALL TO FASTAPI
102102
const handleSave = async () => {

0 commit comments

Comments
 (0)