Skip to content

Commit a73ea65

Browse files
committed
add additional tests for interpolated string and positive int validation
1 parent 0a53391 commit a73ea65

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

airbyte_cdk/sources/declarative/async_job/job_tracker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class ConcurrentJobLimitReached(Exception):
1717

1818

1919
class JobTracker:
20-
def __init__(self, limit: Union[int, InterpolatedString], config: Mapping[str, Any]):
21-
if isinstance(limit, InterpolatedString):
22-
limit = int(limit.eval(config=config, json_loads=json.loads))
20+
def __init__(self, limit: Union[int, str], config: Mapping[str, Any] = {}):
21+
if isinstance(limit, str):
22+
limit = int(InterpolatedString(limit, parameters={}).eval(config=config))
2323

2424
if limit < 1:
2525
raise ValueError(f"Invalid max concurrent jobs limit: {limit}. Minimum value is 1.")

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def test_given_exception_when_start_job_and_skip_this_exception(
301301
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(
302302
self, mock_sleep: MagicMock
303303
) -> None:
304-
job_tracker = JobTracker(1)
304+
job_tracker = JobTracker(1, config={})
305305
jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)]
306306
self._job_repository.start.side_effect = jobs
307307
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(

unit_tests/sources/declarative/async_job/test_job_tracker.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
class JobTrackerTest(TestCase):
1717
def setUp(self) -> None:
18-
self._tracker = JobTracker(_LIMIT, config={})
18+
self._tracker = JobTracker(
19+
limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT}
20+
)
1921

2022
def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None:
2123
intents = self._reach_limit()
@@ -39,3 +41,15 @@ def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> N
3941

4042
def _reach_limit(self) -> List[str]:
4143
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
44+
45+
46+
def test_given_limit_is_interpolated_string_when_init_then_limit_is_int():
47+
tracker = JobTracker(
48+
limit="{{config['max_concurrent_jobs']}}", config={"max_concurrent_jobs": _LIMIT}
49+
)
50+
assert tracker._limit == _LIMIT
51+
52+
53+
def test_given_limit_is_less_than_1_when_init_then_raise_value_error():
54+
with pytest.raises(ValueError):
55+
JobTracker(limit="-1", config={})

0 commit comments

Comments
 (0)