Skip to content

Commit eb635b1

Browse files
author
maxime.c
committed
fix linting but break test_per_partition_cursor.py for now
1 parent 4f46c3e commit eb635b1

File tree

11 files changed

+52
-342
lines changed

11 files changed

+52
-342
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
432432
if stream_state.get("parent_state"):
433433
self._parent_state = stream_state["parent_state"]
434434

435-
# Set parent state for partition routers based on parent streams
436-
self._partition_router.set_initial_state(stream_state)
437-
438435
def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
439436
"""
440437
Initializes the global cursor state from the provided stream state.
@@ -557,6 +554,9 @@ def limit_reached(self) -> bool:
557554
def get_parent_state(
558555
stream_state: Optional[StreamState], parent_stream_name: str
559556
) -> Optional[AirbyteStateMessage]:
557+
if not stream_state:
558+
return stream_state
559+
560560
if "parent_state" not in stream_state:
561561
logger.warning(
562562
f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
RecordSelector,
9999
ResponseToFileExtractor,
100100
)
101+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
101102
from airbyte_cdk.sources.declarative.extractors.record_filter import (
102103
ClientSideIncrementalRecordFilterDecorator,
103104
)
@@ -106,7 +107,6 @@
106107
ConcurrentPerPartitionCursor,
107108
CursorFactory,
108109
DatetimeBasedCursor,
109-
DeclarativeCursor,
110110
GlobalSubstreamCursor,
111111
PerPartitionWithGlobalCursor,
112112
)
@@ -512,7 +512,7 @@
512512
PerPartitionRequestOptionsProvider,
513513
)
514514
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
515-
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
515+
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
516516
from airbyte_cdk.sources.declarative.resolvers import (
517517
ComponentMappingDefinition,
518518
ConfigComponentsResolver,
@@ -529,6 +529,7 @@
529529
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
530530
ConnectorBuilderFileUploader,
531531
DefaultFileUploader,
532+
FileUploader,
532533
LocalFileSystemFileWriter,
533534
NoopFileWriter,
534535
)
@@ -553,6 +554,7 @@
553554
)
554555
from airbyte_cdk.sources.declarative.transformations import (
555556
AddFields,
557+
RecordTransformation,
556558
RemoveFields,
557559
)
558560
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
@@ -3255,7 +3257,7 @@ def create_simple_retriever(
32553257
log_formatter: Optional[Callable[[Response], Any]] = None,
32563258
**kwargs: Any,
32573259
) -> SimpleRetriever:
3258-
def _get_url() -> str:
3260+
def _get_url(req: Requester) -> str:
32593261
"""
32603262
Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever.
32613263
This is needed because the URL is not set until the requester is created.
@@ -3264,12 +3266,12 @@ def _get_url() -> str:
32643266
_url: str = (
32653267
model.requester.url
32663268
if hasattr(model.requester, "url") and model.requester.url is not None
3267-
else requester.get_url()
3269+
else req.get_url(stream_state=None, stream_slice=None, next_page_token=None)
32683270
)
32693271
_url_base: str = (
32703272
model.requester.url_base
32713273
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
3272-
else requester.get_url_base()
3274+
else req.get_url(stream_state=None, stream_slice=None, next_page_token=None)
32733275
)
32743276

32753277
return _url or _url_base
@@ -3355,7 +3357,7 @@ def _get_url() -> str:
33553357
self._create_component_from_model(
33563358
model=model.paginator,
33573359
config=config,
3358-
url_base=_get_url(),
3360+
url_base=_get_url(requester),
33593361
extractor_model=model.record_selector.extractor,
33603362
decoder=decoder,
33613363
cursor_used_for_stop_condition=stop_condition_cursor or None,
@@ -3538,12 +3540,14 @@ def create_async_retriever(
35383540
transformations: List[RecordTransformation],
35393541
**kwargs: Any,
35403542
) -> AsyncRetriever:
3541-
def _get_download_retriever() -> SimpleRetriever:
3543+
def _get_download_retriever(
3544+
requester: Requester, extractor: RecordExtractor, _decoder: Decoder
3545+
) -> SimpleRetriever:
35423546
# We create a record selector for the download retriever
35433547
# with no schema normalization and no transformations, neither record filter
35443548
# as all this occurs in the record_selector of the AsyncRetriever
35453549
record_selector = RecordSelector(
3546-
extractor=download_extractor,
3550+
extractor=extractor,
35473551
name=name,
35483552
record_filter=None,
35493553
transformations=[],
@@ -3554,7 +3558,7 @@ def _get_download_retriever() -> SimpleRetriever:
35543558
paginator = (
35553559
self._create_component_from_model(
35563560
model=model.download_paginator,
3557-
decoder=decoder,
3561+
decoder=_decoder,
35583562
config=config,
35593563
url_base="",
35603564
)
@@ -3563,7 +3567,7 @@ def _get_download_retriever() -> SimpleRetriever:
35633567
)
35643568

35653569
return SimpleRetriever(
3566-
requester=download_requester,
3570+
requester=requester,
35673571
record_selector=record_selector,
35683572
primary_key=None,
35693573
name=name,
@@ -3657,7 +3661,9 @@ def _get_job_timeout() -> datetime.timedelta:
36573661
config=config,
36583662
name=job_download_components_name,
36593663
)
3660-
download_retriever = _get_download_retriever()
3664+
download_retriever = _get_download_retriever(
3665+
download_requester, download_extractor, download_decoder
3666+
)
36613667
abort_requester = (
36623668
self._create_component_from_model(
36633669
model=model.abort_requester,
@@ -3840,7 +3846,9 @@ def _create_message_repository_substream_wrapper(
38403846
model=model, config=config, has_parent_state=has_parent_state, **kwargs
38413847
)
38423848

3843-
def _instantiate_parent_stream_state_manager(self, child_state, config, model):
3849+
def _instantiate_parent_stream_state_manager(
3850+
self, child_state: MutableMapping[str, Any], config: Config, model: ParentStreamConfigModel
3851+
):
38443852
"""
38453853
With DefaultStream, the state needs to be provided during __init__ of the cursor as opposed to the
38463854
`set_initial_state` flow that existed for the declarative cursors. This state is taken from

airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,6 @@ def stream_slices(self) -> Iterable[StreamSlice]:
159159
cursor_slice = {}
160160
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)
161161

162-
def set_initial_state(self, stream_state: StreamState) -> None:
163-
"""
164-
Parent stream states are not supported for cartesian product stream slicer
165-
"""
166-
pass
167-
168162
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
169163
"""
170164
Parent stream states are not supported for cartesian product stream slicer

airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,6 @@ def get_request_body_json(
140140
) -> Mapping[str, Any]:
141141
return {}
142142

143-
def set_initial_state(self, stream_state: StreamState) -> None:
144-
"""Delegate state initialization to the underlying partition router."""
145-
self.underlying_partition_router.set_initial_state(stream_state)
146-
self._state = self.underlying_partition_router.get_stream_state()
147-
148143
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
149144
"""Delegate state retrieval to the underlying partition router."""
150145
return self._state

airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,6 @@ def _get_request_option(
108108
else:
109109
return {}
110110

111-
def set_initial_state(self, stream_state: StreamState) -> None:
112-
"""
113-
ListPartitionRouter doesn't have parent streams
114-
"""
115-
pass
116-
117111
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
118112
"""
119113
ListPartitionRouter doesn't have parent streams

airbyte_cdk/sources/declarative/partition_routers/partition_router.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,9 @@ class PartitionRouter(StreamSlicer):
1515
"""
1616
Base class for partition routers.
1717
Methods:
18-
set_parent_state(stream_state): Set the state of the parent streams.
19-
get_parent_state(): Get the state of the parent streams.
18+
get_stream_state(): Get the state of the parent streams.
2019
"""
2120

22-
@abstractmethod
23-
def set_initial_state(self, stream_state: StreamState) -> None:
24-
"""
25-
Set the state of the parent streams.
26-
27-
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28-
incrementally using the state.
29-
30-
Args:
31-
stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
32-
'parent_state' which is a dictionary of parent state names to their corresponding state.
33-
Example:
34-
{
35-
"parent_state": {
36-
"parent_stream_name_1": { ... },
37-
"parent_stream_name_2": { ... },
38-
...
39-
}
40-
}
41-
"""
42-
4321
@abstractmethod
4422
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
4523
"""

airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ def get_request_body_json(
5050
def stream_slices(self) -> Iterable[StreamSlice]:
5151
yield StreamSlice(partition={}, cursor_slice={})
5252

53-
def set_initial_state(self, stream_state: StreamState) -> None:
54-
"""
55-
SinglePartitionRouter doesn't have parent streams
56-
"""
57-
pass
58-
5953
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
6054
"""
6155
SinglePartitionRouter doesn't have parent streams

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 4 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
3737

3838

39-
def iterate_with_last_flag(generator: Iterable[Partition]) -> Iterable[tuple[Partition, bool]]:
39+
T = TypeVar("T")
40+
41+
42+
def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
4043
iterator = iter(generator)
4144

4245
try:
@@ -307,60 +310,6 @@ def _extract_extra_fields(
307310
extracted_extra_fields[".".join(extra_field_path)] = extra_field_value
308311
return extracted_extra_fields
309312

310-
def set_initial_state(self, stream_state: StreamState) -> None:
311-
"""
312-
Set the state of the parent streams.
313-
314-
If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format.
315-
This migration applies only to parent streams with incremental dependencies.
316-
317-
Args:
318-
stream_state (StreamState): The state of the streams to be set.
319-
320-
Example of state format:
321-
{
322-
"parent_state": {
323-
"parent_stream_name1": {
324-
"last_updated": "2023-05-27T00:00:00Z"
325-
},
326-
"parent_stream_name2": {
327-
"last_updated": "2023-05-27T00:00:00Z"
328-
}
329-
}
330-
}
331-
332-
Example of migrating to parent state format:
333-
- Initial state:
334-
{
335-
"updated_at": "2023-05-27T00:00:00Z"
336-
}
337-
- After migration:
338-
{
339-
"updated_at": "2023-05-27T00:00:00Z",
340-
"parent_state": {
341-
"parent_stream_name": {
342-
"parent_stream_cursor": "2023-05-27T00:00:00Z"
343-
}
344-
}
345-
}
346-
"""
347-
if not stream_state:
348-
return
349-
350-
parent_state = stream_state.get("parent_state", {})
351-
352-
# Set state for each parent stream with an incremental dependency
353-
for parent_config in self.parent_stream_configs:
354-
if (
355-
not parent_state.get(parent_config.stream.name, {})
356-
and parent_config.incremental_dependency
357-
):
358-
# Migrate child state to parent state format
359-
parent_state = self._migrate_child_state_to_parent_state(stream_state)
360-
361-
if parent_config.incremental_dependency:
362-
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
363-
364313
def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
365314
"""
366315
Migrate the child or global stream state into the parent stream's state format.

airbyte_cdk/sources/declarative/requesters/request_options/per_partition_request_option_provider.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ def get_request_params(
1919
) -> Mapping[str, Any]:
2020
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
2121
stream_state=stream_state,
22-
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
22+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={})
23+
if stream_slice
24+
else StreamSlice(partition={}, cursor_slice={}),
2325
next_page_token=next_page_token,
2426
) | self._cursor_provider.get_request_params(
2527
stream_state=stream_state,
26-
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
28+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
29+
if stream_slice
30+
else StreamSlice(partition={}, cursor_slice={}),
2731
next_page_token=next_page_token,
2832
)
2933

@@ -36,11 +40,15 @@ def get_request_headers(
3640
) -> Mapping[str, Any]:
3741
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
3842
stream_state=stream_state,
39-
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
43+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={})
44+
if stream_slice
45+
else stream_slice,
4046
next_page_token=next_page_token,
4147
) | self._cursor_provider.get_request_headers(
4248
stream_state=stream_state,
43-
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
49+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
50+
if stream_slice
51+
else stream_slice,
4452
next_page_token=next_page_token,
4553
)
4654

@@ -53,11 +61,15 @@ def get_request_body_data(
5361
) -> Union[Mapping[str, Any], str]:
5462
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
5563
stream_state=stream_state,
56-
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
64+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={})
65+
if stream_slice
66+
else stream_slice,
5767
next_page_token=next_page_token,
5868
) | self._cursor_provider.get_request_body_data(
5969
stream_state=stream_state,
60-
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
70+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
71+
if stream_slice
72+
else stream_slice,
6173
next_page_token=next_page_token,
6274
)
6375

@@ -70,10 +82,14 @@ def get_request_body_json(
7082
) -> Mapping[str, Any]:
7183
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
7284
stream_state=stream_state,
73-
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
85+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={})
86+
if stream_slice
87+
else stream_slice,
7488
next_page_token=next_page_token,
7589
) | self._cursor_provider.get_request_body_json(
7690
stream_state=stream_state,
77-
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
91+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice)
92+
if stream_slice
93+
else stream_slice,
7894
next_page_token=next_page_token,
7995
)

0 commit comments

Comments
 (0)