Skip to content

Commit 085914c

Browse files
committed
Merge branch 'main' into dbgold17/update-manifest-server-to-python-3.13
2 parents 37352a5 + 20ae208 commit 085914c

File tree

10 files changed

+450
-100
lines changed

10 files changed

+450
-100
lines changed

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import zlib
88
from contextlib import closing
99
from dataclasses import InitVar, dataclass
10+
from math import nan
1011
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
1112

1213
import pandas as pd
1314
import requests
14-
from numpy import nan
1515

1616
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1717

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException):
157157

158158
class FileSizeLimitError(CustomFileBasedException):
159159
pass
160+
161+
162+
class EmptyFileSchemaInferenceError(AirbyteTracedException):
163+
pass

airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
InferenceType,
2323
)
2424
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
25-
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
25+
from airbyte_cdk.sources.file_based.exceptions import (
26+
EmptyFileSchemaInferenceError,
27+
FileBasedSourceError,
28+
RecordParseError,
29+
)
2630
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
2731
AbstractFileBasedStreamReader,
2832
FileReadMode,
@@ -203,7 +207,7 @@ async def infer_schema(
203207
break
204208

205209
if not type_inferrer_by_field:
206-
raise AirbyteTracedException(
210+
raise EmptyFileSchemaInferenceError(
207211
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
208212
f"Else, please contact Airbyte.",
209213
failure_type=FailureType.config_error,

airbyte_cdk/sources/file_based/file_types/excel_parser.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
import orjson
1111
import pandas as pd
12-
from numpy import datetime64, issubdtype
13-
from numpy import dtype as dtype_
1412
from pydantic.v1 import BaseModel
1513

1614
from airbyte_cdk.sources.file_based.config.file_based_stream_config import (
@@ -141,7 +139,7 @@ def file_read_mode(self) -> FileReadMode:
141139
@staticmethod
142140
def dtype_to_json_type(
143141
current_type: Optional[str],
144-
dtype: dtype_, # type: ignore [type-arg]
142+
dtype: Any, # Type object from pandas DataFrame
145143
) -> str:
146144
"""
147145
Convert Pandas DataFrame types to Airbyte Types.
@@ -163,7 +161,7 @@ def dtype_to_json_type(
163161
return "number"
164162
if dtype == "bool" and (not current_type or current_type == "boolean"):
165163
return "boolean"
166-
if issubdtype(dtype, datetime64):
164+
if pd.api.types.is_datetime64_any_dtype(dtype):
167165
return "date-time"
168166
return "string"
169167

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,26 @@
99
from copy import deepcopy
1010
from functools import cache
1111
from os import path
12-
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
12+
from typing import (
13+
Any,
14+
Dict,
15+
Iterable,
16+
List,
17+
Mapping,
18+
MutableMapping,
19+
NoReturn,
20+
Optional,
21+
Set,
22+
Tuple,
23+
Union,
24+
)
1325

1426
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level
1527
from airbyte_cdk.models import Type as MessageType
1628
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
1729
from airbyte_cdk.sources.file_based.exceptions import (
1830
DuplicatedFilesError,
31+
EmptyFileSchemaInferenceError,
1932
FileBasedSourceError,
2033
InvalidSchemaError,
2134
MissingSchemaError,
@@ -230,7 +243,7 @@ def cursor_field(self) -> Union[str, List[str]]:
230243
return self.ab_last_mod_col
231244

232245
@cache
233-
def get_json_schema(self) -> JsonSchema:
246+
def get_json_schema(self) -> JsonSchema: # type: ignore
234247
if self.use_file_transfer:
235248
return file_transfer_schema
236249
extra_fields = {
@@ -246,12 +259,12 @@ def get_json_schema(self) -> JsonSchema:
246259
exception=AirbyteTracedException(exception=config_exception),
247260
failure_type=FailureType.config_error,
248261
)
262+
except EmptyFileSchemaInferenceError as exc:
263+
self._raise_schema_inference_error(exc)
249264
except AirbyteTracedException as ate:
250265
raise ate
251266
except Exception as exc:
252-
raise SchemaInferenceError(
253-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
254-
) from exc
267+
self._raise_schema_inference_error(exc)
255268
else:
256269
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
257270

@@ -380,17 +393,24 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
380393

381394
return base_schema
382395

383-
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
396+
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: # type: ignore
384397
try:
385398
return await self.get_parser().infer_schema(
386399
self.config, file, self.stream_reader, self.logger
387400
)
401+
except EmptyFileSchemaInferenceError as exc:
402+
self._raise_schema_inference_error(exc, file)
388403
except AirbyteTracedException as ate:
389404
raise ate
390405
except Exception as exc:
391-
raise SchemaInferenceError(
392-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
393-
file=file.uri,
394-
format=str(self.config.format),
395-
stream=self.name,
396-
) from exc
406+
self._raise_schema_inference_error(exc, file)
407+
408+
def _raise_schema_inference_error(
409+
self, exc: Exception, file: Optional[RemoteFile] = None
410+
) -> NoReturn:
411+
raise SchemaInferenceError(
412+
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
413+
file=file.uri if file else None,
414+
format=str(self.config.format) if self.config.format else None,
415+
stream=self.name,
416+
) from exc

docs/generate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def run() -> None:
6565
shutil.rmtree("docs/generated")
6666

6767
pdoc.render.configure(
68-
template_directory="docs",
68+
template_directory=pathlib.Path("docs/templates"),
6969
show_source=True,
7070
search=True,
7171
logo="https://docs.airbyte.com/img/logo-dark.png",

0 commit comments

Comments
 (0)