Skip to content

Commit 11533d8

Browse files
committed
Fix mypy
1 parent b06800f commit 11533d8

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2534,6 +2534,10 @@ def create_parent_stream_config(
25342534
"The '*' wildcard in 'lazy_read_pointer' is not supported — only direct paths are allowed."
25352535
)
25362536

2537+
model_lazy_read_pointer: List[Union[InterpolatedString, str]] = (
2538+
[x for x in model.lazy_read_pointer] if model.lazy_read_pointer else []
2539+
)
2540+
25372541
return ParentStreamConfig(
25382542
parent_key=model.parent_key,
25392543
request_option=request_option,
@@ -2543,7 +2547,7 @@ def create_parent_stream_config(
25432547
incremental_dependency=model.incremental_dependency or False,
25442548
parameters=model.parameters or {},
25452549
extra_fields=model.extra_fields,
2546-
lazy_read_pointer=model.lazy_read_pointer,
2550+
lazy_read_pointer=model_lazy_read_pointer,
25472551
)
25482552

25492553
@staticmethod
@@ -2748,18 +2752,28 @@ def create_simple_retriever(
27482752
)
27492753

27502754
if (
2751-
hasattr(model, "partition_router")
2752-
and model.partition_router
2753-
and model.partition_router.type == "SubstreamPartitionRouter"
2755+
model.partition_router
2756+
and model.partition_router.type == "SubstreamPartitionRouter" # type: ignore[union-attr] # 'model' is SimpleRetrieverModel
27542757
and not bool(self._connector_state_manager.get_stream_state(name, None))
27552758
and any(
27562759
parent_stream_config.lazy_read_pointer
2757-
for parent_stream_config in model.partition_router.parent_stream_configs
2760+
for parent_stream_config in model.partition_router.parent_stream_configs # type: ignore[union-attr] # partition_router type guaranteed by a condition earlier
27582761
)
27592762
):
2760-
if incremental_sync and (incremental_sync.step or incremental_sync.cursor_granularity):
2763+
if incremental_sync:
2764+
if incremental_sync.type != "DatetimeBasedCursor":
2765+
raise ValueError(
2766+
f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}."
2767+
)
2768+
2769+
elif incremental_sync.step or incremental_sync.cursor_granularity:
2770+
raise ValueError(
2771+
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
2772+
)
2773+
2774+
if model.decoder and model.decoder.type != "JsonDecoder":
27612775
raise ValueError(
2762-
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
2776+
f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}."
27632777
)
27642778

27652779
return LazySimpleRetriever(

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def stream_slices(self) -> Iterable[StreamSlice]:
215215
if parent_stream_config.lazy_read_pointer:
216216
extracted_extra_fields = {
217217
"child_response": self._extract_child_response(
218-
parent_record, parent_stream_config.lazy_read_pointer
218+
parent_record, parent_stream_config.lazy_read_pointer # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
219219
),
220220
**extracted_extra_fields,
221221
}
@@ -230,19 +230,19 @@ def stream_slices(self) -> Iterable[StreamSlice]:
230230
)
231231

232232
def _extract_child_response(
233-
self, parent_record: MutableMapping[str, Any], pointer
233+
self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
234234
) -> requests.Response:
235235
"""Extract child records from a parent record based on lazy pointers."""
236236

237-
def _create_response(data: Mapping[str, Any]) -> SafeResponse:
237+
def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
238238
"""Create a SafeResponse with the given data."""
239239
response = SafeResponse()
240240
response.content = json.dumps(data).encode("utf-8")
241241
response.status_code = 200
242242
return response
243243

244244
path = [path.eval(self.config) for path in pointer]
245-
return _create_response(dpath.get(parent_record, path, default=[]))
245+
return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure
246246

247247
def _extract_extra_fields(
248248
self,

0 commit comments

Comments
 (0)