Skip to content

Commit 274d1f2

Browse files
feat: enable async job configurability via config (#435)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 2b1f325 commit 274d1f2

File tree

4 files changed

+58
-11
lines changed

4 files changed

+58
-11
lines changed

airbyte_cdk/sources/declarative/async_job/job_tracker.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import logging
44
import threading
55
import uuid
6-
from typing import Set
6+
from dataclasses import dataclass, field
7+
from typing import Any, Mapping, Set, Union
78

89
from airbyte_cdk.logger import lazy_log
10+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
911

1012
LOGGER = logging.getLogger("airbyte")
1113

@@ -14,15 +16,29 @@ class ConcurrentJobLimitReached(Exception):
1416
pass
1517

1618

19+
@dataclass
1720
class JobTracker:
18-
def __init__(self, limit: int):
21+
limit: Union[int, str]
22+
config: Mapping[str, Any] = field(default_factory=dict)
23+
24+
def __post_init__(self) -> None:
1925
self._jobs: Set[str] = set()
20-
if limit < 1:
26+
self._lock = threading.Lock()
27+
if isinstance(self.limit, str):
28+
try:
29+
self.limit = int(
30+
InterpolatedString(self.limit, parameters={}).eval(config=self.config)
31+
)
32+
except Exception as e:
33+
LOGGER.warning(
34+
f"Error interpolating max job count: {self.limit}. Setting to 1. {e}"
35+
)
36+
self.limit = 1
37+
if self.limit < 1:
2138
LOGGER.warning(
22-
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
39+
f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value."
2340
)
24-
self._limit = 1 if limit < 1 else limit
25-
self._lock = threading.Lock()
41+
self._limit = self.limit if self.limit >= 1 else 1
2642

2743
def try_to_get_intent(self) -> str:
2844
lazy_log(

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ properties:
4747
max_concurrent_async_job_count:
4848
title: Maximum Concurrent Asynchronous Jobs
4949
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
50-
type: integer
50+
type:
51+
- integer
52+
- string
53+
examples:
54+
- 3
55+
- "{{ config['max_concurrent_async_job_count'] }}"
5156
metadata:
5257
type: object
5358
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
@@ -2894,7 +2899,7 @@ definitions:
28942899
title: Lazy Read Pointer
28952900
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
28962901
type: array
2897-
default: [ ]
2902+
default: []
28982903
items:
28992904
- type: string
29002905
interpolation_context:
@@ -3199,7 +3204,7 @@ definitions:
31993204
properties:
32003205
type:
32013206
type: string
3202-
enum: [ StateDelegatingStream ]
3207+
enum: [StateDelegatingStream]
32033208
name:
32043209
title: Name
32053210
description: The stream name.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,9 +1890,10 @@ class Config:
18901890
spec: Optional[Spec] = None
18911891
concurrency_level: Optional[ConcurrencyLevel] = None
18921892
api_budget: Optional[HTTPAPIBudget] = None
1893-
max_concurrent_async_job_count: Optional[int] = Field(
1893+
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
18941894
None,
18951895
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1896+
examples=[3, "{{ config['max_concurrent_async_job_count'] }}"],
18961897
title="Maximum Concurrent Asynchronous Jobs",
18971898
)
18981899
metadata: Optional[Dict[str, Any]] = Field(
@@ -1922,9 +1923,10 @@ class Config:
19221923
spec: Optional[Spec] = None
19231924
concurrency_level: Optional[ConcurrencyLevel] = None
19241925
api_budget: Optional[HTTPAPIBudget] = None
1925-
max_concurrent_async_job_count: Optional[int] = Field(
1926+
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
19261927
None,
19271928
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1929+
examples=[3, "{{ config['max_concurrent_async_job_count'] }}"],
19281930
title="Maximum Concurrent Asynchronous Jobs",
19291931
)
19301932
metadata: Optional[Dict[str, Any]] = Field(

unit_tests/sources/declarative/async_job/test_job_tracker.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,27 @@ def _reach_limit(self) -> List[str]:
4545
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
4646
tracker = JobTracker(limit)
4747
assert tracker._limit == 1
48+
49+
50+
@pytest.mark.parametrize(
51+
("limit", "config", "expected_limit"),
52+
[
53+
("2", {}, 2),
54+
(
55+
"{{ config['max_concurrent_async_job_count'] }}",
56+
{"max_concurrent_async_job_count": 2},
57+
2,
58+
),
59+
],
60+
)
61+
def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit):
62+
tracker = JobTracker(limit, config)
63+
assert tracker._limit == expected_limit
64+
65+
66+
def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1():
67+
tracker = JobTracker(
68+
"{{ config['max_concurrent_async_job_count'] }}",
69+
{"max_concurrent_async_job_count": "hello"},
70+
)
71+
assert tracker._limit == 1

0 commit comments

Comments
 (0)