Skip to content
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConcurrentJobLimitReached(Exception):
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = limit
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()

def try_to_get_intent(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
max_concurrent_async_job_count:
title: Maximum Concurrent Async Jobs
description: Maximum number of concurrent async jobs to run. This property is only relevant for sources/streams that support asynchronous job execution (e.g. a Report stream that requires that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level.
type: integer
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def __init__(
self._constructor = (
component_factory
if component_factory
else ModelToComponentFactory(emit_connector_builder_messages)
else ModelToComponentFactory(
emit_connector_builder_messages,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
)
)
self._message_repository = self._constructor.get_message_repository()
self._slice_logger: SliceLogger = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This property is only relevant for sources/streams that support asynchronous job execution (e.g. a Report stream that requires that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down Expand Up @@ -1898,6 +1903,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This property is only relevant for sources/streams that support asynchronous job execution (e.g. a Report stream that requires that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand All @@ -527,6 +527,7 @@ def __init__(
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -540,6 +541,7 @@ def __init__(
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down
6 changes: 6 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N

def _reach_limit(self) -> List[str]:
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]


@pytest.mark.parametrize("limit", [-1, 0])
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
tracker = JobTracker(limit)
assert tracker._limit == 1
Loading