Skip to content

Commit 438a11d

Browse files
author
maxime.c
committed
migrate client side filtering to concurrent cursor
1 parent e4cbaaf commit 438a11d

File tree

10 files changed

+258
-65
lines changed

10 files changed

+258
-65
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,6 @@ def _get_retriever(
481481
if retriever.cursor:
482482
retriever.cursor.set_initial_state(stream_state=stream_state)
483483

484-
# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
485-
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
486-
# for semi-incremental streams using is_client_side_incremental to filter properly
487-
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
488-
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
489-
):
490-
retriever.record_selector.record_filter._cursor.set_initial_state(
491-
stream_state=stream_state
492-
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds
493-
494484
# We zero it out here, but since this is a cursor reference, the state is still properly
495485
# instantiated for the other components that reference it
496486
retriever.cursor = None

airbyte_cdk/sources/declarative/extractors/record_filter.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44
from dataclasses import InitVar, dataclass
55
from typing import Any, Iterable, Mapping, Optional, Union
66

7-
from airbyte_cdk.sources.declarative.incremental import (
8-
DatetimeBasedCursor,
9-
GlobalSubstreamCursor,
10-
PerPartitionWithGlobalCursor,
11-
)
7+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
128
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
139
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
1410

@@ -53,13 +49,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
5349
"""
5450
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.
5551
56-
:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
52+
:param Cursor cursor: Cursor used to filter out values
5753
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
5854
"""
5955

6056
def __init__(
6157
self,
62-
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
58+
cursor: Union[Cursor],
6359
**kwargs: Any,
6460
):
6561
super().__init__(**kwargs)
@@ -77,7 +73,7 @@ def filter_records(
7773
for record in records
7874
if self._cursor.should_be_synced(
7975
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
80-
# Record stream name is empty cause it is not used durig the filtering
76+
# Record stream name is empty because it is not used during the filtering
8177
Record(data=record, associated_slice=stream_slice, stream_name="")
8278
)
8379
)

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
connector_state_converter: AbstractStreamStateConverter,
8282
cursor_field: CursorField,
8383
use_global_cursor: bool = False,
84+
attempt_to_create_cursor_if_not_provided: bool = False
8485
) -> None:
8586
self._global_cursor: Optional[StreamState] = {}
8687
self._stream_name = stream_name
@@ -125,6 +126,9 @@ def __init__(
125126

126127
self._set_initial_state(stream_state)
127128

129+
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
130+
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
131+
128132
@property
129133
def cursor_field(self) -> CursorField:
130134
return self._cursor_field
@@ -513,12 +517,17 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
513517
"Invalid state as stream slices that are emitted should refer to an existing cursor"
514518
)
515519
partition_key = self._to_partition_key(record.associated_slice.partition)
516-
if partition_key not in self._cursor_per_partition:
520+
if partition_key not in self._cursor_per_partition and not self._attempt_to_create_cursor_if_not_provided:
517521
raise ValueError(
518522
"Invalid state as stream slices that are emitted should refer to an existing cursor"
519523
)
520-
cursor = self._cursor_per_partition[partition_key]
521-
return cursor
524+
elif partition_key not in self._cursor_per_partition:
525+
return self._create_cursor(
526+
self._global_cursor,
527+
self._lookback_window if self._global_cursor else 0,
528+
)
529+
else:
530+
return self._cursor_per_partition[partition_key]
522531

523532
def limit_reached(self) -> bool:
524533
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
)
3535
from airbyte_cdk.models import FailureType, Level
3636
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
37-
from airbyte_cdk.sources.declarative import transformations
3837
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
3938
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
4039
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
@@ -1561,6 +1560,8 @@ def create_concurrent_cursor_from_perpartition_cursor(
15611560
stream_state: MutableMapping[str, Any],
15621561
partition_router: PartitionRouter,
15631562
stream_state_migrations: Optional[List[Any]] = None,
1563+
attempt_to_create_cursor_if_not_provided: bool = False,
1564+
15641565
**kwargs: Any,
15651566
) -> ConcurrentPerPartitionCursor:
15661567
component_type = component_definition.get("type")
@@ -1631,6 +1632,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
16311632
connector_state_converter=connector_state_converter,
16321633
cursor_field=cursor_field,
16331634
use_global_cursor=use_global_cursor,
1635+
attempt_to_create_cursor_if_not_provided=attempt_to_create_cursor_if_not_provided,
16341636
)
16351637

16361638
@staticmethod
@@ -1937,23 +1939,10 @@ def create_declarative_stream(
19371939
and hasattr(model.incremental_sync, "is_client_side_incremental")
19381940
and model.incremental_sync.is_client_side_incremental
19391941
):
1940-
supported_slicers = (
1941-
DatetimeBasedCursor,
1942-
GlobalSubstreamCursor,
1943-
PerPartitionWithGlobalCursor,
1944-
)
1945-
if combined_slicers and not isinstance(combined_slicers, supported_slicers):
1946-
raise ValueError(
1947-
"Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead"
1948-
)
1949-
cursor = (
1950-
combined_slicers
1951-
if isinstance(
1952-
combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor)
1953-
)
1954-
else self._create_component_from_model(model=model.incremental_sync, config=config)
1942+
stream_slicer = self._build_stream_slicer_from_partition_router(
1943+
model.retriever, config, stream_name=model.name
19551944
)
1956-
1945+
cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19571946
client_side_incremental_sync = {"cursor": cursor}
19581947

19591948
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
@@ -2185,6 +2174,63 @@ def _build_incremental_cursor(
21852174
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
21862175
return None
21872176

2177+
def _build_concurrent_cursor(
2178+
self,
2179+
model: DeclarativeStreamModel,
2180+
stream_slicer: Optional[PartitionRouter],
2181+
config: Config,
2182+
) -> Optional[StreamSlicer]:
2183+
stream_state = self._connector_state_manager.get_stream_state(
2184+
stream_name=model.name, namespace=None
2185+
)
2186+
2187+
if model.incremental_sync and stream_slicer:
2188+
# FIXME should this be in create_concurrent_cursor_from_perpartition_cursor
2189+
if model.state_migrations:
2190+
state_transformations = [
2191+
self._create_component_from_model(state_migration, config, declarative_stream=model)
2192+
for state_migration in model.state_migrations
2193+
]
2194+
else:
2195+
state_transformations = []
2196+
2197+
return self.create_concurrent_cursor_from_perpartition_cursor(
2198+
# type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2199+
state_manager=self._connector_state_manager,
2200+
model_type=DatetimeBasedCursorModel,
2201+
component_definition=model.incremental_sync.__dict__,
2202+
stream_name=model.name,
2203+
stream_namespace=None,
2204+
config=config or {},
2205+
stream_state=stream_state,
2206+
stream_state_migrations=state_transformations,
2207+
partition_router=stream_slicer,
2208+
attempt_to_create_cursor_if_not_provided=True,
2209+
)
2210+
elif model.incremental_sync:
2211+
if type(model.incremental_sync) == IncrementingCountCursorModel:
2212+
return self.create_concurrent_cursor_from_incrementing_count_cursor(
2213+
model_type=IncrementingCountCursorModel,
2214+
component_definition=model.incremental_sync.__dict__,
2215+
stream_name=model.name or "",
2216+
stream_namespace=None,
2217+
config=config or {},
2218+
stream_state_migrations=model.state_migrations,
2219+
)
2220+
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
2221+
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2222+
model_type=type(model.incremental_sync),
2223+
component_definition=model.incremental_sync.__dict__,
2224+
stream_name=model.name or "",
2225+
stream_namespace=None,
2226+
config=config or {},
2227+
stream_state_migrations=model.state_migrations,
2228+
attempt_to_create_cursor_if_not_provided=True,
2229+
)
2230+
else:
2231+
raise ValueError(f"Incremental sync of type {type(model.incremental_sync)} is not supported")
2232+
return None
2233+
21882234
def _build_resumable_cursor(
21892235
self,
21902236
model: Union[

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929

3030
class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
31+
3132
CURSOR_FIELD = "_ab_source_file_last_modified"
3233
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = (
3334
DefaultFileBasedCursor.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
@@ -311,3 +312,6 @@ def set_initial_state(self, value: StreamState) -> None:
311312

312313
def ensure_at_least_one_state_emitted(self) -> None:
313314
self.emit_state_message()
315+
316+
def should_be_synced(self, record: Record) -> bool:
317+
return True

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,6 @@ def ensure_at_least_one_state_emitted(self) -> None:
8181
self._stream_name, self._stream_namespace
8282
)
8383
self._message_repository.emit_message(state_message)
84+
85+
def should_be_synced(self, record: Record) -> bool:
86+
return True

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
7474
"""
7575
raise NotImplementedError()
7676

77+
@abstractmethod
78+
def should_be_synced(self, record: Record) -> bool:
79+
pass
80+
7781
def stream_slices(self) -> Iterable[StreamSlice]:
7882
"""
7983
Default placeholder implementation of generate_slices.
@@ -123,6 +127,9 @@ def ensure_at_least_one_state_emitted(self) -> None:
123127
)
124128
self._message_repository.emit_message(state_message)
125129

130+
def should_be_synced(self, record: Record) -> bool:
131+
return True
132+
126133

127134
class ConcurrentCursor(Cursor):
128135
_START_BOUNDARY = 0
@@ -192,9 +199,17 @@ def _get_concurrent_state(
192199
self, state: MutableMapping[str, Any]
193200
) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
194201
if self._connector_state_converter.is_state_message_compatible(state):
202+
partitioned_state = self._connector_state_converter.deserialize(state)
203+
slices_from_partitioned_state = partitioned_state.get("slices", [])
204+
205+
value_from_partitioned_state = None
206+
if slices_from_partitioned_state:
207+
# We assume here that the slices have been already merged
208+
first_slice = slices_from_partitioned_state[0]
209+
value_from_partitioned_state = first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice else first_slice[self._connector_state_converter.END_KEY]
195210
return (
196-
self._start or self._connector_state_converter.zero_value,
197-
self._connector_state_converter.deserialize(state),
211+
value_from_partitioned_state or self._start or self._connector_state_converter.zero_value,
212+
partitioned_state,
198213
)
199214
return self._connector_state_converter.convert_from_sequential_state(
200215
self._cursor_field, state, self._start
@@ -471,7 +486,7 @@ def should_be_synced(self, record: Record) -> bool:
471486
except ValueError:
472487
self._log_for_record_without_cursor_value()
473488
return True
474-
return self.start <= record_cursor_value <= self._end_provider()
489+
return self.start <= record_cursor_value
475490

476491
def _log_for_record_without_cursor_value(self) -> None:
477492
if not self._should_be_synced_logger_triggered:

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1424,7 +1424,7 @@ def test_client_side_incremental_with_partition_router():
14241424
assert stream.retriever.record_selector.transform_before_filtering == True
14251425
assert isinstance(
14261426
stream.retriever.record_selector.record_filter._cursor,
1427-
PerPartitionWithGlobalCursor,
1427+
ConcurrentPerPartitionCursor,
14281428
)
14291429

14301430

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,9 +1891,7 @@ def test_stream_using_is_client_side_incremental_has_cursor_state():
18911891
simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever
18921892
record_filter = simple_retriever.record_selector.record_filter
18931893
assert isinstance(record_filter, ClientSideIncrementalRecordFilterDecorator)
1894-
client_side_incremental_cursor_state = record_filter._cursor._cursor
1895-
1896-
assert client_side_incremental_cursor_state == expected_cursor_value
1894+
assert list(record_filter._cursor.state.values()) == [expected_cursor_value]
18971895

18981896

18991897
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)