-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathtest_job_tracker.py
More file actions
47 lines (33 loc) · 1.52 KB
/
test_job_tracker.py
File metadata and controls
47 lines (33 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from typing import List
from unittest import TestCase
import pytest
from airbyte_cdk.sources.declarative.async_job.job_tracker import (
ConcurrentJobLimitReached,
JobTracker,
)
_LIMIT = 3
class JobTrackerTest(TestCase):
def setUp(self) -> None:
self._tracker = JobTracker(_LIMIT)
def test_given_limit_reached_when_remove_job_then_can_get_intent_again(self) -> None:
intents = self._reach_limit()
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
self._tracker.remove_job(intents[0])
assert self._tracker.try_to_get_intent()
def test_given_job_does_not_exist_when_remove_job_then_do_not_raise(self) -> None:
self._tracker.remove_job("non existing job id")
def test_given_limit_reached_when_add_job_then_limit_is_still_reached(self) -> None:
intents = [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
self._tracker.add_job(intents[0], "a created job")
with pytest.raises(ConcurrentJobLimitReached):
self._tracker.try_to_get_intent()
def _reach_limit(self) -> List[str]:
return [self._tracker.try_to_get_intent() for i in range(_LIMIT)]
@pytest.mark.parametrize("limit", [-1, 0])
def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int):
tracker = JobTracker(limit)
assert tracker._limit == 1