Skip to content

Commit 40ce8b1

Browse files
committed
Fix state emit for ConcurrentPerPartitionCursor
1 parent bcfcf04 commit 40ce8b1

File tree

3 files changed

+277
-4
lines changed

3 files changed

+277
-4
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ def _emit_state_message(self, throttle: bool = True) -> None:
242242
if current_time is None:
243243
return
244244
self._last_emission_time = current_time
245+
# Skip state emit for global cursor if parent state is empty
246+
if self._use_global_cursor and not self._parent_state:
247+
return
248+
245249
self._connector_state_manager.update_state_for_stream(
246250
self._stream_name,
247251
self._stream_namespace,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
14831483
)
14841484
)
14851485
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1486-
14871486
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
14881487
use_global_cursor = isinstance(
14891488
partition_router, GroupingPartitionRouter

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 273 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,24 @@
309309
"partition_router"
310310
]["parent_stream_configs"][0]["incremental_dependency"] = False
311311

312+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST)
313+
# Disable incremental_dependency
314+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comments_stream"]["retriever"][
315+
"partition_router"
316+
]["parent_stream_configs"][0]["incremental_dependency"] = False
317+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"]["retriever"][
318+
"partition_router"
319+
]["parent_stream_configs"][0]["incremental_dependency"] = False
320+
# Enable global_cursor
321+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["cursor_incremental_sync"]["global_substream_cursor"] = True
322+
323+
312324
import orjson
313325
import requests_mock
314326

315327

316328
def run_mocked_test(
317-
mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state
329+
mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state, state_count = None
318330
):
319331
"""
320332
Helper function to mock requests, run the test, and verify the results.
@@ -354,6 +366,8 @@ def run_mocked_test(
354366
expected_records, key=lambda x: x["id"]
355367
)
356368

369+
assert len(output.state_messages) == state_count if state_count else True
370+
357371
# Verify state
358372
final_state = output.state_messages[-1].state.stream.stream_state
359373
assert final_state.__dict__ == expected_state
@@ -431,7 +445,7 @@ def _run_read(
431445

432446

433447
@pytest.mark.parametrize(
434-
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state",
448+
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count",
435449
[
436450
(
437451
"test_incremental_parent_state",
@@ -739,11 +753,266 @@ def _run_read(
739753
"parent_state": {},
740754
"state": {"created_at": VOTE_100_CREATED_AT},
741755
},
756+
# State count
757+
2,
758+
),
759+
(
760+
"test_incremental_parent_state_with",
761+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY,
762+
[
763+
# Fetch the first page of posts
764+
(
765+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
766+
{
767+
"posts": [
768+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
769+
{"id": 2, "updated_at": POST_2_UPDATED_AT},
770+
],
771+
"next_page": f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
772+
},
773+
),
774+
# Fetch the second page of posts
775+
(
776+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
777+
{"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]},
778+
),
779+
# Fetch the first page of comments for post 1
780+
(
781+
"https://api.example.com/community/posts/1/comments?per_page=100",
782+
{
783+
"comments": [
784+
{
785+
"id": 9,
786+
"post_id": 1,
787+
"updated_at": COMMENT_9_OLDEST, # No requests for comment 9, filtered out due to the date
788+
},
789+
{
790+
"id": 10,
791+
"post_id": 1,
792+
"updated_at": COMMENT_10_UPDATED_AT,
793+
},
794+
{
795+
"id": 11,
796+
"post_id": 1,
797+
"updated_at": COMMENT_11_UPDATED_AT,
798+
},
799+
],
800+
"next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2",
801+
},
802+
),
803+
# Fetch the second page of comments for post 1
804+
(
805+
"https://api.example.com/community/posts/1/comments?per_page=100&page=2",
806+
{
807+
"comments": [
808+
{
809+
"id": 12,
810+
"post_id": 1,
811+
"updated_at": COMMENT_12_UPDATED_AT,
812+
}
813+
]
814+
},
815+
),
816+
# Fetch the first page of votes for comment 10 of post 1
817+
(
818+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
819+
{
820+
"votes": [
821+
{
822+
"id": 100,
823+
"comment_id": 10,
824+
"created_at": VOTE_100_CREATED_AT,
825+
}
826+
],
827+
"next_page": f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
828+
},
829+
),
830+
# Fetch the second page of votes for comment 10 of post 1
831+
(
832+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
833+
{
834+
"votes": [
835+
{
836+
"id": 101,
837+
"comment_id": 10,
838+
"created_at": VOTE_101_CREATED_AT,
839+
}
840+
]
841+
},
842+
),
843+
# Fetch the first page of votes for comment 11 of post 1
844+
(
845+
f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
846+
{
847+
"votes": [
848+
{
849+
"id": 111,
850+
"comment_id": 11,
851+
"created_at": VOTE_111_CREATED_AT,
852+
}
853+
]
854+
},
855+
),
856+
# Fetch the first page of votes for comment 12 of post 1
857+
(
858+
f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}",
859+
{"votes": []},
860+
),
861+
# Fetch the first page of comments for post 2
862+
(
863+
"https://api.example.com/community/posts/2/comments?per_page=100",
864+
{
865+
"comments": [
866+
{
867+
"id": 20,
868+
"post_id": 2,
869+
"updated_at": COMMENT_20_UPDATED_AT,
870+
}
871+
],
872+
"next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2",
873+
},
874+
),
875+
# Fetch the second page of comments for post 2
876+
(
877+
"https://api.example.com/community/posts/2/comments?per_page=100&page=2",
878+
{
879+
"comments": [
880+
{
881+
"id": 21,
882+
"post_id": 2,
883+
"updated_at": COMMENT_21_UPDATED_AT,
884+
}
885+
]
886+
},
887+
),
888+
# Fetch the first page of votes for comment 20 of post 2
889+
(
890+
f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}",
891+
{
892+
"votes": [
893+
{
894+
"id": 200,
895+
"comment_id": 20,
896+
"created_at": VOTE_200_CREATED_AT,
897+
}
898+
]
899+
},
900+
),
901+
# Fetch the first page of votes for comment 21 of post 2
902+
(
903+
f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}",
904+
{
905+
"votes": [
906+
{
907+
"id": 210,
908+
"comment_id": 21,
909+
"created_at": VOTE_210_CREATED_AT,
910+
}
911+
]
912+
},
913+
),
914+
# Fetch the first page of comments for post 3
915+
(
916+
"https://api.example.com/community/posts/3/comments?per_page=100",
917+
{
918+
"comments": [
919+
{
920+
"id": 30,
921+
"post_id": 3,
922+
"updated_at": COMMENT_30_UPDATED_AT,
923+
}
924+
]
925+
},
926+
),
927+
# Fetch the first page of votes for comment 30 of post 3
928+
(
929+
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
930+
{
931+
"votes": [
932+
{
933+
"id": 300,
934+
"comment_id": 30,
935+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
936+
}
937+
]
938+
},
939+
),
940+
],
941+
# Expected records
942+
[
943+
{
944+
"comment_id": 10,
945+
"comment_updated_at": COMMENT_10_UPDATED_AT,
946+
"created_at": VOTE_100_CREATED_AT,
947+
"id": 100,
948+
},
949+
{
950+
"comment_id": 10,
951+
"comment_updated_at": COMMENT_10_UPDATED_AT,
952+
"created_at": VOTE_101_CREATED_AT,
953+
"id": 101,
954+
},
955+
{
956+
"comment_id": 11,
957+
"comment_updated_at": COMMENT_11_UPDATED_AT,
958+
"created_at": VOTE_111_CREATED_AT,
959+
"id": 111,
960+
},
961+
{
962+
"comment_id": 20,
963+
"comment_updated_at": COMMENT_20_UPDATED_AT,
964+
"created_at": VOTE_200_CREATED_AT,
965+
"id": 200,
966+
},
967+
{
968+
"comment_id": 21,
969+
"comment_updated_at": COMMENT_21_UPDATED_AT,
970+
"created_at": VOTE_210_CREATED_AT,
971+
"id": 210,
972+
},
973+
{
974+
"comment_id": 30,
975+
"comment_updated_at": COMMENT_30_UPDATED_AT,
976+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
977+
"id": 300,
978+
},
979+
],
980+
# Initial state
981+
{
982+
"parent_state": {},
983+
"use_global_cursor": True,
984+
"states": [
985+
{
986+
"partition": {
987+
"id": 10,
988+
"parent_slice": {"id": 1, "parent_slice": {}},
989+
},
990+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
991+
},
992+
{
993+
"partition": {
994+
"id": 11,
995+
"parent_slice": {"id": 1, "parent_slice": {}},
996+
},
997+
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
998+
},
999+
],
1000+
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
1001+
"lookback_window": 86400,
1002+
},
1003+
# Expected state
1004+
{
1005+
"use_global_cursor": True,
1006+
"lookback_window": 1,
1007+
"parent_state": {},
1008+
"state": {"created_at": VOTE_100_CREATED_AT},
1009+
},
1010+
1,
7421011
),
7431012
],
7441013
)
7451014
def test_incremental_parent_state_no_incremental_dependency(
746-
test_name, manifest, mock_requests, expected_records, initial_state, expected_state
1015+
test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count
7471016
):
7481017
"""
7491018
This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams
@@ -765,6 +1034,7 @@ def test_incremental_parent_state_no_incremental_dependency(
7651034
initial_state,
7661035
expected_records,
7671036
expected_state,
1037+
state_count=state_count,
7681038
)
7691039

7701040

0 commit comments

Comments
 (0)