diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index b47fc4cad..62d4feab9 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -17,7 +17,11 @@ class ConcurrentJobLimitReached(Exception): class JobTracker: def __init__(self, limit: int): self._jobs: Set[str] = set() - self._limit = limit + if limit < 1: + LOGGER.warning( + f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value." + ) + self._limit = 1 if limit < 1 else limit self._lock = threading.Lock() def try_to_get_intent(self) -> str: diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..de34b53ee 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -42,6 +42,10 @@ properties: "$ref": "#/definitions/ConcurrencyLevel" api_budget: "$ref": "#/definitions/HTTPAPIBudget" + max_concurrent_async_job_count: + title: Maximum Concurrent Asynchronous Jobs + description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information. + type: integer metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index d3afb1396..e38ac336c 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -93,7 +93,10 @@ def __init__( self._constructor = ( component_factory if component_factory - else ModelToComponentFactory(emit_connector_builder_messages) + else ModelToComponentFactory( + emit_connector_builder_messages, + max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), + ) ) self._message_repository = self._constructor.get_message_repository() self._slice_logger: SliceLogger = ( diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..f85ae7993 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1871,6 +1871,11 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None + max_concurrent_async_job_count: Optional[int] = Field( + None, + description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + title="Maximum Concurrent Asynchronous Jobs", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", @@ -1898,6 +1903,11 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None + max_concurrent_async_job_count: Optional[int] = Field( + None, + description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + title="Maximum Concurrent Asynchronous Jobs", + ) metadata: Optional[Dict[str, Any]] = Field( None, description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c46c76cef..3eec82a45 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -503,7 +503,7 @@ IncrementingCountStreamStateConverter, ) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction -from airbyte_cdk.sources.types import Config +from airbyte_cdk.sources.types import Config, ConnectionDefinition from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer ComponentDefinition = Mapping[str, Any] @@ -527,6 +527,7 @@ def __init__( disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, connector_state_manager: Optional[ConnectorStateManager] = None, + max_concurrent_async_job_count: Optional[int] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -540,6 +541,7 @@ def __init__( ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None + self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( job_repository, stream_slices, - JobTracker(1), - # FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1 + self._job_tracker, self._message_repository, has_bulk_parent=False, # FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index 1202c663d..e20304621 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -39,3 +39,9 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N def _reach_limit(self) -> List[str]: return [self._tracker.try_to_get_intent() for i in range(_LIMIT)] + + +@pytest.mark.parametrize("limit", [-1, 0]) +def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int): + tracker = JobTracker(limit) + assert tracker._limit == 1