Skip to content

Commit 98028ec

Browse files
authored
fix: Address some issues in the split pdf logic (#165)
We've encountered some bugs in the split pdf code. For one, these requests are not retried. With the new `split_pdf_allow_failed=False` behavior, this means one transient network error can interrupt the whole doc. We've also had some asyncio warnings such as `... was never awaited`. This PR adds retries, cleans up the logic, and gives us a much better base for the V2 client release. # Changes ## Return a "dummy request" in the split BeforeRequestHook When the BeforeRequestHook is called, we would split up the doc into N requests, issue coroutines for N-1 requests, and return the last one for the SDK to run. This adds two paths for recombining the results. Instead, the BeforeRequest can return a dummy request that will get a 200. This takes us straight to the AfterSuccessHook, which awaits all of the splits and builds the response. ## Add retries to the split requests This is a copy of the autogenerated code in `retry.py`, which will work for the async calls. At some point, we should be able to reuse the SDK for this so we aren't hardcoding the retry config values here. Need to work with Speakeasy on this. ## Clean up error handling When the retries fail and we do have to bubble up an error, we pass it to `create_failure_response` before returning to the SDK. This pops a 500 status code into the response, only so the SDK does not see a 502/503/504, and proceed to retry the entire doc. ## Set a httpx timeout Many of the failing requests right now are hi_res calls. This is because the default httpx client timeout is 5 seconds, and we immediately throw a ReadTimeout. For now, set this timeout to 10 minutes. This should be sufficient in the splitting code, where page size per request will be controlled. This is another hardcoded value that should go away once we're able to send our splits back into `sdk.general.partition` # Testing Any pipelines that have failed consistently should work now. For more fine grained testing, I tend to mock up my local server to return a retryable error for specific pages, a certain number of times. In the `general_partition` function, I add something like ``` global num_bounces # Initialize this somewhere up above page_num = form_params.starting_page_number or 1 if num_bounces > 0 and page_num == 3: num_bounces -= 1 logger.info(page_num) raise HTTPException(status_code=502, detail="BOUNCE") ``` Then, send a SDK request to your local server and verify that the split request for page 3 of your doc is retrying up to the number of times you want. Also, setting the max concurrency to 15 should reproduce the issue. Choose some 50+ page pdf and try the following with the current 0.25.5 branch. It will likely fail with `ServerError: {}`. Then try a local pip install off this branch. ``` s = UnstructuredClient( api_key_auth="my-api-key", ) filename = "some-large-pdf" with open(filename, "rb") as f: files=shared.Files( content=f.read(), file_name=filename, ) req = operations.PartitionRequest( shared.PartitionParameters( files=files, split_pdf_page=True, strategy="hi_res", split_pdf_allow_failed=False, split_pdf_concurrency_level=15, ), ) resp = s.general.partition(req) if num_elements := len(resp.elements): print(f"Succeeded with {num_elements}") ```
1 parent ce02dd8 commit 98028ec

File tree

4 files changed

+154
-95
lines changed

4 files changed

+154
-95
lines changed

_test_unstructured_client/unit/test_split_pdf_hook.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,12 @@ def test_unit_prepare_request_headers():
105105
def test_unit_create_response():
106106
"""Test create response method properly overrides body elements and Content-Length header."""
107107
test_elements = [{"key": "value"}, {"key_2": "value"}]
108-
test_response = requests.Response()
109-
test_response.status_code = 200
110-
test_response._content = b'[{"key_2": "value"}]'
111-
test_response.headers = requests.structures.CaseInsensitiveDict(
112-
{
113-
"Content-Type": "application/json",
114-
"Content-Length": len(test_response._content),
115-
}
116-
)
117108

118109
expected_status_code = 200
119110
expected_content = b'[{"key": "value"}, {"key_2": "value"}]'
120111
expected_content_length = "38"
121112

122-
response = request_utils.create_response(test_response, test_elements)
113+
response = request_utils.create_response(test_elements)
123114

124115
assert response.status_code, expected_status_code
125116
assert response._content, expected_content

src/unstructured_client/_hooks/custom/logger_hook.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ def after_success(
5555
) -> Union[requests.Response, Exception]:
5656
self.retries_counter.pop(hook_ctx.operation_id, None)
5757
# NOTE: In case of split page partition this means - at least one of the splits was partitioned successfully
58-
logger.info("Successfully partitioned the document.")
58+
# Note(austin) - pdf splitting returns a mock request
59+
# so we always reach the AfterSuccessHook
60+
# This doesn't mean the splits succeeded
61+
# Need to revisit our logging strategy
62+
# logger.info("Successfully partitioned the document.")
5963
return response
6064

6165
def after_error(

src/unstructured_client/_hooks/custom/request_utils.py

Lines changed: 106 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import io
66
import json
77
import logging
8+
import random
9+
import time
810
from typing import Optional, Tuple, Any
911

1012
import httpx
@@ -76,24 +78,85 @@ def create_request(
7678
)
7779

7880

81+
async def retry_with_backoff_async(
82+
request_func,
83+
page_number,
84+
initial_interval,
85+
max_interval,
86+
exponent,
87+
max_elapsed_time,
88+
):
89+
"""
90+
A copy of the autogenerated backoff code adapted for asyncio
91+
Call func()
92+
"""
93+
start = round(time.time() * 1000)
94+
retries = 0
95+
96+
retry_status_codes = [502, 503, 504]
97+
98+
while True:
99+
try:
100+
response = await request_func()
101+
102+
if response.status_code not in retry_status_codes:
103+
return response
104+
105+
logger.error("Request (page %d) failed with status code %d. Waiting to retry.", page_number, response.status_code)
106+
107+
# Is it time to get out of the loop?
108+
now = round(time.time() * 1000)
109+
if now - start > max_elapsed_time:
110+
return response
111+
except Exception as e:
112+
logger.error("Request (page %d) failed (%s). Waiting to retry.", page_number, repr(e))
113+
114+
# Is it time to get out of the loop?
115+
now = round(time.time() * 1000)
116+
if now - start > max_elapsed_time:
117+
raise
118+
119+
# Otherwise go back to sleep
120+
sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1)
121+
sleep = min(sleep, max_interval / 1000)
122+
await asyncio.sleep(sleep)
123+
retries += 1
124+
125+
79126
async def call_api_async(
80127
client: httpx.AsyncClient,
81128
page: Tuple[io.BytesIO, int],
82129
original_request: requests.Request,
83130
form_data: FormData,
84131
filename: str,
85132
limiter: asyncio.Semaphore,
86-
) -> tuple[int, dict]:
133+
) -> httpx.Response:
134+
"""
135+
Issue a httpx POST using a copy of the original requests.Request
136+
Wrap the call in a retry loop. These values are copied from the API spec,
137+
and will not be auto updated. Long term solution is to reuse SDK logic.
138+
We'll need the hook context to have access to the rest of the SDK.
139+
"""
87140
page_content, page_number = page
88141
body = create_request_body(form_data, page_content, filename, page_number)
89142
new_request = create_httpx_request(original_request, body)
143+
144+
one_second = 1000
145+
one_minute = 1000 * 60
146+
retry_values = {
147+
"initial_interval": one_second * 3,
148+
"max_interval": one_minute * 12,
149+
"max_elapsed_time": one_minute * 30,
150+
"exponent": 1.88,
151+
}
152+
153+
async def do_request():
154+
return await client.send(new_request)
155+
90156
async with limiter:
91-
try:
92-
response = await client.send(new_request)
93-
return response.status_code, response.json()
94-
except Exception:
95-
logger.error("Failed to send request for page %d", page_number)
96-
return 500, {}
157+
response = await retry_with_backoff_async(do_request, page_number=page_number, **retry_values)
158+
159+
return response
97160

98161

99162
def call_api(
@@ -157,9 +220,13 @@ def prepare_request_payload(form_data: FormData) -> FormData:
157220
return payload
158221

159222

160-
def create_response(response: requests.Response, elements: list) -> requests.Response:
223+
def create_failure_response(response: requests.Response) -> requests.Response:
161224
"""
162-
Creates a modified response object with updated content.
225+
Convert the status code on the given response to a 500
226+
This is because the split logic catches and retries 502, 503, etc
227+
If a failure is passed back to the SDK, we shouldn't trigger
228+
another layer of retries, we just want to print the error. 500 is
229+
non retryable up above.
163230
164231
Args:
165232
response: The original response object.
@@ -170,11 +237,38 @@ def create_response(response: requests.Response, elements: list) -> requests.Res
170237
The modified response object with updated content.
171238
"""
172239
response_copy = copy.deepcopy(response)
240+
241+
response_copy.status_code = 500
242+
243+
# Some server errors return a lower case content-type
244+
# The SDK error parsing expects Content-Type
245+
if content_type := response_copy.headers.get("content-type"):
246+
response_copy.headers["Content-Type"] = content_type
247+
248+
return response_copy
249+
250+
251+
def create_response(elements: list) -> requests.Response:
252+
"""
253+
Creates a requests.Response object with the list of elements.
254+
255+
Args:
256+
response: The original response object.
257+
elements: The list of elements to be serialized and added to
258+
the response.
259+
260+
Returns:
261+
The modified response object with updated content.
262+
"""
263+
response = requests.Response()
264+
173265
content = json.dumps(elements).encode()
174266
content_length = str(len(content))
175-
response_copy.headers.update({"Content-Length": content_length})
176-
setattr(response_copy, "_content", content)
177-
return response_copy
267+
268+
response.headers.update({"Content-Length": content_length, "Content-Type": "application/json"})
269+
response.status_code = 200
270+
setattr(response, "_content", content)
271+
return response
178272

179273

180274
def log_after_split_response(status_code: int, split_number: int):

0 commit comments

Comments
 (0)