Skip to content

Commit 25db09b

Browse files
authored
Merge pull request ClickHouse#90710 from ClickHouse/backport/25.8/90216
Backport ClickHouse#90216 to 25.8: retry network errors when s3 library parse xml response
2 parents 97c39ff + b07d66d commit 25db09b

File tree

9 files changed

+294
-114
lines changed

9 files changed

+294
-114
lines changed

src/IO/ReadBufferFromS3.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
429429
if (!version_id.empty())
430430
req.SetVersionId(version_id);
431431

432-
req.SetAdditionalCustomHeaderValue("clickhouse-request", fmt::format("attempt={}", attempt));
432+
S3::setClickhouseAttemptNumber(req, attempt);
433433

434434
if (range_end_incl)
435435
{

src/IO/S3/Client.cpp

Lines changed: 86 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <aws/core/utils/logging/ErrorMacros.h>
2020

2121
#include <Poco/Net/NetException.h>
22+
#include <Poco/Exception.h>
2223

2324
#include <IO/Expect404ResponseScope.h>
2425
#include <IO/S3/Requests.h>
@@ -493,37 +494,37 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
493494
Model::ListObjectsV2Outcome Client::ListObjectsV2(ListObjectsV2Request & request) const
494495
{
495496
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
496-
request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
497+
request, [this](Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
497498
}
498499

499500
Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) const
500501
{
501502
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
502-
request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
503+
request, [this](Model::ListObjectsRequest & req) { return ListObjects(req); });
503504
}
504505

505506
Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
506507
{
507508
return processRequestResult(
508-
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
509+
doRequest(request, [this](Model::GetObjectRequest & req) { return GetObject(req); }));
509510
}
510511

511512
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(AbortMultipartUploadRequest & request) const
512513
{
513514
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
514-
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
515+
request, [this](Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
515516
}
516517

517518
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(CreateMultipartUploadRequest & request) const
518519
{
519520
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
520-
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
521+
request, [this](Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
521522
}
522523

523524
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(CompleteMultipartUploadRequest & request) const
524525
{
525526
auto outcome = doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
526-
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
527+
request, [this](Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
527528

528529
const auto & key = request.GetKey();
529530
const auto & bucket = request.GetBucket();
@@ -570,42 +571,42 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(CompleteMu
570571
Model::CopyObjectOutcome Client::CopyObject(CopyObjectRequest & request) const
571572
{
572573
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
573-
request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
574+
request, [this](Model::CopyObjectRequest & req) { return CopyObject(req); });
574575
}
575576

576577
Model::PutObjectOutcome Client::PutObject(PutObjectRequest & request) const
577578
{
578579
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
579-
request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
580+
request, [this](Model::PutObjectRequest & req) { return PutObject(req); });
580581
}
581582

582583
Model::UploadPartOutcome Client::UploadPart(UploadPartRequest & request) const
583584
{
584585
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
585-
request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
586+
request, [this](Model::UploadPartRequest & req) { return UploadPart(req); });
586587
}
587588

588589
Model::UploadPartCopyOutcome Client::UploadPartCopy(UploadPartCopyRequest & request) const
589590
{
590591
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
591-
request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
592+
request, [this](Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
592593
}
593594

594595
Model::DeleteObjectOutcome Client::DeleteObject(DeleteObjectRequest & request) const
595596
{
596597
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
597-
request, [this](const Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); });
598+
request, [this](Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); });
598599
}
599600

600601
Model::DeleteObjectsOutcome Client::DeleteObjects(DeleteObjectsRequest & request) const
601602
{
602603
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
603-
request, [this](const Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); });
604+
request, [this](Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); });
604605
}
605606

606607
Client::ComposeObjectOutcome Client::ComposeObject(ComposeObjectRequest & request) const
607608
{
608-
auto request_fn = [this](const ComposeObjectRequest & req)
609+
auto request_fn = [this](ComposeObjectRequest & req)
609610
{
610611
auto & endpoint_provider = const_cast<Client &>(*this).accessEndpointProvider();
611612
AWS_OPERATION_CHECK_PTR(endpoint_provider, ComposeObject, Aws::Client::CoreErrors, Aws::Client::CoreErrors::ENDPOINT_RESOLUTION_FAILURE);
@@ -634,7 +635,7 @@ Client::ComposeObjectOutcome Client::ComposeObject(ComposeObjectRequest & reques
634635
}
635636

636637
template <typename RequestType, typename RequestFn>
637-
std::invoke_result_t<RequestFn, RequestType>
638+
std::invoke_result_t<RequestFn, RequestType &>
638639
Client::doRequest(RequestType & request, RequestFn request_fn) const
639640
{
640641
addAdditionalAMZHeadersToCanonicalHeadersList(request, client_configuration.extra_headers);
@@ -729,81 +730,100 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
729730
}
730731

731732
template <bool IsReadMethod, typename RequestType, typename RequestFn>
732-
std::invoke_result_t<RequestFn, RequestType>
733+
std::invoke_result_t<RequestFn, RequestType &>
733734
Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request_fn) const
734735
{
736+
/// S3 does retries network errors actually.
737+
/// But it does matter when errors occur.
738+
/// This code retries a specific case when
739+
/// network error happens when XML document is being read from the response body.
740+
/// Hence, the response body is a stream, network errors are possible at reading.
741+
/// S3 doesn't retry them.
742+
743+
/// Not all requests can be retried in that way.
744+
/// Requests that read out response body to build the result are possible to retry.
745+
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.
746+
735747
addAdditionalAMZHeadersToCanonicalHeadersList(request, client_configuration.extra_headers);
736-
auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_)
748+
auto with_retries = [this, request_fn_ = std::move(request_fn)] (RequestType & request_)
737749
{
738750
chassert(client_configuration.retryStrategy);
739751
const Int64 max_attempts = client_configuration.retry_strategy.max_retries + 1;
740-
chassert(max_attempts > 0);
741-
std::exception_ptr last_exception = nullptr;
742-
for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no)
752+
753+
Int64 attempt_no = 1;
754+
std::invoke_result_t<RequestFn, RequestType &> outcome;
755+
756+
auto net_exception_handler = [&]() -> bool /// return true if we should retry
757+
{
758+
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestsErrors, ProfileEvents::S3WriteRequestsErrors);
759+
if (isClientForDisk())
760+
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestsErrors, ProfileEvents::DiskS3WriteRequestsErrors);
761+
762+
tryLogCurrentException(log, fmt::format("Network error on S3 request, attempt {} of {}", attempt_no, max_attempts));
763+
764+
outcome = Aws::Client::AWSError<Aws::Client::CoreErrors>(
765+
Aws::Client::CoreErrors::NETWORK_CONNECTION,
766+
/*name*/ "",
767+
/*message*/ fmt::format("All {} retry attempts failed. Last exception: {}", max_attempts, getCurrentExceptionMessage(false)),
768+
/*retryable*/ true);
769+
770+
// network exceptions are always retryable, we could just return true here
771+
// but we have to check cancellation points for query, ShouldRetry method does it already
772+
return client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1);
773+
};
774+
775+
for (attempt_no = 1; attempt_no <= max_attempts; ++attempt_no)
743776
{
744777
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestAttempts, ProfileEvents::S3WriteRequestAttempts);
745778
if (isClientForDisk())
746779
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestAttempts, ProfileEvents::DiskS3WriteRequestAttempts);
747780

781+
if (attempt_no > 1)
782+
{
783+
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestRetryableErrors, ProfileEvents::S3WriteRequestRetryableErrors);
784+
if (isClientForDisk())
785+
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestRetryableErrors, ProfileEvents::DiskS3WriteRequestRetryableErrors);
786+
787+
// use previously attempt number to calculate delay
788+
updateNextTimeToRetryAfterRetryableError(outcome.GetError(), attempt_no - 1);
789+
790+
// update ClickHouse-specific attempt number in the request
791+
// to help choose the right timeouts on the HTTP client which depends on retry attempt number
792+
auto clickhouse_request_attempt = getClickhouseAttemptNumber(request_);
793+
setClickhouseAttemptNumber(request_, clickhouse_request_attempt + attempt_no);
794+
}
795+
748796
/// Slowing down due to a previously encountered retryable error, possibly from another thread.
749797
slowDownAfterRetryableError();
750798

751799
try
752800
{
753-
/// S3 does retries network errors actually.
754-
/// But it does matter when errors occur.
755-
/// This code retries a specific case when
756-
/// network error happens when XML document is being read from the response body.
757-
/// Hence, the response body is a stream, network errors are possible at reading.
758-
/// S3 doesn't retry them.
759-
760-
/// Not all requests can be retried in that way.
761-
/// Requests that read out response body to build the result are possible to retry.
762-
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.
763-
auto outcome = request_fn_(request_);
764-
765-
if (!outcome.IsSuccess()
766-
/// AWS SDK's built-in per-thread retry logic is disabled.
767-
&& client_configuration.s3_slow_all_threads_after_retryable_error
768-
&& attempt_no + 1 < max_attempts
769-
/// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored.
770-
&& client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1))
771-
{
772-
incrementProfileEvents<IsReadMethod>(
773-
ProfileEvents::S3ReadRequestRetryableErrors, ProfileEvents::S3WriteRequestRetryableErrors);
774-
if (isClientForDisk())
775-
incrementProfileEvents<IsReadMethod>(
776-
ProfileEvents::DiskS3ReadRequestRetryableErrors, ProfileEvents::DiskS3WriteRequestRetryableErrors);
777-
778-
updateNextTimeToRetryAfterRetryableError(outcome.GetError(), attempt_no);
779-
continue;
780-
}
781-
return outcome;
782-
}
783-
catch (Poco::Net::NetException &)
784-
{
785-
/// This includes "connection reset", "malformed message", and possibly other exceptions.
801+
outcome = request_fn_(request_);
786802

787-
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestsErrors, ProfileEvents::S3WriteRequestsErrors);
788-
if (isClientForDisk())
789-
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestsErrors, ProfileEvents::DiskS3WriteRequestsErrors);
790-
791-
tryLogCurrentException(log, "Will retry");
792-
last_exception = std::current_exception();
803+
if (outcome.IsSuccess())
804+
break;
793805

794-
auto error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true);
806+
// do not increment S3ReadRequestsErrors/S3WriteRequestsErrors here, it has been accounted in IO/S3/PocoHTTPClient.cpp
795807

796-
/// Check if query is canceled.
797808
/// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored.
798-
if (!client_configuration.retryStrategy->ShouldRetry(error, /*attemptedRetries*/ -1))
809+
if (!client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1))
810+
break;
811+
}
812+
catch (Poco::Net::NetException &)
813+
{
814+
/// This includes "connection reset", "malformed message", and possibly other exceptions.
815+
if (!net_exception_handler())
816+
break;
817+
}
818+
catch (Poco::TimeoutException &)
819+
{
820+
/// This includes "Timeout"
821+
if (!net_exception_handler())
799822
break;
800-
801-
updateNextTimeToRetryAfterRetryableError(error, attempt_no);
802823
}
803824
}
804825

805-
chassert(last_exception);
806-
std::rethrow_exception(last_exception);
826+
return outcome;
807827
};
808828

809829
return doRequest(request, with_retries);
@@ -844,7 +864,7 @@ void Client::updateNextTimeToRetryAfterRetryableError(Aws::Client::AWSError<Aws:
844864
{
845865
if (next_time_to_retry_after_retryable_error.compare_exchange_weak(stored_next_time, next_time_ms))
846866
{
847-
LOG_TRACE(log, "Updated next retry time to {} ms forward after retryable error with code {} ('{}')", sleep_ms, error.GetResponseCode(), error.GetMessage());
867+
LOG_TRACE(log, "Updated next retry time to {} ms forward after retryable error with code {}", sleep_ms, error.GetResponseCode());
848868
break;
849869
}
850870
}

src/IO/S3/Client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,11 @@ class Client : private Aws::S3::S3Client
278278
ComposeObjectOutcome ComposeObject(ComposeObjectRequest & request) const;
279279

280280
template <typename RequestType, typename RequestFn>
281-
std::invoke_result_t<RequestFn, RequestType>
281+
std::invoke_result_t<RequestFn, RequestType &>
282282
doRequest(RequestType & request, RequestFn request_fn) const;
283283

284284
template <bool IsReadMethod, typename RequestType, typename RequestFn>
285-
std::invoke_result_t<RequestFn, RequestType>
285+
std::invoke_result_t<RequestFn, RequestType &>
286286
doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request_fn) const;
287287

288288
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;

src/IO/S3/PocoHTTPClient.cpp

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
#if USE_AWS_S3
77

88
#include <IO/S3/PocoHTTPClient.h>
9+
#include <IO/S3/Requests.h>
910

10-
#include <utility>
1111
#include <algorithm>
1212
#include <functional>
1313

@@ -384,30 +384,6 @@ void PocoHTTPClient::observeLatency(const Aws::Http::HttpRequest & request, S3La
384384
}
385385
}
386386

387-
String extractAttemptFromInfo(const Aws::String & request_info)
388-
{
389-
static auto key = Aws::String("attempt=");
390-
391-
auto key_begin = request_info.find(key, 0);
392-
if (key_begin == Aws::String::npos)
393-
return "1";
394-
395-
auto val_begin = key_begin + key.size();
396-
auto val_end = request_info.find(';', val_begin);
397-
if (val_end == Aws::String::npos)
398-
val_end = request_info.size();
399-
400-
return request_info.substr(val_begin, val_end-val_begin);
401-
}
402-
403-
String getOrEmpty(const Aws::Http::HeaderValueCollection & map, const String & key)
404-
{
405-
auto it = map.find(key);
406-
if (it == map.end())
407-
return {};
408-
return it->second;
409-
}
410-
411387
ConnectionTimeouts PocoHTTPClient::getTimeouts(const String & method, bool first_attempt, bool first_byte) const
412388
{
413389
if (!s3_use_adaptive_timeouts)
@@ -444,12 +420,12 @@ String getMethod(const Aws::Http::HttpRequest & request)
444420
}
445421
}
446422

447-
PocoHTTPClient::S3LatencyType PocoHTTPClient::getFirstByteLatencyType(const String & sdk_attempt, const String & ch_attempt)
423+
PocoHTTPClient::S3LatencyType PocoHTTPClient::getFirstByteLatencyType(size_t sdk_attempt, size_t ch_attempt)
448424
{
449425
S3LatencyType result = S3LatencyType::FirstByteAttempt1;
450-
if (sdk_attempt != "1" || ch_attempt != "1")
426+
if (sdk_attempt != 1 || ch_attempt != 1)
451427
{
452-
if ((sdk_attempt == "1" && ch_attempt == "2") || (sdk_attempt == "2" && ch_attempt == "1"))
428+
if ((sdk_attempt == 1 && ch_attempt == 2) || (sdk_attempt == 2 && ch_attempt == 1))
453429
result = S3LatencyType::FirstByteAttempt2;
454430
else
455431
result = S3LatencyType::FirstByteAttemptN;
@@ -468,12 +444,20 @@ void PocoHTTPClient::makeRequestInternalImpl(
468444
auto uri = request.GetUri().GetURIString();
469445
auto method = getMethod(request);
470446

471-
auto sdk_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), Aws::Http::SDK_REQUEST_HEADER));
472-
auto ch_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), "clickhouse-request"));
473-
bool first_attempt = ch_attempt == "1" && sdk_attempt == "1";
474-
475-
if (enable_s3_requests_logging)
476-
LOG_TEST(log, "Make request to: {}, aws sdk attempt: {}, clickhouse attempt: {}", uri, sdk_attempt, ch_attempt);
447+
auto sdk_attempt = getSDKAttemptNumber(request);
448+
auto ch_attempt = getClickhouseAttemptNumber(request);
449+
bool first_attempt = ch_attempt == 1 && sdk_attempt == 1;
450+
451+
if (!first_attempt)
452+
LOG_DEBUG(
453+
log,
454+
"Retrying S3 request to: {}, aws sdk attempt: {}, clickhouse attempt: {}, kind: {}",
455+
uri, sdk_attempt, ch_attempt, getMetricKind(request) == S3MetricKind::Read ? "Read" : "Write");
456+
else // if (enable_s3_requests_logging)
457+
LOG_TEST(
458+
log,
459+
"Make S3 request to: {}, aws sdk attempt: {}, clickhouse attempt: {}, kind: {}",
460+
uri, sdk_attempt, ch_attempt, getMetricKind(request) == S3MetricKind::Read ? "Read" : "Write");
477461

478462
switch (request.GetMethod())
479463
{

src/IO/S3/PocoHTTPClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ class PocoHTTPClient : public Aws::Http::HttpClient
196196
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
197197
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
198198

199-
static S3LatencyType getFirstByteLatencyType(const String & sdk_attempt, const String & ch_attempt);
199+
static S3LatencyType getFirstByteLatencyType(size_t sdk_attempt, size_t ch_attempt);
200200

201201
protected:
202202
virtual void makeRequestInternal(

0 commit comments

Comments
 (0)