Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An
for stream_slice in state.get("slices", []):
stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY])
stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY])
if self.MOST_RECENT_RECORD_KEY in stream_slice:
stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message(
stream_slice[self.MOST_RECENT_RECORD_KEY]
)
return state

def serialize(
Expand Down
62 changes: 58 additions & 4 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy
from datetime import datetime, timedelta, timezone
from functools import partial
from typing import Any, Mapping, Optional
Expand Down Expand Up @@ -84,7 +84,7 @@ def _cursor_with_slice_boundary_fields(
return ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{},
deepcopy(_NO_STATE),
self._message_repository,
self._state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state),
Expand All @@ -99,7 +99,7 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
return ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{},
deepcopy(_NO_STATE),
self._message_repository,
self._state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
Expand Down Expand Up @@ -265,7 +265,7 @@ def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
_NO_STATE,
deepcopy(_NO_STATE),
self._message_repository,
self._state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=False),
Expand Down Expand Up @@ -950,6 +950,60 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic
}, # State message is updated to the legacy format before being emitted
)

@freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc))
def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_serialize_state_properly(
self,
) -> None:
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{
"state_type": ConcurrencyCompatibleStateType.date_range.value,
"slices": [
{
EpochValueConcurrentStreamStateConverter.START_KEY: 0,
EpochValueConcurrentStreamStateConverter.END_KEY: 20,
EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15,
},
],
},
self._message_repository,
self._state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=False),
CursorField(_A_CURSOR_FIELD_KEY),
_SLICE_BOUNDARY_FIELDS,
datetime.fromtimestamp(0, timezone.utc),
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

cursor.close_partition(
_partition(
StreamSlice(
partition={},
cursor_slice={
_LOWER_SLICE_BOUNDARY_FIELD: 20,
_UPPER_SLICE_BOUNDARY_FIELD: 50,
},
),
_stream_name=_A_STREAM_NAME,
)
)

expected_state = {
"state_type": ConcurrencyCompatibleStateType.date_range.value,
"slices": [
{
EpochValueConcurrentStreamStateConverter.START_KEY: 0,
EpochValueConcurrentStreamStateConverter.END_KEY: 50,
EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15,
},
],
}
self._state_manager.update_state_for_stream.assert_called_once_with(
_A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state
)


class ClampingIntegrationTest(TestCase):
def setUp(self) -> None:
Expand Down
Loading