Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import (
ConcurrentPerPartitionCursor,
GlobalSubstreamCursor,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
PerPartitionWithGlobalCursor,
Expand Down Expand Up @@ -361,7 +364,8 @@ def _group_streams(
== DatetimeBasedCursorModel.__name__
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
declarative_stream.retriever.stream_slicer,
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
)
):
stream_state = self._connector_state_manager.get_stream_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)

# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
use_global_cursor = isinstance(
partition_router, GroupingPartitionRouter
) or component_definition.get("global_substream_cursor", False)

# Return the concurrent cursor and state converter
return ConcurrentPerPartitionCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3449,3 +3449,48 @@ def test_semaphore_cleanup():
assert '{"id":"2"}' not in cursor._semaphore_per_partition
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state


def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
manifest = deepcopy(SUBSTREAM_MANIFEST)
manifest["definitions"]["post_comments_stream"]["incremental_sync"][
"global_substream_cursor"
] = True
manifest["streams"].remove({"$ref": "#/definitions/post_comment_votes_stream"})
record = {
"id": 9,
"post_id": 1,
"updated_at": COMMENT_10_UPDATED_AT,
}
mock_requests = [
(
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
{
"posts": [
{"id": 1, "updated_at": POST_1_UPDATED_AT},
],
},
),
# Fetch the first page of comments for post 1
(
"https://api.example.com/community/posts/1/comments?per_page=100",
{
"comments": [record],
},
),
]

run_mocked_test(
mock_requests,
manifest,
CONFIG,
"post_comments",
{},
[record],
{
"lookback_window": 1,
"parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}},
"state": {"updated_at": "2024-01-25T00:00:00Z"},
"use_global_cursor": True, # ensures that it is running the Concurrent CDK version as this is not populated in the declarative implementation
}, # this state does have per partition which would be under `states`
)
Loading