Skip to content

Commit e808271

Browse files
maxi297octavia-squidington-iii
andauthored
fix(concurrent cursor): Ensure than when start and state are provided, sequential state value… (#52)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent ab7ab68 commit e808271

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ def convert_from_sequential_state(
8282
# The start and end are the same to avoid confusion as to whether the records for this slice
8383
# were actually synced
8484
slices = [
85-
{self.START_KEY: start if start is not None else sync_start, self.END_KEY: sync_start}
85+
{
86+
self.START_KEY: start if start is not None else sync_start,
87+
self.END_KEY: sync_start,
88+
self.MOST_RECENT_RECORD_KEY: sync_start,
89+
}
8690
]
8791

8892
return sync_start, {

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3017,6 +3017,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30173017
{
30183018
"start": pendulum.parse(config["start_time"]),
30193019
"end": pendulum.parse(stream_state["updated_at"]),
3020+
"most_recent_cursor_value": pendulum.parse(stream_state["updated_at"]),
30203021
},
30213022
],
30223023
"state_type": "date-range",
@@ -3028,6 +3029,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30283029
{
30293030
"start": pendulum.parse(config["start_time"]),
30303031
"end": pendulum.parse(config["start_time"]),
3032+
"most_recent_cursor_value": pendulum.parse(config["start_time"]),
30313033
},
30323034
],
30333035
"state_type": "date-range",
@@ -3261,6 +3263,7 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
32613263
{
32623264
"start": expected_start,
32633265
"end": expected_start,
3266+
"most_recent_cursor_value": expected_start,
32643267
},
32653268
],
32663269
"state_type": "date-range",

unit_tests/sources/streams/concurrent/test_cursor.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ def setUp(self) -> None:
6464
self._message_repository = Mock(spec=MessageRepository)
6565
self._state_manager = Mock(spec=ConnectorStateManager)
6666

67-
def _cursor_with_slice_boundary_fields(self, is_sequential_state=True) -> ConcurrentCursor:
67+
def _cursor_with_slice_boundary_fields(
68+
self, is_sequential_state: bool = True
69+
) -> ConcurrentCursor:
6870
return ConcurrentCursor(
6971
_A_STREAM_NAME,
7072
_A_STREAM_NAMESPACE,
@@ -128,7 +130,10 @@ def test_given_state_not_sequential_when_close_partition_then_emit_state(self) -
128130
_A_STREAM_NAME,
129131
_A_STREAM_NAMESPACE,
130132
{
131-
"slices": [{"end": 0, "start": 0}, {"end": 30, "start": 12}],
133+
"slices": [
134+
{"end": 0, "most_recent_cursor_value": 0, "start": 0},
135+
{"end": 30, "start": 12},
136+
],
132137
"state_type": "date-range",
133138
},
134139
)
@@ -720,6 +725,39 @@ def test_given_overflowing_slice_gap_when_generate_slices_then_cap_upper_bound_t
720725
(datetime.fromtimestamp(0, timezone.utc), datetime.fromtimestamp(10, timezone.utc))
721726
]
722727

728+
@freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc))
729+
def test_given_initial_state_is_sequential_and_start_provided_when_generate_slices_then_state_emitted_is_initial_state(
730+
self,
731+
) -> None:
732+
cursor = ConcurrentCursor(
733+
_A_STREAM_NAME,
734+
_A_STREAM_NAMESPACE,
735+
{_A_CURSOR_FIELD_KEY: 10},
736+
self._message_repository,
737+
self._state_manager,
738+
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
739+
CursorField(_A_CURSOR_FIELD_KEY),
740+
_SLICE_BOUNDARY_FIELDS,
741+
datetime.fromtimestamp(0, timezone.utc),
742+
EpochValueConcurrentStreamStateConverter.get_end_provider(),
743+
_NO_LOOKBACK_WINDOW,
744+
)
745+
746+
# simulate the case where at least the first slice fails but others succeed
747+
cursor.close_partition(
748+
_partition(
749+
{_LOWER_SLICE_BOUNDARY_FIELD: 40, _UPPER_SLICE_BOUNDARY_FIELD: 50},
750+
)
751+
)
752+
753+
self._state_manager.update_state_for_stream.assert_called_once_with(
754+
_A_STREAM_NAME,
755+
_A_STREAM_NAMESPACE,
756+
{
757+
_A_CURSOR_FIELD_KEY: 10
758+
}, # State message is updated to the legacy format before being emitted
759+
)
760+
723761

724762
@freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc))
725763
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)