Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def __init__(
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
self._synced_some_data = False
self._logged_regarding_datetime_format_error = False

@property
def cursor_field(self) -> CursorField:
Expand Down Expand Up @@ -518,10 +519,23 @@ def observe(self, record: Record) -> None:
except ValueError:
return

try:
record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(record_cursor_value)
)
except ValueError as exception:
if not self._logged_regarding_datetime_format_error:
logger.warning(
"Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
self._stream_name,
self._cursor_field.cursor_field_key,
record_cursor_value,
exception,
)
self._logged_regarding_datetime_format_error = True
return

self._synced_some_data = True
record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(record_cursor_value)
)
self._update_global_cursor(record_cursor)
if not self._use_global_cursor:
self._cursor_per_partition[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ConcurrentDeclarativeSource,
)
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
DeclarativePartition,
Expand All @@ -29,7 +30,7 @@
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
)
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.sources.types import Record, StreamSlice
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read

Expand Down Expand Up @@ -4400,3 +4401,34 @@ def test_duplicate_partition_while_processing():
assert len(cursor._processing_partitions_indexes) == 0
assert len(cursor._partition_key_to_index) == 0
assert len(cursor._partitions_done_generating_stream_slices) == 0


def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync():
cursor_factory_mock = MagicMock()
cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")]
cursor = ConcurrentPerPartitionCursor(
cursor_factory=MagicMock(),
partition_router=ListPartitionRouter(
values=["1"], cursor_field="partition_id", config={}, parameters={}
),
stream_name="test_stream",
stream_namespace=None,
stream_state={},
message_repository=MagicMock(),
connector_state_manager=MagicMock(),
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
datetime_format="%Y-%m-%dT%H:%M:%SZ",
input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"],
is_sequential_state=True,
cursor_granularity=timedelta(0),
),
cursor_field=CursorField(cursor_field_key="updated_at"),
)

cursor.observe(
Record(
data={"updated_at": ""},
stream_name="test_stream",
associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}),
)
)
Loading