Skip to content

Commit c83dce6

Browse files
committed
Fix mypy
2 parents 69bc211 + 02030b5 commit c83dce6

File tree

4 files changed

+54
-23
lines changed

4 files changed

+54
-23
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,11 @@ def _group_streams(
211211
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
212212
)
213213

214-
name_to_stream_mapping[declarative_stream.name] = name_to_stream_mapping[declarative_stream.name]["incremental_stream"] \
215-
if stream_state else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
214+
name_to_stream_mapping[declarative_stream.name] = (
215+
name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
216+
if stream_state
217+
else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
218+
)
216219

217220
if isinstance(declarative_stream, DeclarativeStream) and (
218221
name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
32+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
3233
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3334
StateDelegatingStream as StateDelegatingStreamModel,
3435
)
35-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
3636
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
3737
get_registered_components_module,
3838
)
@@ -146,7 +146,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
146146

147147
source_streams = [
148148
self._constructor.create_component(
149-
StateDelegatingStreamModel if stream_config.get("type") == StateDelegatingStreamModel.__name__ else DeclarativeStreamModel,
149+
StateDelegatingStreamModel
150+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
151+
else DeclarativeStreamModel,
150152
stream_config,
151153
config,
152154
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -197,12 +199,12 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
197199
for stream_config in stream_configs:
198200
if stream_config["name"] in parent_streams:
199201
if stream_config["type"] == "StateDelegatingStream":
200-
stream_config["full_refresh_stream"]["retriever"]["requester"][
201-
"use_cache"
202-
] = True
203-
stream_config["incremental_stream"]["retriever"]["requester"][
204-
"use_cache"
205-
] = True
202+
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
203+
True
204+
)
205+
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
206+
True
207+
)
206208
else:
207209
stream_config["retriever"]["requester"]["use_cache"] = True
208210

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,6 @@
204204
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
205205
DeclarativeStream as DeclarativeStreamModel,
206206
)
207-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
208-
StateDelegatingStream as StateDelegatingStreamModel,
209-
)
210207
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
211208
DefaultErrorHandler as DefaultErrorHandlerModel,
212209
)
@@ -355,6 +352,9 @@
355352
SimpleRetriever as SimpleRetrieverModel,
356353
)
357354
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
355+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
356+
StateDelegatingStream as StateDelegatingStreamModel,
357+
)
358358
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
359359
StreamConfig as StreamConfigModel,
360360
)
@@ -1793,13 +1793,17 @@ def _build_stream_slicer_from_partition_router(
17931793
if isinstance(stream_slicer_model, list):
17941794
return CartesianProductStreamSlicer(
17951795
[
1796-
self._create_component_from_model(model=slicer, config=config, stream_name=stream_name or "")
1796+
self._create_component_from_model(
1797+
model=slicer, config=config, stream_name=stream_name or ""
1798+
)
17971799
for slicer in stream_slicer_model
17981800
],
17991801
parameters={},
18001802
)
18011803
else:
1802-
return self._create_component_from_model(model=stream_slicer_model, config=config, stream_name=stream_name or "") # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
1804+
return self._create_component_from_model(
1805+
model=stream_slicer_model, config=config, stream_name=stream_name or ""
1806+
) # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
18031807
return None
18041808

18051809
def _build_incremental_cursor(
@@ -2478,7 +2482,9 @@ def create_page_increment(
24782482
def create_parent_stream_config(
24792483
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
24802484
) -> ParentStreamConfig:
2481-
declarative_stream = self._create_component_from_model(model.stream, config=config, **kwargs)
2485+
declarative_stream = self._create_component_from_model(
2486+
model.stream, config=config, **kwargs
2487+
)
24822488
request_option = (
24832489
self._create_component_from_model(model.request_option, config=config)
24842490
if model.request_option
@@ -2719,12 +2725,26 @@ def create_simple_retriever(
27192725
parameters=model.parameters or {},
27202726
)
27212727

2722-
def create_state_delegating_stream(self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any
2728+
def create_state_delegating_stream(
2729+
self,
2730+
model: StateDelegatingStreamModel,
2731+
config: Config,
2732+
child_state: Optional[MutableMapping[str, Any]] = None,
2733+
**kwargs: Any,
27232734
) -> DeclarativeStream:
2724-
if model.full_refresh_stream.name != model.name or model.name != model.incremental_stream.name:
2725-
raise ValueError(f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}.")
2735+
if (
2736+
model.full_refresh_stream.name != model.name
2737+
or model.name != model.incremental_stream.name
2738+
):
2739+
raise ValueError(
2740+
f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}."
2741+
)
27262742

2727-
stream_model = model.incremental_stream if self._connector_state_manager.get_stream_state(model.name, None) or child_state else model.full_refresh_stream
2743+
stream_model = (
2744+
model.incremental_stream
2745+
if self._connector_state_manager.get_stream_state(model.name, None) or child_state
2746+
else model.full_refresh_stream
2747+
)
27282748

27292749
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # Will be created DeclarativeStream as stream_model is stream description
27302750

@@ -2981,8 +3001,14 @@ def _create_message_repository_substream_wrapper(
29813001
self._evaluate_log_level(self._emit_connector_builder_messages),
29823002
),
29833003
)
2984-
child_state = self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None) if model.incremental_dependency or False else None
2985-
return substream_factory._create_component_from_model(model=model, config=config, child_state=child_state, **kwargs)
3004+
child_state = (
3005+
self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None)
3006+
if model.incremental_dependency or False
3007+
else None
3008+
)
3009+
return substream_factory._create_component_from_model(
3010+
model=model, config=config, child_state=child_state, **kwargs
3011+
)
29863012

29873013
@staticmethod
29883014
def create_wait_time_from_header(

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
"inject_into": "request_parameter",
116116
},
117117
},
118-
}
118+
},
119119
},
120120
},
121121
"streams": [{"$ref": "#/definitions/TestStream"}],

0 commit comments

Comments
 (0)