Skip to content

Commit fab8033

Browse files
committed
make download_target_extractor optional
1 parent e33fd59 commit fab8033

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3603,11 +3603,15 @@ def _get_job_timeout() -> datetime.timedelta:
36033603
status_extractor = self._create_component_from_model(
36043604
model=model.status_extractor, decoder=decoder, config=config, name=name
36053605
)
3606-
download_target_extractor = self._create_component_from_model(
3607-
model=model.download_target_extractor,
3608-
decoder=decoder,
3609-
config=config,
3610-
name=name,
3606+
download_target_extractor = (
3607+
self._create_component_from_model(
3608+
model=model.download_target_extractor,
3609+
decoder=decoder,
3610+
config=config,
3611+
name=name,
3612+
)
3613+
if model.download_target_extractor
3614+
else None
36113615
)
36123616

36133617
job_repository: AsyncJobRepository = AsyncHttpJobRepository(

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4343
delete_requester: Optional[Requester]
4444
status_extractor: DpathExtractor
4545
status_mapping: Mapping[str, AsyncJobStatus]
46-
download_target_extractor: DpathExtractor
46+
download_target_extractor: Optional[DpathExtractor]
4747

4848
# timeout for the job to be completed, passed from `polling_job_timeout`
4949
job_timeout: Optional[timedelta] = None
@@ -213,14 +213,14 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
213213
214214
"""
215215

216-
for target_url in self._get_download_targets(job):
216+
for download_target in self._get_download_targets(job):
217217
job_slice = job.job_parameters()
218218
stream_slice = StreamSlice(
219219
partition=job_slice.partition,
220220
cursor_slice=job_slice.cursor_slice,
221221
extra_fields={
222222
**job_slice.extra_fields,
223-
"download_target": target_url,
223+
"download_target": download_target,
224224
"creation_response": self._get_creation_response_interpolation_context(job),
225225
"polling_response": self._get_polling_response_interpolation_context(job),
226226
},
@@ -332,9 +332,18 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
332332
)
333333

334334
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
335-
if not self.download_target_requester:
336-
url_response = self._polling_job_response_by_id[job.api_job_id()]
337-
else:
335+
"""Returns an iterable of strings to help target requests for downloading async jobs."""
336+
# If neither download_target_extractor nor download_target_requester are provided,return a single empty string
337+
# to express the need to make a single download request without any download_target value
338+
if not self.download_target_extractor and not self.download_target_requester:
339+
lazy_log(
340+
LOGGER,
341+
logging.DEBUG,
342+
lambda: "No download_target_extractor or download_target_requester provided. Using fallback behavior for single download request without download_target.",
343+
)
344+
return [""]
345+
346+
if self.download_target_requester:
338347
stream_slice: StreamSlice = StreamSlice(
339348
partition={},
340349
cursor_slice={},
@@ -348,5 +357,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
348357
internal_message="Always expect a response or an exception from download_target_requester",
349358
failure_type=FailureType.system_error,
350359
)
351-
360+
else:
361+
# if no download_target_requester is provided, we extract directly from the polling response
362+
url_response = self._polling_job_response_by_id[job.api_job_id()]
363+
352364
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings

0 commit comments

Comments
 (0)