Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException):

class FileSizeLimitError(CustomFileBasedException):
pass


class EmptyFileSchemaInferenceError(AirbyteTracedException):
pass
8 changes: 6 additions & 2 deletions airbyte_cdk/sources/file_based/file_types/csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
InferenceType,
)
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.exceptions import (
EmptyFileSchemaInferenceError,
FileBasedSourceError,
RecordParseError,
)
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
AbstractFileBasedStreamReader,
FileReadMode,
Expand Down Expand Up @@ -203,7 +207,7 @@ async def infer_schema(
break

if not type_inferrer_by_field:
raise AirbyteTracedException(
raise EmptyFileSchemaInferenceError(
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
f"Else, please contact Airbyte.",
failure_type=FailureType.config_error,
Expand Down
44 changes: 32 additions & 12 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,26 @@
from copy import deepcopy
from functools import cache
from os import path
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
MutableMapping,
NoReturn,
Optional,
Set,
Tuple,
Union,
)

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
DuplicatedFilesError,
EmptyFileSchemaInferenceError,
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
Expand Down Expand Up @@ -230,7 +243,7 @@ def cursor_field(self) -> Union[str, List[str]]:
return self.ab_last_mod_col

@cache
def get_json_schema(self) -> JsonSchema:
def get_json_schema(self) -> JsonSchema: # type: ignore
if self.use_file_transfer:
return file_transfer_schema
extra_fields = {
Expand All @@ -246,12 +259,12 @@ def get_json_schema(self) -> JsonSchema:
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except EmptyFileSchemaInferenceError as exc:
self._raise_schema_inference_error(exc)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
) from exc
self._raise_schema_inference_error(exc)
else:
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

Expand Down Expand Up @@ -380,17 +393,24 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:

return base_schema

async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: # type: ignore
try:
return await self.get_parser().infer_schema(
self.config, file, self.stream_reader, self.logger
)
except EmptyFileSchemaInferenceError as exc:
self._raise_schema_inference_error(exc, file)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
file=file.uri,
format=str(self.config.format),
stream=self.name,
) from exc
self._raise_schema_inference_error(exc, file)

def _raise_schema_inference_error(
self, exc: Exception, file: Optional[RemoteFile] = None
) -> NoReturn:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
file=file.uri if file else None,
format=str(self.config.format) if self.config.format else None,
stream=self.name,
) from exc
Loading