Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException):

class FileSizeLimitError(CustomFileBasedException):
pass


class EmptyFileSchemaInferenceError(AirbyteTracedException):
pass
8 changes: 6 additions & 2 deletions airbyte_cdk/sources/file_based/file_types/csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
InferenceType,
)
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.exceptions import (
EmptyFileSchemaInferenceError,
FileBasedSourceError,
RecordParseError,
)
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
AbstractFileBasedStreamReader,
FileReadMode,
Expand Down Expand Up @@ -203,7 +207,7 @@ async def infer_schema(
break

if not type_inferrer_by_field:
raise AirbyteTracedException(
raise EmptyFileSchemaInferenceError(
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
f"Else, please contact Airbyte.",
failure_type=FailureType.config_error,
Expand Down
26 changes: 17 additions & 9 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
DuplicatedFilesError,
EmptyFileSchemaInferenceError,
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
Expand Down Expand Up @@ -246,12 +247,12 @@ def get_json_schema(self) -> JsonSchema:
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except EmptyFileSchemaInferenceError as exc:
self._raise_schema_inference_error(exc)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
) from exc
self._raise_schema_inference_error(exc)
else:
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

Expand Down Expand Up @@ -385,12 +386,19 @@ async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
return await self.get_parser().infer_schema(
self.config, file, self.stream_reader, self.logger
)
except EmptyFileSchemaInferenceError as exc:
self._raise_schema_inference_error(exc, file)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
file=file.uri,
format=str(self.config.format),
stream=self.name,
) from exc
self._raise_schema_inference_error(exc, file)

def _raise_schema_inference_error(
self, exc: Exception, file: Optional[RemoteFile] = None
) -> None:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
file=file.uri if file else None,
format=str(self.config.format) if self.config.format else None,
stream=self.name,
) from exc
Loading