Skip to content

Commit c2aaf94

Browse files
fix: ibm watsonx s3 retry connection on error (#474)
* IBM watsonx s3 destination retry on connection error * update version and changelog; ibm retry on connection error fix
1 parent 8bc5535 commit c2aaf94

File tree

4 files changed

+56
-10
lines changed

4 files changed

+56
-10
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
## 1.0.12-dev0
1+
## 1.0.13
22

33
### Fixes
44

55
* **Fix Notion connector database property missing 'description' attribute error**
6+
* **Retry IBM watsonx S3 upload on connection error**
67

78
## 1.0.12
89

test/unit/connectors/ibm_watsonx/test_ibm_watsonx_s3.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def connection_config(access_config: IbmWatsonxAccessConfig):
4646
object_storage_endpoint="test_object_storage_endpoint/",
4747
object_storage_region="test_region",
4848
catalog="test_catalog",
49+
max_retries_connection=2,
4950
)
5051

5152

@@ -248,7 +249,7 @@ def test_ibm_watsonx_connection_config_get_catalog_success(
248249
def test_ibm_watsonx_connection_config_get_catalog_failure(
249250
mocker: MockerFixture, connection_config: IbmWatsonxConnectionConfig
250251
):
251-
mocker.patch(
252+
mock_load_catalog = mocker.patch(
252253
"pyiceberg.catalog.load_catalog",
253254
side_effect=Exception("Connection error"),
254255
)
@@ -258,7 +259,23 @@ def test_ibm_watsonx_connection_config_get_catalog_failure(
258259
new="test_bearer_token",
259260
)
260261
with pytest.raises(ProviderError), connection_config.get_catalog():
261-
pass
262+
mock_load_catalog.assert_called_once()
263+
264+
265+
def test_ibm_watsonx_connection_config_get_catalog_failure_retries(
266+
mocker: MockerFixture, connection_config: IbmWatsonxConnectionConfig
267+
):
268+
mock_load_catalog = mocker.patch(
269+
"pyiceberg.catalog.load_catalog",
270+
side_effect=RESTError("Connection error"),
271+
)
272+
mocker.patch.object(
273+
IbmWatsonxConnectionConfig,
274+
"bearer_token",
275+
new="test_bearer_token",
276+
)
277+
with pytest.raises(ProviderError), connection_config.get_catalog():
278+
assert mock_load_catalog.call_count == 2
262279

263280

264281
def test_ibm_watsonx_uploader_precheck_namespace_exists_table_exists(

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.12-dev0" # pragma: no cover
1+
__version__ = "1.0.13" # pragma: no cover

unstructured_ingest/processes/connectors/ibm_watsonx/ibm_watsonx_s3.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ class IbmWatsonxConnectionConfig(ConnectionConfig):
5656
object_storage_endpoint: str = Field(description="Cloud Object Storage public endpoint")
5757
object_storage_region: str = Field(description="Cloud Object Storage region")
5858
catalog: str = Field(description="Catalog name")
59+
max_retries_connection: int = Field(
60+
default=10,
61+
description="Maximum number of retries in case of a connection error (RESTError)",
62+
ge=2,
63+
le=100,
64+
)
5965

6066
_bearer_token: Optional[dict[str, Any]] = None
6167

@@ -145,10 +151,29 @@ def get_catalog_config(self) -> dict[str, Any]:
145151
@contextmanager
146152
def get_catalog(self) -> Generator["RestCatalog", None, None]:
147153
from pyiceberg.catalog import load_catalog
154+
from pyiceberg.exceptions import RESTError
155+
from tenacity import (
156+
before_log,
157+
retry,
158+
retry_if_exception_type,
159+
stop_after_attempt,
160+
wait_exponential,
161+
)
162+
163+
# Retry connection in case of a connection error
164+
@retry(
165+
stop=stop_after_attempt(self.max_retries_connection),
166+
wait=wait_exponential(exp_base=2, multiplier=1, min=2, max=10),
167+
retry=retry_if_exception_type(RESTError),
168+
before=before_log(logger, logging.DEBUG),
169+
reraise=True,
170+
)
171+
def _get_catalog(catalog_config: dict[str, Any]) -> "RestCatalog":
172+
return load_catalog(**catalog_config)
148173

149174
try:
150175
catalog_config = self.get_catalog_config()
151-
catalog = load_catalog(**catalog_config)
176+
catalog = _get_catalog(catalog_config)
152177
except Exception as e:
153178
logger.error(f"Failed to connect to catalog '{self.catalog}': {e}", exc_info=True)
154179
raise ProviderError(f"Failed to connect to catalog '{self.catalog}': {e}")
@@ -172,7 +197,10 @@ class IbmWatsonxUploaderConfig(UploaderConfig):
172197
namespace: str = Field(description="Namespace name")
173198
table: str = Field(description="Table name")
174199
max_retries: int = Field(
175-
default=5, description="Maximum number of retries to upload data", ge=2, le=500
200+
default=5,
201+
description="Maximum number of retries to upload data (CommitFailedException)",
202+
ge=2,
203+
le=500,
176204
)
177205
record_id_key: str = Field(
178206
default=RECORD_ID_LABEL,
@@ -287,15 +315,15 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
287315
retry,
288316
retry_if_exception_type,
289317
stop_after_attempt,
290-
wait_random,
318+
wait_exponential,
291319
)
292320

293321
data_table = self._df_to_arrow_table(df)
294322

295-
# Retry connection in case of connection error
323+
# Retry connection in case of a connection error or token expiration
296324
@retry(
297-
stop=stop_after_attempt(2),
298-
wait=wait_random(),
325+
stop=stop_after_attempt(self.connection_config.max_retries_connection),
326+
wait=wait_exponential(exp_base=2, multiplier=1, min=2, max=10),
299327
retry=retry_if_exception_type(RESTError),
300328
before=before_log(logger, logging.DEBUG),
301329
reraise=True,

0 commit comments

Comments
 (0)