@@ -653,6 +653,9 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
653653 std::exception_ptr last_exception = nullptr ;
654654 for (Int64 attempt_no = 0 ; attempt_no < max_attempts; ++attempt_no)
655655 {
656+ // / Sometimes we need to slow down because other requests failed with network errors to free the S3 server a bit.
657+ slowDownAfterNetworkError ();
658+
656659 try
657660 {
658661 // / S3 does retries network errors actually.
@@ -695,10 +698,7 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
695698 if (!client_configuration.retryStrategy ->ShouldRetry (error, attempt_no))
696699 break ;
697700
698- auto sleep_ms = client_configuration.retryStrategy ->CalculateDelayBeforeNextRetry (error, attempt_no);
699- LOG_WARNING (log, " Request failed, now waiting {} ms before attempting again" , sleep_ms);
700- sleepForMilliseconds (sleep_ms);
701-
701+ sleepAfterNetworkError (error, attempt_no);
702702 continue ;
703703 }
704704 }
@@ -730,6 +730,44 @@ RequestResult Client::processRequestResult(RequestResult && outcome) const
730730 return RequestResult (error);
731731}
732732
733+ void Client::sleepAfterNetworkError (Aws::Client::AWSError<Aws::Client::CoreErrors> error, Int64 attempt_no) const
734+ {
735+ auto sleep_ms = client_configuration.retryStrategy ->CalculateDelayBeforeNextRetry (error, attempt_no);
736+ if (!client_configuration.s3_slow_all_threads_after_network_error )
737+ {
738+ LOG_WARNING (log, " Request failed, now waiting {} ms before attempting again" , sleep_ms);
739+ sleepForMilliseconds (sleep_ms);
740+ return ;
741+ }
742+
743+ // / Set the time other s3 requests must wait until.
744+ UInt64 current_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now ().time_since_epoch ()).count ();
745+ UInt64 next_time_ms = current_time_ms + sleep_ms;
746+ // / next_time_to_retry_after_network_error = std::max(next_time_to_retry_after_network_error, next_time_ms)
747+ for (UInt64 stored_next_time = next_time_to_retry_after_network_error;
748+ (stored_next_time < next_time_ms) && !next_time_to_retry_after_network_error.compare_exchange_weak (stored_next_time, next_time_ms);)
749+ {
750+ }
751+ }
752+
753+ void Client::slowDownAfterNetworkError () const
754+ {
755+ if (!client_configuration.s3_slow_all_threads_after_network_error )
756+ return ;
757+
758+ // / Wait until `next_time_to_retry_after_network_error`.
759+ for (;;)
760+ {
761+ UInt64 current_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now ().time_since_epoch ()).count ();
762+ UInt64 next_time_ms = next_time_to_retry_after_network_error.load ();
763+ if (current_time_ms >= next_time_ms)
764+ break ;
765+ UInt64 sleep_ms = next_time_ms - current_time_ms;
766+ LOG_WARNING (log, " Some request failed, now waiting {} ms before executing a request" , sleep_ms);
767+ sleepForMilliseconds (sleep_ms);
768+ }
769+ }
770+
733771bool Client::supportsMultiPartCopy () const
734772{
735773 return provider_type != ProviderType::GCS;
@@ -990,6 +1028,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
9901028 const RemoteHostFilter & remote_host_filter,
9911029 unsigned int s3_max_redirects,
9921030 unsigned int s3_retry_attempts,
1031+ bool s3_slow_all_threads_after_network_error,
9931032 bool enable_s3_requests_logging,
9941033 bool for_disk_s3,
9951034 const ThrottlerPtr & get_request_throttler,
@@ -1009,6 +1048,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
10091048 remote_host_filter,
10101049 s3_max_redirects,
10111050 s3_retry_attempts,
1051+ s3_slow_all_threads_after_network_error,
10121052 enable_s3_requests_logging,
10131053 for_disk_s3,
10141054 context->getGlobalContext ()->getSettingsRef ()[Setting::s3_use_adaptive_timeouts],
0 commit comments