Skip to content

Commit daf7d48

Browse files
authored
feat(low-code cursors): Support for low-code cursors to define a field to allow for user defined cursor fields in the catalog (#851)
1 parent 80b7668 commit daf7d48

File tree

9 files changed

+335
-30
lines changed

9 files changed

+335
-30
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,10 @@ definitions:
850850
examples:
851851
- "created_at"
852852
- "{{ config['record_cursor'] }}"
853+
allow_catalog_defined_cursor_field:
854+
title: Allow Catalog Defined Cursor Field
855+
description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.
856+
type: boolean
853857
start_value:
854858
title: Start Value
855859
description: The value that determines the earliest record that should be synced.
@@ -913,6 +917,10 @@ definitions:
913917
examples:
914918
- "created_at"
915919
- "{{ config['record_cursor'] }}"
920+
allow_catalog_defined_cursor_field:
921+
title: Allow Catalog Defined Cursor Field
922+
description: Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.
923+
type: boolean
916924
cursor_datetime_formats:
917925
title: Cursor Datetime Formats
918926
type: array

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -1584,6 +1586,11 @@ class IncrementingCountCursor(BaseModel):
15841586
examples=["created_at", "{{ config['record_cursor'] }}"],
15851587
title="Cursor Field",
15861588
)
1589+
allow_catalog_defined_cursor_field: Optional[bool] = Field(
1590+
None,
1591+
description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.",
1592+
title="Allow Catalog Defined Cursor Field",
1593+
)
15871594
start_value: Optional[Union[str, int]] = Field(
15881595
None,
15891596
description="The value that determines the earliest record that should be synced.",
@@ -1611,6 +1618,11 @@ class DatetimeBasedCursor(BaseModel):
16111618
examples=["created_at", "{{ config['record_cursor'] }}"],
16121619
title="Cursor Field",
16131620
)
1621+
allow_catalog_defined_cursor_field: Optional[bool] = Field(
1622+
None,
1623+
description="Whether the cursor allows users to override the default cursor_field when configuring their connection. The user defined cursor field will be specified from within the configured catalog.",
1624+
title="Allow Catalog Defined Cursor Field",
1625+
)
16141626
cursor_datetime_formats: Optional[List[str]] = Field(
16151627
None,
16161628
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the Outgoing Datetime Format will be used.\nUse placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`, `001`, ..., `999`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (Sunday as first day) - `00`, `01`, ..., `53`\n * **%W**: Week number of the year (Monday as first day) - `00`, `01`, ..., `53`\n * **%c**: Date and time representation - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date representation - `08/16/1988`\n * **%X**: Time representation - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,11 +1326,23 @@ def create_concurrent_cursor_from_datetime_based_cursor(
13261326
)
13271327

13281328
model_parameters = datetime_based_cursor_model.parameters or {}
1329-
interpolated_cursor_field = InterpolatedString.create(
1330-
datetime_based_cursor_model.cursor_field,
1331-
parameters=model_parameters,
1329+
1330+
cursor_field = self._get_catalog_defined_cursor_field(
1331+
stream_name=stream_name,
1332+
allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
1333+
or False,
13321334
)
1333-
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
1335+
1336+
if not cursor_field:
1337+
interpolated_cursor_field = InterpolatedString.create(
1338+
datetime_based_cursor_model.cursor_field,
1339+
parameters=model_parameters,
1340+
)
1341+
cursor_field = CursorField(
1342+
cursor_field_key=interpolated_cursor_field.eval(config=config),
1343+
supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
1344+
or False,
1345+
)
13341346

13351347
interpolated_partition_field_start = InterpolatedString.create(
13361348
datetime_based_cursor_model.partition_field_start or "start_time",
@@ -1551,11 +1563,22 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15511563
else 0
15521564
)
15531565

1554-
interpolated_cursor_field = InterpolatedString.create(
1555-
incrementing_count_cursor_model.cursor_field,
1556-
parameters=incrementing_count_cursor_model.parameters or {},
1566+
cursor_field = self._get_catalog_defined_cursor_field(
1567+
stream_name=stream_name,
1568+
allow_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field
1569+
or False,
15571570
)
1558-
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
1571+
1572+
if not cursor_field:
1573+
interpolated_cursor_field = InterpolatedString.create(
1574+
incrementing_count_cursor_model.cursor_field,
1575+
parameters=incrementing_count_cursor_model.parameters or {},
1576+
)
1577+
cursor_field = CursorField(
1578+
cursor_field_key=interpolated_cursor_field.eval(config=config),
1579+
supports_catalog_defined_cursor_field=incrementing_count_cursor_model.allow_catalog_defined_cursor_field
1580+
or False,
1581+
)
15591582

15601583
connector_state_converter = IncrementingCountStreamStateConverter(
15611584
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
@@ -1625,15 +1648,26 @@ def create_concurrent_cursor_from_perpartition_cursor(
16251648
f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}"
16261649
)
16271650

1628-
interpolated_cursor_field = InterpolatedString.create(
1629-
datetime_based_cursor_model.cursor_field,
1630-
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
1631-
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
1632-
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
1633-
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
1634-
parameters=datetime_based_cursor_model.parameters or {},
1635-
)
1636-
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
1651+
cursor_field = self._get_catalog_defined_cursor_field(
1652+
stream_name=stream_name,
1653+
allow_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
1654+
or False,
1655+
)
1656+
1657+
if not cursor_field:
1658+
interpolated_cursor_field = InterpolatedString.create(
1659+
datetime_based_cursor_model.cursor_field,
1660+
# FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases:
1661+
# * The ComponentDefinition comes from model.__dict__ in which case we have `parameters`
1662+
# * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters`
1663+
# We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory.
1664+
parameters=datetime_based_cursor_model.parameters or {},
1665+
)
1666+
cursor_field = CursorField(
1667+
cursor_field_key=interpolated_cursor_field.eval(config=config),
1668+
supports_catalog_defined_cursor_field=datetime_based_cursor_model.allow_catalog_defined_cursor_field
1669+
or False,
1670+
)
16371671

16381672
datetime_format = datetime_based_cursor_model.datetime_format
16391673

@@ -2076,9 +2110,11 @@ def create_default_stream(
20762110
name=stream_name,
20772111
json_schema=schema_loader.get_json_schema,
20782112
primary_key=get_primary_key_from_stream(primary_key),
2079-
cursor_field=concurrent_cursor.cursor_field.cursor_field_key
2113+
cursor_field=concurrent_cursor.cursor_field
20802114
if hasattr(concurrent_cursor, "cursor_field")
2081-
else "", # FIXME we should have the cursor field has part of the interface of cursor,
2115+
else CursorField(
2116+
cursor_field_key=""
2117+
), # FIXME we should have the cursor field has part of the interface of cursor,
20822118
logger=logging.getLogger(f"airbyte.{stream_name}"),
20832119
cursor=concurrent_cursor,
20842120
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
@@ -4293,3 +4329,25 @@ def _ensure_query_properties_to_model(
42934329
request_parameters[request_parameter_key] = QueryPropertiesModel.parse_obj(
42944330
request_parameter
42954331
)
4332+
4333+
def _get_catalog_defined_cursor_field(
4334+
self, stream_name: str, allow_catalog_defined_cursor_field: bool
4335+
) -> Optional[CursorField]:
4336+
if not allow_catalog_defined_cursor_field:
4337+
return None
4338+
4339+
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
4340+
4341+
# Depending on the operation is being performed, there may not be a configured stream yet. In this
4342+
# case we return None which will then use the default cursor field defined on the cursor model
4343+
if not configured_stream or not configured_stream.cursor_field:
4344+
return None
4345+
elif len(configured_stream.cursor_field) > 1:
4346+
raise ValueError(
4347+
f"The `{stream_name}` stream does not support nested cursor_field. Please specify only a single cursor_field for the stream in the configured catalog."
4348+
)
4349+
else:
4350+
return CursorField(
4351+
cursor_field_key=configured_stream.cursor_field[0],
4352+
supports_catalog_defined_cursor_field=allow_catalog_defined_cursor_field,
4353+
)

airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from airbyte_cdk.sources.message import MessageRepository
3333
from airbyte_cdk.sources.source import ExperimentalClassWarning
3434
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
35+
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
3536
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
3637
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
3738
from airbyte_cdk.sources.streams.concurrent.helpers import (
@@ -97,7 +98,7 @@ def create_from_stream(
9798
name=stream.name,
9899
json_schema=stream.get_json_schema(),
99100
primary_key=pk,
100-
cursor_field=cursor_field,
101+
cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
101102
logger=logger,
102103
namespace=stream.namespace,
103104
cursor=cursor,

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from airbyte_cdk.sources.source import ExperimentalClassWarning
2626
from airbyte_cdk.sources.streams import Stream
2727
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
28-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
28+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor
2929
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
3030
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
3131
from airbyte_cdk.sources.streams.concurrent.helpers import (
@@ -97,7 +97,7 @@ def create_from_stream(
9797
namespace=stream.namespace,
9898
json_schema=stream.get_json_schema(),
9999
primary_key=pk,
100-
cursor_field=cursor_field,
100+
cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
101101
logger=logger,
102102
cursor=cursor,
103103
),

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ def _extract_value(mapping: Mapping[str, Any], path: List[str]) -> Any:
3838

3939

4040
class CursorField:
41-
def __init__(self, cursor_field_key: str) -> None:
41+
def __init__(
42+
self, cursor_field_key: str, supports_catalog_defined_cursor_field: bool = False
43+
) -> None:
4244
self.cursor_field_key = cursor_field_key
45+
self.supports_catalog_defined_cursor_field = supports_catalog_defined_cursor_field
4346

4447
def extract_value(self, record: Record) -> Any:
4548
cursor_value = record.data.get(self.cursor_field_key)

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from airbyte_cdk.models import AirbyteStream, SyncMode
99
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1010
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
11-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
11+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
1212
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1313
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
1414
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -21,7 +21,7 @@ def __init__(
2121
name: str,
2222
json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
2323
primary_key: List[str],
24-
cursor_field: Optional[str],
24+
cursor_field: Optional[CursorField],
2525
logger: Logger,
2626
cursor: Cursor,
2727
namespace: Optional[str] = None,
@@ -50,7 +50,7 @@ def namespace(self) -> Optional[str]:
5050

5151
@property
5252
def cursor_field(self) -> Optional[str]:
53-
return self._cursor_field
53+
return self._cursor_field.cursor_field_key if self._cursor_field else None
5454

5555
def get_json_schema(self) -> Mapping[str, Any]:
5656
return self._json_schema() if callable(self._json_schema) else self._json_schema
@@ -68,10 +68,12 @@ def as_airbyte_stream(self) -> AirbyteStream:
6868
stream.namespace = self._namespace
6969

7070
if self._cursor_field:
71-
stream.source_defined_cursor = True
71+
stream.source_defined_cursor = (
72+
not self._cursor_field.supports_catalog_defined_cursor_field
73+
)
7274
stream.is_resumable = True
7375
stream.supported_sync_modes.append(SyncMode.incremental)
74-
stream.default_cursor_field = [self._cursor_field]
76+
stream.default_cursor_field = [self._cursor_field.cursor_field_key]
7577

7678
keys = self._primary_key
7779
if keys and len(keys) > 0:

0 commit comments

Comments
 (0)