diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..677e64ca8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.terminal.activateEnvironment": false +} \ No newline at end of file diff --git a/agents/rag/.vscode/launch.json b/agents/rag/.vscode/launch.json index c72fb3562..ea5414cb1 100644 --- a/agents/rag/.vscode/launch.json +++ b/agents/rag/.vscode/launch.json @@ -5,7 +5,7 @@ "version": "0.2.0", "configurations": [ { - "name": "agent-form", + "name": "agent-rag", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/src/rag/agent.py", diff --git a/agents/rag/src/rag/tools/files/file_reader.py b/agents/rag/src/rag/tools/files/file_reader.py index 21e8b0159..d03c76fd9 100644 --- a/agents/rag/src/rag/tools/files/file_reader.py +++ b/agents/rag/src/rag/tools/files/file_reader.py @@ -3,13 +3,13 @@ from typing import List, Literal +from agentstack_sdk.platform import File from beeai_framework.emitter import Emitter from beeai_framework.tools import ( JSONToolOutput, Tool, ToolRunOptions, ) -from agentstack_sdk.platform import File from pydantic import BaseModel, Field, create_model from rag.tools.files.utils import File, format_size @@ -116,7 +116,6 @@ async def _run(self, input: FileReadInputBase, options, context) -> FileReaderTo # pull the first (only) MessagePart from the async-generator async with file.load_text_content() as loaded_file: content = loaded_file.text - content_type = loaded_file.content_type if content is None: raise ValueError(f"File content is None for {filename}.") diff --git a/agentstack.code-workspace b/agentstack.code-workspace index 811fb4445..9b54c63ec 100644 --- a/agentstack.code-workspace +++ b/agentstack.code-workspace @@ -28,10 +28,6 @@ "name": "agent-chat", "path": "agents/chat" }, - { - "name": "agent-form", - "path": "agents/form" - }, { "name": "agent-rag", "path": "agents/rag" diff --git a/apps/agentstack-sdk-py/src/agentstack_sdk/platform/file.py b/apps/agentstack-sdk-py/src/agentstack_sdk/platform/file.py index e07118e35..bd1debc39 100644 --- a/apps/agentstack-sdk-py/src/agentstack_sdk/platform/file.py +++ b/apps/agentstack-sdk-py/src/agentstack_sdk/platform/file.py @@ -16,11 +16,20 @@ from agentstack_sdk.util.file import LoadedFile, LoadedFileWithUri, PlatformFileUrl from agentstack_sdk.util.utils import filter_dict +ExtractionFormatLiteral = typing.Literal["markdown", "vendor_specific_json"] + + +class ExtractedFileInfo(pydantic.BaseModel): + """Information about an extracted file.""" + + file_id: str + format: ExtractionFormatLiteral | None + class Extraction(pydantic.BaseModel): id: str file_id: str - extracted_file_id: str | None = None + extracted_files: list[ExtractedFileInfo] = pydantic.Field(default_factory=list) status: typing.Literal["pending", "in_progress", "completed", "failed", "cancelled"] = "pending" job_id: str | None = None error_message: str | None = None @@ -152,9 +161,43 @@ async def load_text_content( await response.aread() yield LoadedFileWithUri(response=response, content_type=file.content_type, filename=file.filename) + @asynccontextmanager + async def load_json_content( + self: File | str, + *, + stream: bool = False, + client: PlatformClient | None = None, + context_id: str | None | Literal["auto"] = "auto", + ) -> AsyncIterator[LoadedFile]: + # `self` has a weird type so that you can call both `instance.load_json_content()` to create an extraction for an instance, or `File.load_json_content("123")` + file_id = self if isinstance(self, str) else self.id + async with client or get_platform_client() as platform_client: + context_id = platform_client.context_id if context_id == "auto" else context_id + + file = await File.get(file_id, client=client, context_id=context_id) if isinstance(self, str) else self + extraction = await file.get_extraction(client=client, context_id=context_id) + + for extracted_file_info in extraction.extracted_files: + if extracted_file_info.format != "vendor_specific_json": + continue + extracted_json_file_id = extracted_file_info.file_id + async with platform_client.stream( + "GET", + url=f"/api/v1/files/{extracted_json_file_id}/content", + params=context_id and {"context_id": context_id}, + ) as response: + response.raise_for_status() + if not stream: + await response.aread() + yield LoadedFileWithUri(response=response, content_type=file.content_type, filename=file.filename) + return + + raise ValueError("No extracted JSON content available for this file.") + async def create_extraction( self: File | str, *, + formats: list[ExtractionFormatLiteral] | None = None, client: PlatformClient | None = None, context_id: str | None | Literal["auto"] = "auto", ) -> Extraction: @@ -167,6 +210,7 @@ async def create_extraction( await platform_client.post( url=f"/api/v1/files/{file_id}/extraction", params=context_id and {"context_id": context_id}, + json={"settings": {"formats": formats}} if formats else None, ) ) .raise_for_status() diff --git a/apps/agentstack-server/pyproject.toml b/apps/agentstack-server/pyproject.toml index c9d4e519e..88b9f680b 100644 --- a/apps/agentstack-server/pyproject.toml +++ b/apps/agentstack-server/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "opentelemetry-instrumentation-httpx>=0.59b0", "opentelemetry-instrumentation-fastapi>=0.59b0", "limits[async-redis]>=5.3.0", + "ijson>=3.4.0.post0", ] [dependency-groups] diff --git a/apps/agentstack-server/src/agentstack_server/api/routes/files.py b/apps/agentstack-server/src/agentstack_server/api/routes/files.py index d11a02084..e198c9547 100644 --- a/apps/agentstack-server/src/agentstack_server/api/routes/files.py +++ b/apps/agentstack-server/src/agentstack_server/api/routes/files.py @@ -1,5 +1,6 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 + import logging from contextlib import AsyncExitStack from typing import Annotated @@ -14,9 +15,17 @@ RequiresContextPermissions, ) from agentstack_server.api.schema.common import EntityModel -from agentstack_server.api.schema.files import FileListQuery +from agentstack_server.api.schema.files import FileListQuery, TextExtractionRequest from agentstack_server.domain.models.common import PaginatedResult -from agentstack_server.domain.models.file import AsyncFile, ExtractionStatus, File, TextExtraction +from agentstack_server.domain.models.file import ( + AsyncFile, + Backend, + ExtractionFormat, + ExtractionStatus, + File, + TextExtraction, + TextExtractionSettings, +) from agentstack_server.domain.models.permissions import AuthorizedUser from agentstack_server.service_layer.services.files import FileService @@ -92,12 +101,32 @@ async def get_text_file_content( user: Annotated[AuthorizedUser, Depends(RequiresContextPermissions(files={"read"}))], ) -> StreamingResponse: extraction = await file_service.get_extraction(file_id=file_id, user=user.user, context_id=user.context_id) - if not extraction.status == ExtractionStatus.COMPLETED or not extraction.extracted_file_id: + if not extraction.status == ExtractionStatus.COMPLETED or not extraction.extracted_files: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Extraction is not completed (status {extraction.status})", ) - return await _stream_file(file_service=file_service, user=user, file_id=extraction.extracted_file_id) + + if extraction.extraction_metadata is not None and extraction.extraction_metadata.backend == Backend.IN_PLACE: + # Fallback to the original file for in-place extraction + original_file_id = extraction.find_file_by_format(format=None) + if not original_file_id: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Original file not found in extraction results", + ) + file_to_stream_id = original_file_id + else: + # Find the markdown file from extracted files + markdown_file_id = extraction.find_file_by_format(format=ExtractionFormat.MARKDOWN) + if not markdown_file_id: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Markdown file not found in extraction results", + ) + file_to_stream_id = markdown_file_id + + return await _stream_file(file_service=file_service, user=user, file_id=file_to_stream_id) @router.delete("/{file_id}", status_code=fastapi.status.HTTP_204_NO_CONTENT) @@ -114,6 +143,7 @@ async def create_text_extraction( file_id: UUID, file_service: FileServiceDependency, user: Annotated[AuthorizedUser, Depends(RequiresContextPermissions(files={"write", "extract"}))], + request: TextExtractionRequest | None = None, ) -> EntityModel[TextExtraction]: """Create or return text extraction for a file. @@ -122,8 +152,15 @@ async def create_text_extraction( - If extraction is pending/in-progress, returns current status - If no extraction exists, creates a new one """ + if request is None: + request = TextExtractionRequest() + + settings = request.settings if request.settings is not None else TextExtractionSettings() + return EntityModel( - await file_service.create_extraction(file_id=file_id, user=user.user, context_id=user.context_id) + await file_service.create_extraction( + file_id=file_id, user=user.user, context_id=user.context_id, settings=settings + ) ) diff --git a/apps/agentstack-server/src/agentstack_server/api/schema/files.py b/apps/agentstack-server/src/agentstack_server/api/schema/files.py index 5bcb595a9..f20f2ebca 100644 --- a/apps/agentstack-server/src/agentstack_server/api/schema/files.py +++ b/apps/agentstack-server/src/agentstack_server/api/schema/files.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field from agentstack_server.api.schema.common import PaginationQuery +from agentstack_server.domain.models.file import TextExtractionSettings class FileResponse(BaseModel): @@ -36,3 +37,12 @@ class FileListQuery(PaginationQuery): description="Case-insensitive partial match search on filename (e.g., 'doc' matches 'my_document.pdf')", ) order_by: str = Field(default_factory=lambda: "created_at", pattern="^created_at|filename|file_size_bytes$") + + +class TextExtractionRequest(BaseModel): + """Request schema for text extraction.""" + + settings: TextExtractionSettings | None = Field( + default=None, + description="Additional options for text extraction", + ) diff --git a/apps/agentstack-server/src/agentstack_server/domain/models/file.py b/apps/agentstack-server/src/agentstack_server/domain/models/file.py index 0f3da4475..880059c1c 100644 --- a/apps/agentstack-server/src/agentstack_server/domain/models/file.py +++ b/apps/agentstack-server/src/agentstack_server/domain/models/file.py @@ -1,8 +1,9 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from enum import StrEnum +from typing import Self from uuid import UUID, uuid4 from pydantic import AwareDatetime, BaseModel, Field @@ -23,8 +24,24 @@ class ExtractionStatus(StrEnum): CANCELLED = "cancelled" +class ExtractionFormat(StrEnum): + MARKDOWN = "markdown" + VENDOR_SPECIFIC_JSON = "vendor_specific_json" + + +class TextExtractionSettings(BaseModel): + formats: list[ExtractionFormat] = Field( + default_factory=lambda: [ExtractionFormat.MARKDOWN, ExtractionFormat.VENDOR_SPECIFIC_JSON] + ) + + +class Backend(StrEnum): + IN_PLACE = "in-place" + + class ExtractionMetadata(BaseModel, extra="allow"): - backend: str + backend: str | None = None + settings: TextExtractionSettings | None = None class FileMetadata(BaseModel, extra="allow"): @@ -39,6 +56,36 @@ class AsyncFile(BaseModel): read: Callable[[int], Awaitable[bytes]] size: int | None = None + @classmethod + def from_async_iterator(cls, iterator: AsyncIterator[bytes], filename: str, content_type: str) -> Self: + buffer = b"" + + async def read(size: int = 8192) -> bytes: + nonlocal buffer + while len(buffer) < size: + try: + buffer += await anext(iterator) + except StopAsyncIteration: + break + + result = buffer[:size] + buffer = buffer[size:] + return result + + return cls(filename=filename, content_type=content_type, read=read) + + @classmethod + def from_bytes(cls, content: bytes, filename: str, content_type: str) -> Self: + pos = 0 + + async def read(size: int = 8192) -> bytes: + nonlocal pos + result = content[pos : pos + size] + pos += len(result) + return result + + return cls(filename=filename, content_type=content_type, read=read, size=len(content)) + class File(BaseModel): id: UUID = Field(default_factory=uuid4) @@ -52,10 +99,17 @@ class File(BaseModel): context_id: UUID | None = None +class ExtractedFileInfo(BaseModel): + """Information about an extracted file.""" + + file_id: UUID + format: ExtractionFormat | None = None + + class TextExtraction(BaseModel): id: UUID = Field(default_factory=uuid4) file_id: UUID - extracted_file_id: UUID | None = None + extracted_files: list[ExtractedFileInfo] = Field(default_factory=list) status: ExtractionStatus = ExtractionStatus.PENDING job_id: str | None = None error_message: str | None = None @@ -64,20 +118,30 @@ class TextExtraction(BaseModel): finished_at: AwareDatetime | None = None created_at: AwareDatetime = Field(default_factory=utc_now) - def set_started(self, job_id: str) -> None: - """Mark extraction as started with job ID.""" + def set_started(self, job_id: str, backend: str) -> None: + """Mark extraction as started with job ID and backend name.""" self.status = ExtractionStatus.IN_PROGRESS self.job_id = job_id self.started_at = utc_now() self.error_message = None - def set_completed(self, extracted_file_id: UUID, metadata: ExtractionMetadata | None = None) -> None: - """Mark extraction as completed with extracted file ID.""" + # Create extraction_metadata if it doesn't exist + if self.extraction_metadata is None: + self.extraction_metadata = ExtractionMetadata() + + # Set the backend name + self.extraction_metadata.backend = backend + + def set_completed( + self, extracted_files: list[ExtractedFileInfo], metadata: ExtractionMetadata | None = None + ) -> None: + """Mark extraction as completed with extracted files and their formats.""" self.status = ExtractionStatus.COMPLETED - self.extracted_file_id = extracted_file_id + self.extracted_files = extracted_files self.finished_at = utc_now() - self.extraction_metadata = metadata self.error_message = None + if metadata is not None: + self.extraction_metadata = metadata def set_failed(self, error_message: str) -> None: """Mark extraction as failed with error message.""" @@ -97,3 +161,10 @@ def reset_for_retry(self) -> None: self.started_at = None self.finished_at = None self.job_id = None + + def find_file_by_format(self, format: ExtractionFormat | None) -> UUID | None: + """Find an extracted file by format from the extracted files list.""" + for extracted_file_info in self.extracted_files: + if extracted_file_info.format == format: + return extracted_file_info.file_id + return None diff --git a/apps/agentstack-server/src/agentstack_server/domain/repositories/file.py b/apps/agentstack-server/src/agentstack_server/domain/repositories/file.py index 215ced126..a80b83e2a 100644 --- a/apps/agentstack-server/src/agentstack_server/domain/repositories/file.py +++ b/apps/agentstack-server/src/agentstack_server/domain/repositories/file.py @@ -10,7 +10,15 @@ from pydantic import AnyUrl, HttpUrl from agentstack_server.domain.models.common import PaginatedResult -from agentstack_server.domain.models.file import AsyncFile, File, FileMetadata, FileType, TextExtraction +from agentstack_server.domain.models.file import ( + AsyncFile, + ExtractionFormat, + File, + FileMetadata, + FileType, + TextExtraction, + TextExtractionSettings, +) class IFileRepository(Protocol): @@ -69,6 +77,17 @@ async def get_file_metadata(self, *, file_id: UUID) -> FileMetadata: ... @runtime_checkable class ITextExtractionBackend(Protocol): + @property + def backend_name(self) -> str: + """Return the name of the extraction backend.""" + ... + @asynccontextmanager - async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = None) -> AsyncIterator[AsyncFile]: # noqa: ASYNC109 + async def extract_text( + self, + *, + file_url: AnyUrl, + timeout: timedelta | None = None, # noqa: ASYNC109 + settings: TextExtractionSettings | None = None, + ) -> AsyncIterator[AsyncIterator[tuple[AsyncFile, ExtractionFormat]]]: yield ... # pyright: ignore [reportReturnType] diff --git a/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/migrations/alembic/versions/ef8769062e65_.py b/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/migrations/alembic/versions/ef8769062e65_.py new file mode 100644 index 000000000..b1e3afa07 --- /dev/null +++ b/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/migrations/alembic/versions/ef8769062e65_.py @@ -0,0 +1,107 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +"""migrate extracted_file_id to junction table + +Revision ID: ef8769062e65 +Revises: 15e0c8efe77f +Create Date: 2025-11-30 21:18:30.961482 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "ef8769062e65" +down_revision: str | None = "15e0c8efe77f" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Create junction table for extraction_files with format column + op.create_table( + "extraction_files", + sa.Column("extraction_id", sa.UUID(), nullable=False), + sa.Column("file_id", sa.UUID(), nullable=False), + sa.Column("format", sa.String(length=50), nullable=True), + sa.ForeignKeyConstraint(["extraction_id"], ["text_extractions.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["file_id"], ["files.id"], ondelete="CASCADE"), + sa.UniqueConstraint("extraction_id", "format", name="uq_extraction_files_extraction_id_format"), + ) + + # Migrate existing data from extracted_file_id to junction table + # Infer format from content_type: markdown for text/markdown, vendor_specific_json for application/json + # Only migrate non-null extracted_file_id values + op.execute(""" + INSERT INTO extraction_files (extraction_id, file_id, format) + SELECT + te.id, + te.extracted_file_id, + CASE + WHEN f.content_type = 'text/markdown' THEN 'markdown' + WHEN f.content_type = 'application/json' THEN 'vendor_specific_json' + ELSE NULL + END + FROM text_extractions te + JOIN files f ON te.extracted_file_id = f.id + WHERE te.extracted_file_id IS NOT NULL + """) + + # Drop the old extracted_file_id column + op.drop_constraint(op.f("text_extractions_extracted_file_id_fkey"), "text_extractions", type_="foreignkey") + op.drop_column("text_extractions", "extracted_file_id") + + +def downgrade() -> None: + """Downgrade schema.""" + # Add back the extracted_file_id column + op.add_column( + "text_extractions", + sa.Column("extracted_file_id", sa.UUID(), autoincrement=False, nullable=True), + ) + + # Migrate data back from junction table to extracted_file_id + # Only keep markdown files, delete other files + op.execute(""" + UPDATE text_extractions te + SET extracted_file_id = ef.file_id + FROM ( + SELECT DISTINCT ON (ef.extraction_id) ef.extraction_id, ef.file_id + FROM extraction_files ef + JOIN files f ON ef.file_id = f.id + WHERE f.content_type = 'text/markdown' + ORDER BY ef.extraction_id + ) ef + WHERE te.id = ef.extraction_id + """) + + # Delete non-markdown extracted files (except those with "in-place" backend) + op.execute(""" + DELETE FROM files + WHERE id IN ( + SELECT ef.file_id + FROM extraction_files ef + JOIN files f ON ef.file_id = f.id + JOIN text_extractions te ON ef.extraction_id = te.id + WHERE f.content_type != 'text/markdown' + AND (te.extraction_metadata->>'backend') != 'in-place' + ) + """) + + # Add the foreign key constraint + op.create_foreign_key( + op.f("text_extractions_extracted_file_id_fkey"), + "text_extractions", + "files", + ["extracted_file_id"], + ["id"], + ondelete="SET NULL", + ) + + # Drop the junction table + op.drop_table("extraction_files") diff --git a/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/repositories/file.py b/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/repositories/file.py index ca445b896..7184eff56 100644 --- a/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/repositories/file.py +++ b/apps/agentstack-server/src/agentstack_server/infrastructure/persistence/repositories/file.py @@ -1,6 +1,8 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + from collections.abc import AsyncIterator from typing import Any, cast from uuid import UUID @@ -16,6 +18,7 @@ String, Table, Text, + UniqueConstraint, func, select, ) @@ -23,7 +26,14 @@ from sqlalchemy.ext.asyncio import AsyncConnection from agentstack_server.domain.models.common import PaginatedResult -from agentstack_server.domain.models.file import ExtractionStatus, File, FileType, TextExtraction +from agentstack_server.domain.models.file import ( + ExtractedFileInfo, + ExtractionFormat, + ExtractionStatus, + File, + FileType, + TextExtraction, +) from agentstack_server.domain.repositories.file import IFileRepository from agentstack_server.exceptions import EntityNotFoundError from agentstack_server.infrastructure.persistence.repositories.context import cursor_paginate @@ -49,7 +59,6 @@ metadata, Column("id", SQL_UUID, primary_key=True), Column("file_id", ForeignKey("files.id", ondelete="CASCADE"), nullable=False, unique=True), - Column("extracted_file_id", ForeignKey("files.id", ondelete="SET NULL"), nullable=True), Column("status", sql_enum(ExtractionStatus, name="extraction_status"), nullable=False), Column("job_id", String(255), nullable=True), Column("error_message", Text, nullable=True), @@ -59,6 +68,16 @@ Column("created_at", DateTime(timezone=True), nullable=False), ) +extraction_files_table = Table( + "extraction_files", + metadata, + Column("extraction_id", ForeignKey("text_extractions.id", ondelete="CASCADE"), nullable=False), + Column("file_id", ForeignKey("files.id", ondelete="CASCADE"), nullable=False), + Column("format", String(50), nullable=True), + # Ensure only one file per format per extraction + UniqueConstraint("extraction_id", "format", name="uq_extraction_files_extraction_id_format"), +) + @inject class SqlAlchemyFileRepository(IFileRepository): @@ -201,12 +220,12 @@ async def list_paginated( has_more=result.has_more, ) - def _to_text_extraction(self, row: Row) -> TextExtraction: + def _to_text_extraction(self, row: Row, extracted_files: list[ExtractedFileInfo] | None = None) -> TextExtraction: return TextExtraction.model_validate( { "id": row.id, "file_id": row.file_id, - "extracted_file_id": row.extracted_file_id, + "extracted_files": extracted_files or [], "status": row.status, "job_id": row.job_id, "error_message": row.error_message, @@ -222,7 +241,6 @@ async def create_extraction(self, *, extraction: TextExtraction) -> None: query = text_extractions_table.insert().values( id=extraction.id, file_id=extraction.file_id, - extracted_file_id=extraction.extracted_file_id, status=extraction.status, job_id=extraction.job_id, error_message=extraction.error_message, @@ -233,6 +251,20 @@ async def create_extraction(self, *, extraction: TextExtraction) -> None: ) await self.connection.execute(query) + # Insert extracted files with format into junction table + if extraction.extracted_files: + await self.connection.execute( + extraction_files_table.insert(), + [ + { + "extraction_id": extraction.id, + "file_id": ef.file_id, + "format": ef.format.value if ef.format else None, + } + for ef in extraction.extracted_files + ], + ) + async def get_extraction_by_file_id( self, *, file_id: UUID, user_id: UUID | None = None, context_id: UUID | None = None ) -> TextExtraction: @@ -247,24 +279,75 @@ async def get_extraction_by_file_id( result = await self.connection.execute(query) if not (row := result.fetchone()): raise EntityNotFoundError(entity="text_extraction", id=file_id, attribute="file_id") - return self._to_text_extraction(row) + + # Load extracted files with format from junction table + extraction_files_query = extraction_files_table.select().where(extraction_files_table.c.extraction_id == row.id) + extraction_files_result = await self.connection.execute(extraction_files_query) + extracted_files = [ + ExtractedFileInfo(file_id=ef_row.file_id, format=ExtractionFormat(ef_row.format) if ef_row.format else None) + for ef_row in extraction_files_result.fetchall() + ] + + return self._to_text_extraction(row, extracted_files) async def update_extraction(self, *, extraction: TextExtraction) -> None: query = ( text_extractions_table.update() .where(text_extractions_table.c.file_id == extraction.file_id) .values( - extracted_file_id=extraction.extracted_file_id, status=extraction.status, job_id=extraction.job_id, error_message=extraction.error_message, - extraction_metadata=extraction.extraction_metadata, + extraction_metadata=extraction.extraction_metadata.model_dump(mode="json") + if extraction.extraction_metadata + else None, started_at=extraction.started_at, finished_at=extraction.finished_at, ) ) await self.connection.execute(query) + # Get currently stored files + current_files_query = extraction_files_table.select().where( + extraction_files_table.c.extraction_id == extraction.id + ) + current_files_result = await self.connection.execute(current_files_query) + current_files = {(row.file_id, row.format) for row in current_files_result.fetchall()} + + # Only add new extracted files (never delete existing ones) + if extraction.extracted_files: + new_files = {(ef.file_id, ef.format.value if ef.format else None) for ef in extraction.extracted_files} + + # Check if any existing files are being removed + if files_being_removed := current_files - new_files: + raise ValueError( + f"Cannot remove extracted files. Attempting to remove {len(files_being_removed)} file(s). " + "Extracted files can only be added, not removed." + ) + + # Only insert files that don't already exist + files_to_insert = [ + { + "extraction_id": extraction.id, + "file_id": ef.file_id, + "format": ef.format.value if ef.format else None, + } + for ef in extraction.extracted_files + if (ef.file_id, ef.format.value if ef.format else None) not in current_files + ] + + if files_to_insert: + await self.connection.execute( + extraction_files_table.insert(), + files_to_insert, + ) + elif current_files: + # If there are current files but extraction.extracted_files is empty, that's an attempt to remove all files + raise ValueError( + f"Cannot remove extracted files. Attempting to remove all {len(current_files)} file(s). " + "Extracted files can only be added, not removed." + ) + async def delete_extraction(self, *, extraction_id: UUID) -> int: query = text_extractions_table.delete().where(text_extractions_table.c.id == extraction_id) result = await self.connection.execute(query) diff --git a/apps/agentstack-server/src/agentstack_server/infrastructure/text_extraction/docling.py b/apps/agentstack-server/src/agentstack_server/infrastructure/text_extraction/docling.py index 6ae3d904c..45e578c82 100644 --- a/apps/agentstack-server/src/agentstack_server/infrastructure/text_extraction/docling.py +++ b/apps/agentstack-server/src/agentstack_server/infrastructure/text_extraction/docling.py @@ -1,40 +1,108 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 +import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager from datetime import timedelta +from decimal import Decimal +from typing import NamedTuple, cast -from httpx import AsyncClient +import ijson +import orjson +from httpx import AsyncClient as HttpxAsyncClient from pydantic import AnyUrl from agentstack_server.configuration import DoclingExtractionConfiguration -from agentstack_server.domain.models.file import AsyncFile +from agentstack_server.domain.models.file import AsyncFile, ExtractionFormat, TextExtractionSettings from agentstack_server.domain.repositories.file import ITextExtractionBackend -from agentstack_server.utils.utils import extract_string_value_stream + +logger = logging.getLogger(__name__) + + +class DoclingFormatInfo(NamedTuple): + api_option: str + file_extension: str + response_field_key: str + content_type: str + + +_DOCLING_FORMAT_INFO: dict[ExtractionFormat, DoclingFormatInfo] = { + ExtractionFormat.MARKDOWN: DoclingFormatInfo("md", "md", "md_content", "text/markdown"), + ExtractionFormat.VENDOR_SPECIFIC_JSON: DoclingFormatInfo("json", "json", "json_content", "application/json"), +} + + +async def _process_docling_stream( + async_file: AsyncFile, formats: list[ExtractionFormat] +) -> AsyncIterator[tuple[AsyncFile, ExtractionFormat]]: + key_map = {info.response_field_key: (fmt, info) for fmt, info in _DOCLING_FORMAT_INFO.items() if fmt in formats} + + def serialize(obj): + if isinstance(obj, Decimal): + return float(obj) + raise TypeError + + async for k, v in ijson.kvitems_async(async_file, "document", use_float=False): # pyright: ignore[reportAny] + if k in key_map: + fmt, info = key_map[k] + + content = v.encode("utf-8") if isinstance(v, str) else cast(bytes, orjson.dumps(v, default=serialize)) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue] + + async_file = AsyncFile.from_bytes( + filename=f"extracted_response.{info.file_extension}", + content_type=info.content_type, + content=content, + ) + + yield (async_file, fmt) class DoclingTextExtractionBackend(ITextExtractionBackend): + """ + # TODO: [DISCLAIMER] this is loading entire field value to memory (which in case of docling is entire document) + # Implementing a streaming parser has been deemed too complex, instead the text extraction worker should be + # scaled independently with memory bounded by concurrency and max_single_file_size configuration + """ + def __init__(self, config: DoclingExtractionConfiguration): self._config = config self._enabled = config.enabled + @property + def backend_name(self) -> str: + """Return the name of the extraction backend.""" + return "docling" + @asynccontextmanager - async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = None) -> AsyncIterator[AsyncFile]: # noqa: ASYNC109 + async def extract_text( + self, + *, + file_url: AnyUrl, + timeout: timedelta | None = None, # noqa: ASYNC109 + settings: TextExtractionSettings | None = None, + ) -> AsyncIterator[AsyncIterator[tuple[AsyncFile, ExtractionFormat]]]: + """ + Extract text from a file using the Docling service. + Streams the response and yields files as they are parsed. + """ if not self._enabled: raise RuntimeError( "Docling extraction backend is not enabled, please check the documentation how to enable it" ) + formats = settings.formats if settings else [ExtractionFormat.MARKDOWN, ExtractionFormat.VENDOR_SPECIFIC_JSON] + timeout = timeout or timedelta(minutes=5) + async with ( - AsyncClient(base_url=str(self._config.docling_service_url), timeout=timeout.seconds) as client, + HttpxAsyncClient(base_url=str(self._config.docling_service_url), timeout=timeout.seconds) as client, client.stream( "POST", "/v1/convert/source", json={ "options": { - "to_formats": ["md"], + "to_formats": [_DOCLING_FORMAT_INFO[fmt].api_option for fmt in formats], "document_timeout": timeout.total_seconds(), "image_export_mode": "placeholder", }, @@ -42,21 +110,6 @@ async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = No }, ) as response, ): - response.raise_for_status() - - md_stream = None - - async def read(chunk_size: int = 1024) -> bytes: - nonlocal md_stream - if not md_stream: - md_stream = extract_string_value_stream(response.aiter_text, "md_content", chunk_size) - async for text_chunk in md_stream: - return text_chunk.encode("utf-8") - return b"" - - yield AsyncFile( - filename="extracted_text.md", - content_type="text/markdown", - read=read, - size=None, # size is unknown beforehand - ) + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + async_file = AsyncFile.from_async_iterator(response.aiter_bytes(), "tmp", "application/json") + yield _process_docling_stream(async_file, formats) diff --git a/apps/agentstack-server/src/agentstack_server/service_layer/services/files.py b/apps/agentstack-server/src/agentstack_server/service_layer/services/files.py index 2d0eb923a..34088f468 100644 --- a/apps/agentstack-server/src/agentstack_server/service_layer/services/files.py +++ b/apps/agentstack-server/src/agentstack_server/service_layer/services/files.py @@ -15,11 +15,14 @@ from agentstack_server.domain.models.common import PaginatedResult from agentstack_server.domain.models.file import ( AsyncFile, + Backend, + ExtractedFileInfo, ExtractionMetadata, ExtractionStatus, File, FileType, TextExtraction, + TextExtractionSettings, ) from agentstack_server.domain.models.user import User from agentstack_server.domain.repositories.file import IObjectStorageRepository, ITextExtractionBackend @@ -48,42 +51,80 @@ def __init__( self._extraction_backend = extraction_backend async def extract_text(self, file_id: UUID, job_id: str): - error_log = [] - async with self._uow() as uow: - extraction = await uow.files.get_extraction_by_file_id(file_id=file_id) - file = await uow.files.get(file_id=file_id) - error_log.append(file.model_dump()) - user = await uow.users.get(user_id=file.created_by) - extraction.set_started(job_id=job_id) - await uow.files.update_extraction(extraction=extraction) - await uow.commit() + """ + Extract text from a file using the configured extraction backend. + + This method coordinates the entire extraction process: fetching the file, + extracting text, and uploading the extracted content. + + Args: + file_id: ID of the file to extract text from + job_id: Background job ID for tracking + + Raises: + CancelledError: If the job is cancelled + Exception: For any extraction or upload errors + """ + error_log: list[str] = [] + extraction: TextExtraction | None = None + file: File | None = None + user: User | None = None + uploaded_file_ids: list[UUID] = [] + try: - file_url = await self._object_storage.get_file_url(file_id=file_id) - error_log.append(f"file url: {file_url}") - async with self._extraction_backend.extract_text(file_url=file_url) as extracted_file: - extracted_db_file = await self.upload_file( - file=extracted_file, - user=user, - file_type=FileType.EXTRACTED_TEXT, - context_id=file.context_id, - parent_file_id=file_id, - ) - extraction.set_completed(extracted_file_id=extracted_db_file.id) async with self._uow() as uow: + extraction = await uow.files.get_extraction_by_file_id(file_id=file_id) + file = await uow.files.get(file_id=file_id) + error_log.append(file.model_dump()) + user = await uow.users.get(user_id=file.created_by) + + extraction.set_started( + job_id=job_id, + backend=self._extraction_backend.backend_name, + ) await uow.files.update_extraction(extraction=extraction) await uow.commit() - except CancelledError: + + file_url = await self._object_storage.get_file_url(file_id=file_id) + error_log.append(f"file url: {file_url}") + + async with self._extraction_backend.extract_text( + file_url=file_url, + settings=extraction.extraction_metadata.settings if extraction.extraction_metadata else None, + ) as extracted_files_iterator: + extracted_files = [] + async for async_file, extraction_format in extracted_files_iterator: + extracted_db_file = await self.upload_file( + file=async_file, + user=user, + file_type=FileType.EXTRACTED_TEXT, + context_id=file.context_id, + parent_file_id=file_id, + ) + uploaded_file_ids.append(extracted_db_file.id) + extracted_files.append(ExtractedFileInfo(file_id=extracted_db_file.id, format=extraction_format)) + + extraction.set_completed(extracted_files=extracted_files) async with self._uow() as uow: - extraction.set_cancelled() await uow.files.update_extraction(extraction=extraction) await uow.commit() + uploaded_file_ids.clear() + except CancelledError: + await self._cleanup_extracted_files(uploaded_file_ids) + if extraction: + async with self._uow() as uow: + extraction.set_cancelled() + await uow.files.update_extraction(extraction=extraction) + await uow.commit() raise except Exception as ex: error_log.append(str(ex)) - async with self._uow() as uow: - extraction.set_failed("\n".join(str(e) for e in error_log)) - await uow.files.update_extraction(extraction=extraction) - await uow.commit() + await self._cleanup_extracted_files(uploaded_file_ids) + if extraction: + async with self._uow() as uow: + extraction.set_failed("\n".join(str(e) for e in error_log)) + await uow.files.update_extraction(extraction=extraction) + await uow.commit() raise async def upload_file( @@ -144,20 +185,61 @@ async def get_content( async with self._uow() as uow: # check if the user owns the file await uow.files.get(file_id=file_id, user_id=user.id, context_id=context_id) - async with self._object_storage.get_file(file_id=file_id) as file: - yield file + + async with self._object_storage.get_file(file_id=file_id) as file: + yield file async def get_extraction(self, *, file_id: UUID, user: User, context_id: UUID | None = None) -> TextExtraction: async with self._uow() as uow: return await uow.files.get_extraction_by_file_id(file_id=file_id, user_id=user.id, context_id=context_id) async def delete(self, *, file_id: UUID, user: User, context_id: UUID | None = None) -> None: + file_ids_to_delete = [file_id] + deleted = False + async with self._uow() as uow: - if await uow.files.delete(file_id=file_id, user_id=user.id, context_id=context_id): - await self._object_storage.delete_files(file_ids=[file_id]) + # Find all extractions for this file and collect their extracted files + try: + extraction = await uow.files.get_extraction_by_file_id( + file_id=file_id, user_id=user.id, context_id=context_id + ) + # Add all extracted file IDs to deletion list + file_ids_to_delete.extend([ef.file_id for ef in extraction.extracted_files]) + except EntityNotFoundError: + # No extraction exists for this file, which is fine + pass + + # Delete from database first (this will cascade delete extractions and extraction_files) + deleted = await uow.files.delete(file_id=file_id, user_id=user.id, context_id=context_id) + await uow.commit() + + if deleted: + await self._object_storage.delete_files(file_ids=file_ids_to_delete) + + async def _cleanup_extracted_files(self, file_ids: list[UUID]) -> None: + """Best-effort cleanup for partially uploaded extracted files.""" + if not file_ids: + return + + unique_ids = list(dict.fromkeys(file_ids)) + + with suppress(Exception): + await self._object_storage.delete_files(file_ids=unique_ids) + + with suppress(Exception): + async with self._uow() as uow: + for extracted_file_id in unique_ids: + await uow.files.delete(file_id=extracted_file_id) await uow.commit() - async def create_extraction(self, *, file_id: UUID, user: User, context_id: UUID | None = None) -> TextExtraction: + async def create_extraction( + self, + *, + file_id: UUID, + user: User, + context_id: UUID | None = None, + settings: TextExtractionSettings, + ) -> TextExtraction: async with self._uow() as uow: # Check user permissions await uow.files.get(file_id=file_id, user_id=user.id, context_id=context_id, file_type=FileType.USER_UPLOAD) @@ -174,11 +256,18 @@ async def create_extraction(self, *, file_id: UUID, user: User, context_id: UUID raise TypeError(f"Unknown extraction status: {extraction.status}") except EntityNotFoundError: file_metadata = await self._object_storage.get_file_metadata(file_id=file_id) - extraction = TextExtraction(file_id=file_id) - if file_metadata.content_type in {"text/plain", "text/markdown"}: + extraction = TextExtraction(file_id=file_id, extraction_metadata=ExtractionMetadata(settings=settings)) + + # Docling doesn't support plain text nor markdown content-type, so we treat them as in-place extractions + in_place_extraction = file_metadata.content_type in {"text/plain", "text/markdown"} + if in_place_extraction: extraction.set_completed( - extracted_file_id=file_id, # Point to itself since it's already text - metadata=ExtractionMetadata(backend="in-place"), + extracted_files=[ + ExtractedFileInfo(file_id=file_id, format=None) # Point to itself since it's already text + ], + metadata=ExtractionMetadata( + backend=Backend.IN_PLACE, settings=None + ), # Settings are ignored for in-place extraction ) await uow.files.create_extraction(extraction=extraction) if extraction.status == ExtractionStatus.PENDING: @@ -195,9 +284,13 @@ async def delete_extraction(self, *, file_id: UUID, user: User, context_id: UUID file_id=file_id, user_id=user.id, context_id=context_id ) - if extraction.extracted_file_id and extraction.extracted_file_id != file_id: - await self._object_storage.delete_files(file_ids=[extraction.extracted_file_id]) - await uow.files.delete(file_id=extraction.extracted_file_id) + file_ids_to_delete = [ef.file_id for ef in extraction.extracted_files if ef.file_id != file_id] + if file_ids_to_delete: + await self._object_storage.delete_files(file_ids=file_ids_to_delete) + + async with self._uow() as uow: + for fid in file_ids_to_delete: + await uow.files.delete(file_id=fid) await uow.files.delete_extraction(extraction_id=extraction.id) await uow.commit() diff --git a/apps/agentstack-server/src/agentstack_server/utils/utils.py b/apps/agentstack-server/src/agentstack_server/utils/utils.py index c79492d2c..b358a1b88 100644 --- a/apps/agentstack-server/src/agentstack_server/utils/utils.py +++ b/apps/agentstack-server/src/agentstack_server/utils/utils.py @@ -4,10 +4,8 @@ import asyncio import concurrent.futures import functools -import json -import re from asyncio import CancelledError -from collections.abc import AsyncIterable, Callable, Iterable +from collections.abc import Callable, Iterable from contextlib import suppress from datetime import UTC, datetime from typing import Any, cast @@ -84,45 +82,3 @@ def wrapped_fn(*args, **kwargs): return future.result() return wrapped_fn - - -async def extract_string_value_stream( - async_stream: Callable[[int], AsyncIterable[str]], key: str, chunk_size: int = 1024 -) -> AsyncIterable[str]: - buffer = "" - max_buffer_size = len(key) * 2 - state = "outside" - if chunk_size < max_buffer_size: - raise ValueError("Chunk size too small") - - async for chunk in async_stream(chunk_size): - buffer += chunk - if state == "outside": - if match := re.search(rf'"{key}" *: *"', buffer): - buffer = buffer[match.end() :] - state = "inside" - else: - buffer = buffer[-max_buffer_size:] - if state == "inside": - backslash_count = 0 - for idx, char in enumerate(buffer): - if char == "\\": - backslash_count += 1 - elif char == '"': - if backslash_count % 2 == 0: - yield json.loads(f'"{buffer[:idx]}"') - return - backslash_count = 0 - else: - backslash_count = 0 - if backslash_count % 2 == 0: - yield json.loads(f'"{buffer}"') - buffer = "" - else: - yield json.loads(f'"{buffer[:-1]}"') - buffer = "\\" - - if state == "inside": - raise EOFError("Unterminated string value in JSON input") - else: - raise KeyError(f"Key {key} not found in JSON input") diff --git a/apps/agentstack-server/tests/conftest.py b/apps/agentstack-server/tests/conftest.py index 40def63f9..0036df008 100644 --- a/apps/agentstack-server/tests/conftest.py +++ b/apps/agentstack-server/tests/conftest.py @@ -1,7 +1,6 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 -import asyncio import os import re from contextlib import contextmanager @@ -72,11 +71,6 @@ async def _get_kr8s_client(): return api -def pytest_sessionstart(session): - """Validate that tests are running against the test VM""" - asyncio.run(_get_kr8s_client()) - - @pytest.fixture() async def kr8s_client(): return await _get_kr8s_client() diff --git a/apps/agentstack-server/tests/e2e/routes/test_files.py b/apps/agentstack-server/tests/e2e/routes/test_files.py index 8fee18998..2f96c35bf 100644 --- a/apps/agentstack-server/tests/e2e/routes/test_files.py +++ b/apps/agentstack-server/tests/e2e/routes/test_files.py @@ -1,5 +1,6 @@ # Copyright 2025 © BeeAI a Series of LF Projects, LLC # SPDX-License-Identifier: Apache-2.0 +import json from collections.abc import Callable from datetime import timedelta from io import BytesIO @@ -13,6 +14,8 @@ from httpx import AsyncClient from tenacity import AsyncRetrying, stop_after_delay, wait_fixed +from agentstack_server.domain.models.file import ExtractionFormat + pytestmark = pytest.mark.e2e @@ -97,9 +100,30 @@ async def test_text_extraction_pdf_workflow(subtests, test_configuration, test_p raise ValueError("not completed") assert final_status == "completed", f"Expected completed status, got {final_status}: {extraction.error_message}" - assert extraction.extracted_file_id is not None assert extraction.finished_at is not None + extracted_files = {f.format: f for f in extraction.extracted_files} + assert set(extracted_files.keys()) == { + ExtractionFormat.MARKDOWN, + ExtractionFormat.VENDOR_SPECIFIC_JSON, + }, "Unexpected extracted file formats" + + with subtests.test("verify extracted files"): + for extracted_file in extracted_files.values(): + if extracted_file.format == ExtractionFormat.MARKDOWN: + async with File.load_content(extracted_file.file_id) as markdown_file: + assert markdown_file.text is not None, "Markdown file has no content" + + elif extracted_file.format == ExtractionFormat.VENDOR_SPECIFIC_JSON: + async with File.load_content(extracted_file.file_id) as json_file: + assert json_file.text is not None, "JSON file has no content" + json_value = json.loads(json_file.text) + + assert "schema_name" in json_value, "Missing 'schema_name' key" + assert json_value["schema_name"] == "DoclingDocument", ( + f"Expected 'DoclingDocument', got '{json_value['schema_name']}'" + ) + with subtests.test("verify extracted text content"): async with file.load_text_content() as text_content: # Check that we get some text content back @@ -115,10 +139,24 @@ async def test_text_extraction_pdf_workflow(subtests, test_configuration, test_p ): _ = await file.get_extraction() + with ( + subtests.test("verify markdown file deleted"), + pytest.raises(httpx.HTTPStatusError, match="404 Not Found"), + ): + async with File.load_content(extracted_files[ExtractionFormat.MARKDOWN].file_id): + ... + + with ( + subtests.test("verify vendor specific json file deleted"), + pytest.raises(httpx.HTTPStatusError, match="404 Not Found"), + ): + async with File.load_content(extracted_files[ExtractionFormat.VENDOR_SPECIFIC_JSON].file_id): + ... + @pytest.mark.usefixtures("clean_up", "setup_real_llm", "setup_platform_client") async def test_text_extraction_plain_text_workflow(subtests): - """Test text extraction for plain text files (should be immediate)""" + """Test text extraction for plain text files also generate json and markdown outputs.""" text_content = "This is a sample text document with some content for testing text extraction." with subtests.test("upload text file"): @@ -128,25 +166,38 @@ async def test_text_extraction_plain_text_workflow(subtests): content_type="text/plain", ) assert file.filename == "test_document.txt" + assert file.file_type == "user_upload" with subtests.test("create text extraction for plain text"): extraction = await file.create_extraction() assert extraction.file_id == file.id - # Plain text files should be completed immediately - assert extraction.status == "completed" + assert extraction.status in ["pending", "in_progress", "completed"] + + with subtests.test("check extraction status"): + extraction = await file.get_extraction() + assert extraction.file_id == file.id + + async for attempt in AsyncRetrying(stop=stop_after_delay(timedelta(seconds=40)), wait=wait_fixed(1)): + with attempt: + extraction = await file.get_extraction() + final_status = extraction.status + if final_status not in ["completed", "failed"]: + raise ValueError("not completed") - with subtests.test("verify immediate text content access"): - async with file.load_text_content() as loaded_text_content: - assert loaded_text_content.text == text_content + assert final_status == "completed", f"Expected completed status, got {final_status}: {extraction.error_message}" + assert extraction.finished_at is not None - with subtests.test("delete extraction"): - await file.delete_extraction() + [extracted_file] = extraction.extracted_files + assert extracted_file.format is None + with subtests.test("delete file should also delete extractions"): + await file.delete() with ( - subtests.test("verify extraction deleted"), + subtests.test("verify file deleted"), pytest.raises(httpx.HTTPStatusError, match="404 Not Found"), ): - _ = await file.get_extraction() + async with file.load_content(): + ... @pytest.mark.usefixtures("clean_up", "setup_platform_client") diff --git a/apps/agentstack-server/tests/unit/domain/models/test_file.py b/apps/agentstack-server/tests/unit/domain/models/test_file.py new file mode 100644 index 000000000..c969b1d80 --- /dev/null +++ b/apps/agentstack-server/tests/unit/domain/models/test_file.py @@ -0,0 +1,119 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import AsyncIterator + +import pytest + +from agentstack_server.domain.models.file import AsyncFile + +pytestmark = pytest.mark.unit + + +async def mock_iterator() -> AsyncIterator[bytes]: + yield b"hello" + yield b" " + yield b"world" + yield b"!" + + +@pytest.mark.asyncio +async def test_async_file_read_chunking(): + """Test that AsyncFile.from_async_iterator correctly chunks data.""" + af = AsyncFile.from_async_iterator(mock_iterator(), "test.txt", "text/plain") + + # Read 3 bytes + chunk1 = await af.read(3) + assert chunk1 == b"hel" + + # Read 3 bytes (should get 'lo ') + chunk2 = await af.read(3) + assert chunk2 == b"lo " + + # Read rest (larger than remaining) + chunk3 = await af.read(100) + assert chunk3 == b"world!" + + # Read EOF + chunk4 = await af.read(100) + assert chunk4 == b"" + + +@pytest.mark.asyncio +async def test_async_file_read_exact_size(): + """Test AsyncFile.from_async_iterator reading exact size of chunks.""" + + async def iterator() -> AsyncIterator[bytes]: + yield b"123456" + + af = AsyncFile.from_async_iterator(iterator(), "test.txt", "text/plain") + + chunk1 = await af.read(3) + assert chunk1 == b"123" + + chunk2 = await af.read(3) + assert chunk2 == b"456" + + chunk3 = await af.read(3) + assert chunk3 == b"" + + +@pytest.mark.asyncio +async def test_async_file_read_across_chunks(): + """Test AsyncFile.from_async_iterator reading spanning multiple chunks.""" + + async def iterator() -> AsyncIterator[bytes]: + yield b"ab" + yield b"cd" + yield b"ef" + + af = AsyncFile.from_async_iterator(iterator(), "test.txt", "text/plain") + + # Read 3 bytes (ab + c) + chunk1 = await af.read(3) + assert chunk1 == b"abc" + + # Read 3 bytes (d + ef) + chunk2 = await af.read(3) + assert chunk2 == b"def" + + +@pytest.mark.asyncio +async def test_async_file_from_content(): + """Test AsyncFile.from_content.""" + content = b"hello world from buffer" + af = AsyncFile.from_bytes(content, "mem_test.txt", "text/plain") + + assert af.size == len(content) + + # Read 5 bytes + chunk1 = await af.read(5) + assert chunk1 == b"hello" + + # Read 6 bytes + chunk2 = await af.read(6) + assert chunk2 == b" world" + + # Read 100 bytes (remaining) + chunk3 = await af.read(100) + assert chunk3 == b" from buffer" + + # Read EOF + chunk4 = await af.read(100) + assert chunk4 == b"" + + +@pytest.mark.asyncio +async def test_async_file_from_content_exact_chunks(): + """Test AsyncFile.from_content with exact chunk sizes.""" + content = b"123456" + af = AsyncFile.from_bytes(content, "mem_test.txt", "text/plain") + + chunk1 = await af.read(3) + assert chunk1 == b"123" + + chunk2 = await af.read(3) + assert chunk2 == b"456" + + chunk3 = await af.read(3) + assert chunk3 == b"" diff --git a/apps/agentstack-server/tests/unit/utils/test_utils.py b/apps/agentstack-server/tests/unit/utils/test_utils.py index 352bd3ef6..10b5e7052 100644 --- a/apps/agentstack-server/tests/unit/utils/test_utils.py +++ b/apps/agentstack-server/tests/unit/utils/test_utils.py @@ -7,10 +7,7 @@ import pytest -from agentstack_server.utils.utils import ( - extract_string_value_stream, - filter_json_recursively, -) +from agentstack_server.utils.utils import filter_json_recursively def async_json_reader(obj: dict[str, Any] | str): @@ -23,43 +20,6 @@ async def read(size: int): return read -@pytest.mark.unit -@pytest.mark.parametrize( - "obj", - [ - {"text": "abcde" * 100}, - {"text": "abcde" * 100, "other_key": 42}, - {"first_key": '"text": "haha"', "text": "abcde" * 100, "other_key": 666}, - {"text": 'escape "hell\\"\\' * 1000}, - {"text": 'escape "hell2\\n\t\r\\d"\\' * 1000}, - ], -) -async def test_extract_string_value_stream(obj): - reader = async_json_reader(obj) - - result = [] - async for chunk in extract_string_value_stream(reader, "text", chunk_size=128): - result.append(chunk) - - assert "".join(result) == obj["text"] - - -@pytest.mark.unit -@pytest.mark.parametrize( - "obj, error", - [ - ({"txt": "aa"}, KeyError), - ('{"text": "aaaa ', EOFError), - ], -) -async def test_extract_string_value_stream_key_in_between_chunks(obj, error): - reader = async_json_reader(obj) - - with pytest.raises(error): - async for _chunk in extract_string_value_stream(reader, "text"): - ... - - @pytest.mark.unit def test_filter_json_recursively(): data = { diff --git a/apps/agentstack-server/uv.lock b/apps/agentstack-server/uv.lock index 944b1efae..a95090c05 100644 --- a/apps/agentstack-server/uv.lock +++ b/apps/agentstack-server/uv.lock @@ -90,6 +90,7 @@ dependencies = [ { name = "fastapi", extra = ["standard"] }, { name = "httpx" }, { name = "ibm-watsonx-ai" }, + { name = "ijson" }, { name = "kink" }, { name = "kr8s" }, { name = "limits", extra = ["async-redis"] }, @@ -145,6 +146,7 @@ requires-dist = [ { name = "fastapi", extras = ["standard"], specifier = ">=0.115.7" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "ibm-watsonx-ai", specifier = ">=1.3.28" }, + { name = "ijson", specifier = ">=3.4.0.post0" }, { name = "kink", specifier = ">=0.8.1" }, { name = "kr8s", specifier = ">=0.20.7" }, { name = "limits", extras = ["async-redis"], specifier = ">=5.3.0" }, @@ -970,6 +972,36 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" }, ] +[[package]] +name = "ijson" +version = "3.4.0.post0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2d/30/7ab4b9e88e7946f6beef419f74edcc541df3ea562c7882257b4eaa82417d/ijson-3.4.0.post0.tar.gz", hash = "sha256:9aa02dc70bb245670a6ca7fba737b992aeeb4895360980622f7e568dbf23e41e", size = 67216, upload-time = "2025-10-10T05:29:25.62Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1b/20/aaec6977f9d538bbadd760c7fa0f6a0937742abdcc920ec6478a8576e55f/ijson-3.4.0.post0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:114ed248166ac06377e87a245a158d6b98019d2bdd3bb93995718e0bd996154f", size = 87863, upload-time = "2025-10-10T05:28:20.786Z" }, + { url = "https://files.pythonhosted.org/packages/5b/29/06bf56a866e2fe21453a1ad8f3a5d7bca3c723f73d96329656dfee969783/ijson-3.4.0.post0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ffb21203736b08fe27cb30df6a4f802fafb9ef7646c5ff7ef79569b63ea76c57", size = 59806, upload-time = "2025-10-10T05:28:21.596Z" }, + { url = "https://files.pythonhosted.org/packages/ba/ae/e1d0fda91ba7a444b75f0d60cb845fdb1f55d3111351529dcbf4b1c276fe/ijson-3.4.0.post0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:07f20ecd748602ac7f18c617637e53bd73ded7f3b22260bba3abe401a7fc284e", size = 59643, upload-time = "2025-10-10T05:28:22.45Z" }, + { url = "https://files.pythonhosted.org/packages/4d/24/5a24533be2726396cc1724dc237bada09b19715b5bfb0e7b9400db0901ad/ijson-3.4.0.post0-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:27aa193d47ffc6bc4e45453896ad98fb089a367e8283b973f1fe5c0198b60b4e", size = 138082, upload-time = "2025-10-10T05:28:23.319Z" }, + { url = "https://files.pythonhosted.org/packages/05/60/026c3efcec23c329657e878cbc0a9a25b42e7eb3971e8c2377cb3284e2b7/ijson-3.4.0.post0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ccddb2894eb7af162ba43b9475ac5825d15d568832f82eb8783036e5d2aebd42", size = 149145, upload-time = "2025-10-10T05:28:24.279Z" }, + { url = "https://files.pythonhosted.org/packages/ed/c2/036499909b7a1bc0bcd85305e4348ad171aeb9df57581287533bdb3497e9/ijson-3.4.0.post0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:61ab0b8c5bf707201dc67e02c116f4b6545c4afd7feb2264b989d242d9c4348a", size = 149046, upload-time = "2025-10-10T05:28:25.186Z" }, + { url = "https://files.pythonhosted.org/packages/ba/75/e7736073ad96867c129f9e799e3e65086badd89dbf3911f76d9b3bf8a115/ijson-3.4.0.post0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:254cfb8c124af68327a0e7a49b50bbdacafd87c4690a3d62c96eb01020a685ef", size = 150356, upload-time = "2025-10-10T05:28:26.135Z" }, + { url = "https://files.pythonhosted.org/packages/9d/1b/1c1575d2cda136985561fcf774fe6c54412cd0fa08005342015af0403193/ijson-3.4.0.post0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:04ac9ca54db20f82aeda6379b5f4f6112fdb150d09ebce04affeab98a17b4ed3", size = 142322, upload-time = "2025-10-10T05:28:27.125Z" }, + { url = "https://files.pythonhosted.org/packages/28/4d/aba9871feb624df8494435d1a9ddc7b6a4f782c6044bfc0d770a4b59f145/ijson-3.4.0.post0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a603d7474bf35e7b3a8e49c8dabfc4751841931301adff3f3318171c4e407f32", size = 151386, upload-time = "2025-10-10T05:28:28.274Z" }, + { url = "https://files.pythonhosted.org/packages/3f/9a/791baa83895fb6e492bce2c7a0ea6427b6a41fe854349e62a37d0c9deaf0/ijson-3.4.0.post0-cp313-cp313-win32.whl", hash = "sha256:ec5bb1520cb212ebead7dba048bb9b70552c3440584f83b01b0abc96862e2a09", size = 52352, upload-time = "2025-10-10T05:28:29.191Z" }, + { url = "https://files.pythonhosted.org/packages/a9/0c/061f51493e1da21116d74ee8f6a6b9ae06ca5fa2eb53c3b38b64f9a9a5ae/ijson-3.4.0.post0-cp313-cp313-win_amd64.whl", hash = "sha256:3505dff18bdeb8b171eb28af6df34857e2be80dc01e2e3b624e77215ad58897f", size = 54783, upload-time = "2025-10-10T05:28:30.048Z" }, + { url = "https://files.pythonhosted.org/packages/c7/89/4344e176f2c5f5ef3251c9bfa4ddd5b4cf3f9601fd6ec3f677a3ba0b9c71/ijson-3.4.0.post0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:45a0b1c833ed2620eaf8da958f06ac8351c59e5e470e078400d23814670ed708", size = 92342, upload-time = "2025-10-10T05:28:31.389Z" }, + { url = "https://files.pythonhosted.org/packages/d4/b1/85012c586a6645f9fb8bfa3ef62ed2f303c8d73fc7c2f705111582925980/ijson-3.4.0.post0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:7809ec8c8f40228edaaa089f33e811dff4c5b8509702652870d3f286c9682e27", size = 62028, upload-time = "2025-10-10T05:28:32.849Z" }, + { url = "https://files.pythonhosted.org/packages/65/ea/7b7e2815c101d78b33e74d64ddb70cccc377afccd5dda76e566ed3fcb56f/ijson-3.4.0.post0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:cf4a34c2cfe852aee75c89c05b0a4531c49dc0be27eeed221afd6fbf9c3e149c", size = 61773, upload-time = "2025-10-10T05:28:34.016Z" }, + { url = "https://files.pythonhosted.org/packages/59/7d/2175e599cb77a64f528629bad3ce95dfdf2aa6171d313c1fc00bbfaf0d22/ijson-3.4.0.post0-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:a39d5d36067604b26b78de70b8951c90e9272450642661fe531a8f7a6936a7fa", size = 198562, upload-time = "2025-10-10T05:28:34.878Z" }, + { url = "https://files.pythonhosted.org/packages/13/97/82247c501c92405bb2fc44ab5efb497335bcb9cf0f5d3a0b04a800737bd8/ijson-3.4.0.post0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:83fc738d81c9ea686b452996110b8a6678296c481e0546857db24785bff8da92", size = 216212, upload-time = "2025-10-10T05:28:36.208Z" }, + { url = "https://files.pythonhosted.org/packages/95/ca/b956f507bb02e05ce109fd11ab6a2c054f8b686cc5affe41afe50630984d/ijson-3.4.0.post0-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b2a81aee91633868f5b40280e2523f7c5392e920a5082f47c5e991e516b483f6", size = 206618, upload-time = "2025-10-10T05:28:37.243Z" }, + { url = "https://files.pythonhosted.org/packages/3e/12/e827840ab81d86a9882e499097934df53294f05155f1acfcb9a211ac1142/ijson-3.4.0.post0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:56169e298c5a2e7196aaa55da78ddc2415876a74fe6304f81b1eb0d3273346f7", size = 210689, upload-time = "2025-10-10T05:28:38.252Z" }, + { url = "https://files.pythonhosted.org/packages/1b/3b/59238d9422c31a4aefa22ebeb8e599e706158a0ab03669ef623be77a499a/ijson-3.4.0.post0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:eeb9540f0b1a575cbb5968166706946458f98c16e7accc6f2fe71efa29864241", size = 199927, upload-time = "2025-10-10T05:28:39.233Z" }, + { url = "https://files.pythonhosted.org/packages/b6/0f/ec01c36c128c37edb8a5ae8f3de3256009f886338d459210dfe121ee4ba9/ijson-3.4.0.post0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ba3478ff0bb49d7ba88783f491a99b6e3fa929c930ab062d2bb7837e6a38fe88", size = 204455, upload-time = "2025-10-10T05:28:40.644Z" }, + { url = "https://files.pythonhosted.org/packages/c8/cf/5560e1db96c6d10a5313be76bf5a1754266cbfb5cc13ff64d107829e07b1/ijson-3.4.0.post0-cp313-cp313t-win32.whl", hash = "sha256:b005ce84e82f28b00bf777a464833465dfe3efa43a0a26c77b5ac40723e1a728", size = 54566, upload-time = "2025-10-10T05:28:41.663Z" }, + { url = "https://files.pythonhosted.org/packages/22/5a/cbb69144c3b25dd56f5421ff7dc0cf3051355579062024772518e4f4b3c5/ijson-3.4.0.post0-cp313-cp313t-win_amd64.whl", hash = "sha256:fe9c84c9b1c8798afa407be1cea1603401d99bfc7c34497e19f4f5e5ddc9b441", size = 57298, upload-time = "2025-10-10T05:28:42.881Z" }, +] + [[package]] name = "importlib-metadata" version = "8.7.0" diff --git a/docs/agent-development/rag.mdx b/docs/agent-development/rag.mdx index c120f87ba..73de13509 100644 --- a/docs/agent-development/rag.mdx +++ b/docs/agent-development/rag.mdx @@ -8,17 +8,19 @@ information from a knowledge base relevant to a specific user query and provide a complex topic with many variants. We will focus on the fundamental building blocks that any RAG pipeline needs. The document processing pipeline: + 1. **text extraction** - process complex document formats (PDF, CSV, etc.) 2. **text splitting** - create meaningful chunks out of long pages of text 3. **embedding** - vectorize chunks (extract semantic meaning) 4. **store** - insert chunks to a specialized database Retrieval: + 1. **embedding** - vectorize user query 2. **search** - retrieve the document chunks most similar to the user query - ## Building blocks + Let's break down how each step can be implemented with the Agent Stack API, but first, make sure you have the Platform API extension enabled in your agent: @@ -78,9 +80,10 @@ agent can accept, use the `default_input_modes` parameter in the agent decorator First, let's build a set of functions to process the documents which we will then use in the agent. ### Text Extraction + To extract text from a `File` uploaded to the Platform API, simply use `file.create_extraction()` and wait for the result. After extraction is completed, the `extraction` object will contain -`extracted_file_id`, which is an ID of a new file containing the extracted text in Markdown. +`extracted_files`, which is a list of extracted files in different formats. ```python from agentstack_sdk.platform import File, Extraction @@ -91,11 +94,22 @@ async def extract_file(file: File): while extraction.status in {"pending", "in_progress"}: await asyncio.sleep(1) extraction = await file.get_extraction() - if extraction.status != "completed" or not extraction.extracted_file_id: + if extraction.status != "completed": raise ValueError(f"Extraction failed with status: {extraction.status}") ``` +#### Extraction Formats + +Text extraction produces two extraction formats and you can request either subset by passing `formats` to `create_extraction` (e.g., `["markdown"]` if you only need plain text): + +- __markdown__: The extracted text formatted as Markdown (`file.load_text_content()`) +- __vendor_specific_json__: The Docling-specific JSON format containing document structure (`file.load_json_content()`) + +> __WARNING__: +> The `vendor_specific_json` format is not generated for plain text or markdown files, as Docling does not support these formats as input. + ### Text Splitting + In this example we will use `MarkdownTextSplitter` from the [langchain-text-splitters](https://reference.langchain.com/python/langchain_text_splitters/) package. This will split a long document into reasonably sized chunks based on the Markdown header structure. @@ -108,6 +122,7 @@ def chunk_markdown(markdown_text: str) -> list[str]: ``` ### Embedding + Now we need to embed each chunk using the embedding service. Similarly to LLM, Agent Stack implements OpenAI-compatible embedding API. You can use any preferred client, in this example we will use the embedding extension to create an `AsyncOpenAI` client: @@ -134,6 +149,7 @@ def get_embedding_client( ``` + Now we can use this client to embed our chunks and create vector store items: ```python @@ -164,6 +180,7 @@ async def embed_chunks( ``` ### Store + Finally, to insert the prepared items, we need a function to create a vector store. For this we will need to know the dimension of the embeddings and model_id. Because the model is chosen by the embedding extension and we don't know it in advance, we will create a test embedding request to calculate the dimension: @@ -184,9 +201,11 @@ async def create_vector_store(embedding_client: AsyncOpenAI, embedding_model: st model_id=embedding_model, ) ``` + We can then add the prepared items using `vector_store.add_documents`, this will become clear in the final example. ### Query vector store + Assuming we have our knowledge base of documents prepared, we can now easily search the store according to the user query. The following function will retrieve five document chunks most similar to the query embedding: @@ -273,6 +292,7 @@ Instead of simply returning the output of the vector store, you would typically agentic framework. ### Conversational agent + Having a new vector store for each message is not really a good practice. Typically, you would want to search through all documents uploaded in the conversation. Below is a version of the agent which will reuse the vector store across messages so you can ask multiple queries and or additional documents later on. @@ -344,8 +364,10 @@ async def rag_agent( ``` ### Next steps + To further improve the agent, learn how to use other parts of the platform such as LLMs, file uploads and conversations: + - [LLM extension](/extensions/llm-proxy-service) - [Multi-turn conversations](/guides/multi-turn) - [File handling](/guides/files) diff --git a/helm/templates/docling/docling-serve.yaml b/helm/templates/docling/docling-serve.yaml index f65a3cb65..a20606081 100644 --- a/helm/templates/docling/docling-serve.yaml +++ b/helm/templates/docling/docling-serve.yaml @@ -69,6 +69,8 @@ spec: value: {{ .Values.docling.ui.enabled | quote }} - name: UVICORN_PORT value: "15001" + - name: DOCLING_SERVE_MAX_SYNC_WAIT + value: {{ .Values.docling.maxSyncWait | quote }} ports: - name: http containerPort: 15001 diff --git a/helm/values.yaml b/helm/values.yaml index 54c5c7631..db0bd0cab 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -657,6 +657,7 @@ docling: service: port: 15001 type: ClusterIP + maxSyncWait: 300 # 5 minutes in seconds imagePullPolicy: IfNotPresent image: "ghcr.io/docling-project/docling-serve-cpu:v1.5.0"