Skip to content

Commit dbc85eb

Browse files
retry with wait when throttling error happen in sharepoint (#588)
1 parent abc27a2 commit dbc85eb

File tree

6 files changed

+185
-24
lines changed

6 files changed

+185
-24
lines changed

CHANGELOG.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
## 1.2.12-dev1
1+
## 1.2.12
22

3+
* **Fix: retry with wait when throttling error happens in Sharepoint connector**
34
* **Fix: fix Milvus stager to use correct exception**
4-
5-
## 1.2.12-dev0
6-
75
* **Fix: confluence integration test to use new link and credential**
86

97
## 1.2.11

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ test = [
143143
"pyarrow",
144144
"networkx",
145145
"htmlbuilder",
146+
"office365-rest-python-client",
146147
]
147148
# Add constraints needed for CI
148149
ci = [
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from unittest.mock import Mock
2+
3+
import pytest
4+
5+
from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers
6+
from unstructured_ingest.error import SourceConnectionError
7+
from unstructured_ingest.processes.connectors.sharepoint import (
8+
SharepointConnectionConfig,
9+
SharepointDownloader,
10+
SharepointDownloaderConfig,
11+
)
12+
13+
14+
@pytest.fixture
15+
def mock_client():
16+
return Mock()
17+
18+
19+
@pytest.fixture
20+
def mock_site():
21+
return Mock()
22+
23+
24+
@pytest.fixture
25+
def mock_drive_item():
26+
return Mock()
27+
28+
29+
@pytest.fixture
30+
def mock_file():
31+
return Mock()
32+
33+
34+
@pytest.fixture
35+
def mock_connection_config(mock_client, mock_drive_item):
36+
config = Mock(spec=SharepointConnectionConfig)
37+
config.site = "https://test.sharepoint.com/sites/test"
38+
config.get_client.return_value = mock_client
39+
config._get_drive_item.return_value = mock_drive_item
40+
return config
41+
42+
43+
@pytest.fixture
44+
def mock_download_config():
45+
config = Mock(spec=SharepointDownloaderConfig)
46+
config.max_retries = 3
47+
return config
48+
49+
50+
@pytest.fixture
51+
def sharepoint_downloader(mock_connection_config, mock_download_config):
52+
downloader = SharepointDownloader(
53+
connection_config=mock_connection_config, download_config=mock_download_config
54+
)
55+
return downloader
56+
57+
58+
@pytest.fixture
59+
def file_data():
60+
return FileData(
61+
source_identifiers=SourceIdentifiers(
62+
filename="test.docx", fullpath="/sites/test/Shared Documents/test.docx"
63+
),
64+
connector_type="sharepoint",
65+
identifier="test-id",
66+
)
67+
68+
69+
def test_fetch_file(
70+
mock_client, mock_drive_item, mock_site, mock_file, sharepoint_downloader, file_data
71+
):
72+
"""Test successful file fetch without any errors"""
73+
mock_client.sites.get_by_url.return_value.get.return_value.execute_query.return_value = (
74+
mock_site
75+
)
76+
mock_drive_item.get_by_path.return_value.get.return_value.execute_query.return_value = mock_file
77+
result = sharepoint_downloader._fetch_file(file_data)
78+
79+
assert result == mock_file
80+
assert mock_client.sites.get_by_url.return_value.get.return_value.execute_query.call_count == 1
81+
assert mock_drive_item.get_by_path.return_value.get.return_value.execute_query.call_count == 1
82+
mock_drive_item.get_by_path.assert_called_with("/sites/test/Shared Documents/test.docx")
83+
84+
85+
def test_fetch_file_retries_on_429_error(
86+
mock_client, mock_drive_item, mock_site, sharepoint_downloader, file_data
87+
):
88+
"""Test that _fetch_file retries when encountering 429 errors"""
89+
mock_client.sites.get_by_url.return_value.get.return_value.execute_query.return_value = (
90+
mock_site
91+
)
92+
mock_drive_item.get_by_path.return_value.get.return_value.execute_query.side_effect = [
93+
Exception("429 Client Error"),
94+
Exception("Request has been throttled"),
95+
mock_file,
96+
]
97+
98+
result = sharepoint_downloader._fetch_file(file_data)
99+
assert result == mock_file
100+
assert mock_drive_item.get_by_path.return_value.get.return_value.execute_query.call_count == 3
101+
102+
103+
def test_fetch_file_fails_after_max_retries(
104+
mock_client, mock_drive_item, mock_site, sharepoint_downloader, file_data
105+
):
106+
"""Test that _fetch_file fails after exhausting max retries"""
107+
mock_client.sites.get_by_url.return_value.get.return_value.execute_query.return_value = (
108+
mock_site
109+
)
110+
mock_drive_item.get_by_path.return_value.get.return_value.execute_query.side_effect = Exception(
111+
"429 Client Error"
112+
)
113+
114+
with pytest.raises(Exception, match="429"):
115+
sharepoint_downloader._fetch_file(file_data)
116+
117+
expected_calls = sharepoint_downloader.download_config.max_retries
118+
assert (
119+
mock_drive_item.get_by_path.return_value.get.return_value.execute_query.call_count
120+
== expected_calls
121+
)
122+
123+
124+
def test_fetch_file_handles_site_not_found_immediately(
125+
mock_client, sharepoint_downloader, file_data
126+
):
127+
"""Test that site not found errors are not retried"""
128+
mock_client.sites.get_by_url.return_value.get.return_value.execute_query.side_effect = (
129+
Exception("Site not found")
130+
)
131+
132+
with pytest.raises(SourceConnectionError, match="Site not found"):
133+
sharepoint_downloader._fetch_file(file_data)
134+
135+
assert mock_client.sites.get_by_url.return_value.get.return_value.execute_query.call_count == 1

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.2.12-dev1" # pragma: no cover
1+
__version__ = "1.2.12" # pragma: no cover

unstructured_ingest/processes/connectors/sharepoint.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import logging
45
from dataclasses import dataclass
56
from typing import TYPE_CHECKING, Any, AsyncIterator, Optional
67

@@ -210,7 +211,7 @@ async def run_async(self, **kwargs: Any) -> AsyncIterator[FileData]:
210211

211212

212213
class SharepointDownloaderConfig(OnedriveDownloaderConfig):
213-
pass
214+
max_retries: int = 10
214215

215216

216217
@dataclass
@@ -219,10 +220,22 @@ class SharepointDownloader(OnedriveDownloader):
219220
download_config: SharepointDownloaderConfig
220221
connector_type: str = CONNECTOR_TYPE
221222

223+
@staticmethod
224+
def retry_on_status_code(exc):
225+
error_msg = str(exc).lower()
226+
return "429" in error_msg or "activitylimitreached" in error_msg or "throttled" in error_msg
227+
222228
@SourceConnectionNetworkError.wrap
223229
@requires_dependencies(["office365"], extras="sharepoint")
224230
def _fetch_file(self, file_data: FileData) -> DriveItem:
225231
from office365.runtime.client_request_exception import ClientRequestException
232+
from tenacity import (
233+
before_log,
234+
retry,
235+
retry_if_exception,
236+
stop_after_attempt,
237+
wait_exponential,
238+
)
226239

227240
if file_data.source_identifiers is None or not file_data.source_identifiers.fullpath:
228241
raise ValueError(
@@ -233,13 +246,27 @@ def _fetch_file(self, file_data: FileData) -> DriveItem:
233246
server_relative_path = file_data.source_identifiers.fullpath
234247
client = self.connection_config.get_client()
235248

236-
try:
237-
client_site = client.sites.get_by_url(self.connection_config.site).get().execute_query()
238-
site_drive_item = self.connection_config._get_drive_item(client_site)
239-
except ClientRequestException:
240-
logger.info("Site not found")
241-
raise SourceConnectionError(f"Site not found: {self.connection_config.site}")
242-
file = site_drive_item.get_by_path(server_relative_path).get().execute_query()
249+
@retry(
250+
stop=stop_after_attempt(self.download_config.max_retries),
251+
wait=wait_exponential(exp_base=2, multiplier=1, min=2, max=10),
252+
retry=retry_if_exception(self.retry_on_status_code),
253+
before=before_log(logger, logging.DEBUG),
254+
reraise=True,
255+
)
256+
def _get_item_by_path() -> DriveItem:
257+
try:
258+
client_site = (
259+
client.sites.get_by_url(self.connection_config.site).get().execute_query()
260+
)
261+
site_drive_item = self.connection_config._get_drive_item(client_site)
262+
except ClientRequestException:
263+
logger.info(f"Site not found: {self.connection_config.site}")
264+
raise SourceConnectionError(f"Site not found: {self.connection_config.site}")
265+
file = site_drive_item.get_by_path(server_relative_path).get().execute_query()
266+
return file
267+
268+
# Call the retry-wrapped function
269+
file = _get_item_by_path()
243270

244271
if not file:
245272
raise NotFoundError(f"file not found: {server_relative_path}")

uv.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)