diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..51c38d5c4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -681,6 +681,7 @@ def __init__( connector_state_manager: Optional[ConnectorStateManager] = None, max_concurrent_async_job_count: Optional[int] = None, configured_catalog: Optional[ConfiguredAirbyteCatalog] = None, + api_budget: Optional[APIBudget] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -695,7 +696,7 @@ def __init__( configured_catalog ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() - self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None + self._api_budget: Optional[Union[APIBudget]] = api_budget self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) # placeholder for deprecation warnings self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = [] @@ -3887,6 +3888,7 @@ def create_parent_stream_config_with_substream_wrapper( self._evaluate_log_level(self._emit_connector_builder_messages), ), ), + api_budget=self._api_budget, ) return substream_factory.create_parent_stream_config( diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index 87e8574bb..ab4335fe6 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -1491,559 +1491,6 @@ def _create_page(response_body): return response -@pytest.mark.parametrize( - "test_name, manifest, pages, expected_records, expected_calls", - [ - ( - "test_read_manifest_no_pagination_no_partitions", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [{"ABC": 0}, {"AED": 1}], - [call({}, {}, None)], - ), - ( - "test_read_manifest_with_added_fields", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "transformations": [ - { - "type": "AddFields", - "fields": [ - { - "type": "AddedFieldDefinition", - "path": ["added_field_key"], - "value": "added_field_value", - } - ], - } - ], - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [ - {"ABC": 0, "added_field_key": "added_field_value"}, - {"AED": 1, "added_field_key": "added_field_value"}, - ], - [call({}, {}, None)], - ), - ( - "test_read_manifest_with_flatten_fields", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - }, - "type": "object", - }, - }, - "transformations": [{"type": "FlattenFields"}], - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [ - {"nested_fields": {"ABC": 0}, "id": 1}, - {"nested_fields": {"AED": 1}, "id": 2}, - ], - "_metadata": {"next": "next"}, - } - ), - _create_page({"rates": [{"USD": 2}], "_metadata": {"next": "next"}}), - ) - * 10, - [ - {"ABC": 0, "id": 1}, - {"AED": 1, "id": 2}, - ], - [call({}, {}, None)], - ), - ( - "test_read_with_pagination_no_partitions", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "USD": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": { - "type": "DefaultPaginator", - "page_size": 2, - "page_size_option": { - "inject_into": "request_parameter", - "field_name": "page_size", - }, - "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": { - "type": "CursorPagination", - "cursor_value": "{{ response._metadata.next }}", - "page_size": 2, - }, - }, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page({"rates": [{"ABC": 0}, {"AED": 1}], "_metadata": {"next": "next"}}), - _create_page({"rates": [{"USD": 2}], "_metadata": {}}), - ) - * 10, - [{"ABC": 0}, {"AED": 1}, {"USD": 2}], - [ - call({}, {}, None), - call( - {}, - {}, - {"next_page_token": "next"}, - ), - ], - ), - ( - "test_no_pagination_with_partition_router", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "partition": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "partition_router": { - "type": "ListPartitionRouter", - "values": ["0", "1"], - "cursor_field": "partition", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": {"type": "NoPagination"}, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}], - "_metadata": {"next": "next"}, - } - ), - _create_page( - {"rates": [{"ABC": 2, "partition": 1}], "_metadata": {"next": "next"}} - ), - ), - [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}, {"ABC": 2, "partition": 1}], - [ - call({}, {"partition": "0"}, None), - call( - {}, - {"partition": "1"}, - None, - ), - ], - ), - ( - "test_with_pagination_and_partition_router", - { - "version": "0.34.2", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["Rates"]}, - "streams": [ - { - "type": "DeclarativeStream", - "name": "Rates", - "primary_key": [], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "ABC": {"type": "number"}, - "AED": {"type": "number"}, - "partition": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.apilayer.com", - "path": "/exchangerates_data/latest", - "http_method": "GET", - "request_parameters": {}, - "request_headers": {}, - "request_body_json": {}, - "authenticator": { - "type": "ApiKeyAuthenticator", - "header": "apikey", - "api_token": "{{ config['api_key'] }}", - }, - }, - "partition_router": { - "type": "ListPartitionRouter", - "values": ["0", "1"], - "cursor_field": "partition", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["rates"]}, - }, - "paginator": { - "type": "DefaultPaginator", - "page_size": 2, - "page_size_option": { - "inject_into": "request_parameter", - "field_name": "page_size", - }, - "page_token_option": {"inject_into": "path", "type": "RequestPath"}, - "pagination_strategy": { - "type": "CursorPagination", - "cursor_value": "{{ response._metadata.next }}", - "page_size": 2, - }, - }, - }, - } - ], - "spec": { - "connection_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - "documentation_url": "https://example.org", - "type": "Spec", - }, - }, - ( - _create_page( - { - "rates": [{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}], - "_metadata": {"next": "next"}, - } - ), - _create_page({"rates": [{"USD": 3, "partition": 0}], "_metadata": {}}), - _create_page({"rates": [{"ABC": 2, "partition": 1}], "_metadata": {}}), - ), - [ - {"ABC": 0, "partition": 0}, - {"AED": 1, "partition": 0}, - {"USD": 3, "partition": 0}, - {"ABC": 2, "partition": 1}, - ], - [ - call({}, {"partition": "0"}, None), - call({}, {"partition": "0"}, {"next_page_token": "next"}), - call( - {}, - {"partition": "1"}, - None, - ), - ], - ), - ], -) -def test_read_manifest_declarative_source( - test_name, manifest, pages, expected_records, expected_calls -): - _stream_name = "Rates" - with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages) as mock_retriever: - output_data = [ - message.record.data for message in _run_read(manifest, _stream_name) if message.record - ] - assert output_data == expected_records - mock_retriever.assert_has_calls(expected_calls) - - def test_only_parent_streams_use_cache(): applications_stream = { "type": "DeclarativeStream", diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 591d47ae6..4c2e935dd 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -744,7 +744,26 @@ def test_create_substream_partition_router(): "", resolved_manifest["partition_router"], {} ) - partition_router = factory.create_component( + model_to_component_factory = ModelToComponentFactory() + model_to_component_factory.set_api_budget( + { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "limit": 1, + "interval": "PT60S", + } + ], + "matchers": [], + } + ], + }, + input_config, + ) + partition_router = model_to_component_factory.create_component( model_type=SubstreamPartitionRouterModel, component_definition=partition_router_manifest, config=input_config, @@ -757,6 +776,14 @@ def test_create_substream_partition_router(): assert isinstance(parent_stream_configs[0].stream, DefaultStream) assert isinstance(parent_stream_configs[1].stream, DefaultStream) + # ensure api budget + assert get_retriever( + parent_stream_configs[0].stream + ).requester._http_client._api_budget._policies + assert get_retriever( + parent_stream_configs[1].stream + ).requester._http_client._api_budget._policies + assert partition_router.parent_stream_configs[0].parent_key.eval({}) == "id" assert partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id" assert (