Skip to content

Commit 123718a

Browse files
authored
feat/expand error handling to fsspec connectors (#319)
* provide better error handling in s3 * reuse wrap error logic * provide better error handling in azure * provide better error handling in dropbox * provide better error handling in gcs * provide better error handling in box * bump changelog
1 parent 8a83e91 commit 123718a

File tree

10 files changed

+168
-39
lines changed

10 files changed

+168
-39
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
## 0.3.13-dev0
2+
13
## 0.3.12
24

35
### Enhancements

test/integration/connectors/test_s3.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
source_connector_validation,
1717
)
1818
from test.integration.utils import requires_env
19-
from unstructured_ingest.error import (
20-
SourceConnectionError,
21-
)
19+
from unstructured_ingest.v2.errors import UserAuthError, UserError
2220
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers
2321
from unstructured_ingest.v2.processes.connectors.fsspec.s3 import (
2422
CONNECTOR_TYPE,
@@ -94,12 +92,19 @@ async def test_s3_source_special_char(anon_connection_config: S3ConnectionConfig
9492
)
9593

9694

97-
@pytest.mark.asyncio
9895
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG)
99-
async def test_s3_source_no_access(anon_connection_config: S3ConnectionConfig):
96+
def test_s3_source_no_access(anon_connection_config: S3ConnectionConfig):
10097
indexer_config = S3IndexerConfig(remote_url="s3://utic-ingest-test-fixtures/destination/")
10198
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
102-
with pytest.raises(SourceConnectionError):
99+
with pytest.raises(UserAuthError):
100+
indexer.precheck()
101+
102+
103+
@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG)
104+
def test_s3_source_no_bucket(anon_connection_config: S3ConnectionConfig):
105+
indexer_config = S3IndexerConfig(remote_url="s3://fake-bucket")
106+
indexer = S3Indexer(connection_config=anon_connection_config, index_config=indexer_config)
107+
with pytest.raises(UserError):
103108
indexer.precheck()
104109

105110

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.12" # pragma: no cover
1+
__version__ = "0.3.13-dev0" # pragma: no cover

unstructured_ingest/v2/processes/connectors/fsspec/azure.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from pydantic import Field, Secret
99

1010
from unstructured_ingest.utils.dep_check import requires_dependencies
11+
from unstructured_ingest.v2.errors import ProviderError, UserAuthError, UserError
1112
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
13+
from unstructured_ingest.v2.logger import logger
1214
from unstructured_ingest.v2.processes.connector_registry import (
1315
DestinationRegistryEntry,
1416
SourceRegistryEntry,
@@ -98,6 +100,24 @@ def get_client(self, protocol: str) -> Generator["AzureBlobFileSystem", None, No
98100
with super().get_client(protocol=protocol) as client:
99101
yield client
100102

103+
def wrap_error(self, e: Exception) -> Exception:
104+
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError
105+
106+
if not isinstance(e, HttpResponseError):
107+
logger.error(f"unhandled exception from azure ({type(e)}): {e}", exc_info=True)
108+
return e
109+
if isinstance(e, ClientAuthenticationError):
110+
return UserAuthError(e.reason)
111+
status_code = e.status_code
112+
message = e.reason
113+
if status_code is not None:
114+
if 400 <= status_code < 500:
115+
return UserError(message)
116+
if status_code >= 500:
117+
return ProviderError(message)
118+
logger.error(f"unhandled exception from azure ({type(e)}): {e}", exc_info=True)
119+
return e
120+
101121

102122
@dataclass
103123
class AzureIndexer(FsspecIndexer):

unstructured_ingest/v2/processes/connectors/fsspec/box.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
from pydantic.functional_validators import BeforeValidator
1111

1212
from unstructured_ingest.utils.dep_check import requires_dependencies
13+
from unstructured_ingest.v2.errors import ProviderError, UserAuthError, UserError
1314
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
15+
from unstructured_ingest.v2.logger import logger
1416
from unstructured_ingest.v2.processes.connector_registry import (
1517
DestinationRegistryEntry,
1618
SourceRegistryEntry,
@@ -24,7 +26,6 @@
2426
FsspecIndexerConfig,
2527
FsspecUploader,
2628
FsspecUploaderConfig,
27-
SourceConnectionError,
2829
)
2930
from unstructured_ingest.v2.processes.connectors.utils import conform_string_to_dict
3031

@@ -57,13 +58,10 @@ def get_access_config(self) -> dict[str, Any]:
5758

5859
# Create and authenticate the JWTAuth object
5960
oauth = JWTAuth.from_settings_dictionary(settings_dict)
60-
try:
61-
oauth.authenticate_instance()
62-
except Exception as e:
63-
raise SourceConnectionError(f"Failed to authenticate with Box: {e}")
61+
oauth.authenticate_instance()
6462

65-
if not oauth.access_token:
66-
raise SourceConnectionError("Authentication failed: No access token generated.")
63+
# if not oauth.access_token:
64+
# raise SourceConnectionError("Authentication failed: No access token generated.")
6765

6866
# Prepare the access configuration with the authenticated oauth
6967
access_kwargs_with_oauth: dict[str, Any] = {
@@ -75,6 +73,24 @@ def get_access_config(self) -> dict[str, Any]:
7573

7674
return access_kwargs_with_oauth
7775

76+
def wrap_error(self, e: Exception) -> Exception:
77+
from boxsdk.exception import BoxAPIException, BoxOAuthException
78+
79+
if isinstance(e, BoxOAuthException):
80+
return UserAuthError(e.message)
81+
if not isinstance(e, BoxAPIException):
82+
logger.error(f"unhandled exception from box ({type(e)}): {e}", exc_info=True)
83+
return e
84+
message = e.message or e
85+
if error_code_status := e.status:
86+
if 400 <= error_code_status < 500:
87+
return UserError(message)
88+
if error_code_status >= 500:
89+
return ProviderError(message)
90+
91+
logger.error(f"unhandled exception from box ({type(e)}): {e}", exc_info=True)
92+
return e
93+
7894
@requires_dependencies(["boxfs"], extras="box")
7995
@contextmanager
8096
def get_client(self, protocol: str) -> Generator["BoxFileSystem", None, None]:

unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,16 @@
88
from pydantic import Field, Secret
99

1010
from unstructured_ingest.utils.dep_check import requires_dependencies
11+
from unstructured_ingest.v2.errors import (
12+
ProviderError,
13+
UserAuthError,
14+
UserError,
15+
)
16+
from unstructured_ingest.v2.errors import (
17+
RateLimitError as CustomRateLimitError,
18+
)
1119
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
20+
from unstructured_ingest.v2.logger import logger
1221
from unstructured_ingest.v2.processes.connector_registry import (
1322
DestinationRegistryEntry,
1423
SourceRegistryEntry,
@@ -53,6 +62,30 @@ def get_client(self, protocol: str) -> Generator["DropboxDriveFileSystem", None,
5362
with super().get_client(protocol=protocol) as client:
5463
yield client
5564

65+
def wrap_error(self, e: Exception) -> Exception:
66+
from dropbox.exceptions import AuthError, HttpError, RateLimitError
67+
68+
if not isinstance(e, HttpError):
69+
logger.error(f"unhandled exception from dropbox ({type(e)}): {e}", exc_info=True)
70+
return e
71+
if isinstance(e, AuthError):
72+
raise UserAuthError(e.error)
73+
if isinstance(e, RateLimitError):
74+
return CustomRateLimitError(e.error)
75+
status_code = e.status_code
76+
if 400 <= status_code < 500:
77+
if body := getattr(e, "body", None):
78+
return UserError(body)
79+
else:
80+
return UserError(e.body)
81+
if status_code >= 500:
82+
if body := getattr(e, "body", None):
83+
return ProviderError(body)
84+
else:
85+
return ProviderError(e.body)
86+
logger.error(f"unhandled exception from dropbox ({type(e)}): {e}", exc_info=True)
87+
return e
88+
5689

5790
@dataclass
5891
class DropboxIndexer(FsspecIndexer):

unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,6 @@
1212

1313
from pydantic import BaseModel, Field, Secret
1414

15-
from unstructured_ingest.error import (
16-
DestinationConnectionError,
17-
SourceConnectionError,
18-
SourceConnectionNetworkError,
19-
)
2015
from unstructured_ingest.v2.interfaces import (
2116
AccessConfig,
2217
ConnectionConfig,
@@ -88,6 +83,9 @@ def get_client(self, protocol: str) -> Generator["AbstractFileSystem", None, Non
8883
)
8984
yield client
9085

86+
def wrap_error(self, e: Exception) -> Exception:
87+
return e
88+
9189

9290
FsspecIndexerConfigT = TypeVar("FsspecIndexerConfigT", bound=FsspecIndexerConfig)
9391
FsspecConnectionConfigT = TypeVar("FsspecConnectionConfigT", bound=FsspecConnectionConfig)
@@ -99,6 +97,9 @@ class FsspecIndexer(Indexer):
9997
index_config: FsspecIndexerConfigT
10098
connector_type: str = Field(default=CONNECTOR_TYPE, init=False)
10199

100+
def wrap_error(self, e: Exception) -> Exception:
101+
return self.connection_config.wrap_error(e=e)
102+
102103
def precheck(self) -> None:
103104
from fsspec import get_filesystem_class
104105

@@ -116,7 +117,7 @@ def precheck(self) -> None:
116117
client.head(path=file_to_sample)
117118
except Exception as e:
118119
logger.error(f"failed to validate connection: {e}", exc_info=True)
119-
raise SourceConnectionError(f"failed to validate connection: {e}")
120+
raise self.wrap_error(e=e)
120121

121122
def get_file_data(self) -> list[dict[str, Any]]:
122123
if not self.index_config.recursive:
@@ -230,6 +231,9 @@ def handle_directory_download(self, lpath: Path) -> None:
230231
shutil.rmtree(lpath)
231232
shutil.move(src=temp_location, dst=lpath)
232233

234+
def wrap_error(self, e: Exception) -> Exception:
235+
return self.connection_config.wrap_error(e=e)
236+
233237
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
234238
download_path = self.get_download_path(file_data=file_data)
235239
download_path.parent.mkdir(parents=True, exist_ok=True)
@@ -239,8 +243,7 @@ def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
239243
client.get(rpath=rpath, lpath=download_path.as_posix())
240244
self.handle_directory_download(lpath=download_path)
241245
except Exception as e:
242-
logger.error(f"failed to download file {file_data.identifier}: {e}", exc_info=True)
243-
raise SourceConnectionNetworkError(f"failed to download file {file_data.identifier}")
246+
raise self.wrap_error(e=e)
244247
return self.generate_download_response(file_data=file_data, download_path=download_path)
245248

246249
async def async_run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
@@ -252,8 +255,7 @@ async def async_run(self, file_data: FileData, **kwargs: Any) -> DownloadRespons
252255
await client.get(rpath=rpath, lpath=download_path.as_posix())
253256
self.handle_directory_download(lpath=download_path)
254257
except Exception as e:
255-
logger.error(f"failed to download file {file_data.identifier}: {e}", exc_info=True)
256-
raise SourceConnectionNetworkError(f"failed to download file {file_data.identifier}")
258+
raise self.wrap_error(e=e)
257259
return self.generate_download_response(file_data=file_data, download_path=download_path)
258260

259261

@@ -291,6 +293,9 @@ def __post_init__(self):
291293
f"missing 1 required positional argument: 'upload_config'"
292294
)
293295

296+
def wrap_error(self, e: Exception) -> Exception:
297+
return self.connection_config.wrap_error(e=e)
298+
294299
def precheck(self) -> None:
295300
from fsspec import get_filesystem_class
296301

@@ -301,8 +306,7 @@ def precheck(self) -> None:
301306
upload_path = Path(self.upload_config.path_without_protocol) / "_empty"
302307
fs.write_bytes(path=upload_path.as_posix(), value=b"")
303308
except Exception as e:
304-
logger.error(f"failed to validate connection: {e}", exc_info=True)
305-
raise DestinationConnectionError(f"failed to validate connection: {e}")
309+
raise self.wrap_error(e=e)
306310

307311
def get_upload_path(self, file_data: FileData) -> Path:
308312
upload_path = (

unstructured_ingest/v2/processes/connectors/fsspec/gcs.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
from unstructured_ingest.utils.dep_check import requires_dependencies
1313
from unstructured_ingest.utils.string_and_date_utils import json_to_dict
14+
from unstructured_ingest.v2.errors import ProviderError, UserError
1415
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
16+
from unstructured_ingest.v2.logger import logger
1517
from unstructured_ingest.v2.processes.connector_registry import (
1618
DestinationRegistryEntry,
1719
SourceRegistryEntry,
@@ -103,6 +105,25 @@ def get_client(self, protocol: str) -> Generator["GCSFileSystem", None, None]:
103105
with super().get_client(protocol=protocol) as client:
104106
yield client
105107

108+
def wrap_error(self, e: Exception) -> Exception:
109+
# https://github.com/fsspec/gcsfs/blob/main/gcsfs/retry.py#L79
110+
from gcsfs.retry import HttpError
111+
112+
if isinstance(e, FileNotFoundError):
113+
raise UserError(f"File not found: {e}")
114+
if isinstance(e, OSError) and "Forbidden" in str(e):
115+
raise UserError(e)
116+
if isinstance(e, ValueError) and "Bad Request" in str(e):
117+
raise UserError(e)
118+
if isinstance(e, HttpError) and (http_error_code := e.code):
119+
message = e.message or e
120+
if 400 <= http_error_code < 500:
121+
raise UserError(message)
122+
if http_error_code >= 500:
123+
raise ProviderError(message)
124+
logger.error(f"unhandled exception from gcs ({type(e)}): {e}", exc_info=True)
125+
return e
126+
106127

107128
@dataclass
108129
class GcsIndexer(FsspecIndexer):

unstructured_ingest/v2/processes/connectors/fsspec/s3.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
from pydantic import Field, Secret
88

99
from unstructured_ingest.utils.dep_check import requires_dependencies
10+
from unstructured_ingest.v2.errors import ProviderError, UserAuthError, UserError
1011
from unstructured_ingest.v2.interfaces import (
1112
FileDataSourceMetadata,
1213
)
14+
from unstructured_ingest.v2.logger import logger
1315
from unstructured_ingest.v2.processes.connector_registry import (
1416
DestinationRegistryEntry,
1517
SourceRegistryEntry,
@@ -79,13 +81,35 @@ def get_client(self, protocol: str) -> Generator["S3FileSystem", None, None]:
7981
with super().get_client(protocol=protocol) as client:
8082
yield client
8183

84+
def wrap_error(self, e: Exception) -> Exception:
85+
# s3fs maps botocore errors into python ones using mapping here:
86+
# https://github.com/fsspec/s3fs/blob/main/s3fs/errors.py
87+
if isinstance(e, PermissionError):
88+
return UserAuthError(e)
89+
if isinstance(e, FileNotFoundError):
90+
return UserError(e)
91+
if cause := getattr(e, "__cause__", None):
92+
error_response = cause.response
93+
error_meta = error_response["ResponseMetadata"]
94+
http_code = error_meta["HTTPStatusCode"]
95+
message = error_response["Error"].get("Message", str(e))
96+
if 400 <= http_code < 500:
97+
return UserError(message)
98+
if http_code >= 500:
99+
return ProviderError(message)
100+
logger.error(f"unhandled exception from s3 ({type(e)}): {e}", exc_info=True)
101+
return e
102+
82103

83104
@dataclass
84105
class S3Indexer(FsspecIndexer):
85106
connection_config: S3ConnectionConfig
86107
index_config: S3IndexerConfig
87108
connector_type: str = CONNECTOR_TYPE
88109

110+
def wrap_error(self, e: Exception) -> Exception:
111+
return self.connection_config.wrap_error(e=e)
112+
89113
def get_path(self, file_data: dict) -> str:
90114
return file_data["Key"]
91115

0 commit comments

Comments
 (0)