Skip to content

Commit 8396e1d

Browse files
dopieracoryan
authored andcommitted
fix: use next_expected_byte() in retried uploads (#3037)
1 parent 563a3ca commit 8396e1d

File tree

3 files changed

+587
-51
lines changed

3 files changed

+587
-51
lines changed

google/cloud/storage/internal/retry_resumable_upload_session.cc

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,38 +38,66 @@ StatusOr<ResumableUploadResponse> ReturnError(Status&& last_status,
3838

3939
StatusOr<ResumableUploadResponse> RetryResumableUploadSession::UploadChunk(
4040
std::string const& buffer) {
41-
Status last_status(StatusCode::kDeadlineExceeded,
42-
"Retry policy exhausted before first attempt was made.");
43-
while (!retry_policy_->IsExhausted()) {
44-
auto result = session_->UploadChunk(buffer);
45-
if (result.ok()) {
46-
return result;
47-
}
48-
last_status = std::move(result).status();
49-
if (!retry_policy_->OnFailure(last_status)) {
50-
return ReturnError(std::move(last_status), *retry_policy_, __func__);
51-
}
52-
auto delay = backoff_policy_->OnCompletion();
53-
std::this_thread::sleep_for(delay);
54-
55-
result = ResetSession();
56-
if (!result.ok()) {
57-
return result;
58-
}
59-
}
60-
std::ostringstream os;
61-
os << "Retry policy exhausted in " << __func__ << ": " << last_status;
62-
return Status(last_status.code(), os.str());
41+
return UploadGenericChunk(buffer, optional<std::uint64_t>());
6342
}
6443

6544
StatusOr<ResumableUploadResponse> RetryResumableUploadSession::UploadFinalChunk(
6645
std::string const& buffer, std::uint64_t upload_size) {
46+
return UploadGenericChunk(buffer, upload_size);
47+
}
48+
49+
StatusOr<ResumableUploadResponse>
50+
RetryResumableUploadSession::UploadGenericChunk(
51+
std::string const& buffer, optional<std::uint64_t> const& upload_size) {
52+
bool const is_final_chunk = upload_size.has_value();
53+
char const* const func = is_final_chunk ? "UploadFinalChunk" : "UploadChunk";
54+
std::uint64_t next_byte = session_->next_expected_byte();
6755
Status last_status(StatusCode::kDeadlineExceeded,
6856
"Retry policy exhausted before first attempt was made.");
57+
// On occasion, we might need to retry uploading only a part of the buffer.
58+
// The current APIs require us to copy the buffer in such a scenario. We can
59+
// and want to avoid the copy in the common case, so we use this variable to
60+
// either reference the copy or the original buffer.
61+
// TODO(#3036): change the APIs to avoid this extra copy.
62+
std::string const* buffer_to_use = &buffer;
63+
std::string truncated_buffer;
6964
while (!retry_policy_->IsExhausted()) {
70-
auto result = session_->UploadFinalChunk(buffer, upload_size);
65+
std::uint64_t new_next_byte = session_->next_expected_byte();
66+
if (new_next_byte < next_byte) {
67+
std::stringstream os;
68+
os << func << ": server has not confirmed bytes which we no longer hold ("
69+
<< new_next_byte << "-" << next_byte
70+
<< "). This is most likely a bug in the GCS client. Please report it "
71+
"at https://github.com/googleapis/google-cloud-cpp/issues/new";
72+
return Status(StatusCode::kInternal, os.str());
73+
}
74+
if (new_next_byte > next_byte) {
75+
truncated_buffer = buffer_to_use->substr(new_next_byte - next_byte);
76+
buffer_to_use = &truncated_buffer;
77+
next_byte = new_next_byte;
78+
}
79+
auto result = is_final_chunk
80+
? session_->UploadFinalChunk(*buffer_to_use, *upload_size)
81+
: session_->UploadChunk(*buffer_to_use);
7182
if (result.ok()) {
72-
return result;
83+
if (is_final_chunk &&
84+
result->upload_state == ResumableUploadResponse::kDone) {
85+
// If it's a final chunk and it succeded, return.
86+
return result;
87+
}
88+
if (next_expected_byte() - next_byte == buffer_to_use->size()) {
89+
// Otherwise, return only if there were no failures and it wasn't a
90+
// short write.
91+
return result;
92+
}
93+
std::stringstream os;
94+
os << "Short write. Previous next_byte=" << next_byte
95+
<< ", current next_byte=" << next_expected_byte()
96+
<< ", inteded to write " << buffer_to_use->size();
97+
last_status = Status(StatusCode::kUnavailable, os.str());
98+
// Don't reset the session on a short write nor wait according to the
99+
// backoff policy - we did get a response from the server after all.
100+
continue;
73101
}
74102
last_status = std::move(result).status();
75103
if (!retry_policy_->OnFailure(last_status)) {
@@ -84,7 +112,7 @@ StatusOr<ResumableUploadResponse> RetryResumableUploadSession::UploadFinalChunk(
84112
}
85113
}
86114
std::ostringstream os;
87-
os << "Retry policy exhausted in " << __func__ << ": " << last_status;
115+
os << "Retry policy exhausted in " << func << ": " << last_status;
88116
return Status(last_status.code(), os.str());
89117
}
90118

google/cloud/storage/internal/retry_resumable_upload_session.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ class RetryResumableUploadSession : public ResumableUploadSession {
5252
StatusOr<ResumableUploadResponse> const& last_response() const override;
5353

5454
private:
55+
// Retry either UploadChunk or either UploadFinalChunk.
56+
StatusOr<ResumableUploadResponse> UploadGenericChunk(
57+
std::string const& buffer, optional<std::uint64_t> const& upload_size);
5558
std::unique_ptr<ResumableUploadSession> session_;
5659
std::unique_ptr<RetryPolicy> retry_policy_;
5760
std::unique_ptr<BackoffPolicy> backoff_policy_;

0 commit comments

Comments
 (0)