Skip to content

Commit 47af0fa

Browse files
author
maxi297
committed
Running global cursors in concurrent CDK
1 parent 271a66d commit 47af0fa

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from airbyte_cdk.sources.declarative.extractors.record_filter import (
2020
ClientSideIncrementalRecordFilterDecorator,
2121
)
22-
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
22+
from airbyte_cdk.sources.declarative.incremental import (
23+
ConcurrentPerPartitionCursor,
24+
GlobalSubstreamCursor,
25+
)
2326
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2427
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
2528
PerPartitionWithGlobalCursor,
@@ -361,7 +364,7 @@ def _group_streams(
361364
== DatetimeBasedCursorModel.__name__
362365
and hasattr(declarative_stream.retriever, "stream_slicer")
363366
and isinstance(
364-
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
367+
declarative_stream.retriever.stream_slicer, (GlobalSubstreamCursor, PerPartitionWithGlobalCursor)
365368
)
366369
):
367370
stream_state = self._connector_state_manager.get_stream_state(

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3456,6 +3456,7 @@ def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
34563456
manifest["definitions"]["post_comments_stream"]["incremental_sync"][
34573457
"global_substream_cursor"
34583458
] = True
3459+
manifest["streams"].remove({"$ref": "#/definitions/post_comment_votes_stream"})
34593460
record = {
34603461
"id": 9,
34613462
"post_id": 1,
@@ -3478,6 +3479,7 @@ def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
34783479
},
34793480
),
34803481
]
3482+
34813483
run_mocked_test(
34823484
mock_requests,
34833485
manifest,
@@ -3489,5 +3491,6 @@ def test_given_global_state_when_read_then_state_is_not_per_partition() -> None:
34893491
"lookback_window": 1,
34903492
"parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}},
34913493
"state": {"updated_at": "2024-01-25T00:00:00Z"},
3494+
"use_global_cursor": True # ensures that it is running the Concurrent CDK version as this is not populated in the declarative implementation
34923495
}, # this state does have per partition which would be under `states`
34933496
)

0 commit comments

Comments
 (0)