Skip to content

Commit 74dbe15

Browse files
committed
Fix mypy
1 parent 65b0546 commit 74dbe15

File tree

3 files changed

+58
-34
lines changed

3 files changed

+58
-34
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2711,15 +2711,27 @@ def create_simple_retriever(
27112711
model.ignore_stream_slicer_parameters_on_paginated_requests or False
27122712
)
27132713

2714+
if model.lazy_read_pointer and not hasattr(model, "partition_router"):
2715+
raise ValueError(
2716+
"LazySimpleRetriever requires a 'partition_router' when 'lazy_read_pointer' is set. "
2717+
"Please either define 'partition_router' or remove 'lazy_read_pointer' from the model."
2718+
)
2719+
27142720
if model.lazy_read_pointer and not bool(
2715-
self._connector_state_manager.get_stream_state(name, None)
2721+
self._connector_state_manager.get_stream_state(name, None)
27162722
):
2723+
if model.partition_router.type != "SubstreamPartitionRouterModel": # type: ignore[union-attr] # model.partition_router has BaseModel type
2724+
raise ValueError(
2725+
"LazySimpleRetriever only supports 'SubstreamPartitionRouterModel' as the 'partition_router' type. " # type: ignore[union-attr] # model.partition_router has BaseModel type
2726+
f"Found: '{model.partition_router.type}'."
2727+
)
2728+
27172729
lazy_read_pointer = [
27182730
InterpolatedString.create(path, parameters=model.parameters or {})
27192731
for path in model.lazy_read_pointer
27202732
]
27212733
partition_router = self._create_component_from_model(
2722-
model=model.partition_router, config=config
2734+
model=model.partition_router, config=config # type: ignore[arg-type]
27232735
)
27242736
stream_slicer = (
27252737
self._create_component_from_model(model=incremental_sync, config=config)

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ def _get_request_option(
144144
return params
145145

146146
def process_parent_record(
147-
self, parent_record: Union[AirbyteMessage, Record, Mapping], parent_stream_name: str
148-
) -> Tuple[Optional[Mapping], Optional[Mapping]]:
147+
self, parent_record: Union[AirbyteMessage, Record, Mapping[str, Any]], parent_stream_name: str
148+
) -> Tuple[Optional[MutableMapping[str, Any]], Optional[MutableMapping[str, Any]]]:
149149
"""
150150
Processes and extracts data from a parent record, handling different record types
151151
and ensuring only valid types proceed.
@@ -161,23 +161,21 @@ def process_parent_record(
161161
f"This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
162162
)
163163
if parent_record.type == MessageType.RECORD:
164-
return parent_record.record.data, {}
164+
return parent_record.record.data, {} # type: ignore[union-attr] # parent_record.record is always AirbyteRecordMessage
165165
return None, None # Skip invalid or non-record data
166166

167-
# Handle Record type
168167
if isinstance(parent_record, Record):
169168
parent_partition = (
170169
parent_record.associated_slice.partition if parent_record.associated_slice else {}
171170
)
172-
return parent_record.data, parent_partition
171+
return {**parent_record.data}, {**parent_partition}
173172

174-
# Validate the record type
175-
if not isinstance(parent_record, Mapping):
176-
raise AirbyteTracedException(
177-
message=f"Parent stream returned records as invalid type {type(parent_record)}"
178-
)
173+
if isinstance(parent_record, Mapping):
174+
return {**parent_record}, {}
179175

180-
return parent_record, {}
176+
raise AirbyteTracedException(
177+
message=f"Parent stream returned records as invalid type {type(parent_record)}"
178+
)
181179

182180
def stream_slices(self) -> Iterable[StreamSlice]:
183181
"""
@@ -210,10 +208,10 @@ def stream_slices(self) -> Iterable[StreamSlice]:
210208

211209
# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
212210
# not support either substreams or RFR, but something that needs to be considered once we do
213-
for parent_record in parent_stream.read_only_records():
211+
for raw_parent_record in parent_stream.read_only_records():
214212
# Process the parent record
215213
parent_record, parent_partition = self.process_parent_record(
216-
parent_record, parent_stream.name
214+
raw_parent_record, parent_stream.name
217215
)
218216

219217
# Skip invalid or non-record data

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
from dataclasses import InitVar, dataclass, field
77
from functools import partial
88
from itertools import islice
9-
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union
9+
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union, MutableMapping
1010

1111
import dpath
1212
import requests
13+
from typing_extensions import deprecated
1314

1415
from airbyte_cdk.models import AirbyteMessage
1516
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
@@ -35,6 +36,7 @@
3536
from airbyte_cdk.sources.streams.core import StreamData
3637
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
3738
from airbyte_cdk.utils.mapping_helpers import combine_mappings
39+
from airbyte_cdk.sources.source import ExperimentalClassWarning
3840

3941
FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"
4042

@@ -625,25 +627,29 @@ def _fetch_next_page(
625627

626628

627629
class SafeResponse(requests.Response):
628-
def __getattr__(self, name):
630+
def __getattr__(self, name: str) -> Any:
629631
return getattr(requests.Response, name, None)
630632

631633
@property
632-
def content(self):
634+
def content(self) -> Optional[bytes]:
633635
return super().content
634636

635637
@content.setter
636-
def content(self, value):
638+
def content(self, value: Union[str, bytes]) -> None:
637639
self._content = value.encode() if isinstance(value, str) else value
638640

639641

642+
@deprecated(
643+
"This class is experimental. Use at your own risk.",
644+
category=ExperimentalClassWarning,
645+
)
640646
@dataclass
641647
class LazySimpleRetriever(SimpleRetriever):
642648
"""
643649
A retriever that supports lazy loading from parent streams.
644650
"""
645651

646-
partition_router: SubstreamPartitionRouter = field(init=True, repr=False, default=None)
652+
partition_router: SubstreamPartitionRouter = field(init=True, repr=False, default=None) # type: ignore[assignment] # 'partition_router' is required for LazySimpleRetriever and is validated in the constructor
647653
lazy_read_pointer: Optional[List[InterpolatedString]] = None
648654

649655
def _read_pages(
@@ -655,9 +661,9 @@ def _read_pages(
655661
parent_stream_config = self.partition_router.parent_stream_configs[-1]
656662
parent_stream = parent_stream_config.stream
657663

658-
for parent_record in parent_stream.read_only_records():
664+
for raw_parent_record in parent_stream.read_only_records():
659665
parent_record, parent_partition = self.partition_router.process_parent_record(
660-
parent_record, parent_stream.name
666+
raw_parent_record, parent_stream.name
661667
)
662668
if parent_record is None:
663669
continue
@@ -676,19 +682,19 @@ def _read_pages(
676682

677683
yield from []
678684

679-
def _extract_child_records(self, parent_record: Mapping) -> Mapping:
685+
def _extract_child_records(self, parent_record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
680686
"""Extract child records from a parent record based on lazy pointers."""
681687
if not self.lazy_read_pointer:
682688
return parent_record
683689

684690
path = [path.eval(self.config) for path in self.lazy_read_pointer]
685691
return (
686-
dpath.values(parent_record, path)
692+
dpath.values(parent_record, path) # type: ignore # return value will be a MutableMapping, given input data structure
687693
if "*" in path
688694
else dpath.get(parent_record, path, default=[])
689695
)
690696

691-
def _create_response(self, data: Mapping) -> SafeResponse:
697+
def _create_response(self, data: Mapping[str, Any]) -> SafeResponse:
692698
"""Create a SafeResponse with the given data."""
693699
response = SafeResponse()
694700
response.content = json.dumps(data).encode("utf-8")
@@ -701,7 +707,7 @@ def _yield_records_with_pagination(
701707
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
702708
stream_state: Mapping[str, Any],
703709
stream_slice: StreamSlice,
704-
parent_record: Record,
710+
parent_record: MutableMapping[str, Any],
705711
parent_stream_config: Any,
706712
) -> Iterable[Record]:
707713
"""Yield records, handling pagination if needed."""
@@ -729,7 +735,7 @@ def _paginate(
729735
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
730736
stream_state: Mapping[str, Any],
731737
stream_slice: StreamSlice,
732-
parent_record: Record,
738+
parent_record: MutableMapping[str, Any],
733739
parent_stream_config: Any,
734740
) -> Iterable[Record]:
735741
"""Handle pagination by fetching subsequent pages."""
@@ -742,7 +748,9 @@ def _paginate(
742748
cursor_slice=stream_slice.cursor_slice,
743749
)
744750

745-
while next_page_token:
751+
pagination_complete = False
752+
753+
while not pagination_complete:
746754
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
747755
last_page_size, last_record = 0, None
748756

@@ -751,9 +759,15 @@ def _paginate(
751759
last_record = record
752760
yield record
753761

754-
last_page_token_value = (
755-
next_page_token.get("next_page_token") if next_page_token else None
756-
)
757-
next_page_token = self._next_page_token(
758-
response, last_page_size, last_record, last_page_token_value
759-
)
762+
if not response:
763+
pagination_complete = True
764+
else:
765+
last_page_token_value = (
766+
next_page_token.get("next_page_token") if next_page_token else None
767+
)
768+
next_page_token = self._next_page_token(
769+
response, last_page_size, last_record, last_page_token_value
770+
)
771+
772+
if not next_page_token:
773+
pagination_complete = True

0 commit comments

Comments
 (0)