Skip to content
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConcurrentJobLimitReached(Exception):
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = limit
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()

def try_to_get_intent(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
max_concurrent_job_count:
title: Maximum Concurrent Async Jobs
description: Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.
type: integer
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def __init__(
self._constructor = (
component_factory
if component_factory
else ModelToComponentFactory(emit_connector_builder_messages)
else ModelToComponentFactory(
emit_connector_builder_messages,
max_concurrent_async_job_count=source_config.get("max_concurrent_job_count"),
)
)
self._message_repository = self._constructor.get_message_repository()
self._slice_logger: SliceLogger = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down Expand Up @@ -1898,6 +1903,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand All @@ -527,6 +527,7 @@ def __init__(
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -540,6 +541,7 @@ def __init__(
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker = JobTracker(max_concurrent_async_job_count or 1)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down
Loading