Skip to content

Commit 61cfdd8

Browse files
aleskalfasjezekra1
andcommitted
feat(server): add json format to text extraction
Co-authored-by: Radek Ježek <[email protected]> Signed-off-by: Aleš Kalfas <[email protected]> Signed-off-by: Radek Ježek <[email protected]>
1 parent e5c8a05 commit 61cfdd8

File tree

23 files changed

+853
-200
lines changed

23 files changed

+853
-200
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"python.terminal.activateEnvironment": false
3+
}

agents/rag/.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"version": "0.2.0",
66
"configurations": [
77
{
8-
"name": "agent-form",
8+
"name": "agent-rag",
99
"type": "debugpy",
1010
"request": "launch",
1111
"program": "${workspaceFolder}/src/rag/agent.py",

agents/rag/src/rag/tools/files/file_reader.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
from typing import List, Literal
55

6+
from agentstack_sdk.platform import File
67
from beeai_framework.emitter import Emitter
78
from beeai_framework.tools import (
89
JSONToolOutput,
910
Tool,
1011
ToolRunOptions,
1112
)
12-
from agentstack_sdk.platform import File
1313
from pydantic import BaseModel, Field, create_model
1414

1515
from rag.tools.files.utils import File, format_size
@@ -116,7 +116,6 @@ async def _run(self, input: FileReadInputBase, options, context) -> FileReaderTo
116116
# pull the first (only) MessagePart from the async-generator
117117
async with file.load_text_content() as loaded_file:
118118
content = loaded_file.text
119-
content_type = loaded_file.content_type
120119

121120
if content is None:
122121
raise ValueError(f"File content is None for {filename}.")

agentstack.code-workspace

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@
2828
"name": "agent-chat",
2929
"path": "agents/chat"
3030
},
31-
{
32-
"name": "agent-form",
33-
"path": "agents/form"
34-
},
3531
{
3632
"name": "agent-rag",
3733
"path": "agents/rag"

apps/agentstack-sdk-py/src/agentstack_sdk/platform/file.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,20 @@
1616
from agentstack_sdk.util.file import LoadedFile, LoadedFileWithUri, PlatformFileUrl
1717
from agentstack_sdk.util.utils import filter_dict
1818

19+
ExtractionFormatLiteral = typing.Literal["markdown", "vendor_specific_json"]
20+
21+
22+
class ExtractedFileInfo(pydantic.BaseModel):
23+
"""Information about an extracted file."""
24+
25+
file_id: str
26+
format: ExtractionFormatLiteral | None
27+
1928

2029
class Extraction(pydantic.BaseModel):
2130
id: str
2231
file_id: str
23-
extracted_file_id: str | None = None
32+
extracted_files: list[ExtractedFileInfo] = pydantic.Field(default_factory=list)
2433
status: typing.Literal["pending", "in_progress", "completed", "failed", "cancelled"] = "pending"
2534
job_id: str | None = None
2635
error_message: str | None = None
@@ -152,9 +161,43 @@ async def load_text_content(
152161
await response.aread()
153162
yield LoadedFileWithUri(response=response, content_type=file.content_type, filename=file.filename)
154163

164+
@asynccontextmanager
165+
async def load_json_content(
166+
self: File | str,
167+
*,
168+
stream: bool = False,
169+
client: PlatformClient | None = None,
170+
context_id: str | None | Literal["auto"] = "auto",
171+
) -> AsyncIterator[LoadedFile]:
172+
# `self` has a weird type so that you can call both `instance.load_json_content()` to create an extraction for an instance, or `File.load_json_content("123")`
173+
file_id = self if isinstance(self, str) else self.id
174+
async with client or get_platform_client() as platform_client:
175+
context_id = platform_client.context_id if context_id == "auto" else context_id
176+
177+
file = await File.get(file_id, client=client, context_id=context_id) if isinstance(self, str) else self
178+
extraction = await file.get_extraction(client=client, context_id=context_id)
179+
180+
for extracted_file_info in extraction.extracted_files:
181+
if extracted_file_info.format != "vendor_specific_json":
182+
continue
183+
extracted_json_file_id = extracted_file_info.file_id
184+
async with platform_client.stream(
185+
"GET",
186+
url=f"/api/v1/files/{extracted_json_file_id}/content",
187+
params=context_id and {"context_id": context_id},
188+
) as response:
189+
response.raise_for_status()
190+
if not stream:
191+
await response.aread()
192+
yield LoadedFileWithUri(response=response, content_type=file.content_type, filename=file.filename)
193+
return
194+
195+
raise ValueError("No extracted JSON content available for this file.")
196+
155197
async def create_extraction(
156198
self: File | str,
157199
*,
200+
formats: list[ExtractionFormatLiteral] | None = None,
158201
client: PlatformClient | None = None,
159202
context_id: str | None | Literal["auto"] = "auto",
160203
) -> Extraction:
@@ -167,6 +210,7 @@ async def create_extraction(
167210
await platform_client.post(
168211
url=f"/api/v1/files/{file_id}/extraction",
169212
params=context_id and {"context_id": context_id},
213+
json={"settings": {"formats": formats}} if formats else None,
170214
)
171215
)
172216
.raise_for_status()

apps/agentstack-server/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ dependencies = [
4848
"opentelemetry-instrumentation-httpx>=0.59b0",
4949
"opentelemetry-instrumentation-fastapi>=0.59b0",
5050
"limits[async-redis]>=5.3.0",
51+
"ijson>=3.4.0.post0",
5152
]
5253

5354
[dependency-groups]

apps/agentstack-server/src/agentstack_server/api/routes/files.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
22
# SPDX-License-Identifier: Apache-2.0
3+
34
import logging
45
from contextlib import AsyncExitStack
56
from typing import Annotated
@@ -14,9 +15,17 @@
1415
RequiresContextPermissions,
1516
)
1617
from agentstack_server.api.schema.common import EntityModel
17-
from agentstack_server.api.schema.files import FileListQuery
18+
from agentstack_server.api.schema.files import FileListQuery, TextExtractionRequest
1819
from agentstack_server.domain.models.common import PaginatedResult
19-
from agentstack_server.domain.models.file import AsyncFile, ExtractionStatus, File, TextExtraction
20+
from agentstack_server.domain.models.file import (
21+
AsyncFile,
22+
Backend,
23+
ExtractionFormat,
24+
ExtractionStatus,
25+
File,
26+
TextExtraction,
27+
TextExtractionSettings,
28+
)
2029
from agentstack_server.domain.models.permissions import AuthorizedUser
2130
from agentstack_server.service_layer.services.files import FileService
2231

@@ -92,12 +101,32 @@ async def get_text_file_content(
92101
user: Annotated[AuthorizedUser, Depends(RequiresContextPermissions(files={"read"}))],
93102
) -> StreamingResponse:
94103
extraction = await file_service.get_extraction(file_id=file_id, user=user.user, context_id=user.context_id)
95-
if not extraction.status == ExtractionStatus.COMPLETED or not extraction.extracted_file_id:
104+
if not extraction.status == ExtractionStatus.COMPLETED or not extraction.extracted_files:
96105
raise HTTPException(
97106
status_code=status.HTTP_400_BAD_REQUEST,
98107
detail=f"Extraction is not completed (status {extraction.status})",
99108
)
100-
return await _stream_file(file_service=file_service, user=user, file_id=extraction.extracted_file_id)
109+
110+
if extraction.extraction_metadata is not None and extraction.extraction_metadata.backend == Backend.IN_PLACE:
111+
# Fallback to the original file for in-place extraction
112+
original_file_id = extraction.find_file_by_format(format=None)
113+
if not original_file_id:
114+
raise HTTPException(
115+
status_code=status.HTTP_404_NOT_FOUND,
116+
detail="Original file not found in extraction results",
117+
)
118+
file_to_stream_id = original_file_id
119+
else:
120+
# Find the markdown file from extracted files
121+
markdown_file_id = extraction.find_file_by_format(format=ExtractionFormat.MARKDOWN)
122+
if not markdown_file_id:
123+
raise HTTPException(
124+
status_code=status.HTTP_404_NOT_FOUND,
125+
detail="Markdown file not found in extraction results",
126+
)
127+
file_to_stream_id = markdown_file_id
128+
129+
return await _stream_file(file_service=file_service, user=user, file_id=file_to_stream_id)
101130

102131

103132
@router.delete("/{file_id}", status_code=fastapi.status.HTTP_204_NO_CONTENT)
@@ -114,6 +143,7 @@ async def create_text_extraction(
114143
file_id: UUID,
115144
file_service: FileServiceDependency,
116145
user: Annotated[AuthorizedUser, Depends(RequiresContextPermissions(files={"write", "extract"}))],
146+
request: TextExtractionRequest | None = None,
117147
) -> EntityModel[TextExtraction]:
118148
"""Create or return text extraction for a file.
119149
@@ -122,8 +152,15 @@ async def create_text_extraction(
122152
- If extraction is pending/in-progress, returns current status
123153
- If no extraction exists, creates a new one
124154
"""
155+
if request is None:
156+
request = TextExtractionRequest()
157+
158+
settings = request.settings if request.settings is not None else TextExtractionSettings()
159+
125160
return EntityModel(
126-
await file_service.create_extraction(file_id=file_id, user=user.user, context_id=user.context_id)
161+
await file_service.create_extraction(
162+
file_id=file_id, user=user.user, context_id=user.context_id, settings=settings
163+
)
127164
)
128165

129166

apps/agentstack-server/src/agentstack_server/api/schema/files.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pydantic import BaseModel, Field
77

88
from agentstack_server.api.schema.common import PaginationQuery
9+
from agentstack_server.domain.models.file import TextExtractionSettings
910

1011

1112
class FileResponse(BaseModel):
@@ -36,3 +37,12 @@ class FileListQuery(PaginationQuery):
3637
description="Case-insensitive partial match search on filename (e.g., 'doc' matches 'my_document.pdf')",
3738
)
3839
order_by: str = Field(default_factory=lambda: "created_at", pattern="^created_at|filename|file_size_bytes$")
40+
41+
42+
class TextExtractionRequest(BaseModel):
43+
"""Request schema for text extraction."""
44+
45+
settings: TextExtractionSettings | None = Field(
46+
default=None,
47+
description="Additional options for text extraction",
48+
)

apps/agentstack-server/src/agentstack_server/domain/models/file.py

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from collections.abc import Awaitable, Callable
4+
from collections.abc import AsyncIterator, Awaitable, Callable
55
from enum import StrEnum
6+
from typing import Self
67
from uuid import UUID, uuid4
78

89
from pydantic import AwareDatetime, BaseModel, Field
@@ -23,8 +24,24 @@ class ExtractionStatus(StrEnum):
2324
CANCELLED = "cancelled"
2425

2526

27+
class ExtractionFormat(StrEnum):
28+
MARKDOWN = "markdown"
29+
VENDOR_SPECIFIC_JSON = "vendor_specific_json"
30+
31+
32+
class TextExtractionSettings(BaseModel):
33+
formats: list[ExtractionFormat] = Field(
34+
default_factory=lambda: [ExtractionFormat.MARKDOWN, ExtractionFormat.VENDOR_SPECIFIC_JSON]
35+
)
36+
37+
38+
class Backend(StrEnum):
39+
IN_PLACE = "in-place"
40+
41+
2642
class ExtractionMetadata(BaseModel, extra="allow"):
27-
backend: str
43+
backend: str | None = None
44+
settings: TextExtractionSettings | None = None
2845

2946

3047
class FileMetadata(BaseModel, extra="allow"):
@@ -39,6 +56,36 @@ class AsyncFile(BaseModel):
3956
read: Callable[[int], Awaitable[bytes]]
4057
size: int | None = None
4158

59+
@classmethod
60+
def from_async_iterator(cls, iterator: AsyncIterator[bytes], filename: str, content_type: str) -> Self:
61+
buffer = b""
62+
63+
async def read(size: int = 8192) -> bytes:
64+
nonlocal buffer
65+
while len(buffer) < size:
66+
try:
67+
buffer += await anext(iterator)
68+
except StopAsyncIteration:
69+
break
70+
71+
result = buffer[:size]
72+
buffer = buffer[size:]
73+
return result
74+
75+
return cls(filename=filename, content_type=content_type, read=read)
76+
77+
@classmethod
78+
def from_bytes(cls, content: bytes, filename: str, content_type: str) -> Self:
79+
pos = 0
80+
81+
async def read(size: int = 8192) -> bytes:
82+
nonlocal pos
83+
result = content[pos : pos + size]
84+
pos += len(result)
85+
return result
86+
87+
return cls(filename=filename, content_type=content_type, read=read, size=len(content))
88+
4289

4390
class File(BaseModel):
4491
id: UUID = Field(default_factory=uuid4)
@@ -52,10 +99,17 @@ class File(BaseModel):
5299
context_id: UUID | None = None
53100

54101

102+
class ExtractedFileInfo(BaseModel):
103+
"""Information about an extracted file."""
104+
105+
file_id: UUID
106+
format: ExtractionFormat | None = None
107+
108+
55109
class TextExtraction(BaseModel):
56110
id: UUID = Field(default_factory=uuid4)
57111
file_id: UUID
58-
extracted_file_id: UUID | None = None
112+
extracted_files: list[ExtractedFileInfo] = Field(default_factory=list)
59113
status: ExtractionStatus = ExtractionStatus.PENDING
60114
job_id: str | None = None
61115
error_message: str | None = None
@@ -64,20 +118,30 @@ class TextExtraction(BaseModel):
64118
finished_at: AwareDatetime | None = None
65119
created_at: AwareDatetime = Field(default_factory=utc_now)
66120

67-
def set_started(self, job_id: str) -> None:
68-
"""Mark extraction as started with job ID."""
121+
def set_started(self, job_id: str, backend: str) -> None:
122+
"""Mark extraction as started with job ID and backend name."""
69123
self.status = ExtractionStatus.IN_PROGRESS
70124
self.job_id = job_id
71125
self.started_at = utc_now()
72126
self.error_message = None
73127

74-
def set_completed(self, extracted_file_id: UUID, metadata: ExtractionMetadata | None = None) -> None:
75-
"""Mark extraction as completed with extracted file ID."""
128+
# Create extraction_metadata if it doesn't exist
129+
if self.extraction_metadata is None:
130+
self.extraction_metadata = ExtractionMetadata()
131+
132+
# Set the backend name
133+
self.extraction_metadata.backend = backend
134+
135+
def set_completed(
136+
self, extracted_files: list[ExtractedFileInfo], metadata: ExtractionMetadata | None = None
137+
) -> None:
138+
"""Mark extraction as completed with extracted files and their formats."""
76139
self.status = ExtractionStatus.COMPLETED
77-
self.extracted_file_id = extracted_file_id
140+
self.extracted_files = extracted_files
78141
self.finished_at = utc_now()
79-
self.extraction_metadata = metadata
80142
self.error_message = None
143+
if metadata is not None:
144+
self.extraction_metadata = metadata
81145

82146
def set_failed(self, error_message: str) -> None:
83147
"""Mark extraction as failed with error message."""
@@ -97,3 +161,10 @@ def reset_for_retry(self) -> None:
97161
self.started_at = None
98162
self.finished_at = None
99163
self.job_id = None
164+
165+
def find_file_by_format(self, format: ExtractionFormat | None) -> UUID | None:
166+
"""Find an extracted file by format from the extracted files list."""
167+
for extracted_file_info in self.extracted_files:
168+
if extracted_file_info.format == format:
169+
return extracted_file_info.file_id
170+
return None

0 commit comments

Comments
 (0)