Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3333,7 +3333,7 @@ definitions:
items:
type: string
AsyncRetriever:
description: "[Experimental - We expect the interface to change shortly and we reserve the right to not consider this a breaking change] Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router."
type: object
required:
- type
Expand Down Expand Up @@ -3381,6 +3381,13 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
polling_job_timeout:
description: The time in minutes after which the single Async Job should be considered as Timed Out.
anyOf:
- type: integer
- type: string
interpolation_context:
- config
download_target_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ class AddFields(BaseModel):
)
condition: Optional[str] = Field(
"",
description="Fields will be added if expression is evaluated to True.,",
description="Fields will be added if expression is evaluated to True.",
examples=[
"{{ property|string == '' }}",
"{{ property is integer }}",
Expand Down Expand Up @@ -2354,6 +2354,10 @@ class AsyncRetriever(BaseModel):
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.",
)
polling_job_timeout: Optional[Union[int, str]] = Field(
None,
description="The time in minutes after which the single Async Job should be considered as Timed Out.",
)
download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field(
None,
description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config, ConnectionDefinition
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

ComponentDefinition = Mapping[str, Any]
Expand Down Expand Up @@ -2939,6 +2939,27 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
parameters={},
)

def _get_job_timeout() -> datetime.timedelta:
user_defined_timeout: Optional[int] = (
int(
InterpolatedString.create(
str(model.polling_job_timeout),
parameters={},
).eval(config)
)
if model.polling_job_timeout
else None
)

# check for user defined timeout during the test read or 15 minutes
test_read_timeout = datetime.timedelta(minutes=user_defined_timeout or 15)
# default value for non-connector builder is 60 minutes.
default_sync_timeout = datetime.timedelta(minutes=user_defined_timeout or 60)

return (
test_read_timeout if self._emit_connector_builder_messages else default_sync_timeout
)

decoder = (
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
Expand Down Expand Up @@ -3032,6 +3053,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
config=config,
name=name,
)

job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
polling_requester=polling_requester,
Expand All @@ -3042,6 +3064,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
status_extractor=status_extractor,
status_mapping=self._create_async_job_status_mapping(model.status_mapping, config),
download_target_extractor=download_target_extractor,
job_timeout=_get_job_timeout(),
)

async_job_partition_router = AsyncJobPartitionRouter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class AsyncHttpJobRepository(AsyncJobRepository):
status_mapping: Mapping[str, AsyncJobStatus]
download_target_extractor: DpathExtractor

# timeout for the job to be completed, passed from `polling_job_timeout`
job_timeout: Optional[timedelta] = None

record_extractor: RecordExtractor = field(
init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({})
)
Expand Down Expand Up @@ -131,7 +133,7 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request
log_formatter=lambda response: format_http_message(
response=response,
title="Async Job -- Create",
description="Create the server-side async job.",
description=f"Create the server-side async job. Timeout after: {self.job_timeout}",
stream_name=None,
is_auxiliary=True,
type="ASYNC_CREATE",
Expand Down
7 changes: 0 additions & 7 deletions airbyte_cdk/sources/declarative/retrievers/async_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,17 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
AsyncJobPartitionRouter,
)
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger


@deprecated(
"This class is experimental. Use at your own risk.",
category=ExperimentalClassWarning,
)
@dataclass
class AsyncRetriever(Retriever):
config: Config
Expand Down
Loading