Skip to content

Commit 77d2b59

Browse files
committed
fix tests
1 parent 046c9dd commit 77d2b59

File tree

1 file changed

+118
-14
lines changed

1 file changed

+118
-14
lines changed

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 118 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3614,7 +3614,14 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update
36143614
slices = list(cursor.stream_slices()) # Call once
36153615
for slice in slices:
36163616
cursor.close_partition(
3617-
DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
3617+
DeclarativePartition(
3618+
stream_name="test_stream",
3619+
json_schema={},
3620+
retriever=MagicMock(),
3621+
message_repository=MagicMock(),
3622+
max_records_limit=None,
3623+
stream_slice=slice,
3624+
)
36183625
)
36193626

36203627
assert cursor.state == {
@@ -3692,7 +3699,14 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
36923699
# Close all partitions except from the first one
36933700
for slice in slices[1:]:
36943701
cursor.close_partition(
3695-
DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
3702+
DeclarativePartition(
3703+
stream_name="test_stream",
3704+
json_schema={},
3705+
retriever=MagicMock(),
3706+
message_repository=MagicMock(),
3707+
max_records_limit=None,
3708+
stream_slice=slice,
3709+
)
36963710
)
36973711
cursor.ensure_at_least_one_state_emitted()
36983712

@@ -3780,7 +3794,14 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
37803794
# Close all partitions except from the first one
37813795
for slice in slices[:-1]:
37823796
cursor.close_partition(
3783-
DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
3797+
DeclarativePartition(
3798+
stream_name="test_stream",
3799+
json_schema={},
3800+
retriever=MagicMock(),
3801+
message_repository=MagicMock(),
3802+
max_records_limit=None,
3803+
stream_slice=slice,
3804+
)
37843805
)
37853806
cursor.ensure_at_least_one_state_emitted()
37863807

@@ -3863,7 +3884,14 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi
38633884
slices = list(cursor.stream_slices())
38643885
for slice in slices:
38653886
cursor.close_partition(
3866-
DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
3887+
DeclarativePartition(
3888+
stream_name="test_stream",
3889+
json_schema={},
3890+
retriever=MagicMock(),
3891+
message_repository=MagicMock(),
3892+
max_records_limit=None,
3893+
stream_slice=slice,
3894+
)
38673895
)
38683896

38693897
cursor.ensure_at_least_one_state_emitted()
@@ -3930,7 +3958,14 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob
39303958
slices = list(cursor.stream_slices())
39313959
for slice in slices:
39323960
cursor.close_partition(
3933-
DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
3961+
DeclarativePartition(
3962+
stream_name="test_stream",
3963+
json_schema={},
3964+
retriever=MagicMock(),
3965+
message_repository=MagicMock(),
3966+
max_records_limit=None,
3967+
stream_slice=slice,
3968+
)
39343969
)
39353970
cursor.ensure_at_least_one_state_emitted()
39363971

@@ -4007,7 +4042,16 @@ def test_semaphore_cleanup():
40074042

40084043
# Close partitions to acquire semaphores (value back to 0)
40094044
for s in generated_slices:
4010-
cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s))
4045+
cursor.close_partition(
4046+
DeclarativePartition(
4047+
stream_name="test_stream",
4048+
json_schema={},
4049+
retriever=MagicMock(),
4050+
message_repository=MagicMock(),
4051+
max_records_limit=None,
4052+
stream_slice=s,
4053+
)
4054+
)
40114055

40124056
# Check state after closing partitions
40134057
assert len(cursor._partitions_done_generating_stream_slices) == 0
@@ -4119,15 +4163,38 @@ def test_duplicate_partition_after_closing_partition_cursor_deleted():
41194163

41204164
first_1 = next(slice_gen)
41214165
cursor.close_partition(
4122-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), first_1)
4166+
DeclarativePartition(
4167+
stream_name="dup_stream",
4168+
json_schema={},
4169+
retriever=MagicMock(),
4170+
message_repository=MagicMock(),
4171+
max_records_limit=None,
4172+
stream_slice=first_1,
4173+
)
41234174
)
41244175

41254176
two = next(slice_gen)
4126-
cursor.close_partition(DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), two))
4177+
cursor.close_partition(
4178+
DeclarativePartition(
4179+
stream_name="dup_stream",
4180+
json_schema={},
4181+
retriever=MagicMock(),
4182+
message_repository=MagicMock(),
4183+
max_records_limit=None,
4184+
stream_slice=two,
4185+
)
4186+
)
41274187

41284188
second_1 = next(slice_gen)
41294189
cursor.close_partition(
4130-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), second_1)
4190+
DeclarativePartition(
4191+
stream_name="dup_stream",
4192+
json_schema={},
4193+
retriever=MagicMock(),
4194+
message_repository=MagicMock(),
4195+
max_records_limit=None,
4196+
stream_slice=second_1,
4197+
)
41314198
)
41324199

41334200
assert cursor._IS_PARTITION_DUPLICATION_LOGGED is False # No duplicate detected
@@ -4181,16 +4248,39 @@ def test_duplicate_partition_after_closing_partition_cursor_exists():
41814248

41824249
first_1 = next(slice_gen)
41834250
cursor.close_partition(
4184-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), first_1)
4251+
DeclarativePartition(
4252+
stream_name="dup_stream",
4253+
json_schema={},
4254+
retriever=MagicMock(),
4255+
message_repository=MagicMock(),
4256+
max_records_limit=None,
4257+
stream_slice=first_1,
4258+
)
41854259
)
41864260

41874261
two = next(slice_gen)
4188-
cursor.close_partition(DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), two))
4262+
cursor.close_partition(
4263+
DeclarativePartition(
4264+
stream_name="dup_stream",
4265+
json_schema={},
4266+
retriever=MagicMock(),
4267+
message_repository=MagicMock(),
4268+
max_records_limit=None,
4269+
stream_slice=two,
4270+
)
4271+
)
41894272

41904273
# Second “1” should appear because the semaphore was cleaned up
41914274
second_1 = next(slice_gen)
41924275
cursor.close_partition(
4193-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), second_1)
4276+
DeclarativePartition(
4277+
stream_name="dup_stream",
4278+
json_schema={},
4279+
retriever=MagicMock(),
4280+
message_repository=MagicMock(),
4281+
max_records_limit=None,
4282+
stream_slice=second_1,
4283+
)
41944284
)
41954285

41964286
with pytest.raises(StopIteration):
@@ -4241,11 +4331,25 @@ def test_duplicate_partition_while_processing():
42414331

42424332
# Close “2” first
42434333
cursor.close_partition(
4244-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), generated[1])
4334+
DeclarativePartition(
4335+
stream_name="dup_stream",
4336+
json_schema={},
4337+
retriever=MagicMock(),
4338+
message_repository=MagicMock(),
4339+
max_records_limit=None,
4340+
stream_slice=generated[1],
4341+
)
42454342
)
42464343
# Now close the initial “1”
42474344
cursor.close_partition(
4248-
DeclarativePartition("dup_stream", {}, MagicMock(), MagicMock(), generated[0])
4345+
DeclarativePartition(
4346+
stream_name="dup_stream",
4347+
json_schema={},
4348+
retriever=MagicMock(),
4349+
message_repository=MagicMock(),
4350+
max_records_limit=None,
4351+
stream_slice=generated[0],
4352+
)
42494353
)
42504354

42514355
assert cursor._IS_PARTITION_DUPLICATION_LOGGED is True # warning emitted

0 commit comments

Comments
 (0)