Skip to content

Commit 37fcc50

Browse files
committed
[owl] Validate file column URI (#829)
Backend - owl (API server) - Add file column URI validation - Log file size and URI in Docling log messages - Log row data validation failure
1 parent d96e1c4 commit 37fcc50

File tree

6 files changed

+112
-52
lines changed

6 files changed

+112
-52
lines changed

services/api/src/owl/db/gen_executor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,9 @@ async def _setup_tasks(self) -> None:
520520
# Process inputs and dependencies
521521
if self._regen_strategy is None:
522522
_body: RowAdd = self.body
523-
self._column_dict = {k: v for k, v in _body.data.items() if k in self._col_map}
523+
self._column_dict = {
524+
k: v for k, v in _body.data.items() if k in self._col_map and not k.endswith("_")
525+
}
524526
else:
525527
_body: RowRegen = self.body
526528
_row = await self.table.get_row(self._row_id)
@@ -1387,7 +1389,8 @@ async def _load_uri_as_base64(uri: str | None) -> str | AudioContent | ImageCont
13871389
f"{', '.join(DOCUMENT_FILE_EXTENSIONS + AUDIO_FILE_EXTENSIONS + IMAGE_FILE_EXTENSIONS)}"
13881390
)
13891391
)
1390-
except BadInputError:
1392+
except BadInputError as e:
1393+
logger.warning(f'Failed to parse file "{uri}" due to error: {repr(e)}')
13911394
raise
13921395
except Exception as e:
13931396
logger.warning(f'Failed to parse file "{uri}" due to error: {repr(e)}')

services/api/src/owl/db/gen_table.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
json_loads,
109109
open_uri_async,
110110
s3_upload,
111+
validate_url,
111112
)
112113
from owl.version import __version__ as owl_version
113114

@@ -1417,27 +1418,25 @@ def create_vector_validator(col: ColumnMetadata):
14171418
@classmethod
14181419
def vector_validator(cls, v: np.ndarray | None) -> np.ndarray | None:
14191420
if v is not None and len(v) != col.vlen:
1420-
raise ValueError(
1421-
f"Array input for column {col.column_id} must have length {col.vlen}"
1422-
)
1421+
raise ValueError(f"Array input must have length {col.vlen}")
14231422
return v
14241423

14251424
return vector_validator
14261425

14271426
validators[f"validate_{col.column_id}"] = create_vector_validator(col)
14281427
field_definitions[col.column_id] = (NumpyArray | None, Field(default=None))
14291428
else:
1430-
# if col.is_file_column:
1431-
# # Create URL validator
1432-
# def create_url_validator(col: ColumnMetadata):
1433-
# @field_validator(col.column_id, mode="after")
1434-
# @classmethod
1435-
# def url_validator(cls, v: str | None) -> str | None:
1436-
# return validate_url(v) if v else None
1429+
if col.is_file_column:
1430+
# Create URL validator
1431+
def create_url_validator(col: ColumnMetadata):
1432+
@field_validator(col.column_id, mode="after")
1433+
@classmethod
1434+
def url_validator(cls, v: str | None) -> str | None:
1435+
return validate_url(v, error_cls=ValueError) if v else None
14371436

1438-
# return url_validator
1437+
return url_validator
14391438

1440-
# validators[f"validate_{col.column_id}"] = create_url_validator(col)
1439+
validators[f"validate_{col.column_id}"] = create_url_validator(col)
14411440
# Get the Python type from ColumnDtype
14421441
py_type = col.dtype.to_python_type()
14431442
field_definitions[col.column_id] = (py_type | None, Field(default=None))
@@ -3175,14 +3174,33 @@ async def drop_columns(
31753174
def _jsonify(x: Any) -> Any:
31763175
return x.tolist() if isinstance(x, np.ndarray) else x
31773176

3177+
def validate_row_data(self, data: dict[str, Any]):
3178+
try:
3179+
self.data_table_model.model_validate(data, strict=False)
3180+
except ValidationError as e:
3181+
# Set invalid value to None, and save original value to state
3182+
msg = ""
3183+
for error in e.errors():
3184+
if len(error["loc"]) != 1:
3185+
logger.warning(
3186+
f"Cannot handle row data validation error with nested loc: {repr(e)}"
3187+
)
3188+
continue
3189+
col = error["loc"][0]
3190+
msg += f'Column "{col}": {error.get("msg", "")}. '
3191+
raise BadInputError(f"Row data contains errors. {msg}") from e
3192+
31783193
def _validate_row_data(self, data: dict[str, Any]) -> DataTableRow:
31793194
try:
31803195
row = self.data_table_model.model_validate(data, strict=False)
31813196
except ValidationError as e:
31823197
# Set invalid value to None, and save original value to state
31833198
for error in e.errors():
3184-
if len(error["loc"]) > 1:
3185-
raise BadInputError(f"Input data contains errors: {e}") from e
3199+
if len(error["loc"]) != 1:
3200+
logger.warning(
3201+
f"Cannot handle row data validation error with nested loc: {repr(e)}"
3202+
)
3203+
continue
31863204
col = error["loc"][0]
31873205
state = data.get(f"{col}_", {})
31883206
data[col], data[f"{col}_"] = (
@@ -3193,7 +3211,7 @@ def _validate_row_data(self, data: dict[str, Any]) -> DataTableRow:
31933211
try:
31943212
row = self.data_table_model.model_validate(data, strict=False)
31953213
except ValidationError as e:
3196-
raise BadInputError(f"Input data contains errors: {e}") from e
3214+
raise BadInputError(f"Row data contains errors: {e}") from e
31973215
return row
31983216

31993217
# Row Create Ops

services/api/src/owl/docparse.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,11 @@ def split_chunks(
137137
for d in text_splitter.split_documents([Document(page_content=chunk.text)])
138138
]
139139
logger.info(
140-
f"{_id} - {len(request.chunks):,d} chunks split into {len(chunks):,d} chunks.",
140+
f"{len(request.chunks):,d} chunks split into {len(chunks):,d} chunks. ({_id})",
141141
)
142142
return chunks
143143
except Exception as e:
144-
logger.exception(f"{_id} - Failed to split chunks.")
144+
logger.exception(f"Failed to split chunks. ({_id})")
145145
raise BadInputError("Failed to split chunks.") from e
146146

147147

@@ -216,7 +216,7 @@ async def load_document(
216216
ext = splitext(file_name)[1].lower()
217217
if ext in [".pdf", ".docx", ".pptx", ".xlsx", ".html"]:
218218
doc_loader = DoclingLoader(self.request_id)
219-
md = await doc_loader.convert_document_to_markdown(
219+
md = await doc_loader.document_to_markdown(
220220
file_name=file_name, content=content
221221
)
222222
elif ext in [".md", ".txt"]:
@@ -320,7 +320,7 @@ async def load_document_chunks(
320320
)
321321
else:
322322
doc_loader = DoclingLoader(self.request_id, page_break_placeholder=None)
323-
chunks = await doc_loader.convert_document_to_chunks(
323+
chunks = await doc_loader.document_to_chunks(
324324
file_name=file_name,
325325
content=content,
326326
chunk_size=chunk_size,
@@ -438,13 +438,13 @@ def __init__(
438438
)
439439
self.page_break_placeholder = page_break_placeholder
440440

441-
async def retrieve_document_content(
441+
async def _parse_document(
442442
self,
443443
file_name: str,
444444
content: bytes,
445-
) -> dict: # Expecting JSON response from docling-serve
445+
) -> dict:
446446
"""
447-
Retrieves the content of a document file using Docling-Serve API (async pattern).
447+
Parse the document using Docling-Serve API (async pattern).
448448
449449
Args:
450450
file_path (str): Path to the document file to be parsed (local temp path).
@@ -458,7 +458,10 @@ async def retrieve_document_content(
458458
Raises:
459459
HTTPException: If the document conversion fails via docling-serve.
460460
"""
461-
logger.info(f'{self.request_id} - Calling Docling-Serve for file "{file_name}".')
461+
size_mb = get_bytes_size_mb(content)
462+
logger.info(
463+
f'Calling Docling-Serve for file "{file_name}" with size {size_mb:.3f} MiB. ({self.request_id})'
464+
)
462465

463466
files = {"files": (file_name, content, "application/octet-stream")}
464467
data = {
@@ -507,7 +510,10 @@ async def retrieve_document_content(
507510
elif task_status in ("failure", "revoked"):
508511
error_info = status_data.get("task_result", {}).get("error", "Unknown error")
509512
logger.error(
510-
f'Docling-Serve task "{task_id}" for document "{file_name}" failed: {error_info}'
513+
(
514+
f'Docling-Serve task "{task_id}" for document "{file_name}" '
515+
f"with size {size_mb:.3f} MiB failed: {error_info}. ({self.request_id})"
516+
)
511517
)
512518
raise BadInputError(f'Your document "{file_name}" cannot be parsed.')
513519
# If not success, failure, or revoked, it's still processing or in another state
@@ -516,7 +522,7 @@ async def retrieve_document_content(
516522
else: # Executed if the while loop completes without a 'break'
517523
logger.error(
518524
(
519-
f'Docling-Serve task "{task_id}" for document "{file_name}" '
525+
f'Docling-Serve task "{task_id}" for document "{file_name}" with size {size_mb:.3f} MiB '
520526
f"timed out after polling for {time_slept} seconds. ({self.request_id})"
521527
)
522528
)
@@ -537,24 +543,20 @@ async def retrieve_document_content(
537543
except Exception as e:
538544
raise UnexpectedError(f"Docling-Serve API error: {e}") from e
539545

540-
async def convert_document_to_markdown(self, file_name: str, content: bytes) -> str:
546+
async def document_to_markdown(self, file_name: str, content: bytes) -> str:
541547
"""
542548
Converts a document to Markdown format using Docling-Serve.
543549
"""
544-
docling_response = await self.retrieve_document_content(file_name, content)
545-
logger.info(
546-
f"Converted `{file_name}` to Markdown in {docling_response.get('processing_time', '0'):.3f} seconds, "
547-
f"{get_bytes_size_mb(content):.3f} MB."
548-
)
550+
docling_response = await self._parse_document(file_name, content)
549551
return docling_response.get("document", {}).get("md_content", "")
550552

551-
async def convert_document_to_chunks(
553+
async def document_to_chunks(
552554
self, file_name: str, content: bytes, chunk_size: int, chunk_overlap: int
553555
) -> list[Chunk]:
554556
"""
555557
Converts a document to chunks, respecting page and table boundaries, using Docling-Serve.
556558
"""
557-
docling_response = await self.retrieve_document_content(file_name, content)
559+
docling_response = await self._parse_document(file_name, content)
558560
md_content = docling_response.get("document", {}).get("md_content", "")
559561

560562
documents = [Document(page_content=md_content, metadata={"page": 1})]

services/api/src/owl/routers/gen_table.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,19 @@ async def add_rows(
552552
billing.has_gen_table_quota(table)
553553
billing.has_db_storage_quota()
554554
billing.has_egress_quota()
555+
# Validate data
556+
try:
557+
[table.validate_row_data(d) for d in body.data]
558+
except Exception as e:
559+
logger.info(
560+
(
561+
"Row data validation failed. "
562+
f'Table={table.schema_id}."{table.table_metadata.short_id}" '
563+
f"Org={org.id} "
564+
f"User={user.id} "
565+
f"Error={repr(e)}"
566+
)
567+
)
555568
executor = MultiRowGenExecutor(
556569
request=request,
557570
table=table,
@@ -840,6 +853,19 @@ async def update_rows(
840853
billing.has_gen_table_quota(table)
841854
billing.has_db_storage_quota()
842855
billing.has_egress_quota()
856+
# Validate data
857+
try:
858+
{row_id: table.validate_row_data(d) for row_id, d in body.data.items()}
859+
except Exception as e:
860+
logger.info(
861+
(
862+
"Row data validation failed. "
863+
f'Table={table.schema_id}."{table.table_metadata.short_id}" '
864+
f"Org={org.id} "
865+
f"User={user.id} "
866+
f"Error={repr(e)}"
867+
)
868+
)
843869
await table.update_rows(body.data)
844870
return OkResponse()
845871

@@ -935,14 +961,24 @@ async def embed_file(
935961
content_type=mime,
936962
filename=file_name,
937963
)
938-
# if overwrite:
939-
# file_table.delete_file(file_name=file_name)
940964
# --- Add into Knowledge Table --- #
941965
logger.info(f'{request_id} - Parsing file "{file_name}".')
942966
doc_parser = GeneralDocLoader(request_id=request_id)
943-
chunks = await doc_parser.load_document_chunks(
944-
file_name, file_content, data.chunk_size, data.chunk_overlap
945-
)
967+
try:
968+
chunks = await doc_parser.load_document_chunks(
969+
file_name, file_content, data.chunk_size, data.chunk_overlap
970+
)
971+
except BadInputError as e:
972+
logger.warning(f'Failed to parse file "{file_uri}" due to error: {repr(e)}')
973+
raise
974+
except Exception as e:
975+
logger.warning(f'Failed to parse file "{file_uri}" due to error: {repr(e)}')
976+
raise BadInputError(
977+
(
978+
f'Sorry we encountered an issue while processing your file "{file_name}". '
979+
"Please ensure the file is not corrupted and is in a supported format."
980+
)
981+
) from e
946982
logger.info(f'{request_id} - Embedding file "{file_name}" with {len(chunks):,d} chunks.')
947983

948984
# --- Extract title --- #

services/api/src/owl/utils/io.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,25 @@ def _is_private_or_local_ip(ip: str) -> bool:
106106
return addr.is_private or addr.is_loopback or addr.is_link_local
107107

108108

109-
def validate_url(url: str) -> str:
110-
parsed = urlparse(url)
109+
def validate_url(url: str, *, error_cls: type[Exception] = BadInputError) -> str:
110+
try:
111+
parsed = urlparse(url)
112+
except Exception as e:
113+
raise error_cls(f'URL "{url}" is invalid: {e}') from e
111114
if parsed.scheme == "s3":
112115
return url
113116
if parsed.scheme != "https":
114-
raise BadInputError(f"Unsupported scheme: {parsed.scheme}")
117+
raise error_cls(f'URL "{url}" is invalid: Scheme is not "https".')
115118
if not parsed.hostname:
116-
raise BadInputError("URL must contain hostname.")
119+
raise error_cls(f'URL "{url}" is invalid: Missing hostname.')
117120
try:
118121
ips = {info[4][0] for info in socket.getaddrinfo(parsed.hostname, None)}
119122
except socket.gaierror as e:
120-
raise BadInputError("Failed to resolve hostname.") from e
123+
raise error_cls(f'URL "{url}" is invalid: {e}') from e
121124
if not ips:
122-
raise BadInputError("Failed to resolve hostname.")
125+
raise error_cls(f'URL "{url}" is invalid: Failed to resolve hostname.')
123126
if any(_is_private_or_local_ip(ip) for ip in ips):
124-
raise BadInputError(f"Target '{url}' resolves to private or local IP.")
127+
raise error_cls(f'URL "{url}" is invalid: Hostname resolves to private or local IP.')
125128
return url
126129

127130

@@ -168,14 +171,14 @@ async def open_uri_async(uri: str) -> AsyncGenerator[tuple[AsyncResponse, str],
168171

169172
def get_bytes_size_mb(bytes_content: bytes, decimal_places: int = 3) -> float:
170173
"""
171-
Convert bytes to megabytes (MB).
174+
Convert bytes to Mebibyte (MiB).
172175
173176
Args:
174177
bytes_content (bytes): The content in bytes to be calculated.
175178
decimal_places (int, optional): Number of decimal places to round to. Defaults to 3.
176179
177180
Returns:
178-
float: The converted value in megabytes (MB)
181+
float: The converted value in Mebibyte (MiB)
179182
"""
180183
mb_value = len(bytes_content) / (1024 * 1024) # 1 MB = 1024 KB = 1024 * 1024 bytes
181184
return round(mb_value, decimal_places)

services/api/tests/test_docparse.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ async def test_convert_pdf_document_to_markdown(doc_path: str):
5656
with open(doc_path, "rb") as f:
5757
doc_content_bytes = f.read()
5858

59-
api_response_data = await loader.retrieve_document_content(
60-
basename(doc_path), doc_content_bytes
61-
)
59+
api_response_data = await loader._parse_document(basename(doc_path), doc_content_bytes)
6260

6361
api_document_content = api_response_data.get("document", {})
6462

0 commit comments

Comments
 (0)