Skip to content

Commit 0a6b0f2

Browse files
author
Oleksandr Bazarnov
committed
fix
1 parent c4d0f91 commit 0a6b0f2

File tree

4 files changed

+38
-10
lines changed

4 files changed

+38
-10
lines changed

airbyte_cdk/sources/declarative/async_job/job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def api_job_id(self) -> str:
3434

3535
def status(self) -> AsyncJobStatus:
3636
if self._timer.has_timed_out():
37-
return AsyncJobStatus.TIMED_OUT
37+
return AsyncJobStatus.FORCED_TIME_OUT
3838
return self._status
3939

4040
def job_parameters(self) -> StreamSlice:

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ def status(self) -> AsyncJobStatus:
9595
return AsyncJobStatus.FAILED
9696
elif AsyncJobStatus.TIMED_OUT in statuses:
9797
return AsyncJobStatus.TIMED_OUT
98+
# specific case when the job is forced to be stopped by the system
99+
elif AsyncJobStatus.FORCED_TIME_OUT in statuses:
100+
return AsyncJobStatus.FORCED_TIME_OUT
98101
else:
99102
return AsyncJobStatus.RUNNING
100103

@@ -144,6 +147,8 @@ class AsyncJobOrchestrator:
144147
AsyncJobStatus.FAILED,
145148
AsyncJobStatus.RUNNING,
146149
AsyncJobStatus.TIMED_OUT,
150+
# specific case when the job is forced to be stopped by the system
151+
AsyncJobStatus.FORCED_TIME_OUT,
147152
}
148153
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
149154

@@ -179,7 +184,7 @@ def __init__(
179184
self._non_breaking_exceptions: List[Exception] = []
180185

181186
def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
182-
failed_status_jobs = (AsyncJobStatus.FAILED,)
187+
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
183188
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
184189
for job in jobs_to_replace:
185190
new_job = self._start_job(job.job_parameters(), job.api_job_id())
@@ -363,7 +368,7 @@ def _process_running_partitions_and_yield_completed_ones(
363368
self._reallocate_partition(current_running_partitions, partition)
364369

365370
# We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
366-
self._remove_completed_or_timed_out_jobs(partition)
371+
self._remove_completed_or_forced_time_out_jobs(partition)
367372

368373
# update the referenced list with running partitions
369374
self._running_partitions = current_running_partitions
@@ -378,6 +383,9 @@ def _stop_partition(self, partition: AsyncPartition) -> None:
378383
def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
379384
for job in partition.jobs:
380385
if job.status() == AsyncJobStatus.TIMED_OUT:
386+
# we don't free allocation here because it is expected to retry the job
387+
self._abort_job(job, free_job_allocation=False)
388+
elif job.status() == AsyncJobStatus.FORCED_TIME_OUT:
381389
self._abort_job(job, free_job_allocation=True)
382390
raise AirbyteTracedException(
383391
internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.",
@@ -392,15 +400,15 @@ def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
392400
except Exception as exception:
393401
LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
394402

395-
def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None:
403+
def _remove_completed_or_forced_time_out_jobs(self, partition: AsyncPartition) -> None:
396404
"""
397405
Remove completed or timed out jobs from the partition.
398406
399407
Args:
400408
partition (AsyncPartition): The partition to process.
401409
"""
402410
for job in partition.jobs:
403-
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]:
411+
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.FORCED_TIME_OUT]:
404412
self._job_tracker.remove_job(job.api_job_id())
405413

406414
def _reallocate_partition(
@@ -415,10 +423,8 @@ def _reallocate_partition(
415423
current_running_partitions (list): The list of currently running partitions.
416424
partition (AsyncPartition): The partition to reallocate.
417425
"""
418-
for job in partition.jobs:
419-
if job.status() != AsyncJobStatus.TIMED_OUT:
420-
# allow the FAILED jobs to be re-allocated for partition
421-
current_running_partitions.insert(0, partition)
426+
# allow the FAILED / TIMED_OUT jobs to be re-allocated for partition
427+
current_running_partitions.insert(0, partition)
422428

423429
def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
424430
"""

airbyte_cdk/sources/declarative/async_job/status.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ class AsyncJobStatus(Enum):
1111
COMPLETED = ("COMPLETED", _TERMINAL)
1212
FAILED = ("FAILED", _TERMINAL)
1313
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
14+
# service status to force the job to be stopped by the system
15+
FORCED_TIME_OUT = ("FORCED_TIME_OUT", _TERMINAL)
1416

1517
def __init__(self, value: str, is_terminal: bool) -> None:
1618
self._value = value
@@ -19,6 +21,6 @@ def __init__(self, value: str, is_terminal: bool) -> None:
1921
def is_terminal(self) -> bool:
2022
"""
2123
A status is terminal when a job status can't be updated anymore. For example if a job is completed, it will stay completed but a
22-
running job might because completed, failed or timed out.
24+
running job might become completed, failed or timed out.
2325
"""
2426
return self._is_terminal

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,26 @@ def test_given_timeout_when_create_and_get_completed_partitions_then_free_budget
144144
)
145145
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)
146146

147+
with pytest.raises(AirbyteTracedException):
148+
list(orchestrator.create_and_get_completed_partitions())
149+
150+
assert job_tracker.try_to_get_intent()
151+
assert (
152+
self._job_repository.start.call_args_list
153+
== [call(_A_STREAM_SLICE)] * _MAX_NUMBER_OF_ATTEMPTS
154+
)
155+
156+
@mock.patch(sleep_mock_target)
157+
def test_given_forced_timeout_when_create_and_get_completed_partitions_then_free_budget_and_raise_exception(
158+
self, mock_sleep: MagicMock
159+
) -> None:
160+
job_tracker = JobTracker(1)
161+
self._job_repository.start.return_value = self._job_for_a_slice
162+
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
163+
{self._job_for_a_slice: [AsyncJobStatus.FORCED_TIME_OUT]}
164+
)
165+
orchestrator = self._orchestrator([_A_STREAM_SLICE], job_tracker)
166+
147167
with pytest.raises(AirbyteTracedException) as error:
148168
list(orchestrator.create_and_get_completed_partitions())
149169

0 commit comments

Comments
 (0)