diff --git a/airbyte_cdk/sources/file_based/exceptions.py b/airbyte_cdk/sources/file_based/exceptions.py index b0d38947f..75f7d3f83 100644 --- a/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte_cdk/sources/file_based/exceptions.py @@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException): class FileSizeLimitError(CustomFileBasedException): pass + + +class EmptyFileSchemaInferenceError(AirbyteTracedException): + pass diff --git a/airbyte_cdk/sources/file_based/file_types/csv_parser.py b/airbyte_cdk/sources/file_based/file_types/csv_parser.py index e3010690e..edab346fe 100644 --- a/airbyte_cdk/sources/file_based/file_types/csv_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/csv_parser.py @@ -22,7 +22,11 @@ InferenceType, ) from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig -from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError +from airbyte_cdk.sources.file_based.exceptions import ( + EmptyFileSchemaInferenceError, + FileBasedSourceError, + RecordParseError, +) from airbyte_cdk.sources.file_based.file_based_stream_reader import ( AbstractFileBasedStreamReader, FileReadMode, @@ -203,7 +207,7 @@ async def infer_schema( break if not type_inferrer_by_field: - raise AirbyteTracedException( + raise EmptyFileSchemaInferenceError( message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. " f"Else, please contact Airbyte.", failure_type=FailureType.config_error, diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 0e7121325..3053a74d2 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -9,13 +9,26 @@ from copy import deepcopy from functools import cache from os import path -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union +from typing import ( + Any, + Dict, + Iterable, + List, + Mapping, + MutableMapping, + NoReturn, + Optional, + Set, + Tuple, + Union, +) from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.exceptions import ( DuplicatedFilesError, + EmptyFileSchemaInferenceError, FileBasedSourceError, InvalidSchemaError, MissingSchemaError, @@ -230,7 +243,7 @@ def cursor_field(self) -> Union[str, List[str]]: return self.ab_last_mod_col @cache - def get_json_schema(self) -> JsonSchema: + def get_json_schema(self) -> JsonSchema: # type: ignore if self.use_file_transfer: return file_transfer_schema extra_fields = { @@ -246,12 +259,12 @@ def get_json_schema(self) -> JsonSchema: exception=AirbyteTracedException(exception=config_exception), failure_type=FailureType.config_error, ) + except EmptyFileSchemaInferenceError as exc: + self._raise_schema_inference_error(exc) except AirbyteTracedException as ate: raise ate except Exception as exc: - raise SchemaInferenceError( - FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name - ) from exc + self._raise_schema_inference_error(exc) else: return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} @@ -380,17 +393,24 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: return base_schema - async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: + async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: # type: ignore try: return await self.get_parser().infer_schema( self.config, file, self.stream_reader, self.logger ) + except EmptyFileSchemaInferenceError as exc: + self._raise_schema_inference_error(exc, file) except AirbyteTracedException as ate: raise ate except Exception as exc: - raise SchemaInferenceError( - FileBasedSourceError.SCHEMA_INFERENCE_ERROR, - file=file.uri, - format=str(self.config.format), - stream=self.name, - ) from exc + self._raise_schema_inference_error(exc, file) + + def _raise_schema_inference_error( + self, exc: Exception, file: Optional[RemoteFile] = None + ) -> NoReturn: + raise SchemaInferenceError( + FileBasedSourceError.SCHEMA_INFERENCE_ERROR, + file=file.uri if file else None, + format=str(self.config.format) if self.config.format else None, + stream=self.name, + ) from exc