diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 75669d5e3..30e2c7eac 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -189,6 +189,7 @@ def __init__( # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided self._synced_some_data = False + self._logged_regarding_datetime_format_error = False @property def cursor_field(self) -> CursorField: @@ -518,10 +519,23 @@ def observe(self, record: Record) -> None: except ValueError: return + try: + record_cursor = self._connector_state_converter.output_format( + self._connector_state_converter.parse_value(record_cursor_value) + ) + except ValueError as exception: + if not self._logged_regarding_datetime_format_error: + logger.warning( + "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", + self._stream_name, + self._cursor_field.cursor_field_key, + record_cursor_value, + exception, + ) + self._logged_regarding_datetime_format_error = True + return + self._synced_some_data = True - record_cursor = self._connector_state_converter.output_format( - self._connector_state_converter.parse_value(record_cursor_value) - ) self._update_global_cursor(record_cursor) if not self._use_global_cursor: self._cursor_per_partition[ diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 2602c52fc..28b9b8460 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -20,6 +20,7 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor +from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartition, @@ -29,7 +30,7 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, ) -from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -4400,3 +4401,34 @@ def test_duplicate_partition_while_processing(): assert len(cursor._processing_partitions_indexes) == 0 assert len(cursor._partition_key_to_index) == 0 assert len(cursor._partitions_done_generating_stream_slices) == 0 + + +def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync(): + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")] + cursor = ConcurrentPerPartitionCursor( + cursor_factory=MagicMock(), + partition_router=ListPartitionRouter( + values=["1"], cursor_field="partition_id", config={}, parameters={} + ), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ), + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + + cursor.observe( + Record( + data={"updated_at": ""}, + stream_name="test_stream", + associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}), + ) + )