Skip to content

Commit 883b896

Browse files
sync records without cursor field in ConcurrentPerPartitionCursor
1 parent 37cf511 commit 883b896

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,14 @@ def observe(self, record: Record) -> None:
448448
"Invalid state as stream slices that are emitted should refer to an existing cursor"
449449
)
450450

451+
# if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
452+
try:
453+
record_cursor_value = self._cursor_field.extract_value(record)
454+
except ValueError:
455+
return
456+
451457
record_cursor = self._connector_state_converter.output_format(
452-
self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
458+
self._connector_state_converter.parse_value(record_cursor_value)
453459
)
454460
self._update_global_cursor(record_cursor)
455461
if not self._use_global_cursor:

0 commit comments

Comments
 (0)