Skip to content

Commit ea3e1b9

Browse files
committed
add state variable to prevent race condition
1 parent 69ff5b6 commit ea3e1b9

File tree

1 file changed

+33
-26
lines changed

1 file changed

+33
-26
lines changed

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -186,25 +186,27 @@ class AsyncWriterConnectionResumedState
186186
}
187187

188188
void StartWriting(std::unique_lock<std::mutex> lk) {
189-
if (writing_) return;
189+
if (state_ != State::kIdle) return;
190190
WriteLoop(std::move(lk));
191191
}
192192

193193
void WriteLoop(std::unique_lock<std::mutex> lk) {
194-
if (writing_) return;
194+
if (state_ != State::kIdle) return;
195+
195196
// Determine if there's data left to write *before* potentially finalizing.
196-
writing_ = write_offset_ < resend_buffer_.size();
197+
auto const has_data = write_offset_ < resend_buffer_.size();
197198

198199
// If we are writing data, continue doing so.
199-
if (writing_) {
200+
if (has_data) {
201+
state_ = State::kWriting;
200202
// Still data to write, determine the next chunk.
201203
auto const n = resend_buffer_.size() - write_offset_;
202204
auto payload = resend_buffer_.Subcord(write_offset_, n);
203205
if (flush_) return FlushStep(std::move(lk), std::move(payload));
204206
return WriteStep(std::move(lk), std::move(payload));
205207
}
206208

207-
// No data left to write (writing_ is false).
209+
// No data left to write.
208210
// Check if we need to finalize (only if not already writing data AND not
209211
// already finalizing).
210212
if (finalize_ && !finalizing_) {
@@ -213,23 +215,24 @@ class AsyncWriterConnectionResumedState
213215
}
214216
// If not finalizing, check if an empty flush is needed.
215217
if (flush_) {
216-
writing_ = true;
217-
// Pass empty payload to FlushStep
218+
state_ = State::kWriting;
219+
// Pass empty payload to FlushStep
218220
return FlushStep(std::move(lk), absl::Cord{});
219221
}
220222

221223
// No data to write, not finalizing, not flushing. The loop can stop.
222-
// writing_ is already false.
224+
state_ = State::kIdle;
223225
}
224226

225227
// FinalizeStep is now called only when all data in resend_buffer_ is written.
226228
void FinalizeStep(std::unique_lock<std::mutex> lk) {
227229
// Check *under lock* if we are already finalizing.
228-
if (finalizing_) {
230+
if (finalizing_ || state_ != State::kIdle) {
229231
// If another thread initiated FinalizeStep concurrently, just return.
230232
return;
231233
}
232234
// Mark that we are starting the finalization process.
235+
state_ = State::kWriting;
233236
finalizing_ = true;
234237
auto impl = Impl(lk);
235238
lk.unlock();
@@ -306,8 +309,8 @@ class AsyncWriterConnectionResumedState
306309
buffer_offset_ = persisted_size;
307310
write_offset_ -= static_cast<std::size_t>(n);
308311
// If the buffer is small enough, collect all the handlers to notify them.
309-
auto const handlers = ClearHandlersIfEmpty(lk);
310-
writing_ = false;
312+
auto const handlers = ClearHandlersIfEmpty(lk);
313+
state_ = State::kIdle;
311314
StartWriting(std::move(lk));
312315
// The notifications are deferred until the lock is released, as they might
313316
// call back and try to acquire the lock.
@@ -330,7 +333,7 @@ class AsyncWriterConnectionResumedState
330333
if (!result.ok()) return Resume(std::move(result));
331334
std::unique_lock<std::mutex> lk(mu_);
332335
write_offset_ += write_size;
333-
writing_ = false;
336+
state_ = State::kIdle;
334337
return StartWriting(std::move(lk));
335338
}
336339

@@ -349,8 +352,9 @@ class AsyncWriterConnectionResumedState
349352
}
350353
// Include write_handle to enable fast resume instead of slow
351354
// takeover. Without handle, server performs full state validation.
352-
if (latest_write_handle_) {
353-
*append_object_spec.mutable_write_handle() = *latest_write_handle_;
355+
if (first_response_.has_write_handle()) {
356+
*append_object_spec.mutable_write_handle() =
357+
first_response_.write_handle();
354358
}
355359
append_object_spec.set_generation(first_response_.resource().generation());
356360
ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status));
@@ -359,9 +363,11 @@ class AsyncWriterConnectionResumedState
359363
bool was_finalizing;
360364
{
361365
std::unique_lock<std::mutex> lk(mu_);
362-
was_finalizing = finalizing_;
363-
writing_ = false;
366+
if (state_ == State::kResuming) return;
367+
was_finalizing = finalizing_;
368+
state_ = State::kResuming;
364369
if (!s.ok() && cancelled_) {
370+
state_ = State::kIdle;
365371
return SetError(std::move(lk), std::move(s));
366372
}
367373
}
@@ -376,16 +382,13 @@ class AsyncWriterConnectionResumedState
376382
void OnResume(Status const& original_status, bool was_finalizing,
377383
StatusOr<WriteObject::WriteResult> res) {
378384
std::unique_lock<std::mutex> lk(mu_);
379-
// Update write_handle from any resume response that contains it.
380-
if (res && res->first_response.has_write_handle()) {
381-
latest_write_handle_ = res->first_response.write_handle();
382-
}
383385

384386
if (was_finalizing) {
385387
// If resuming due to a finalization error, we *must* complete the
386388
// finalized_ promise now, based on the resume attempt's outcome.
387389
if (!res) {
388390
// The resume attempt itself failed. Use that error.
391+
state_ = State::kIdle;
389392
return SetError(std::move(lk), std::move(res).status());
390393
}
391394
// Resume attempt succeeded, check the persisted state.
@@ -401,12 +404,14 @@ class AsyncWriterConnectionResumedState
401404
// Use the original status that triggered the resume. Reset finalizing_
402405
// before setting the error, as the attempt is now over.
403406
finalizing_ = false;
407+
state_ = State::kIdle;
404408
return SetError(std::move(lk), std::move(original_status));
405409
}
406410

407411
// Resume was *not* triggered by finalization failure.
408412
if (!res) {
409413
// Regular resume attempt failed.
414+
state_ = State::kIdle;
410415
return SetError(std::move(lk), std::move(res).status());
411416
}
412417
// Regular resume attempt succeeded. Check state.
@@ -434,7 +439,7 @@ class AsyncWriterConnectionResumedState
434439
void SetFinalized(std::unique_lock<std::mutex> lk,
435440
google::storage::v2::Object object) {
436441
resend_buffer_.Clear();
437-
writing_ = false;
442+
state_ = State::kIdle;
438443
finalize_ = false;
439444
finalizing_ = false; // Reset finalizing flag
440445
flush_ = false;
@@ -482,7 +487,7 @@ class AsyncWriterConnectionResumedState
482487

483488
void SetError(std::unique_lock<std::mutex> lk, Status const& status) {
484489
resume_status_ = status;
485-
writing_ = false;
490+
state_ = State::kIdle;
486491
finalize_ = false;
487492
finalizing_ = false; // Reset finalizing flag
488493
flush_ = false;
@@ -605,7 +610,12 @@ class AsyncWriterConnectionResumedState
605610
std::vector<std::unique_ptr<BufferShrinkHandler>> flush_handlers_;
606611

607612
// True if the writing loop is activate.
608-
bool writing_ = false;
613+
enum class State {
614+
kIdle,
615+
kWriting,
616+
kResuming,
617+
};
618+
State state_ = State::kIdle;
609619

610620
// True if cancelled, in which case any RPC failures are final.
611621
bool cancelled_ = false;
@@ -615,9 +625,6 @@ class AsyncWriterConnectionResumedState
615625

616626
// Tracks if the final promise (`finalized_`) has been completed.
617627
bool finalized_promise_completed_ = false;
618-
619-
// Track the latest write handle seen in responses.
620-
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
621628
};
622629

623630
/**

0 commit comments

Comments
 (0)