Skip to content

Commit 18e636c

Browse files
author
Oleksandr Bazarnov
committed
fix
1 parent f2c429c commit 18e636c

File tree

1 file changed

+44
-4
lines changed

1 file changed

+44
-4
lines changed

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,24 +273,64 @@ def _clean_up_job(self, job_id: str) -> None:
273273
del self._create_job_response_by_id[job_id]
274274
del self._polling_job_response_by_id[job_id]
275275

276+
def _get_creation_response_interpolation_context(self, job: AsyncJob) -> Dict[str, Any]:
277+
"""
278+
Returns the interpolation context for the creation response.
279+
280+
Args:
281+
job (AsyncJob): The job for which to get the creation response interpolation context.
282+
283+
Returns:
284+
Dict[str, Any]: The interpolation context as a dictionary.
285+
"""
286+
creation_response_context = self._create_job_response_by_id[job.api_job_id()].json()
287+
creation_response_context["headers"] = self._create_job_response_by_id[
288+
job.api_job_id()
289+
].headers
290+
creation_response_context["request"] = self._create_job_response_by_id[
291+
job.api_job_id()
292+
].request
293+
return dict(creation_response_context)
294+
295+
def _get_polling_response_interpolation_context(self, job: AsyncJob) -> Dict[str, Any]:
296+
"""
297+
Returns the interpolation context for the polling response.
298+
299+
Args:
300+
job (AsyncJob): The job for which to get the polling response interpolation context.
301+
302+
Returns:
303+
Dict[str, Any]: The interpolation context as a dictionary.
304+
"""
305+
polling_response_context = self._polling_job_response_by_id[job.api_job_id()].json()
306+
polling_response_context["headers"] = self._polling_job_response_by_id[
307+
job.api_job_id()
308+
].headers
309+
polling_response_context["request"] = self._polling_job_response_by_id[
310+
job.api_job_id()
311+
].request
312+
return dict(polling_response_context)
313+
276314
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
277-
creation_response = self._create_job_response_by_id[job.api_job_id()].json()
278315
stream_slice = StreamSlice(
279316
partition={},
280317
cursor_slice={},
281-
extra_fields={"creation_response": creation_response},
318+
extra_fields={
319+
"creation_response": self._get_creation_response_interpolation_context(job),
320+
},
282321
)
283322
return stream_slice
284323

285324
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
286325
if not self.download_target_requester:
287326
url_response = self._polling_job_response_by_id[job.api_job_id()]
288327
else:
289-
polling_response = self._polling_job_response_by_id[job.api_job_id()].json()
290328
stream_slice: StreamSlice = StreamSlice(
291329
partition={},
292330
cursor_slice={},
293-
extra_fields={"polling_response": polling_response},
331+
extra_fields={
332+
"polling_response": self._get_polling_response_interpolation_context(job),
333+
},
294334
)
295335
url_response = self.download_target_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect download_target_requester to always be presented, otherwise raise an exception as we cannot proceed with the report
296336
if not url_response:

0 commit comments

Comments
 (0)