Skip to content

Commit 7fcc964

Browse files
committed
feat: introduce feature flag for the feature
1 parent da73452 commit 7fcc964

File tree

4 files changed

+176
-24
lines changed

4 files changed

+176
-24
lines changed

src/unstructured_client/_hooks/custom/form_utils.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4+
from pathlib import Path
45
from typing import TYPE_CHECKING
56
from typing_extensions import TypeAlias
67

@@ -19,6 +20,8 @@
1920
PARTITION_FORM_SPLIT_PDF_PAGE_KEY = "split_pdf_page"
2021
PARTITION_FORM_PAGE_RANGE_KEY = "split_pdf_page_range[]"
2122
PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY = "split_pdf_allow_failed"
23+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY = "split_pdf_cache_tmp_data"
24+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY = "split_pdf_cache_tmp_data_dir"
2225
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY = "starting_page_number"
2326
PARTITION_FORM_CONCURRENCY_LEVEL_KEY = "split_pdf_concurrency_level"
2427

@@ -126,6 +129,71 @@ def get_split_pdf_allow_failed_param(
126129

127130
return allow_failed.lower() == "true"
128131

132+
def get_split_pdf_cache_tmp_data(
133+
form_data: FormData, key: str, fallback_value: bool,
134+
) -> bool:
135+
"""Retrieves the value for cache tmp data that should be used for splitting pdf.
136+
137+
In case given the value is not a correct (existing) dir (Path), it will use the
138+
default value.
139+
140+
Args:
141+
form_data: The form data containing the desired flag value.
142+
key: The key to look for in the form data.
143+
fallback_value: The default value to use in case of an error.
144+
145+
Returns:
146+
The flag value for 'cache tmp data' feature after validation.
147+
"""
148+
cache_tmp_data = form_data.get(key)
149+
150+
if not isinstance(cache_tmp_data, str):
151+
return fallback_value
152+
153+
if cache_tmp_data.lower() not in ["true", "false"]:
154+
logger.warning(
155+
"'%s' is not a valid boolean. Using default value '%s'.",
156+
key,
157+
fallback_value,
158+
)
159+
return fallback_value
160+
161+
return cache_tmp_data.lower() == "true"
162+
163+
def get_split_pdf_cache_tmp_data_dir(
164+
form_data: FormData, key: str, fallback_value: Path | str,
165+
) -> Path | str:
166+
"""Retrieves the value for cache tmp data dir that should be used for splitting pdf.
167+
168+
In case given the number is not a "false" or "true" literal, it will use the
169+
default value.
170+
171+
Args:
172+
form_data: The form data containing the desired flag value.
173+
key: The key to look for in the form data.
174+
fallback_value: The default value to use in case of an error.
175+
176+
Returns:
177+
The flag value for 'cache tmp data' feature after validation.
178+
"""
179+
cache_tmp_data_dir = form_data.get(key)
180+
181+
if not isinstance(cache_tmp_data_dir, str) and not isinstance(cache_tmp_data_dir, Path):
182+
return fallback_value
183+
184+
if isinstance(cache_tmp_data_dir, str):
185+
cache_tmp_data_dir = Path(cache_tmp_data_dir)
186+
187+
if not cache_tmp_data_dir.exists():
188+
logger.warning(
189+
"'%s' does not exist. Using default value '%s'.",
190+
key,
191+
fallback_value,
192+
)
193+
return fallback_value
194+
195+
return cache_tmp_data_dir.resolve()
196+
129197

130198
def get_split_pdf_concurrency_level_param(
131199
form_data: FormData, key: str, fallback_value: int, max_allowed: int

src/unstructured_client/_hooks/custom/request_utils.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io
55
import json
66
import logging
7-
from typing import Tuple, Any, BinaryIO
7+
from typing import Tuple, Any, BinaryIO, cast, IO
88

99
import httpx
1010
from httpx._multipart import DataField, FileField
@@ -15,6 +15,8 @@
1515
PARTITION_FORM_FILES_KEY,
1616
PARTITION_FORM_SPLIT_PDF_PAGE_KEY,
1717
PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY,
18+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY,
19+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY,
1820
PARTITION_FORM_PAGE_RANGE_KEY,
1921
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
2022
FormData,
@@ -82,6 +84,8 @@ def create_pdf_chunk_request_params(
8284
PARTITION_FORM_PAGE_RANGE_KEY,
8385
PARTITION_FORM_PAGE_RANGE_KEY.replace("[]", ""),
8486
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
87+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY,
88+
PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY,
8589
]
8690
chunk_payload = {key: form_data[key] for key in form_data if key not in fields_to_drop}
8791
chunk_payload[PARTITION_FORM_SPLIT_PDF_PAGE_KEY] = "false"
@@ -110,9 +114,15 @@ def create_pdf_chunk_request(
110114
data = create_pdf_chunk_request_params(form_data, page_number)
111115
original_headers = prepare_request_headers(original_request.headers)
112116

117+
pdf_chunk_content = (
118+
pdf_chunk_file.getvalue()
119+
if isinstance(pdf_chunk_file, io.BytesIO)
120+
else pdf_chunk_file
121+
)
122+
113123
pdf_chunk_partition_params = shared.PartitionParameters(
114124
files=shared.Files(
115-
content=pdf_chunk_file,
125+
content=pdf_chunk_content,
116126
file_name=filename,
117127
content_type="application/pdf",
118128
),

src/unstructured_client/_hooks/custom/split_pdf_hook.py

Lines changed: 92 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import io
45
import json
56
import logging
67
import math
@@ -18,6 +19,7 @@
1819
from httpx import AsyncClient
1920
from pypdf import PdfReader, PdfWriter
2021
from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore
22+
from unstructured.chunking.dispatch import chunk
2123

2224
from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils
2325
from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME
@@ -27,7 +29,7 @@
2729
PARTITION_FORM_PAGE_RANGE_KEY,
2830
PARTITION_FORM_SPLIT_PDF_PAGE_KEY,
2931
PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY,
30-
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
32+
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY,
3133
)
3234
from unstructured_client._hooks.types import (
3335
AfterErrorContext,
@@ -45,6 +47,8 @@
4547
DEFAULT_STARTING_PAGE_NUMBER = 1
4648
DEFAULT_ALLOW_FAILED = False
4749
DEFAULT_CONCURRENCY_LEVEL = 10
50+
DEFAULT_CACHE_TMP_DATA = False
51+
DEFAULT_CACHE_TMP_DATA_DIR = tempfile.gettempdir()
4852
MAX_CONCURRENCY_LEVEL = 50
4953
MIN_PAGES_PER_SPLIT = 2
5054
MAX_PAGES_PER_SPLIT = 20
@@ -309,6 +313,17 @@ def before_request(
309313
)
310314
limiter = asyncio.Semaphore(concurrency_level)
311315

316+
self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data(
317+
form_data,
318+
key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY,
319+
fallback_value=DEFAULT_CACHE_TMP_DATA,
320+
)
321+
322+
self.cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir(
323+
form_data,
324+
key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY,
325+
fallback_value=DEFAULT_CACHE_TMP_DATA_DIR,
326+
)
312327

313328
page_range_start, page_range_end = form_utils.get_page_range(
314329
form_data,
@@ -327,16 +342,24 @@ def before_request(
327342
if split_size >= page_count and page_count == len(pdf.pages):
328343
return request
329344

330-
pdf_chunk_paths = self._get_pdf_chunk_paths(
331-
pdf,
332-
operation_id=operation_id,
333-
split_size=split_size,
334-
page_start=page_range_start,
335-
page_end=page_range_end
336-
)
337-
# force free PDF object memory
338-
del pdf
339-
pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths)
345+
if self.cache_tmp_data_feature:
346+
pdf_chunk_paths = self._get_pdf_chunk_paths(
347+
pdf,
348+
operation_id=operation_id,
349+
split_size=split_size,
350+
page_start=page_range_start,
351+
page_end=page_range_end
352+
)
353+
# force free PDF object memory
354+
del pdf
355+
pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths)
356+
else:
357+
pdf_chunks = self._get_pdf_chunks_in_memory(
358+
pdf,
359+
split_size=split_size,
360+
page_start=page_range_start,
361+
page_end=page_range_end
362+
)
340363

341364
self.coroutines_to_execute[operation_id] = []
342365
set_index = 1
@@ -393,19 +416,62 @@ async def call_api_partial(
393416
del response._request # pylint: disable=protected-access
394417
response._request = None # pylint: disable=protected-access
395418

396-
# If we get 200, dump the contents to a file and return the path
397-
temp_dir = self.tempdirs[operation_id]
419+
398420
if response.status_code == 200:
399-
temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json"
400-
async with aiofiles.open(temp_file_name, mode='wb') as temp_file:
401-
# Avoid reading the entire response into memory
402-
async for bytes_chunk in response.aiter_bytes():
403-
await temp_file.write(bytes_chunk)
404-
# we save the path in content attribute to be used in after_success
405-
response._content = temp_file_name.encode() # pylint: disable=protected-access
421+
if self.cache_tmp_data_feature:
422+
# If we get 200, dump the contents to a file and return the path
423+
temp_dir = self.tempdirs[operation_id]
424+
temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json"
425+
async with aiofiles.open(temp_file_name, mode='wb') as temp_file:
426+
# Avoid reading the entire response into memory
427+
async for bytes_chunk in response.aiter_bytes():
428+
await temp_file.write(bytes_chunk)
429+
# we save the path in content attribute to be used in after_success
430+
response._content = temp_file_name.encode() # pylint: disable=protected-access
406431

407432
return response
408433

434+
def _get_pdf_chunks_in_memory(
435+
self,
436+
pdf: PdfReader,
437+
split_size: int = 1,
438+
page_start: int = 1,
439+
page_end: Optional[int] = None
440+
) -> Generator[Tuple[BinaryIO, int], None, None]:
441+
"""Reads given bytes of a pdf file and split it into n pdf-chunks, each
442+
with `split_size` pages. The chunks are written into temporary files in
443+
a temporary directory corresponding to the operation_id.
444+
445+
Args:
446+
file_content: Content of the PDF file.
447+
split_size: Split size, e.g. if the given file has 10 pages
448+
and this value is set to 2 it will yield 5 documents, each containing 2 pages
449+
of the original document. By default it will split each page to a separate file.
450+
page_start: Begin splitting at this page number
451+
page_end: If provided, split up to and including this page number
452+
453+
Returns:
454+
The list of temporary file paths.
455+
"""
456+
457+
offset = page_start - 1
458+
offset_end = page_end or len(pdf.pages)
459+
460+
chunk_no = 0
461+
while offset < offset_end:
462+
chunk_no += 1
463+
new_pdf = PdfWriter()
464+
chunk_buffer = io.BytesIO()
465+
466+
end = min(offset + split_size, offset_end)
467+
468+
for page in list(pdf.pages[offset:end]):
469+
new_pdf.add_page(page)
470+
new_pdf.write(chunk_buffer)
471+
chunk_buffer.seek(0)
472+
yield chunk_buffer, offset
473+
offset += split_size
474+
409475
def _get_pdf_chunk_paths(
410476
self,
411477
pdf: PdfReader,
@@ -434,7 +500,8 @@ def _get_pdf_chunk_paths(
434500
offset_end = page_end or len(pdf.pages)
435501

436502
tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with
437-
suffix="unstructured_client"
503+
dir=self.cache_tmp_data_dir,
504+
prefix="unstructured_client_"
438505
)
439506
self.tempdirs[operation_id] = tempdir
440507
tempdir_path = Path(tempdir.name)
@@ -517,7 +584,10 @@ def _await_elements(
517584
response_number,
518585
)
519586
successful_responses.append(res)
520-
elements.append(load_elements_from_response(res))
587+
if self.cache_tmp_data_feature:
588+
elements.append(load_elements_from_response(res))
589+
else:
590+
elements.append(res.json())
521591
else:
522592
error_message = f"Failed to partition set {response_number}."
523593

src/unstructured_client/models/shared/partition_parameters.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ class PartitionParametersTypedDict(TypedDict):
124124
r"""This parameter determines if the PDF file should be split on the client side. It's an internal parameter for the Python client and is not sent to the backend."""
125125
split_pdf_page_range: NotRequired[List[int]]
126126
r"""When `split_pdf_page is set to `True`, this parameter selects a subset of the pdf to send to the API. The parameter is a list of 2 integers within the range [1, length_of_pdf]. A ValueError is thrown if the given range is invalid. It's an internal parameter for the Python client and is not sent to the backend."""
127+
split_pdf_cache_tmp_data: NotRequired[bool]
128+
r"""When `split_pdf_page` is set to `True`, this parameter determines if the temporary data used for splitting the PDF should be cached into disc - if enabled should save significant amount of RAM memory when processing big files. It's an internal parameter for the Python client and is not sent to the backend."""
129+
split_pdf_cache_tmp_data_dir: NotRequired[str]
130+
r"""When `split_pdf_page` is set to `True` and `split_pdf_cache_tmp_data` feature is used, this parameter specifies the directory where the temporary data used for splitting the PDF should be cached into disc. It's an internal parameter for the Python client and is not sent to the backend."""
127131
starting_page_number: NotRequired[Nullable[int]]
128132
r"""When PDF is split into pages before sending it into the API, providing this information will allow the page number to be assigned correctly. Introduced in 1.0.27."""
129133
strategy: NotRequired[Strategy]

0 commit comments

Comments
 (0)