Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""add columns to collection job and documents table

Revision ID: 050
Revises: 049
Create Date: 2026-03-25 10:09:47.318575

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "050"
down_revision = "049"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"collection_jobs",
sa.Column(
"docs_num",
sa.Integer(),
nullable=True,
comment="Total number of documents to be processed in this job",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"total_size",
sa.Integer(),
nullable=True,
comment="Total size of documents being uploaded to collection",
),
)
op.add_column(
"document",
sa.Column(
"file_size",
sa.Integer(),
nullable=True,
comment="Size of the document in bytes",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be Size of the document in kilobytes (KB)

),
)


def downgrade():
op.drop_column("document", "file_size")
op.drop_column("collection_jobs", "total_size")
op.drop_column("collection_jobs", "docs_num")
7 changes: 4 additions & 3 deletions backend/app/api/docs/collections/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ pipeline:

* Create a vector store from the document IDs you received after uploading your
documents through the Documents module.
* The `batch_size` parameter controls how many documents are sent to OpenAI in a
single transaction when creating the vector store. This helps optimize the upload
process for large document sets. If not specified, the default value is **10**.
* Documents are automatically batched when creating the vector store to optimize
the upload process for large document sets. A new batch is created when either
the cumulative size reaches configured total size of documents given to upload to a vector store
or the document count reaches specific number of files in a batch, whichever limit is hit first.
* [Deprecated] Attach the Vector Store to an OpenAI
[Assistant](https://platform.openai.com/docs/api-reference/assistants). Use
parameters in the request body relevant to an Assistant to flesh out
Expand Down
14 changes: 14 additions & 0 deletions backend/app/api/routes/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CollectionCrud,
CollectionJobCrud,
DocumentCollectionCrud,
DocumentCrud,
)
from app.core.cloud import get_cloud_storage
from app.models import (
Expand Down Expand Up @@ -95,12 +96,25 @@ def create_collection(
if request.name:
ensure_unique_name(session, current_user.project_.id, request.name)

# Calculate total size of all documents
document_crud = DocumentCrud(session, current_user.project_.id)
total_size = 0
for doc_id in request.documents:
doc = document_crud.read_one(doc_id)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the looks of it seems we are reading files multiple time

  1. collections.py:102
  2. helpers.py:79
  3. create_collection.py:202

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all three are being calculated differently for different things, in collections.py we are calculating total size of documents given and storing it in db, then in helper we are counting and adding up document sizes till it reaches the level we have set and makes batches, and in create collection after the collections is created i am just converting the total sizr i calculated and saved in db in routes/collections.py, chnaging it to a readable format (in KBs) and returning it in in logs

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what he is suggesting is - can we not read the file once from database and use it everywhere instead of doing a relatively heavy operation (db read) 3 times for the same thing. Having read the documents the first time, try and pass them around as params instead of doing a db fetch every time.

Copy link
Copy Markdown
Collaborator Author

@nishika26 nishika26 Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this now so that instead of three separate db calls, we make one db call (in create_collection.py) and use whatever is returned for all three things - calculating the total size of docs being uploaded, using the docs in batch document helper function and using the info for logs

total_size += doc.file_size or 0

logger.info(
f"[create_collection] Calculated total size | {{'total_documents': {len(request.documents)}, 'total_size_bytes': {total_size}, 'total_size_mb': {round(total_size / (1024 * 1024), 2)}}}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we calculate the size one time and the use it across instead of using MB logic again in logs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saving total size in mb only now and directly fetching that for logs

)

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(request.documents),
total_size=total_size,
)
)

Expand Down
4 changes: 4 additions & 0 deletions backend/app/api/routes/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from app.core.cloud import get_cloud_storage
from app.services.collections.helpers import pick_service_for_documennt
from app.services.documents.helpers import (
calculate_file_size,
schedule_transformation,
pre_transform_validation,
build_document_schema,
Expand Down Expand Up @@ -129,6 +130,8 @@ async def upload_doc(
transformer=transformer,
)

file_size = await calculate_file_size(src)

storage = get_cloud_storage(session=session, project_id=current_user.project_.id)
document_id = uuid4()
object_store_url = storage.put(src, Path(str(document_id)))
Expand All @@ -137,6 +140,7 @@ async def upload_doc(
document = Document(
id=document_id,
fname=src.filename,
file_size=file_size,
object_store_url=str(object_store_url),
)
source_document = crud.update(document)
Expand Down
23 changes: 5 additions & 18 deletions backend/app/crud/rag/open_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,11 @@ def update(
f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
)
if req.file_counts.completed != req.file_counts.total:
view = {x.fname: x for x in docs}
for i in self.read(vector_store_id):
if i.last_error is None:
fname = self.client.files.retrieve(i.id)
view.pop(fname)

error = {
"error": "OpenAI document processing error",
"documents": list(view.values()),
}
try:
raise InterruptedError(json.dumps(error, cls=BaseModelEncoder))
except InterruptedError as err:
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'error': '{error['error']}', 'failed_documents': {len(error['documents'])}}}",
exc_info=True,
)
raise
error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed"
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
)
raise InterruptedError(error_msg)

while files:
f_obj = files.pop()
Expand Down
1 change: 1 addition & 0 deletions backend/app/models/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class CollectionOptions(SQLModel):
batch_size: int = Field(
default=10,
description=(
"**[Deprecated]** "
"Number of documents to send to OpenAI in a single "
"transaction. See the `file_ids` parameter in the "
"vector store [create batch](https://platform.openai.com/docs/api-reference/vector-stores-file-batches/createBatch)."
Expand Down
16 changes: 16 additions & 0 deletions backend/app/models/collection_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ class CollectionJob(SQLModel, table=True):
description="Tracing ID for correlating logs and traces.",
sa_column_kwargs={"comment": "Tracing ID for correlating logs and traces"},
)
docs_num: int | None = Field(
default=None,
description="Total number of documents to be processed in this job",
sa_column_kwargs={
"comment": "Total number of documents to be processed in this job"
},
)
total_size: int | None = Field(
default=None,
description="Total size of documents being uploaded to collection",
sa_column_kwargs={
"comment": "Total size of documents being uploaded to collection"
},
)
error_message: str | None = Field(
default=None,
sa_column=Column(
Expand Down Expand Up @@ -106,6 +120,8 @@ class CollectionJobCreate(SQLModel):
collection_id: UUID | None = None
status: CollectionJobStatus
action_type: CollectionActionType
docs_num: int | None = None
total_size: int | None = None
project_id: int


Expand Down
10 changes: 7 additions & 3 deletions backend/app/models/document.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
from typing import Any
from uuid import UUID, uuid4

from pydantic import model_serializer
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need Any and model_serializer?
seems like we are not using them

from sqlmodel import Field, SQLModel

from app.core.util import now
Expand Down Expand Up @@ -41,6 +43,11 @@ class Document(DocumentBase, table=True):
default=False,
sa_column_kwargs={"comment": "Soft delete flag"},
)
file_size: int | None = Field(
default=None,
description="The size of the document in bytes",
sa_column_kwargs={"comment": "Size of the document in bytes"},
)

# Foreign keys
source_document_id: UUID | None = Field(
Expand Down Expand Up @@ -80,9 +87,6 @@ class DocumentPublic(DocumentBase):
updated_at: datetime = Field(
description="The timestamp when the document was last updated"
)
signed_url: str | None = Field(
default=None, description="A signed URL for accessing the document"
)


class TransformedDocumentPublic(DocumentPublic):
Expand Down
10 changes: 5 additions & 5 deletions backend/app/services/collections/create_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,8 @@ def execute_job(
flat_docs = document_crud.read_each(creation_request.documents)

file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname}
file_sizes_kb = [
storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs
]
total_size_bytes = collection_job.total_size or 0
total_size_mb = round(total_size_bytes / (1024 * 1024), 2)

with Session(engine) as session:
collection_crud = CollectionCrud(session, project_id)
Expand Down Expand Up @@ -240,11 +239,12 @@ def execute_job(

elapsed = time.time() - start_time
logger.info(
"[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Sizes: %s KB | Types: %s",
"[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB (%s bytes) | Types: %s",
collection_id,
elapsed,
len(flat_docs),
file_sizes_kb,
total_size_mb,
total_size_bytes,
list(file_exts),
)

Expand Down
59 changes: 43 additions & 16 deletions backend/app/services/collections/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from app.crud import DocumentCrud, CollectionCrud
from app.api.deps import SessionDep
from app.models import DocumentCollection, Collection, CollectionPublic
from app.models import DocumentCollection, Collection, CollectionPublic, Document


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,24 +56,51 @@ def extract_error_message(err: Exception) -> str:


def batch_documents(
document_crud: DocumentCrud, documents: List[UUID], batch_size: int
):
"""Batch document IDs into chunks of size `batch_size`, load each via `DocumentCrud.read_each`,
and return a list of document batches."""
document_crud: DocumentCrud, documents: List[UUID]
) -> List[List[Document]]:
"""
Batch documents dynamically based on size and count limits.

Creates a new batch when either:
- Total size reaches 30 MB (31,457,280 bytes)
- Document count reaches 200

Returns:
List of document batches
"""

MAX_BATCH_SIZE_BYTES = 30 * 1024 * 1024 # 30 MB in bytes
MAX_BATCH_COUNT = 200 # Maximum documents per batch

docs_batches = []
current_batch = []
current_batch_size = 0

for doc_id in documents:
doc = document_crud.read_one(doc_id)
doc_size = doc.file_size or 0

would_exceed_size = (current_batch_size + doc_size) > MAX_BATCH_SIZE_BYTES
would_exceed_count = len(current_batch) >= MAX_BATCH_COUNT

if current_batch and (would_exceed_size or would_exceed_count):
logger.info(
f"[batch_documents] Batch completed | {{'batch_num': {len(docs_batches) + 1}, 'doc_count': {len(current_batch)}, 'batch_size_bytes': {current_batch_size}, 'batch_size_mb': {round(current_batch_size / (1024 * 1024), 2)}}}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we calculate the size one time and the use it across instead of using MB logic again in logs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is being used to calculate size per batch and not total size

)
docs_batches.append(current_batch)
current_batch = []
current_batch_size = 0

current_batch.append(doc)
current_batch_size += doc_size

if current_batch:
docs_batches.append(current_batch)

logger.info(
f"[batch_documents] Starting batch iteration for documents | {{'batch_size': {batch_size}, 'total_documents': {len(documents)}}}"
f"[batch_documents] Batching complete | {{'total_batches': {len(docs_batches)}, 'total_documents': {len(documents)}}}"
)
docs_batches = []
start, stop = 0, batch_size
while True:
view = documents[start:stop]
if not view:
break
batch_docs = document_crud.read_each(view)
docs_batches.append(batch_docs)
start = stop
stop += batch_size

return docs_batches


Expand Down
8 changes: 1 addition & 7 deletions backend/app/services/collections/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ def create(
Create OpenAI vector store with documents and optionally an assistant.
"""
try:
# Use user-provided batch_size, default to 10 if not set
batch_size = collection_request.batch_size or 10
docs_batches = batch_documents(
document_crud,
collection_request.documents,
batch_size,
)
docs_batches = batch_documents(document_crud, collection_request.documents)

vector_store_crud = OpenAIVectorStoreCrud(self.client)
vector_store = vector_store_crud.create()
Expand Down
24 changes: 23 additions & 1 deletion backend/app/services/documents/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Tuple, Iterable, Union
from uuid import UUID

from fastapi import HTTPException
from fastapi import HTTPException, UploadFile

from app.services.doctransform.registry import (
get_available_transformers,
Expand All @@ -23,6 +23,28 @@
)


async def calculate_file_size(file: UploadFile) -> int:
"""
Calculate the size of an uploaded file in bytes.

Args:
file: The uploaded file from FastAPI

Returns:
The size of the file in bytes
"""
if file.size:
return file.size

# If size is not available, calculate by reading the file
await file.seek(0)
content = await file.read()
size_bytes = len(content)
await file.seek(0) # Reset to beginning for subsequent operations

return size_bytes


def pre_transform_validation(
*,
src_filename: str,
Expand Down
Loading
Loading