Skip to content

Commit 9b9f7bd

Browse files
authored
fix(IncrementingCountCursor): Raise a better error when users attempt to use IncrementingCountCursor with partition router (#784)
1 parent c67570b commit 9b9f7bd

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2208,6 +2208,14 @@ def _build_concurrent_cursor(
22082208
and stream_slicer
22092209
and not isinstance(stream_slicer, SinglePartitionRouter)
22102210
):
2211+
if isinstance(model.incremental_sync, IncrementingCountCursorModel):
2212+
# We don't currently support usage of partition routing and IncrementingCountCursor at the
2213+
# same time because we didn't solve for design questions like what the lookback window would
2214+
# be as well as global cursor fall backs. We have not seen customers that have needed both
2215+
# at the same time yet and are currently punting on this until we need to solve it.
2216+
raise ValueError(
2217+
f"The low-code framework does not currently support usage of a PartitionRouter and an IncrementingCountCursor at the same time. Please specify only one of these options for stream {stream_name}."
2218+
)
22112219
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22122220
state_manager=self._connector_state_manager,
22132221
model_type=DatetimeBasedCursorModel,

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3669,6 +3669,58 @@ def test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations
36693669
)
36703670

36713671

3672+
def test_incrementing_count_cursor_with_partition_router_raises_error():
3673+
content = """
3674+
type: DeclarativeStream
3675+
primary_key: "id"
3676+
name: test
3677+
schema_loader:
3678+
type: InlineSchemaLoader
3679+
schema:
3680+
$schema: "http://json-schema.org/draft-07/schema"
3681+
type: object
3682+
properties:
3683+
id:
3684+
type: string
3685+
incremental_sync:
3686+
type: "IncrementingCountCursor"
3687+
cursor_field: "mid"
3688+
start_value: "0"
3689+
retriever:
3690+
type: SimpleRetriever
3691+
name: test
3692+
requester:
3693+
type: HttpRequester
3694+
name: "test"
3695+
url_base: "https://api.test.com/v3/"
3696+
http_method: "GET"
3697+
authenticator:
3698+
type: NoAuth
3699+
record_selector:
3700+
type: RecordSelector
3701+
extractor:
3702+
type: DpathExtractor
3703+
field_path: []
3704+
partition_router:
3705+
type: ListPartitionRouter
3706+
cursor_field: arbitrary
3707+
values:
3708+
- item_1
3709+
- item_2
3710+
"""
3711+
3712+
factory = ModelToComponentFactory(
3713+
emit_connector_builder_messages=True, connector_state_manager=ConnectorStateManager()
3714+
)
3715+
3716+
with pytest.raises(ValueError):
3717+
factory.create_component(
3718+
model_type=DeclarativeStreamModel,
3719+
component_definition=YamlDeclarativeSource._parse(content),
3720+
config=input_config,
3721+
)
3722+
3723+
36723724
def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
36733725
"""
36743726
Validates a special case for when the start_time.datetime_format and end_time.datetime_format are defined, the date to

0 commit comments

Comments
 (0)