Skip to content

Commit ad606dc

Browse files
committed
add slice to logging
1 parent 820be29 commit ad606dc

File tree

4 files changed

+13
-4
lines changed

4 files changed

+13
-4
lines changed

airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ def send_request(
473473
dedupe_query_params=True,
474474
log_formatter=log_formatter,
475475
exit_on_rate_limit=self._exit_on_rate_limit,
476+
stream_slice=stream_slice,
476477
)
477478

478479
return response

airbyte_cdk/sources/streams/http/http.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ def _fetch_next_page(
546546
dedupe_query_params=True,
547547
log_formatter=self.get_log_formatter(),
548548
exit_on_rate_limit=self.exit_on_rate_limit,
549+
stream_slice=stream_slice
549550
)
550551

551552
return request, response

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ def _send_with_retry(
243243
request_kwargs: Mapping[str, Any],
244244
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
245245
exit_on_rate_limit: Optional[bool] = False,
246+
stream_slice: Optional[Mapping[str, Any]] = None,
246247
) -> requests.Response:
247248
"""
248249
Sends a request with retry logic.
@@ -259,9 +260,9 @@ def _send_with_retry(
259260
max_tries = max(0, max_retries) + 1
260261
max_time = self._max_time
261262

262-
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
263-
self._send
264-
)
263+
user_backoff_handler = user_defined_backoff_handler(
264+
max_tries=max_tries, max_time=max_time, stream_slice=stream_slice
265+
)(self._send)
265266
rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
266267
backoff_handler = http_client_default_backoff_handler(
267268
max_tries=max_tries, max_time=max_time
@@ -506,6 +507,7 @@ def send_request(
506507
dedupe_query_params: bool = False,
507508
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
508509
exit_on_rate_limit: Optional[bool] = False,
510+
stream_slice: Optional[Mapping[str, Any]] = None,
509511
) -> Tuple[requests.PreparedRequest, requests.Response]:
510512
"""
511513
Prepares and sends request and return request and response objects.
@@ -526,6 +528,7 @@ def send_request(
526528
request_kwargs=request_kwargs,
527529
log_formatter=log_formatter,
528530
exit_on_rate_limit=exit_on_rate_limit,
531+
stream_slice=stream_slice
529532
)
530533

531534
return request, response

airbyte_cdk/sources/streams/http/rate_limiting.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def should_give_up(exc: Exception) -> bool:
103103
def user_defined_backoff_handler(
104104
max_tries: Optional[int],
105105
max_time: Optional[int] = None,
106+
stream_slice: Optional[Mapping[str, Any]] = None,
106107
**kwargs: Any,
107108
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
108109
def sleep_on_ratelimit(details: Mapping[str, Any]) -> None:
@@ -114,8 +115,11 @@ def sleep_on_ratelimit(details: Mapping[str, Any]) -> None:
114115
)
115116
retry_after = exc.backoff
116117
# include logging og the current time to help with debugging
118+
logging_message = f"Retrying. Sleeping for {retry_after} seconds at {time.strftime('%Y-%m-%d %H:%M:%S')}"
119+
if stream_slice:
120+
logging_message += f" for slice: {stream_slice}"
117121
logger.info(
118-
f"Retrying. Sleeping for {retry_after} seconds at {time.strftime('%Y-%m-%d %H:%M:%S')}"
122+
logging_message
119123
)
120124
time.sleep(retry_after + 1) # extra second to cover any fractions of second
121125

0 commit comments

Comments
 (0)