Skip to content

Commit 7dc2164

Browse files
author
maxime.c
committed
fix test, format, lint and a bit of mypy
1 parent dff2559 commit 7dc2164

File tree

9 files changed

+79
-43
lines changed

9 files changed

+79
-43
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def connection_checker(self) -> ConnectionChecker:
298298
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
299299
)
300300

301-
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:
301+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302302
"""
303303
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304304
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,10 @@
545545
StreamSlicer,
546546
StreamSlicerTestReadDecorator,
547547
)
548-
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import \
549-
StreamSlicerPartitionGenerator, DeclarativePartitionFactory
548+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
549+
DeclarativePartitionFactory,
550+
StreamSlicerPartitionGenerator,
551+
)
550552
from airbyte_cdk.sources.declarative.transformations import (
551553
AddFields,
552554
RecordTransformation,
@@ -608,7 +610,12 @@
608610
WeekClampingStrategy,
609611
Weekday,
610612
)
611-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, Cursor, FinalStateCursor
613+
from airbyte_cdk.sources.streams.concurrent.cursor import (
614+
ConcurrentCursor,
615+
Cursor,
616+
CursorField,
617+
FinalStateCursor,
618+
)
612619
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
613620
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
614621
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
@@ -2066,7 +2073,11 @@ def create_declarative_stream(
20662073
options["name"] = model.name
20672074
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20682075

2069-
if isinstance(combined_slicers, PartitionRouter) and not is_parent and not self._emit_connector_builder_messages:
2076+
if (
2077+
isinstance(combined_slicers, PartitionRouter)
2078+
and not is_parent
2079+
and not self._emit_connector_builder_messages
2080+
):
20702081
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20712082
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
20722083
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
@@ -2154,7 +2165,9 @@ def _build_incremental_cursor(
21542165
stream_slicer: Optional[PartitionRouter],
21552166
config: Config,
21562167
) -> Optional[StreamSlicer]:
2157-
if model.incremental_sync and (stream_slicer and not isinstance(stream_slicer, SinglePartitionRouter)):
2168+
if model.incremental_sync and (
2169+
stream_slicer and not isinstance(stream_slicer, SinglePartitionRouter)
2170+
):
21582171
if model.retriever.type == "AsyncRetriever":
21592172
stream_name = model.name or ""
21602173
stream_namespace = None
@@ -2902,7 +2915,10 @@ def create_parent_stream_config(
29022915
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
29032916
) -> ParentStreamConfig:
29042917
declarative_stream = self._create_component_from_model(
2905-
model.stream, config=config, is_parent=True, **kwargs,
2918+
model.stream,
2919+
config=config,
2920+
is_parent=True,
2921+
**kwargs,
29062922
)
29072923
request_option = (
29082924
self._create_component_from_model(model.request_option, config=config)

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313

1414

1515
class SchemaLoaderCachingDecorator(SchemaLoader):
16-
1716
def __init__(self, schema_loader: SchemaLoader):
1817
self._decorated = schema_loader
1918
self._loaded_schema = None
2019

2120
def get_json_schema(self) -> Mapping[str, Any]:
2221
if self._loaded_schema is None:
2322
self._loaded_schema = self._decorated.get_json_schema()
23+
24+
if self._loaded_schema is None:
25+
raise ValueError("Could not load schema")
2426
return self._loaded_schema
2527

2628

@@ -69,7 +71,9 @@ def __init__(
6971
self._hash = SliceHasher.hash(self._stream_name, self._stream_slice)
7072

7173
def read(self) -> Iterable[Record]:
72-
for stream_data in self._retriever.read_records(self._schema_loader.get_json_schema(), self._stream_slice):
74+
for stream_data in self._retriever.read_records(
75+
self._schema_loader.get_json_schema(), self._stream_slice
76+
):
7377
if isinstance(stream_data, Mapping):
7478
record = (
7579
stream_data

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import copy
66
import json
77
import logging
8+
from functools import lru_cache
89
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
910

1011
from typing_extensions import deprecated
@@ -195,6 +196,8 @@ def cursor_field(self) -> Union[str, List[str]]:
195196
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
196197
return self._cursor
197198

199+
# FIXME the lru_cache seems to be mostly there because of typing issue
200+
@lru_cache(maxsize=None)
198201
def get_json_schema(self) -> Mapping[str, Any]:
199202
return self._abstract_stream.get_json_schema()
200203

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from logging import Logger
6-
from typing import Any, Iterable, List, Mapping, Optional, Union, Callable
6+
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union
77

88
from airbyte_cdk.models import AirbyteStream, SyncMode
99
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
@@ -109,7 +109,9 @@ def check_availability(self) -> StreamAvailability:
109109
f"Cannot attempt to connect to stream {self.name} - no stream slices were found"
110110
)
111111
except AirbyteTracedException as error:
112-
return StreamAvailability.unavailable(error.message or error.internal_message or "<no error message>")
112+
return StreamAvailability.unavailable(
113+
error.message or error.internal_message or "<no error message>"
114+
)
113115

114116
try:
115117
next(iter(partition.read()))
@@ -118,4 +120,6 @@ def check_availability(self) -> StreamAvailability:
118120
self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.")
119121
return StreamAvailability.available()
120122
except AirbyteTracedException as error:
121-
return StreamAvailability.unavailable(error.message or error.internal_message or "<no error message>")
123+
return StreamAvailability.unavailable(
124+
error.message or error.internal_message or "<no error message>"
125+
)

unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,8 @@ def get_body():
9393
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body())
9494
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body())
9595

96-
stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
97-
for stream_slice in stream_slices:
98-
for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice):
96+
for partition in stream.generate_partitions():
97+
for _ in partition.read():
9998
counter += 1
10099

101-
assert counter == lines_in_response * len(stream_slices)
100+
assert counter == lines_in_response * 4 # 4 partitions

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@
157157
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
158158
from airbyte_cdk.sources.declarative.spec import Spec
159159
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
160-
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import SchemaLoaderCachingDecorator
160+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
161+
SchemaLoaderCachingDecorator,
162+
)
161163
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
162164
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
163165
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
@@ -1768,14 +1770,8 @@ def test_config_with_defaults():
17681770

17691771
schema_loader = get_schema_loader(stream)
17701772
assert isinstance(schema_loader, JsonFileSchemaLoader)
1771-
assert (
1772-
schema_loader.file_path.string
1773-
== "./source_sendgrid/schemas/{{ parameters.name }}.yaml"
1774-
)
1775-
assert (
1776-
schema_loader.file_path.default
1777-
== "./source_sendgrid/schemas/{{ parameters.name }}.yaml"
1778-
)
1773+
assert schema_loader.file_path.string == "./source_sendgrid/schemas/{{ parameters.name }}.yaml"
1774+
assert schema_loader.file_path.default == "./source_sendgrid/schemas/{{ parameters.name }}.yaml"
17791775

17801776
assert isinstance(retriever.requester, HttpRequester)
17811777
assert retriever.requester.http_method == HttpMethod.GET
@@ -1785,9 +1781,9 @@ def test_config_with_defaults():
17851781

17861782
assert isinstance(retriever.record_selector, RecordSelector)
17871783
assert isinstance(retriever.record_selector.extractor, DpathExtractor)
1788-
assert [
1789-
fp.eval(input_config) for fp in retriever.record_selector.extractor._field_path
1790-
] == ["result"]
1784+
assert [fp.eval(input_config) for fp in retriever.record_selector.extractor._field_path] == [
1785+
"result"
1786+
]
17911787

17921788
assert isinstance(retriever.paginator, DefaultPaginator)
17931789
assert retriever.paginator.url_base.string == "https://api.sendgrid.com"
@@ -2508,7 +2504,9 @@ def test_default_schema_loader(self):
25082504
),
25092505
],
25102506
)
2511-
def test_merge_incremental_and_partition_router(incremental, partition_router, expected_router_type, expected_stream_type):
2507+
def test_merge_incremental_and_partition_router(
2508+
incremental, partition_router, expected_router_type, expected_stream_type
2509+
):
25122510
stream_model = {
25132511
"type": "DeclarativeStream",
25142512
"retriever": {
@@ -2542,7 +2540,11 @@ def test_merge_incremental_and_partition_router(incremental, partition_router, e
25422540
assert isinstance(stream, expected_stream_type)
25432541
retriever = get_retriever(stream)
25442542
assert isinstance(retriever, SimpleRetriever)
2545-
stream_slicer = retriever.stream_slicer if expected_stream_type == DeclarativeStream else stream._stream_partition_generator._stream_slicer
2543+
stream_slicer = (
2544+
retriever.stream_slicer
2545+
if expected_stream_type == DeclarativeStream
2546+
else stream._stream_partition_generator._stream_slicer
2547+
)
25462548
assert isinstance(stream_slicer, expected_router_type)
25472549

25482550
if incremental and partition_router:
@@ -2722,7 +2724,9 @@ def test_create_custom_retriever():
27222724
)
27232725

27242726
assert isinstance(stream, DefaultStream)
2725-
assert isinstance(stream._stream_partition_generator._partition_factory._retriever, MyCustomRetriever)
2727+
assert isinstance(
2728+
stream._stream_partition_generator._partition_factory._retriever, MyCustomRetriever
2729+
)
27262730

27272731

27282732
@freezegun.freeze_time("2021-01-01 00:00:00")
@@ -4667,9 +4671,16 @@ def test_create_stream_with_multiple_schema_loaders():
46674671

46684672

46694673
def get_schema_loader(stream: DefaultStream):
4670-
assert isinstance(stream._stream_partition_generator._partition_factory._schema_loader, SchemaLoaderCachingDecorator)
4674+
assert isinstance(
4675+
stream._stream_partition_generator._partition_factory._schema_loader,
4676+
SchemaLoaderCachingDecorator,
4677+
)
46714678
return stream._stream_partition_generator._partition_factory._schema_loader._decorated
46724679

46734680

46744681
def get_retriever(stream: Union[DeclarativeStream, DefaultStream]):
4675-
return stream.retriever if isinstance(stream, DeclarativeStream) else stream._stream_partition_generator._partition_factory._retriever
4682+
return (
4683+
stream.retriever
4684+
if isinstance(stream, DeclarativeStream)
4685+
else stream._stream_partition_generator._partition_factory._retriever
4686+
)

unit_tests/sources/declarative/resolvers/test_config_components_resolver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,5 +383,6 @@ def test_component_mapping_conditions(manifest, config, expected_conditional_par
383383
for stream in source.streams(config):
384384
if stream.name in expected_conditional_params:
385385
assert (
386-
stream._stream_partition_generator._partition_factory._retriever.requester._parameters == expected_conditional_params[stream.name]
386+
stream._stream_partition_generator._partition_factory._retriever.requester._parameters
387+
== expected_conditional_params[stream.name]
387388
)

unit_tests/sources/declarative/test_manifest_declarative_source.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
SyncMode,
2929
Type,
3030
)
31-
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
31+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
32+
ConcurrentDeclarativeSource,
33+
)
3234
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
3335
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
3436
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
@@ -2191,14 +2193,10 @@ def test_only_parent_streams_use_cache():
21912193
assert not stream_1_retriever.requester.use_cache
21922194

21932195
# Parent stream created for substream
2194-
assert (
2195-
stream_1_retriever.stream_slicer.parent_stream_configs[0].stream.name
2196-
== "applications"
2197-
)
2198-
assert (
2199-
stream_1_retriever.stream_slicer.parent_stream_configs[0]
2200-
.stream.retriever.requester.use_cache
2201-
)
2196+
assert stream_1_retriever.stream_slicer.parent_stream_configs[0].stream.name == "applications"
2197+
assert stream_1_retriever.stream_slicer.parent_stream_configs[
2198+
0
2199+
].stream.retriever.requester.use_cache
22022200

22032201
# Main stream without caching
22042202
assert streams[2].name == "jobs"

0 commit comments

Comments
 (0)