Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions airbyte_cdk/sources/declarative/async_job/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import logging
import threading
import uuid
from typing import Set
from dataclasses import dataclass, field
from typing import Any, Mapping, Set, Union

from airbyte_cdk.logger import lazy_log
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString

LOGGER = logging.getLogger("airbyte")

Expand All @@ -14,15 +16,29 @@ class ConcurrentJobLimitReached(Exception):
pass


@dataclass
class JobTracker:
def __init__(self, limit: int):
limit: Union[int, str]
config: Mapping[str, Any] = field(default_factory=dict)

def __post_init__(self) -> None:
self._jobs: Set[str] = set()
if limit < 1:
self._lock = threading.Lock()
if isinstance(self.limit, str):
try:
self.limit = int(
InterpolatedString(self.limit, parameters={}).eval(config=self.config)
)
except Exception as e:
LOGGER.warning(
f"Error interpolating max job count: {self.limit}. Setting to 1. {e}"
)
self.limit = 1
if self.limit < 1:
LOGGER.warning(
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value."
)
self._limit = 1 if limit < 1 else limit
self._lock = threading.Lock()
self._limit = self.limit if self.limit >= 1 else 1

def try_to_get_intent(self) -> str:
lazy_log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ properties:
max_concurrent_async_job_count:
title: Maximum Concurrent Asynchronous Jobs
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
type: integer
type:
- integer
- string
examples:
- 3
- "{{ config['max_concurrent_async_job_count'] }}"
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down Expand Up @@ -2894,7 +2899,7 @@ definitions:
title: Lazy Read Pointer
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
type: array
default: [ ]
default: []
items:
- type: string
interpolation_context:
Expand Down Expand Up @@ -3199,7 +3204,7 @@ definitions:
properties:
type:
type: string
enum: [ StateDelegatingStream ]
enum: [StateDelegatingStream]
name:
title: Name
description: The stream name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1890,9 +1890,10 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
examples=[3, "{{ config['max_concurrent_async_job_count'] }}"],
title="Maximum Concurrent Asynchronous Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1922,9 +1923,10 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
max_concurrent_async_job_count: Optional[int] = Field(
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
examples=[3, "{{ config['max_concurrent_async_job_count'] }}"],
title="Maximum Concurrent Asynchronous Jobs",
)
metadata: Optional[Dict[str, Any]] = Field(
Expand Down
24 changes: 24 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,27 @@ def _reach_limit(self) -> List[str]:
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
tracker = JobTracker(limit)
assert tracker._limit == 1


@pytest.mark.parametrize(
("limit", "config", "expected_limit"),
[
("2", {}, 2),
(
"{{ config['max_concurrent_async_job_count'] }}",
{"max_concurrent_async_job_count": 2},
2,
),
],
)
def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit):
tracker = JobTracker(limit, config)
assert tracker._limit == expected_limit


def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1():
tracker = JobTracker(
"{{ config['max_concurrent_async_job_count'] }}",
{"max_concurrent_async_job_count": "hello"},
)
assert tracker._limit == 1
Loading