Skip to content

Commit d7ebfd9

Browse files
lazebnyioctavia-squidington-iii
andauthored
fix(cdk): fix state emit for ConcurrentPerPartitionCursor (#539)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 5d9cfff commit d7ebfd9

File tree

3 files changed

+286
-4
lines changed

3 files changed

+286
-4
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ def _emit_state_message(self, throttle: bool = True) -> None:
242242
if current_time is None:
243243
return
244244
self._last_emission_time = current_time
245+
# Skip state emit for global cursor if parent state is empty
246+
if self._use_global_cursor and not self._parent_state:
247+
return
248+
245249
self._connector_state_manager.update_state_for_stream(
246250
self._stream_name,
247251
self._stream_namespace,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
14831483
)
14841484
)
14851485
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1486-
14871486
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
14881487
use_global_cursor = isinstance(
14891488
partition_router, GroupingPartitionRouter

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 282 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,33 @@
309309
"partition_router"
310310
]["parent_stream_configs"][0]["incremental_dependency"] = False
311311

312+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST)
313+
# Disable incremental_dependency
314+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comments_stream"][
315+
"retriever"
316+
]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False
317+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"][
318+
"retriever"
319+
]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False
320+
# Enable global_cursor
321+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY["definitions"]["cursor_incremental_sync"][
322+
"global_substream_cursor"
323+
] = True
324+
325+
312326
import orjson
313327
import requests_mock
314328

315329

316330
def run_mocked_test(
317-
mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state
331+
mock_requests,
332+
manifest,
333+
config,
334+
stream_name,
335+
initial_state,
336+
expected_records,
337+
expected_state,
338+
state_count=None,
318339
):
319340
"""
320341
Helper function to mock requests, run the test, and verify the results.
@@ -354,6 +375,8 @@ def run_mocked_test(
354375
expected_records, key=lambda x: x["id"]
355376
)
356377

378+
assert len(output.state_messages) == state_count if state_count else True
379+
357380
# Verify state
358381
final_state = output.state_messages[-1].state.stream.stream_state
359382
assert final_state.__dict__ == expected_state
@@ -431,7 +454,7 @@ def _run_read(
431454

432455

433456
@pytest.mark.parametrize(
434-
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state",
457+
"test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count",
435458
[
436459
(
437460
"test_incremental_parent_state",
@@ -739,11 +762,266 @@ def _run_read(
739762
"parent_state": {},
740763
"state": {"created_at": VOTE_100_CREATED_AT},
741764
},
765+
# State count
766+
2,
767+
),
768+
(
769+
"test_incremental_parent_state_with",
770+
SUBSTREAM_MANIFEST_WITH_GLOBAL_CURSOR_AND_NO_DEPENDENCY,
771+
[
772+
# Fetch the first page of posts
773+
(
774+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}",
775+
{
776+
"posts": [
777+
{"id": 1, "updated_at": POST_1_UPDATED_AT},
778+
{"id": 2, "updated_at": POST_2_UPDATED_AT},
779+
],
780+
"next_page": f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
781+
},
782+
),
783+
# Fetch the second page of posts
784+
(
785+
f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2",
786+
{"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]},
787+
),
788+
# Fetch the first page of comments for post 1
789+
(
790+
"https://api.example.com/community/posts/1/comments?per_page=100",
791+
{
792+
"comments": [
793+
{
794+
"id": 9,
795+
"post_id": 1,
796+
"updated_at": COMMENT_9_OLDEST, # No requests for comment 9, filtered out due to the date
797+
},
798+
{
799+
"id": 10,
800+
"post_id": 1,
801+
"updated_at": COMMENT_10_UPDATED_AT,
802+
},
803+
{
804+
"id": 11,
805+
"post_id": 1,
806+
"updated_at": COMMENT_11_UPDATED_AT,
807+
},
808+
],
809+
"next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2",
810+
},
811+
),
812+
# Fetch the second page of comments for post 1
813+
(
814+
"https://api.example.com/community/posts/1/comments?per_page=100&page=2",
815+
{
816+
"comments": [
817+
{
818+
"id": 12,
819+
"post_id": 1,
820+
"updated_at": COMMENT_12_UPDATED_AT,
821+
}
822+
]
823+
},
824+
),
825+
# Fetch the first page of votes for comment 10 of post 1
826+
(
827+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
828+
{
829+
"votes": [
830+
{
831+
"id": 100,
832+
"comment_id": 10,
833+
"created_at": VOTE_100_CREATED_AT,
834+
}
835+
],
836+
"next_page": f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
837+
},
838+
),
839+
# Fetch the second page of votes for comment 10 of post 1
840+
(
841+
f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}",
842+
{
843+
"votes": [
844+
{
845+
"id": 101,
846+
"comment_id": 10,
847+
"created_at": VOTE_101_CREATED_AT,
848+
}
849+
]
850+
},
851+
),
852+
# Fetch the first page of votes for comment 11 of post 1
853+
(
854+
f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}",
855+
{
856+
"votes": [
857+
{
858+
"id": 111,
859+
"comment_id": 11,
860+
"created_at": VOTE_111_CREATED_AT,
861+
}
862+
]
863+
},
864+
),
865+
# Fetch the first page of votes for comment 12 of post 1
866+
(
867+
f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}",
868+
{"votes": []},
869+
),
870+
# Fetch the first page of comments for post 2
871+
(
872+
"https://api.example.com/community/posts/2/comments?per_page=100",
873+
{
874+
"comments": [
875+
{
876+
"id": 20,
877+
"post_id": 2,
878+
"updated_at": COMMENT_20_UPDATED_AT,
879+
}
880+
],
881+
"next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2",
882+
},
883+
),
884+
# Fetch the second page of comments for post 2
885+
(
886+
"https://api.example.com/community/posts/2/comments?per_page=100&page=2",
887+
{
888+
"comments": [
889+
{
890+
"id": 21,
891+
"post_id": 2,
892+
"updated_at": COMMENT_21_UPDATED_AT,
893+
}
894+
]
895+
},
896+
),
897+
# Fetch the first page of votes for comment 20 of post 2
898+
(
899+
f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}",
900+
{
901+
"votes": [
902+
{
903+
"id": 200,
904+
"comment_id": 20,
905+
"created_at": VOTE_200_CREATED_AT,
906+
}
907+
]
908+
},
909+
),
910+
# Fetch the first page of votes for comment 21 of post 2
911+
(
912+
f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}",
913+
{
914+
"votes": [
915+
{
916+
"id": 210,
917+
"comment_id": 21,
918+
"created_at": VOTE_210_CREATED_AT,
919+
}
920+
]
921+
},
922+
),
923+
# Fetch the first page of comments for post 3
924+
(
925+
"https://api.example.com/community/posts/3/comments?per_page=100",
926+
{
927+
"comments": [
928+
{
929+
"id": 30,
930+
"post_id": 3,
931+
"updated_at": COMMENT_30_UPDATED_AT,
932+
}
933+
]
934+
},
935+
),
936+
# Fetch the first page of votes for comment 30 of post 3
937+
(
938+
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
939+
{
940+
"votes": [
941+
{
942+
"id": 300,
943+
"comment_id": 30,
944+
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
945+
}
946+
]
947+
},
948+
),
949+
],
950+
# Expected records
951+
[
952+
{
953+
"comment_id": 10,
954+
"comment_updated_at": COMMENT_10_UPDATED_AT,
955+
"created_at": VOTE_100_CREATED_AT,
956+
"id": 100,
957+
},
958+
{
959+
"comment_id": 10,
960+
"comment_updated_at": COMMENT_10_UPDATED_AT,
961+
"created_at": VOTE_101_CREATED_AT,
962+
"id": 101,
963+
},
964+
{
965+
"comment_id": 11,
966+
"comment_updated_at": COMMENT_11_UPDATED_AT,
967+
"created_at": VOTE_111_CREATED_AT,
968+
"id": 111,
969+
},
970+
{
971+
"comment_id": 20,
972+
"comment_updated_at": COMMENT_20_UPDATED_AT,
973+
"created_at": VOTE_200_CREATED_AT,
974+
"id": 200,
975+
},
976+
{
977+
"comment_id": 21,
978+
"comment_updated_at": COMMENT_21_UPDATED_AT,
979+
"created_at": VOTE_210_CREATED_AT,
980+
"id": 210,
981+
},
982+
{
983+
"comment_id": 30,
984+
"comment_updated_at": COMMENT_30_UPDATED_AT,
985+
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
986+
"id": 300,
987+
},
988+
],
989+
# Initial state
990+
{
991+
"parent_state": {},
992+
"use_global_cursor": True,
993+
"states": [
994+
{
995+
"partition": {
996+
"id": 10,
997+
"parent_slice": {"id": 1, "parent_slice": {}},
998+
},
999+
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
1000+
},
1001+
{
1002+
"partition": {
1003+
"id": 11,
1004+
"parent_slice": {"id": 1, "parent_slice": {}},
1005+
},
1006+
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
1007+
},
1008+
],
1009+
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
1010+
"lookback_window": 86400,
1011+
},
1012+
# Expected state
1013+
{
1014+
"use_global_cursor": True,
1015+
"lookback_window": 1,
1016+
"parent_state": {},
1017+
"state": {"created_at": VOTE_100_CREATED_AT},
1018+
},
1019+
1,
7421020
),
7431021
],
7441022
)
7451023
def test_incremental_parent_state_no_incremental_dependency(
746-
test_name, manifest, mock_requests, expected_records, initial_state, expected_state
1024+
test_name, manifest, mock_requests, expected_records, initial_state, expected_state, state_count
7471025
):
7481026
"""
7491027
This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams
@@ -765,6 +1043,7 @@ def test_incremental_parent_state_no_incremental_dependency(
7651043
initial_state,
7661044
expected_records,
7671045
expected_state,
1046+
state_count=state_count,
7681047
)
7691048

7701049

0 commit comments

Comments
 (0)