diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 777ee1ccf..649e66b44 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1278,6 +1278,12 @@ def create_concurrent_cursor_from_datetime_based_cursor( f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" ) + # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: + # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` + # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` + # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. + if "$parameters" not in component_definition and "parameters" in component_definition: + component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): @@ -1582,6 +1588,12 @@ def create_concurrent_cursor_from_perpartition_cursor( f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" ) + # FIXME the interfaces of the concurrent cursor are kind of annoying as they take a `ComponentDefinition` instead of the actual model. This was done because the ConcurrentDeclarativeSource didn't have access to the models [here for example](https://github.com/airbytehq/airbyte-python-cdk/blob/f525803b3fec9329e4cc8478996a92bf884bfde9/airbyte_cdk/sources/declarative/concurrent_declarative_source.py#L354C54-L354C91). So now we have two cases: + # * The ComponentDefinition comes from model.__dict__ in which case we have `parameters` + # * The ComponentDefinition comes from the manifest as a dict in which case we have `$parameters` + # We should change those interfaces to use the model once we clean up the code in CDS at which point the parameter propagation should happen as part of the ModelToComponentFactory. + if "$parameters" not in component_definition and "parameters" in component_definition: + component_definition["$parameters"] = component_definition.get("parameters") # type: ignore # This is a dict datetime_based_cursor_model = model_type.parse_obj(component_definition) if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 9f96ee50f..14d52f832 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4649,3 +4649,93 @@ def test_given_invalid_config_streams_validates_config_and_raises(): with pytest.raises(ValueError): source.streams(input_config) + + +def test_parameter_propagation_for_concurrent_cursor(): + cursor_field_parameter_override = "created_at" + manifest = { + "version": "5.0.0", + "definitions": { + "selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "requester": { + "type": "HttpRequester", + "url_base": "https://persona.metaverse.com", + "http_method": "GET", + }, + "retriever": { + "type": "SimpleRetriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "paginator": {"type": "NoPagination"}, + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_cursor": { + "type": "DatetimeBasedCursor", + "start_datetime": {"datetime": "2024-01-01"}, + "end_datetime": "2024-12-31", + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_granularity": "P1D", + "step": "P400D", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, + "incremental_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, + "$parameters": { + "name": "stream_name", + "primary_key": "id", + "path": "/path", + "cursor_field": cursor_field_parameter_override, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + }, + }, + }, + }, + }, + "streams": [ + "#/definitions/incremental_stream", + ], + "check": {"stream_names": ["stream_name"]}, + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + } + + source = ConcurrentDeclarativeSource( + source_config=manifest, + config={}, + catalog=create_catalog("stream_name"), + state=None, + ) + streams = source.streams({}) + + assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override