Skip to content

Commit c2f0761

Browse files
author
Oleksandr Bazarnov
committed
fix
1 parent c4d0f91 commit c2f0761

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed

airbyte_cdk/sources/declarative/async_job/job.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ def api_job_id(self) -> str:
3434

3535
def status(self) -> AsyncJobStatus:
3636
if self._timer.has_timed_out():
37+
# TODO: we should account the fact that,
38+
# certain APIs could send the `Timeout` status,
39+
# thus we should not return `Timeout` in that case,
40+
# but act based on the scenario.
41+
42+
# the default behavior is to return `Timeout` status and retry.
3743
return AsyncJobStatus.TIMED_OUT
3844
return self._status
3945

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,21 @@ class AsyncPartition:
4444
This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
4545
"""
4646

47-
_MAX_NUMBER_OF_ATTEMPTS = 3
47+
_DEFAULT_MAX_JOB_RETRY = 3
4848

49-
def __init__(self, jobs: List[AsyncJob], stream_slice: StreamSlice) -> None:
49+
def __init__(
50+
self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
51+
) -> None:
5052
self._attempts_per_job = {job: 1 for job in jobs}
5153
self._stream_slice = stream_slice
54+
self._job_max_retry = (
55+
job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
56+
)
5257

5358
def has_reached_max_attempt(self) -> bool:
5459
return any(
5560
map(
56-
lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS,
61+
lambda attempt_count: attempt_count >= self._job_max_retry,
5762
self._attempts_per_job.values(),
5863
)
5964
)
@@ -62,7 +67,7 @@ def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> Non
6267
current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
6368
if current_attempt_count is None:
6469
raise ValueError("Could not find job to replace")
65-
elif current_attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS:
70+
elif current_attempt_count >= self._job_max_retry:
6671
raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
6772

6873
new_attempt_count = current_attempt_count + 1
@@ -155,6 +160,7 @@ def __init__(
155160
message_repository: MessageRepository,
156161
exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
157162
has_bulk_parent: bool = False,
163+
job_max_retry: Optional[int] = None,
158164
) -> None:
159165
"""
160166
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
@@ -175,11 +181,12 @@ def __init__(
175181
self._message_repository = message_repository
176182
self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
177183
self._has_bulk_parent = has_bulk_parent
184+
self._job_max_retry = job_max_retry
178185

179186
self._non_breaking_exceptions: List[Exception] = []
180187

181188
def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
182-
failed_status_jobs = (AsyncJobStatus.FAILED,)
189+
failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
183190
jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
184191
for job in jobs_to_replace:
185192
new_job = self._start_job(job.job_parameters(), job.api_job_id())
@@ -214,7 +221,7 @@ def _start_jobs(self) -> None:
214221
for _slice in self._slice_iterator:
215222
at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
216223
job = self._start_job(_slice)
217-
self._running_partitions.append(AsyncPartition([job], _slice))
224+
self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
218225
if self._has_bulk_parent and self._slice_iterator.has_next():
219226
break
220227
except ConcurrentJobLimitReached:
@@ -378,11 +385,7 @@ def _stop_partition(self, partition: AsyncPartition) -> None:
378385
def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
379386
for job in partition.jobs:
380387
if job.status() == AsyncJobStatus.TIMED_OUT:
381-
self._abort_job(job, free_job_allocation=True)
382-
raise AirbyteTracedException(
383-
internal_message=f"Job {job.api_job_id()} has timed out. Try increasing the `polling job timeout`.",
384-
failure_type=FailureType.config_error,
385-
)
388+
self._abort_job(job, free_job_allocation=False)
386389

387390
def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
388391
try:
@@ -400,7 +403,7 @@ def _remove_completed_or_timed_out_jobs(self, partition: AsyncPartition) -> None
400403
partition (AsyncPartition): The partition to process.
401404
"""
402405
for job in partition.jobs:
403-
if job.status() in [AsyncJobStatus.COMPLETED, AsyncJobStatus.TIMED_OUT]:
406+
if job.status() == AsyncJobStatus.COMPLETED:
404407
self._job_tracker.remove_job(job.api_job_id())
405408

406409
def _reallocate_partition(
@@ -415,10 +418,7 @@ def _reallocate_partition(
415418
current_running_partitions (list): The list of currently running partitions.
416419
partition (AsyncPartition): The partition to reallocate.
417420
"""
418-
for job in partition.jobs:
419-
if job.status() != AsyncJobStatus.TIMED_OUT:
420-
# allow the FAILED jobs to be re-allocated for partition
421-
current_running_partitions.insert(0, partition)
421+
current_running_partitions.insert(0, partition)
422422

423423
def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
424424
"""

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3073,8 +3073,11 @@ def _get_job_timeout() -> datetime.timedelta:
30733073
stream_slices,
30743074
self._job_tracker,
30753075
self._message_repository,
3076-
has_bulk_parent=False,
30773076
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk
3077+
has_bulk_parent=False,
3078+
# set the `job_max_retry` to 1 for the `Connector Builder`` use-case.
3079+
# `None` == default retry is set to 3 attempts, under the hood.
3080+
job_max_retry=1 if self._emit_connector_builder_messages else None,
30783081
),
30793082
stream_slicer=stream_slicer,
30803083
config=config,

0 commit comments

Comments
 (0)