Skip to content
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConcurrentJobLimitReached(Exception):
class JobTracker:
def __init__(self, limit: int):
self._jobs: Set[str] = set()
self._limit = limit
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()

def try_to_get_intent(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
max_concurrent_job_count:
title: Maximum Concurrent Async Jobs
description: Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.
type: integer
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def __init__(
self._constructor = (
component_factory
if component_factory
else ModelToComponentFactory(emit_connector_builder_messages)
else ModelToComponentFactory(
emit_connector_builder_messages, source_config=source_config
)
)
self._message_repository = self._constructor.get_message_repository()
self._slice_logger: SliceLogger = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down Expand Up @@ -1898,6 +1903,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_job_count: Optional[int] = Field(
None,
description="Maximum number of concurrent async jobs to run. This is often set by the API's maximum number of concurrent jobs on the account level.",
title="Maximum Concurrent Async Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand All @@ -527,6 +527,7 @@ def __init__(
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
source_config: Optional[ConnectionDefinition] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -540,6 +541,11 @@ def __init__(
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: Optional[JobTracker] = (
self._create_async_job_tracker(source_config=source_config)
if source_config
else None
)

def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -2924,12 +2930,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
download_target_extractor=download_target_extractor,
)

self._job_tracker = JobTracker(1) if not self._job_tracker else self._job_tracker

async_job_partition_router = AsyncJobPartitionRouter(
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
job_repository,
stream_slices,
JobTracker(1),
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
self._job_tracker,
self._message_repository,
has_bulk_parent=False,
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
Expand Down Expand Up @@ -3218,3 +3225,13 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def _create_async_job_tracker(
self, source_config: ConnectionDefinition
) -> Optional[JobTracker]:
"""
Sets up job tracking for async jobs based on limit specified in the source config.
"""
if job_count := source_config.get("max_concurrent_job_count"):
return JobTracker(job_count)
return None
Loading