Skip to content

Commit 000aac1

Browse files
add ignore status to AsyncJobStatus
1 parent 6504148 commit 000aac1

File tree

5 files changed

+20
-0
lines changed

5 files changed

+20
-0
lines changed

airbyte_cdk/sources/declarative/async_job/job_orchestrator.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def status(self) -> AsyncJobStatus:
100100
return AsyncJobStatus.FAILED
101101
elif AsyncJobStatus.TIMED_OUT in statuses:
102102
return AsyncJobStatus.TIMED_OUT
103+
elif AsyncJobStatus.IGNORE in statuses:
104+
return AsyncJobStatus.IGNORE
103105
else:
104106
return AsyncJobStatus.RUNNING
105107

@@ -149,6 +151,7 @@ class AsyncJobOrchestrator:
149151
AsyncJobStatus.FAILED,
150152
AsyncJobStatus.RUNNING,
151153
AsyncJobStatus.TIMED_OUT,
154+
AsyncJobStatus.IGNORE,
152155
}
153156
_RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
154157

@@ -364,6 +367,11 @@ def _process_running_partitions_and_yield_completed_ones(
364367
case _ if partition.has_reached_max_attempt():
365368
self._stop_partition(partition)
366369
self._process_partitions_with_errors(partition)
370+
case AsyncJobStatus.IGNORE:
371+
self._stop_partition(partition)
372+
LOGGER.warning(
373+
f"Stopping processing partition: {partition.stream_slice} due to received {AsyncJobStatus.IGNORE} status."
374+
)
367375
case _:
368376
self._stop_timed_out_jobs(partition)
369377
# re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated

airbyte_cdk/sources/declarative/async_job/status.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class AsyncJobStatus(Enum):
1111
COMPLETED = ("COMPLETED", _TERMINAL)
1212
FAILED = ("FAILED", _TERMINAL)
1313
TIMED_OUT = ("TIMED_OUT", _TERMINAL)
14+
IGNORE = ("IGNORE", _TERMINAL)
1415

1516
def __init__(self, value: str, is_terminal: bool) -> None:
1617
self._value = value

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3863,6 +3863,10 @@ definitions:
38633863
type: array
38643864
items:
38653865
type: string
3866+
ignore:
3867+
type: array
3868+
items:
3869+
type: string
38663870
AsyncRetriever:
38673871
title: Asynchronous Retriever
38683872
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,7 @@ class AsyncJobStatusMap(BaseModel):
12061206
completed: List[str]
12071207
failed: List[str]
12081208
timeout: List[str]
1209+
ignore: Optional[List[str]] = None
12091210

12101211

12111212
class ValueType(Enum):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3539,6 +3539,10 @@ def _create_async_job_status_mapping(
35393539
# This is an element of the dict because of the typing of the CDK but it is not a CDK status
35403540
continue
35413541

3542+
if api_statuses is None and cdk_status is "ignore":
3543+
# ignore status is not required
3544+
continue
3545+
35423546
for status in api_statuses:
35433547
if status in api_status_to_cdk_status:
35443548
raise ValueError(
@@ -3557,6 +3561,8 @@ def _get_async_job_status(self, status: str) -> AsyncJobStatus:
35573561
return AsyncJobStatus.FAILED
35583562
case "timeout":
35593563
return AsyncJobStatus.TIMED_OUT
3564+
case "ignore":
3565+
return AsyncJobStatus.IGNORE
35603566
case _:
35613567
raise ValueError(f"Unsupported CDK status {status}")
35623568

0 commit comments

Comments
 (0)