Skip to content

Commit 1cd879f

Browse files
author
maxime.c
committed
add unit test
1 parent 6d6bbed commit 1cd879f

File tree

1 file changed

+87
-0
lines changed

1 file changed

+87
-0
lines changed

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4649,3 +4649,90 @@ def test_given_invalid_config_streams_validates_config_and_raises():
46494649

46504650
with pytest.raises(ValueError):
46514651
source.streams(input_config)
4652+
4653+
4654+
def test_parameter_propagation_for_concurrent_cursor():
4655+
cursor_field_parameter_override = "created_at"
4656+
manifest = {
4657+
"version": "5.0.0",
4658+
"definitions": {
4659+
"selector": {
4660+
"type": "RecordSelector",
4661+
"extractor": {"type": "DpathExtractor", "field_path": []},
4662+
},
4663+
"requester": {
4664+
"type": "HttpRequester",
4665+
"url_base": "https://persona.metaverse.com",
4666+
"http_method": "GET",
4667+
},
4668+
"retriever": {
4669+
"type": "SimpleRetriever",
4670+
"record_selector": {"$ref": "#/definitions/selector"},
4671+
"paginator": {"type": "NoPagination"},
4672+
"requester": {"$ref": "#/definitions/requester"},
4673+
},
4674+
"incremental_cursor": {
4675+
"type": "DatetimeBasedCursor",
4676+
"start_datetime": {
4677+
"datetime": "2024-01-01"
4678+
},
4679+
"end_datetime": "2024-12-31",
4680+
"datetime_format": "%Y-%m-%d",
4681+
"cursor_datetime_formats": ["%Y-%m-%d"],
4682+
"cursor_granularity": "P1D",
4683+
"step": "P400D",
4684+
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
4685+
"start_time_option": {
4686+
"type": "RequestOption",
4687+
"field_name": "start",
4688+
"inject_into": "request_parameter",
4689+
},
4690+
"end_time_option": {
4691+
"type": "RequestOption",
4692+
"field_name": "end",
4693+
"inject_into": "request_parameter",
4694+
},
4695+
},
4696+
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
4697+
"incremental_stream": {
4698+
"retriever": {
4699+
"$ref": "#/definitions/retriever",
4700+
"requester": {"$ref": "#/definitions/requester"},
4701+
},
4702+
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
4703+
"$parameters": {"name": "stream_name", "primary_key": "id", "path": "/path", "cursor_field": cursor_field_parameter_override},
4704+
"schema_loader": {
4705+
"type": "InlineSchemaLoader",
4706+
"schema": {
4707+
"$schema": "https://json-schema.org/draft-07/schema#",
4708+
"type": "object",
4709+
"properties": {
4710+
"id": {
4711+
"description": "The identifier",
4712+
"type": ["null", "string"],
4713+
},
4714+
},
4715+
},
4716+
},
4717+
},
4718+
},
4719+
"streams": [
4720+
"#/definitions/incremental_stream",
4721+
],
4722+
"check": {"stream_names": ["stream_name"]},
4723+
"concurrency_level": {
4724+
"type": "ConcurrencyLevel",
4725+
"default_concurrency": "{{ config['num_workers'] or 10 }}",
4726+
"max_concurrency": 25,
4727+
},
4728+
}
4729+
4730+
source = ConcurrentDeclarativeSource(
4731+
source_config=manifest,
4732+
config={},
4733+
catalog=create_catalog("stream_name"),
4734+
state=None,
4735+
)
4736+
streams = source.streams({})
4737+
4738+
assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override

0 commit comments

Comments
 (0)