Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def read(
catalog: ConfiguredAirbyteCatalog,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
if state:
self._connector_state_manager = ConnectorStateManager(state=state)
concurrent_streams, _ = self._group_streams(config=config)

# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
Expand Down Expand Up @@ -267,6 +269,7 @@ def _group_streams(
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
stream_state=stream_state,
config=config or {},
)
else:
Expand All @@ -275,6 +278,7 @@ def _group_streams(
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
stream_state=stream_state,
config=config or {},
)
partition_generator = StreamSlicerPartitionGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,8 @@ def create_concurrent_cursor_from_datetime_based_cursor(
) -> ConcurrentCursor:
# Per-partition incremental streams can dynamically create child cursors which will pass their current
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
# incoming state and connector_state_manager that is initialized when the component factory is created
# incoming state and connector_state_manager that is initialized when the component factory is created.
# stream_state is also used in low code connector where the state is passed via the read() method.
stream_state = (
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
if "stream_state" not in kwargs
Expand Down Expand Up @@ -1209,6 +1210,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
# Per-partition incremental streams can dynamically create child cursors which will pass their current
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
# incoming state and connector_state_manager that is initialized when the component factory is created
# stream_state is also used in low code connector where the state is passed via the read() method.
stream_state = (
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
if "stream_state" not in kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,80 @@ def test_create_concurrent_cursor():
assert incremental_counting_cursor._end_provider() == math.inf


@freezegun.freeze_time(_NOW)
def test_concurrent_cursor_with_state_in_read_method():
"""
This test mimicks the behavior of a source in a real life low-code connector
where the source is instantiated without a state and the state is provided
via the read() method.

Note: this test specifically checks that for DatetimeBasedCursor
"""
state = [
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="party_members", namespace=None),
stream_state=AirbyteStateBlob(updated_at="2024-09-04"),
),
),
]

catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="party_members",
json_schema={},
supported_sync_modes=[SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
),
]
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
)

with HttpMocker() as http_mocker:
_mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES)
messages = list(
source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=state)
)

concurrent_streams, _ = source._group_streams(config=_CONFIG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is private and only called once per source hence this does not reflect the normal usage of a source. Could you please comment on what this is trying to achieve?

The consequences of using this private method on the test seem important as the test read performed above is applied to another instance of party_members_stream and the read should not influence the state that is tested below. The reason the test passes right now is the read swap the instance of _connector_state_manager and the next time _group_stream is called, it uses the new state.

In order to properly validate that the input state is taken into account, we would have to call http_mocker._validate_all_matchers_called() to ensure that the HTTP requests that were performed considered the state as an input.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a way to obtain the streams that I saw in other tests in this same file. I'm happy to find another way to get the streams in order to make the assertions I was intending to do if the general approach of the fix included in this PR is okay.

party_members_stream = [s for s in concurrent_streams if s.name == "party_members"][0]

assert party_members_stream is not None, "Could not find party_members stream"
party_members_cursor = party_members_stream.cursor

assert isinstance(party_members_cursor, ConcurrentCursor)
assert party_members_cursor._stream_name == "party_members"
assert party_members_cursor._cursor_field.cursor_field_key == "updated_at"

cursor_value = AirbyteDateTime.strptime("2024-09-04", "%Y-%m-%d")

assert len(party_members_cursor._concurrent_state["slices"]) == 1
assert (
party_members_cursor._concurrent_state["slices"][0]["most_recent_cursor_value"]
== cursor_value
)

# There is only one record after 2024-09-05
party_members_records = get_records_for_stream("party_members", messages)
assert len(party_members_records) == 1
assert party_members_records[0].data["id"] == "yoshizawa"

# Emitted state should have the updated_at of the one record read
states = get_states_for_stream("party_members", messages)
assert len(states) == 2
assert states[1].stream.stream_state == AirbyteStateBlob(
updated_at=party_members_records[0].data["updated_at"]
)


def test_check():
"""
Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams
Expand Down
Loading