Skip to content

Commit 806cdff

Browse files
handle schema inference error for empty file
1 parent 041c201 commit 806cdff

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException):
157157

158158
class FileSizeLimitError(CustomFileBasedException):
159159
pass
160+
161+
162+
class EmptyFileSchemaInferenceError(AirbyteTracedException):
163+
pass

airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
InferenceType,
2323
)
2424
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
25-
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
25+
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError, \
26+
EmptyFileSchemaInferenceError
2627
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
2728
AbstractFileBasedStreamReader,
2829
FileReadMode,
@@ -203,7 +204,7 @@ async def infer_schema(
203204
break
204205

205206
if not type_inferrer_by_field:
206-
raise AirbyteTracedException(
207+
raise EmptyFileSchemaInferenceError(
207208
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
208209
f"Else, please contact Airbyte.",
209210
failure_type=FailureType.config_error,

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
MissingSchemaError,
2222
RecordParseError,
2323
SchemaInferenceError,
24-
StopSyncPerValidationPolicy,
24+
StopSyncPerValidationPolicy, EmptyFileSchemaInferenceError,
2525
)
2626
from airbyte_cdk.sources.file_based.file_types import FileTransfer
2727
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -246,12 +246,12 @@ def get_json_schema(self) -> JsonSchema:
246246
exception=AirbyteTracedException(exception=config_exception),
247247
failure_type=FailureType.config_error,
248248
)
249+
except EmptyFileSchemaInferenceError as exc:
250+
self._raise_schema_inference_error(exc)
249251
except AirbyteTracedException as ate:
250252
raise ate
251253
except Exception as exc:
252-
raise SchemaInferenceError(
253-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
254-
) from exc
254+
self._raise_schema_inference_error(exc)
255255
else:
256256
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
257257

@@ -385,12 +385,17 @@ async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
385385
return await self.get_parser().infer_schema(
386386
self.config, file, self.stream_reader, self.logger
387387
)
388+
except EmptyFileSchemaInferenceError as exc:
389+
self._raise_schema_inference_error(exc, file)
388390
except AirbyteTracedException as ate:
389391
raise ate
390392
except Exception as exc:
391-
raise SchemaInferenceError(
392-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
393-
file=file.uri,
394-
format=str(self.config.format),
395-
stream=self.name,
396-
) from exc
393+
self._raise_schema_inference_error(exc, file)
394+
395+
def _raise_schema_inference_error(self, exc: Exception, file: Optional[RemoteFile] = None) -> None:
396+
raise SchemaInferenceError(
397+
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
398+
file=file.uri if file else None,
399+
format=str(self.config.format) if self.config.format else None,
400+
stream=self.name,
401+
) from exc

0 commit comments

Comments
 (0)