Skip to content
6 changes: 5 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ class ConcurrentJobLimitReached(Exception):
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = limit
if limit < 1:
LOGGER.warning(
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
)
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()

def try_to_get_intent(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
max_concurrent_async_job_count:
title: Maximum Concurrent Asynchronous Jobs
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
type: integer
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def __init__(
self._constructor = (
component_factory
if component_factory
else ModelToComponentFactory(emit_connector_builder_messages)
else ModelToComponentFactory(
emit_connector_builder_messages,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
)
)
self._message_repository = self._constructor.get_message_repository()
self._slice_logger: SliceLogger = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
title="Maximum Concurrent Asynchronous Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down Expand Up @@ -1898,6 +1903,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
title="Maximum Concurrent Asynchronous Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand All @@ -527,6 +527,7 @@ def __init__(
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -540,6 +541,7 @@ def __init__(
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down
6 changes: 6 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N

def _reach_limit(self) -> List[str]:
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]


@pytest.mark.parametrize("limit", [-1, 0])
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
tracker = JobTracker(limit)
assert tracker._limit == 1
Loading