diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 1cb53104b..967a71ccd 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3333,7 +3333,7 @@ definitions: items: type: string AsyncRetriever: - description: "[Experimental - We expect the interface to change shortly and we reserve the right to not consider this a breaking change] Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." + description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." type: object required: - type @@ -3381,6 +3381,13 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + polling_job_timeout: + description: The time in minutes after which the single Async Job should be considered as Timed Out. + anyOf: + - type: integer + - type: string + interpolation_context: + - config download_target_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job. anyOf: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e4fb459ff..480fa51c7 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1467,7 +1467,7 @@ class AddFields(BaseModel): ) condition: Optional[str] = Field( "", - description="Fields will be added if expression is evaluated to True.,", + description="Fields will be added if expression is evaluated to True.", examples=[ "{{ property|string == '' }}", "{{ property is integer }}", @@ -2354,6 +2354,10 @@ class AsyncRetriever(BaseModel): ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.", ) + polling_job_timeout: Optional[Union[int, str]] = Field( + None, + description="The time in minutes after which the single Async Job should be considered as Timed Out.", + ) download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 86e880b20..1501aa676 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -507,7 +507,7 @@ IncrementingCountStreamStateConverter, ) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction -from airbyte_cdk.sources.types import Config, ConnectionDefinition +from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer ComponentDefinition = Mapping[str, Any] @@ -2939,6 +2939,27 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie parameters={}, ) + def _get_job_timeout() -> datetime.timedelta: + user_defined_timeout: Optional[int] = ( + int( + InterpolatedString.create( + str(model.polling_job_timeout), + parameters={}, + ).eval(config) + ) + if model.polling_job_timeout + else None + ) + + # check for user defined timeout during the test read or 15 minutes + test_read_timeout = datetime.timedelta(minutes=user_defined_timeout or 15) + # default value for non-connector builder is 60 minutes. + default_sync_timeout = datetime.timedelta(minutes=user_defined_timeout or 60) + + return ( + test_read_timeout if self._emit_connector_builder_messages else default_sync_timeout + ) + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder @@ -3032,6 +3053,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie config=config, name=name, ) + job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, polling_requester=polling_requester, @@ -3042,6 +3064,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie status_extractor=status_extractor, status_mapping=self._create_async_job_status_mapping(model.status_mapping, config), download_target_extractor=download_target_extractor, + job_timeout=_get_job_timeout(), ) async_job_partition_router = AsyncJobPartitionRouter( diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 28e9528ea..b06d82f5f 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -45,7 +45,9 @@ class AsyncHttpJobRepository(AsyncJobRepository): status_mapping: Mapping[str, AsyncJobStatus] download_target_extractor: DpathExtractor + # timeout for the job to be completed, passed from `polling_job_timeout` job_timeout: Optional[timedelta] = None + record_extractor: RecordExtractor = field( init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({}) ) @@ -131,7 +133,7 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request log_formatter=lambda response: format_http_message( response=response, title="Async Job -- Create", - description="Create the server-side async job.", + description=f"Create the server-side async job. Timeout after: {self.job_timeout}", stream_name=None, is_auxiliary=True, type="ASYNC_CREATE", diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index c0728d438..33a288c43 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -4,24 +4,17 @@ from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, Mapping, Optional -from typing_extensions import deprecated - from airbyte_cdk.sources.declarative.async_job.job import AsyncJob from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( AsyncJobPartitionRouter, ) from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever -from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Config, StreamSlice, StreamState from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) @dataclass class AsyncRetriever(Retriever): config: Config