-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathtest_interpolated_request_input_provider.py
More file actions
51 lines (46 loc) · 1.83 KB
/
test_interpolated_request_input_provider.py
File metadata and controls
51 lines (46 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pytest as pytest
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_input_provider import (
InterpolatedRequestInputProvider,
)
@pytest.mark.parametrize(
"test_name, input_request_data, expected_request_data",
[
(
"test_static_map_data",
{"a_static_request_param": "a_static_value"},
{"a_static_request_param": "a_static_value"},
),
(
"test_map_depends_on_stream_slice",
{"read_from_slice": "{{ stream_slice['slice_key'] }}"},
{"read_from_slice": "slice_value"},
),
(
"test_map_depends_on_config",
{"read_from_config": "{{ config['config_key'] }}"},
{"read_from_config": "value_of_config"},
),
(
"test_map_depends_on_parameters",
{"read_from_parameters": "{{ parameters['read_from_parameters'] }}"},
{"read_from_parameters": "value_of_parameters"},
),
("test_defaults_to_empty_dictionary", None, {}),
],
)
def test_initialize_interpolated_mapping_request_input_provider(
test_name, input_request_data, expected_request_data
):
config = {"config_key": "value_of_config"}
stream_slice = {"slice_key": "slice_value"}
parameters = {"read_from_parameters": "value_of_parameters"}
provider = InterpolatedRequestInputProvider(
request_inputs=input_request_data, config=config, parameters=parameters
)
actual_request_data = provider.eval_request_inputs(stream_slice=stream_slice)
assert isinstance(provider._interpolator, InterpolatedMapping)
assert actual_request_data == expected_request_data