Skip to content

Commit 0410784

Browse files
authored
fix/ensure that split pdf requests are retried (#160)
We discovered that when a pdf is split into smaller chunks, those requests are not being retried. Now that we have `allow_failed=False`, this results in the whole document failing as soon as any of the child requests hit a transient error. The fix is to reuse the `utils.Retry` logic that the main code path uses. Copying the retry config in the hook logic is not great, and we can work with Speakeasy to make the internal logic more modular so we can reuse more. But for now, this will address the current failures while we work on a better implementation. Testing: See the added unit test. The existing retry logic works for the final split page, everything else needs to use the new logic. To test this, I mocked a response from the server to return 502 for a low `starting_page_number`, which we know will have to be handled by the hooks. Other changes: Remove the "Not splitting" log. When the final split page is retried, it triggers all the hooks again. We need to force `split_pdf_page=False` in this request, and we don't need additional logging when this code is hit again.
1 parent 2f3c098 commit 0410784

File tree

3 files changed

+124
-3
lines changed

3 files changed

+124
-3
lines changed

_test_unstructured_client/integration/test_decorators.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1+
import httpx
2+
import json
13
import pytest
24
import requests
35
from deepdiff import DeepDiff
6+
from httpx import Response
7+
8+
from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore
9+
410
from unstructured_client import UnstructuredClient
511
from unstructured_client.models import shared, operations
612
from unstructured_client.models.errors import HTTPValidationError
13+
from unstructured_client.utils.retries import BackoffStrategy, RetryConfig
14+
from unstructured_client._hooks.custom import form_utils
15+
from unstructured_client._hooks.custom import split_pdf_hook
716

817
FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
918

@@ -275,3 +284,80 @@ def test_integration_split_pdf_strict_mode(
275284
],
276285
)
277286
assert len(diff) == 0
287+
288+
289+
@pytest.mark.asyncio
290+
async def test_split_pdf_requests_do_retry(monkeypatch):
291+
"""
292+
Test that when we split a pdf, the split requests will honor retryable errors.
293+
"""
294+
number_of_split_502s = 2
295+
number_of_last_page_502s = 2
296+
297+
async def mock_send(_, request):
298+
"""
299+
Return a predefined number of 502s for requests with certain starting_page_number values.
300+
301+
This is because N-1 splits are sent off in the hook logic. These need explicit retry handling.
302+
The final split is returned to the SDK and gets the built in retry code.
303+
304+
We want to make sure both code paths are retried.
305+
"""
306+
request_body = request.read()
307+
decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type"))
308+
form_data = form_utils.parse_form_data(decoded_body)
309+
310+
nonlocal number_of_split_502s
311+
nonlocal number_of_last_page_502s
312+
313+
if number_of_split_502s > 0:
314+
if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3:
315+
number_of_split_502s -= 1
316+
return Response(502, request=request)
317+
318+
if number_of_last_page_502s > 0:
319+
if "starting_page_number" in form_data and int(form_data["starting_page_number"]) > 12:
320+
number_of_last_page_502s -= 1
321+
return Response(502, request=request)
322+
323+
mock_return_data = [{
324+
"type": "Title",
325+
"text": "Hello",
326+
}]
327+
328+
return Response(
329+
200,
330+
request=request,
331+
content=json.dumps(mock_return_data),
332+
headers={"Content-Type": "application/json"},
333+
)
334+
335+
monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send)
336+
337+
sdk = UnstructuredClient(
338+
api_key_auth=FAKE_KEY,
339+
server_url="localhost:8000",
340+
retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 1000), False),
341+
)
342+
343+
filename = "_sample_docs/layout-parser-paper.pdf"
344+
with open(filename, "rb") as f:
345+
files = shared.Files(
346+
content=f.read(),
347+
file_name=filename,
348+
)
349+
350+
req = operations.PartitionRequest(
351+
shared.PartitionParameters(
352+
files=files,
353+
split_pdf_page=True,
354+
split_pdf_allow_failed=False,
355+
strategy="fast",
356+
)
357+
)
358+
359+
res = await sdk.general.partition_async(request=req)
360+
361+
assert number_of_split_502s == 0
362+
assert number_of_last_page_502s == 0
363+
assert res.status_code == 200

src/unstructured_client/_hooks/custom/request_utils.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
2020
FormData,
2121
)
22+
import unstructured_client.utils as utils
2223

2324
logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
2425

@@ -69,10 +70,42 @@ async def call_api_async(
6970
)
7071

7172
async with limiter:
72-
response = await client.send(new_request)
73+
response = await send_request_async_with_retries(client, new_request)
7374
return response
7475

7576

77+
async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request):
78+
# Hardcode the retry config until we can
79+
# properly reuse the SDK logic
80+
# (Values are in ms)
81+
retry_config = utils.RetryConfig(
82+
"backoff",
83+
utils.BackoffStrategy(
84+
initial_interval=2000,
85+
max_interval=60000,
86+
exponent=1.5,
87+
max_elapsed_time=1000 * 60 * 5 # 5 minutes
88+
),
89+
retry_connection_errors=True
90+
)
91+
92+
retryable_codes = [
93+
"502",
94+
"503",
95+
"504"
96+
]
97+
98+
async def do_request():
99+
return await client.send(request)
100+
101+
response = await utils.retry_async(
102+
do_request,
103+
utils.Retries(retry_config, retryable_codes)
104+
)
105+
106+
return response
107+
108+
76109
def prepare_request_headers(
77110
headers: httpx.Headers,
78111
) -> httpx.Headers:

src/unstructured_client/_hooks/custom/split_pdf_hook.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ def before_request(
168168
form_data = form_utils.parse_form_data(decoded_body)
169169
split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY)
170170
if split_pdf_page is None or split_pdf_page == "false":
171-
logger.info("Partitioning without split.")
172171
return request
173172

174173
logger.info("Preparing to split document for partition.")
@@ -287,6 +286,8 @@ async def call_api_partial(page):
287286
# `before_request` method needs to return a request so we skip sending the last page in parallel
288287
# and return that last page at the end of this method
289288

289+
# Need to make sure the final page does not trigger splitting again
290+
form_data[PARTITION_FORM_SPLIT_PDF_PAGE_KEY] = "false"
290291
body = request_utils.create_request_body(
291292
form_data, last_page_content, file.file_name, last_page_number
292293
)
@@ -413,12 +414,13 @@ def after_error(
413414
If requests were run in parallel, and at least one was successful, a combined
414415
response object; otherwise, the original response and exception.
415416
"""
417+
operation_id = hook_ctx.operation_id
416418

417419
# if fails are disallowed - return response and error objects immediately
418420
if not self.allow_failed:
421+
self._clear_operation(operation_id)
419422
return (response, error)
420423

421-
operation_id = hook_ctx.operation_id
422424
# We know that this request failed so we pass a failed or empty response to `_await_elements` method
423425
# where it checks if at least on of the other requests succeeded
424426
elements = self._await_elements(operation_id, response or httpx.Response(status_code=200))

0 commit comments

Comments
 (0)