Skip to content

Commit ba87ec2

Browse files
test: update test to verify last slice is used for client-side incremental filtering
Updated test_given_partitioned_state_with_multiple_slices_when_should_be_synced to verify that the LAST slice's end value is used for filtering, not the first. This reflects the correct behavior for client-side incremental filtering. Co-Authored-By: unknown <>
1 parent 41f10f1 commit ba87ec2

File tree

1 file changed

+20
-7
lines changed

1 file changed

+20
-7
lines changed

unit_tests/sources/streams/concurrent/test_cursor.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,16 +1344,17 @@ def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value
13441344
)
13451345

13461346

1347-
def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter():
1347+
def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_last_slice_to_filter():
13481348
first_slice_end = 5
13491349
second_slice_start = first_slice_end + 10
1350+
second_slice_end = first_slice_end + 100
13501351
cursor = ConcurrentCursor(
13511352
_A_STREAM_NAME,
13521353
_A_STREAM_NAMESPACE,
13531354
{
13541355
"slices": [
13551356
{"end": first_slice_end, "start": 0},
1356-
{"end": first_slice_end + 100, "start": second_slice_start},
1357+
{"end": second_slice_end, "start": second_slice_start},
13571358
],
13581359
"state_type": "date-range",
13591360
},
@@ -1367,23 +1368,35 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then
13671368
_NO_LOOKBACK_WINDOW,
13681369
)
13691370

1371+
# Records before the last slice's end should NOT be synced (already processed)
13701372
assert (
13711373
cursor.should_be_synced(
1372-
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream")
1374+
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")
13731375
)
13741376
== False
13751377
)
13761378
assert (
13771379
cursor.should_be_synced(
1378-
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")
1380+
Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")
1381+
)
1382+
== False
1383+
)
1384+
assert (
1385+
cursor.should_be_synced(
1386+
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end - 1}, stream_name="test_stream")
1387+
)
1388+
== False
1389+
)
1390+
# Records at or after the last slice's end should be synced
1391+
assert (
1392+
cursor.should_be_synced(
1393+
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end}, stream_name="test_stream")
13791394
)
13801395
== True
13811396
)
1382-
# even if this is within a boundary that has been synced, we don't take any chance and we sync it
1383-
# anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API
13841397
assert (
13851398
cursor.should_be_synced(
1386-
Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")
1399+
Record(data={_A_CURSOR_FIELD_KEY: second_slice_end + 1}, stream_name="test_stream")
13871400
)
13881401
== True
13891402
)

0 commit comments

Comments
 (0)