Skip to content

Commit 4802856

Browse files
Fix state handling when passed in read() method
Low-code connectors instantiate the source without a state and pass the state via the read() method. This commit updates the ConcurrentDeclarativeSource to support this behavior. Changes are: - Instantiate the ConnectorStateManager with the state passed in read() method. - Update the ModelToComponentFactory to use the stream_state parameter when creating a cursor. - Add a test to verify the behavior: instantiate the source without a state and pass the state via the read() method.
1 parent e8ec233 commit 4802856

File tree

3 files changed

+73
-1
lines changed

3 files changed

+73
-1
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ def read(
135135
catalog: ConfiguredAirbyteCatalog,
136136
state: Optional[List[AirbyteStateMessage]] = None,
137137
) -> Iterator[AirbyteMessage]:
138+
self._connector_state_manager = ConnectorStateManager(state=state)
138139
concurrent_streams, _ = self._group_streams(config=config)
139140

140141
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
@@ -267,6 +268,7 @@ def _group_streams(
267268
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
268269
stream_name=declarative_stream.name,
269270
stream_namespace=declarative_stream.namespace,
271+
stream_state=stream_state,
270272
config=config or {},
271273
)
272274
else:
@@ -275,6 +277,7 @@ def _group_streams(
275277
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
276278
stream_name=declarative_stream.name,
277279
stream_namespace=declarative_stream.namespace,
280+
stream_state=stream_state,
278281
config=config or {},
279282
)
280283
partition_generator = StreamSlicerPartitionGenerator(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,8 @@ def create_concurrent_cursor_from_datetime_based_cursor(
983983
) -> ConcurrentCursor:
984984
# Per-partition incremental streams can dynamically create child cursors which will pass their current
985985
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
986-
# incoming state and connector_state_manager that is initialized when the component factory is created
986+
# incoming state and connector_state_manager that is initialized when the component factory is created.
987+
# stream_state is also used in low code connector where the state is passed via the read() method.
987988
stream_state = (
988989
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
989990
if "stream_state" not in kwargs
@@ -1209,6 +1210,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
12091210
# Per-partition incremental streams can dynamically create child cursors which will pass their current
12101211
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
12111212
# incoming state and connector_state_manager that is initialized when the component factory is created
1213+
# stream_state is also used in low code connector where the state is passed via the read() method.
12121214
stream_state = (
12131215
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
12141216
if "stream_state" not in kwargs

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,73 @@ def test_create_concurrent_cursor():
825825
assert incremental_counting_cursor._end_provider() == math.inf
826826

827827

828+
@freezegun.freeze_time(_NOW)
829+
def test_concurrent_cursor_with_state_in_read_method():
830+
"""
831+
This test mimicks the behavior of a source in a real life low-code connector
832+
where the source is instantiated without a state and the state is provided
833+
via the read() method.
834+
835+
Note: this test specifically checks that for DatetimeBasedCursor
836+
"""
837+
state = [
838+
AirbyteStateMessage(
839+
type=AirbyteStateType.STREAM,
840+
stream=AirbyteStreamState(
841+
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
842+
stream_state=AirbyteStateBlob(updated_at="2024-09-05"),
843+
),
844+
),
845+
]
846+
847+
catalog = ConfiguredAirbyteCatalog(
848+
streams=[
849+
ConfiguredAirbyteStream(
850+
stream=AirbyteStream(
851+
name="party_members", json_schema={}, supported_sync_modes=[SyncMode.incremental]
852+
),
853+
sync_mode=SyncMode.incremental,
854+
destination_sync_mode=DestinationSyncMode.append,
855+
),
856+
]
857+
)
858+
859+
860+
def get_source():
861+
return ConcurrentDeclarativeSource(
862+
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
863+
)
864+
865+
866+
source = get_source()
867+
with HttpMocker() as http_mocker:
868+
_mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES)
869+
messages_iterator = source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
870+
871+
# Get the first message only to get the stream running so that we can check the cursor
872+
first_message = next(messages_iterator)
873+
874+
concurrent_streams, _ = source._group_streams(config=_CONFIG)
875+
party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0]
876+
877+
assert party_members_stream is not None, "Could not find party_members stream"
878+
party_members_cursor = party_members_stream.cursor
879+
880+
assert isinstance(party_members_cursor, ConcurrentCursor)
881+
assert party_members_cursor._stream_name == "party_members"
882+
assert party_members_cursor._cursor_field.cursor_field_key == "updated_at"
883+
884+
cursor_value = AirbyteDateTime.strptime("2024-09-05", "%Y-%m-%d")
885+
886+
assert len(party_members_cursor._concurrent_state["slices"]) == 1
887+
assert party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"] == cursor_value
888+
889+
messages = list(messages_iterator)
890+
party_members_records = get_records_for_stream("party_members", messages)
891+
# There is only one record after 2024-09-05
892+
assert len(party_members_records) == 1
893+
assert party_members_records[0].data["id"] == "yoshizawa"
894+
828895
def test_check():
829896
"""
830897
Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams

0 commit comments

Comments
 (0)