diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index c92ffb150..357bc95e1 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -135,6 +135,8 @@ def read( catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: + if state: + self._connector_state_manager = ConnectorStateManager(state=state) concurrent_streams, _ = self._group_streams(config=config) # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of @@ -267,6 +269,7 @@ def _group_streams( component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, + stream_state=stream_state, config=config or {}, ) else: @@ -275,6 +278,7 @@ def _group_streams( component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, + stream_state=stream_state, config=config or {}, ) partition_generator = StreamSlicerPartitionGenerator( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..322fbbf28 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -983,7 +983,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( ) -> ConcurrentCursor: # Per-partition incremental streams can dynamically create child cursors which will pass their current # state via the stream_state keyword argument. Incremental syncs without parent streams use the - # incoming state and connector_state_manager that is initialized when the component factory is created + # incoming state and connector_state_manager that is initialized when the component factory is created. + # stream_state is also used in low code connector where the state is passed via the read() method. stream_state = ( self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs @@ -1209,6 +1210,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor( # Per-partition incremental streams can dynamically create child cursors which will pass their current # state via the stream_state keyword argument. Incremental syncs without parent streams use the # incoming state and connector_state_manager that is initialized when the component factory is created + # stream_state is also used in low code connector where the state is passed via the read() method. stream_state = ( self._connector_state_manager.get_stream_state(stream_name, stream_namespace) if "stream_state" not in kwargs diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4a043ac82..3a0ae682a 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -825,6 +825,80 @@ def test_create_concurrent_cursor(): assert incremental_counting_cursor._end_provider() == math.inf +@freezegun.freeze_time(_NOW) +def test_concurrent_cursor_with_state_in_read_method(): + """ + This test mimicks the behavior of a source in a real life low-code connector + where the source is instantiated without a state and the state is provided + via the read() method. + + Note: this test specifically checks that for DatetimeBasedCursor + """ + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="party_members", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-09-04"), + ), + ), + ] + + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="party_members", + json_schema={}, + supported_sync_modes=[SyncMode.incremental], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[] + ) + + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) + messages = list( + source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state) + ) + + concurrent_streams, _ = source._group_streams(config=_CONFIG) + party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0] + + assert party_members_stream is not None, "Could not find party_members stream" + party_members_cursor = party_members_stream.cursor + + assert isinstance(party_members_cursor, ConcurrentCursor) + assert party_members_cursor._stream_name == "party_members" + assert party_members_cursor._cursor_field.cursor_field_key == "updated_at" + + cursor_value = AirbyteDateTime.strptime("2024-09-04", "%Y-%m-%d") + + assert len(party_members_cursor._concurrent_state["slices"]) == 1 + assert ( + party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"] + == cursor_value + ) + + # There is only one record after 2024-09-05 + party_members_records = get_records_for_stream("party_members", messages) + assert len(party_members_records) == 1 + assert party_members_records[0].data["id"] == "yoshizawa" + + # Emitted state should have the updated_at of the one record read + states = get_states_for_stream("party_members", messages) + assert len(states) == 2 + assert states[1].stream.stream_state == AirbyteStateBlob( + updated_at=party_members_records[0].data["updated_at"] + ) + + def test_check(): """ Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams