Skip to content

Commit ed6bac5

Browse files
committed
add max_concurrent_job to async retriever
1 parent 463ad81 commit ed6bac5

File tree

4 files changed

+23
-10
lines changed

4 files changed

+23
-10
lines changed

airbyte_cdk/sources/declarative/async_job/job_tracker.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import logging
44
import threading
55
import uuid
6-
from typing import Set
6+
from typing import Any, Mapping, Set, Union
77

8+
import json
89
from airbyte_cdk.logger import lazy_log
10+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
911

1012
LOGGER = logging.getLogger("airbyte")
1113

@@ -15,7 +17,13 @@ class ConcurrentJobLimitReached(Exception):
1517

1618

1719
class JobTracker:
18-
def __init__(self, limit: int):
20+
def __init__(self, limit: Union[int, InterpolatedString], config: Mapping[str, Any]):
21+
if isinstance(limit, InterpolatedString):
22+
limit = int(limit.eval(config=config, json_loads=json.loads))
23+
24+
if limit < 1:
25+
raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.")
26+
1927
self._jobs: Set[str] = set()
2028
self._limit = limit
2129
self._lock = threading.Lock()

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3376,12 +3376,16 @@ definitions:
33763376
- "$ref": "#/definitions/IterableDecoder"
33773377
- "$ref": "#/definitions/XmlDecoder"
33783378
- "$ref": "#/definitions/ZipfileDecoder"
3379-
maximum_job_count:
3380-
title: Maximum Job Count
3381-
description: Maximum number of asynchronous jobs to run concurrently.
3379+
max_concurrent_jobs:
3380+
title: Maximum Conccurent Job Count
3381+
description: Maximum number of concurrent jobs to run.
33823382
anyOf:
3383-
- type: number
3383+
- type: integer
3384+
- type: string
33843385
default: 1
3386+
examples:
3387+
- 2
3388+
- "{{ config['max_concurrent_jobs'] }}"
33853389
$parameters:
33863390
type: object
33873391
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,10 +2368,11 @@ class AsyncRetriever(BaseModel):
23682368
description="Component decoding the download response so records can be extracted.",
23692369
title="Download Decoder",
23702370
)
2371-
maximum_job_count: Optional[float] = Field(
2371+
max_concurrent_jobs: Optional[Union[int, str]] = Field(
23722372
1,
2373-
description="Maximum number of asynchronous jobs to run concurrently.",
2374-
title="Maximum Job Count",
2373+
description="Maximum number of concurrent jobs to run.",
2374+
examples=[2, "{{ config['max_concurrent_jobs'] }}"],
2375+
title="Maximum Conccurent Job Count",
23752376
)
23762377
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
23772378

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2888,7 +2888,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
28882888
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
28892889
job_repository,
28902890
stream_slices,
2891-
JobTracker(model.maximum_job_count),
2891+
JobTracker(model.max_concurrent_jobs, config),
28922892
self._message_repository,
28932893
has_bulk_parent=False,
28942894
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk

0 commit comments

Comments
 (0)