Skip to content

Commit b6cd208

Browse files
author
maxime.c
committed
share API budget with parent streams
1 parent 6504148 commit b6cd208

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,7 @@ def __init__(
681681
connector_state_manager: Optional[ConnectorStateManager] = None,
682682
max_concurrent_async_job_count: Optional[int] = None,
683683
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
684+
api_budget: Optional[APIBudget] = None,
684685
):
685686
self._init_mappings()
686687
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -695,7 +696,7 @@ def __init__(
695696
configured_catalog
696697
)
697698
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
698-
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
699+
self._api_budget: Optional[Union[APIBudget]] = api_budget
699700
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
700701
# placeholder for deprecation warnings
701702
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
@@ -3887,6 +3888,7 @@ def create_parent_stream_config_with_substream_wrapper(
38873888
self._evaluate_log_level(self._emit_connector_builder_messages),
38883889
),
38893890
),
3891+
api_budget=self._api_budget,
38903892
)
38913893

38923894
return substream_factory.create_parent_stream_config(

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from datetime import datetime, timedelta, timezone
99
from pathlib import Path
1010
from typing import Any, Iterable, Mapping, Optional, Union
11+
from unittest.mock import Mock
1112

1213
import freezegun
1314
import pytest
@@ -170,7 +171,7 @@
170171
)
171172
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
172173
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
173-
from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy
174+
from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy
174175
from airbyte_cdk.sources.streams.concurrent.clamping import (
175176
ClampingEndProvider,
176177
DayClampingStrategy,
@@ -744,7 +745,26 @@ def test_create_substream_partition_router():
744745
"", resolved_manifest["partition_router"], {}
745746
)
746747

747-
partition_router = factory.create_component(
748+
model_to_component_factory = ModelToComponentFactory()
749+
model_to_component_factory.set_api_budget(
750+
{
751+
"type": "HTTPAPIBudget",
752+
"policies": [
753+
{
754+
"type": "MovingWindowCallRatePolicy",
755+
"rates": [
756+
{
757+
"limit": 1,
758+
"interval": "PT60S",
759+
}
760+
],
761+
"matchers": [],
762+
}
763+
],
764+
},
765+
input_config,
766+
)
767+
partition_router = model_to_component_factory.create_component(
748768
model_type=SubstreamPartitionRouterModel,
749769
component_definition=partition_router_manifest,
750770
config=input_config,
@@ -757,7 +777,14 @@ def test_create_substream_partition_router():
757777
assert isinstance(parent_stream_configs[0].stream, DefaultStream)
758778
assert isinstance(parent_stream_configs[1].stream, DefaultStream)
759779

760-
assert partition_router.parent_stream_configs[0].parent_key.eval({}) == "id"
780+
# ensure api budget
781+
assert get_retriever(
782+
parent_stream_configs[0].stream
783+
).requester._http_client._api_budget._policies
784+
assert get_retriever(
785+
parent_stream_configs[1].stream
786+
).requester._http_client._api_budget._policies
787+
761788
assert partition_router.parent_stream_configs[0].partition_field.eval({}) == "repository_id"
762789
assert (
763790
partition_router.parent_stream_configs[0].request_option.inject_into

0 commit comments

Comments
 (0)