Skip to content

Commit 6c48588

Browse files
committed
feat(server): add json format to docling extraction
Signed-off-by: Aleš Kalfas <kalfas.ales@gmail.com>
1 parent fa33379 commit 6c48588

File tree

10 files changed

+361
-17
lines changed

10 files changed

+361
-17
lines changed

apps/agentstack-server/src/agentstack_server/api/routes/files.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
RequiresContextPermissions,
1515
)
1616
from agentstack_server.api.schema.common import EntityModel
17-
from agentstack_server.api.schema.files import FileListQuery
17+
from agentstack_server.api.schema.files import FileListQuery, TextExtractionRequest
1818
from agentstack_server.domain.models.common import PaginatedResult
1919
from agentstack_server.domain.models.file import AsyncFile, ExtractionStatus, File, TextExtraction
2020
from agentstack_server.domain.models.permissions import AuthorizedUser
@@ -114,6 +114,7 @@ async def create_text_extraction(
114114
file_id: UUID,
115115
file_service: FileServiceDependency,
116116
user: Annotated[AuthorizedUser, Depends(RequiresContextPermissions(files={"write", "extract"}))],
117+
request: TextExtractionRequest | None = None,
117118
) -> EntityModel[TextExtraction]:
118119
"""Create or return text extraction for a file.
119120
@@ -122,8 +123,13 @@ async def create_text_extraction(
122123
- If extraction is pending/in-progress, returns current status
123124
- If no extraction exists, creates a new one
124125
"""
126+
if request is None:
127+
request = TextExtractionRequest()
128+
125129
return EntityModel(
126-
await file_service.create_extraction(file_id=file_id, user=user.user, context_id=user.context_id)
130+
await file_service.create_extraction(
131+
file_id=file_id, user=user.user, context_id=user.context_id, extras=request.extras
132+
)
127133
)
128134

129135

apps/agentstack-server/src/agentstack_server/api/schema/files.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,12 @@ class FileListQuery(PaginationQuery):
3636
description="Case-insensitive partial match search on filename (e.g., 'doc' matches 'my_document.pdf')",
3737
)
3838
order_by: str = Field(default_factory=lambda: "created_at", pattern="^created_at|filename|file_size_bytes$")
39+
40+
41+
class TextExtractionRequest(BaseModel):
42+
"""Request schema for text extraction."""
43+
44+
extras: dict | None = Field(
45+
default=None,
46+
description="Additional options for text extraction (e.g., {'json_format': True})",
47+
)

apps/agentstack-server/src/agentstack_server/domain/models/file.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class TextExtraction(BaseModel):
6060
job_id: str | None = None
6161
error_message: str | None = None
6262
extraction_metadata: ExtractionMetadata | None = None
63+
extras: dict | None = None
6364
started_at: AwareDatetime | None = None
6465
finished_at: AwareDatetime | None = None
6566
created_at: AwareDatetime = Field(default_factory=utc_now)

apps/agentstack-server/src/agentstack_server/domain/repositories/file.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,11 @@ async def get_file_metadata(self, *, file_id: UUID) -> FileMetadata: ...
7070
@runtime_checkable
7171
class ITextExtractionBackend(Protocol):
7272
@asynccontextmanager
73-
async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = None) -> AsyncIterator[AsyncFile]: # noqa: ASYNC109
73+
async def extract_text(
74+
self,
75+
*,
76+
file_url: AnyUrl,
77+
timeout: timedelta | None = None, # noqa: ASYNC109
78+
extras: dict | None = None,
79+
) -> AsyncIterator[AsyncFile]:
7480
yield ... # pyright: ignore [reportReturnType]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""add extras column to text_extractions table
5+
6+
Revision ID: 90f8cd7d6bef
7+
Revises: 214ed3790c6d
8+
Create Date: 2025-11-26 17:44:56.490088
9+
10+
"""
11+
12+
from collections.abc import Sequence
13+
14+
import sqlalchemy as sa
15+
from alembic import op
16+
17+
# revision identifiers, used by Alembic.
18+
revision: str = "90f8cd7d6bef"
19+
down_revision: str | None = "214ed3790c6d"
20+
branch_labels: str | Sequence[str] | None = None
21+
depends_on: str | Sequence[str] | None = None
22+
23+
24+
def upgrade() -> None:
25+
"""Upgrade schema."""
26+
# ### commands auto generated by Alembic - please adjust! ###
27+
op.add_column("text_extractions", sa.Column("extras", sa.JSON(), nullable=True))
28+
# ### end Alembic commands ###
29+
30+
31+
def downgrade() -> None:
32+
"""Downgrade schema."""
33+
# ### commands auto generated by Alembic - please adjust! ###
34+
op.drop_column("text_extractions", "extras")
35+
# ### end Alembic commands ###

apps/agentstack-server/src/agentstack_server/infrastructure/persistence/repositories/file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
Column("job_id", String(255), nullable=True),
5555
Column("error_message", Text, nullable=True),
5656
Column("extraction_metadata", JSON, nullable=True),
57+
Column("extras", JSON, nullable=True),
5758
Column("started_at", DateTime(timezone=True), nullable=True),
5859
Column("finished_at", DateTime(timezone=True), nullable=True),
5960
Column("created_at", DateTime(timezone=True), nullable=False),
@@ -211,6 +212,7 @@ def _to_text_extraction(self, row: Row) -> TextExtraction:
211212
"job_id": row.job_id,
212213
"error_message": row.error_message,
213214
"extraction_metadata": row.extraction_metadata,
215+
"extras": row.extras,
214216
"started_at": row.started_at,
215217
"finished_at": row.finished_at,
216218
"created_at": row.created_at,
@@ -227,6 +229,7 @@ async def create_extraction(self, *, extraction: TextExtraction) -> None:
227229
job_id=extraction.job_id,
228230
error_message=extraction.error_message,
229231
extraction_metadata=extraction_metadata and extraction_metadata.model_dump(mode="json"),
232+
extras=extraction.extras,
230233
started_at=extraction.started_at,
231234
finished_at=extraction.finished_at,
232235
created_at=extraction.created_at,
Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import logging
45
from collections.abc import AsyncIterator
56
from contextlib import asynccontextmanager
67
from datetime import timedelta
@@ -11,7 +12,9 @@
1112
from agentstack_server.configuration import DoclingExtractionConfiguration
1213
from agentstack_server.domain.models.file import AsyncFile
1314
from agentstack_server.domain.repositories.file import ITextExtractionBackend
14-
from agentstack_server.utils.utils import extract_string_value_stream
15+
from agentstack_server.utils.utils import extract_object_value_stream, extract_string_value_stream
16+
17+
logger = logging.getLogger(__name__)
1518

1619

1720
class DoclingTextExtractionBackend(ITextExtractionBackend):
@@ -20,12 +23,22 @@ def __init__(self, config: DoclingExtractionConfiguration):
2023
self._enabled = config.enabled
2124

2225
@asynccontextmanager
23-
async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = None) -> AsyncIterator[AsyncFile]: # noqa: ASYNC109
26+
async def extract_text(
27+
self,
28+
*,
29+
file_url: AnyUrl,
30+
timeout: timedelta | None = None, # noqa: ASYNC109
31+
extras: dict | None = None,
32+
) -> AsyncIterator[AsyncFile]:
2433
if not self._enabled:
2534
raise RuntimeError(
2635
"Docling extraction backend is not enabled, please check the documentation how to enable it"
2736
)
2837

38+
# Switch to json output formats if specified in extras
39+
is_json_format = extras and extras.get("json_format") is True
40+
to_formats = ["json"] if is_json_format else ["md"]
41+
2942
timeout = timeout or timedelta(minutes=5)
3043
async with (
3144
AsyncClient(base_url=str(self._config.docling_service_url), timeout=timeout.seconds) as client,
@@ -34,7 +47,7 @@ async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = No
3447
"/v1/convert/source",
3548
json={
3649
"options": {
37-
"to_formats": ["md"],
50+
"to_formats": to_formats,
3851
"document_timeout": timeout.total_seconds(),
3952
"image_export_mode": "placeholder",
4053
},
@@ -44,19 +57,22 @@ async def extract_text(self, *, file_url: AnyUrl, timeout: timedelta | None = No
4457
):
4558
response.raise_for_status()
4659

47-
md_stream = None
60+
resp_stream = None
4861

4962
async def read(chunk_size: int = 1024) -> bytes:
50-
nonlocal md_stream
51-
if not md_stream:
52-
md_stream = extract_string_value_stream(response.aiter_text, "md_content", chunk_size)
53-
async for text_chunk in md_stream:
63+
nonlocal resp_stream
64+
if not resp_stream:
65+
if is_json_format:
66+
resp_stream = extract_object_value_stream(response.aiter_text, "json_content", chunk_size)
67+
else:
68+
resp_stream = extract_string_value_stream(response.aiter_text, "md_content", chunk_size)
69+
async for text_chunk in resp_stream:
5470
return text_chunk.encode("utf-8")
5571
return b""
5672

5773
yield AsyncFile(
58-
filename="extracted_text.md",
59-
content_type="text/markdown",
74+
filename="extracted_response.json" if is_json_format else "extracted_response.md",
75+
content_type="application/json" if is_json_format else "text/markdown",
6076
read=read,
6177
size=None, # size is unknown beforehand
6278
)

apps/agentstack-server/src/agentstack_server/service_layer/services/files.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ async def extract_text(self, file_id: UUID, job_id: str):
6060
try:
6161
file_url = await self._object_storage.get_file_url(file_id=file_id)
6262
error_log.append(f"file url: {file_url}")
63-
async with self._extraction_backend.extract_text(file_url=file_url) as extracted_file:
63+
async with self._extraction_backend.extract_text(
64+
file_url=file_url, extras=extraction.extras
65+
) as extracted_file:
6466
extracted_db_file = await self.upload_file(
6567
file=extracted_file,
6668
user=user,
@@ -157,7 +159,9 @@ async def delete(self, *, file_id: UUID, user: User, context_id: UUID | None = N
157159
await self._object_storage.delete_files(file_ids=[file_id])
158160
await uow.commit()
159161

160-
async def create_extraction(self, *, file_id: UUID, user: User, context_id: UUID | None = None) -> TextExtraction:
162+
async def create_extraction(
163+
self, *, file_id: UUID, user: User, context_id: UUID | None = None, extras: dict | None = None
164+
) -> TextExtraction:
161165
async with self._uow() as uow:
162166
# Check user permissions
163167
await uow.files.get(file_id=file_id, user_id=user.id, context_id=context_id, file_type=FileType.USER_UPLOAD)
@@ -174,7 +178,7 @@ async def create_extraction(self, *, file_id: UUID, user: User, context_id: UUID
174178
raise TypeError(f"Unknown extraction status: {extraction.status}")
175179
except EntityNotFoundError:
176180
file_metadata = await self._object_storage.get_file_metadata(file_id=file_id)
177-
extraction = TextExtraction(file_id=file_id)
181+
extraction = TextExtraction(file_id=file_id, extras=extras)
178182
if file_metadata.content_type in {"text/plain", "text/markdown"}:
179183
extraction.set_completed(
180184
extracted_file_id=file_id, # Point to itself since it's already text

apps/agentstack-server/src/agentstack_server/utils/utils.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,106 @@ async def extract_string_value_stream(
106106
raise EOFError("Unterminated string value in JSON input")
107107
else:
108108
raise KeyError(f"Key {key} not found in JSON input")
109+
110+
111+
async def extract_object_value_stream(
112+
async_stream: Callable[[int], AsyncIterable[str]], key: str, chunk_size: int = 1024
113+
) -> AsyncIterable[str]:
114+
"""
115+
Extract a JSON object or array value from a streaming JSON response.
116+
117+
Parses a JSON stream incrementally to find a specific key and extract its
118+
associated object or array value. Yields the content in chunks as it's read,
119+
without waiting for the entire stream to complete. Properly handles nested
120+
objects, arrays, and escaped characters within string values.
121+
122+
Args:
123+
async_stream: Async function that yields text chunks
124+
key: The JSON key to extract the object from
125+
chunk_size: Size of chunks to read from the stream
126+
127+
Yields:
128+
String chunks containing the JSON object content
129+
130+
Raises:
131+
KeyError: If the key is not found in the JSON
132+
EOFError: If the JSON object is unterminated
133+
"""
134+
buffer = ""
135+
max_buffer_size = len(key) * 4 + 20 # Enough to store key pattern like: "key" : {
136+
state = "outside"
137+
brace_depth = 0
138+
bracket_depth = 0
139+
in_string = False
140+
escape_next = False
141+
processed_idx = 0 # Track how much of the buffer we've already yielded
142+
143+
async for chunk in async_stream(chunk_size):
144+
buffer += chunk
145+
146+
if state == "outside":
147+
# Look for the key followed by : and either { or [
148+
if match := re.search(rf'"{key}" *: *([{{[])', buffer):
149+
buffer = buffer[match.end() - 1 :] # Start from the opening brace/bracket
150+
state = "inside"
151+
processed_idx = 0
152+
elif len(buffer) > max_buffer_size:
153+
# Only trim buffer if it's getting too large
154+
buffer = buffer[-max_buffer_size:]
155+
156+
if state == "inside":
157+
# Process only the new characters in the buffer
158+
chars_to_yield = []
159+
idx = processed_idx
160+
161+
while idx < len(buffer):
162+
char = buffer[idx]
163+
164+
if escape_next:
165+
escape_next = False
166+
chars_to_yield.append(char)
167+
idx += 1
168+
continue
169+
170+
if char == "\\":
171+
escape_next = True
172+
chars_to_yield.append(char)
173+
idx += 1
174+
continue
175+
176+
if char == '"':
177+
in_string = not in_string
178+
chars_to_yield.append(char)
179+
idx += 1
180+
continue
181+
182+
if not in_string:
183+
if char == "{":
184+
brace_depth += 1
185+
elif char == "}":
186+
brace_depth -= 1
187+
elif char == "[":
188+
bracket_depth += 1
189+
elif char == "]":
190+
bracket_depth -= 1
191+
192+
chars_to_yield.append(char)
193+
194+
# Check if we've reached the end of the object
195+
if not in_string and brace_depth == 0 and bracket_depth == 0:
196+
# Yield any remaining characters and return
197+
if chars_to_yield:
198+
yield "".join(chars_to_yield)
199+
return
200+
201+
idx += 1
202+
203+
# Yield only the newly processed characters
204+
if chars_to_yield:
205+
yield "".join(chars_to_yield)
206+
processed_idx = idx
207+
208+
if state == "inside":
209+
raise EOFError("Unterminated JSON object in input")
210+
else:
211+
raise KeyError(f"Key {key} not found in JSON input")

0 commit comments

Comments
 (0)