11#
22# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33#
4-
4+ from copy import deepcopy
55from datetime import datetime , timedelta , timezone
66from functools import partial
77from typing import Any , Mapping , Optional
@@ -84,7 +84,7 @@ def _cursor_with_slice_boundary_fields(
8484 return ConcurrentCursor (
8585 _A_STREAM_NAME ,
8686 _A_STREAM_NAMESPACE ,
87- {} ,
87+ deepcopy ( _NO_STATE ) ,
8888 self ._message_repository ,
8989 self ._state_manager ,
9090 EpochValueConcurrentStreamStateConverter (is_sequential_state ),
@@ -99,7 +99,7 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
9999 return ConcurrentCursor (
100100 _A_STREAM_NAME ,
101101 _A_STREAM_NAMESPACE ,
102- {} ,
102+ deepcopy ( _NO_STATE ) ,
103103 self ._message_repository ,
104104 self ._state_manager ,
105105 EpochValueConcurrentStreamStateConverter (is_sequential_state = True ),
@@ -265,7 +265,7 @@ def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end
265265 cursor = ConcurrentCursor (
266266 _A_STREAM_NAME ,
267267 _A_STREAM_NAMESPACE ,
268- _NO_STATE ,
268+ deepcopy ( _NO_STATE ) ,
269269 self ._message_repository ,
270270 self ._state_manager ,
271271 EpochValueConcurrentStreamStateConverter (is_sequential_state = False ),
@@ -950,6 +950,57 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic
950950 }, # State message is updated to the legacy format before being emitted
951951 )
952952
953+ @freezegun .freeze_time (time_to_freeze = datetime .fromtimestamp (50 , timezone .utc ))
954+ def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_serialize_state_properly (
955+ self ,
956+ ) -> None :
957+ cursor = ConcurrentCursor (
958+ _A_STREAM_NAME ,
959+ _A_STREAM_NAMESPACE ,
960+ {
961+ "state_type" : ConcurrencyCompatibleStateType .date_range .value ,
962+ "slices" : [
963+ {
964+ EpochValueConcurrentStreamStateConverter .START_KEY : 0 ,
965+ EpochValueConcurrentStreamStateConverter .END_KEY : 20 ,
966+ EpochValueConcurrentStreamStateConverter .MOST_RECENT_RECORD_KEY : 15 ,
967+ },
968+ ],
969+ },
970+ self ._message_repository ,
971+ self ._state_manager ,
972+ EpochValueConcurrentStreamStateConverter (is_sequential_state = False ),
973+ CursorField (_A_CURSOR_FIELD_KEY ),
974+ _SLICE_BOUNDARY_FIELDS ,
975+ datetime .fromtimestamp (0 , timezone .utc ),
976+ EpochValueConcurrentStreamStateConverter .get_end_provider (),
977+ _NO_LOOKBACK_WINDOW ,
978+ )
979+
980+ cursor .close_partition (_partition (
981+ StreamSlice (
982+ partition = {},
983+ cursor_slice = {
984+ _LOWER_SLICE_BOUNDARY_FIELD : 20 ,
985+ _UPPER_SLICE_BOUNDARY_FIELD : 50 ,
986+ },
987+ ),
988+ _stream_name = _A_STREAM_NAME ,
989+ ))
990+
991+ expected_state = {
992+ "state_type" : ConcurrencyCompatibleStateType .date_range .value ,
993+ "slices" : [
994+ {
995+ EpochValueConcurrentStreamStateConverter .START_KEY : 0 ,
996+ EpochValueConcurrentStreamStateConverter .END_KEY : 50 ,
997+ EpochValueConcurrentStreamStateConverter .MOST_RECENT_RECORD_KEY : 15 ,
998+ },
999+ ],
1000+ }
1001+ self ._state_manager .update_state_for_stream .assert_called_once_with (_A_STREAM_NAME , _A_STREAM_NAMESPACE , expected_state )
1002+
1003+
9531004
9541005class ClampingIntegrationTest (TestCase ):
9551006 def setUp (self ) -> None :
0 commit comments