Skip to content

Commit 6b5569a

Browse files
committed
fix(storage): gRPC misuse causing crashes due to concurrent writes from Flush() and Write()
1 parent c03c99b commit 6b5569a

File tree

2 files changed

+111
-21
lines changed

2 files changed

+111
-21
lines changed

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class AsyncWriterConnectionResumedState
184184
}
185185

186186
void WriteLoop(std::unique_lock<std::mutex> lk) {
187+
if (writing_) return;
187188
// Determine if there's data left to write *before* potentially finalizing.
188189
writing_ = write_offset_ < resend_buffer_.size();
189190

@@ -205,7 +206,8 @@ class AsyncWriterConnectionResumedState
205206
}
206207
// If not finalizing, check if an empty flush is needed.
207208
if (flush_) {
208-
// Pass empty payload to FlushStep
209+
writing_ = true;
210+
// Pass empty payload to FlushStep
209211
return FlushStep(std::move(lk), absl::Cord{});
210212
}
211213

@@ -256,8 +258,10 @@ class AsyncWriterConnectionResumedState
256258
auto impl = Impl(lk);
257259
lk.unlock();
258260
impl->Query().then([this, result, w = WeakFromThis()](auto f) {
259-
SetFlushed(std::unique_lock<std::mutex>(mu_), std::move(result));
260-
if (auto self = w.lock()) return self->OnQuery(f.get());
261+
auto self = w.lock();
262+
if (!self) return;
263+
self->OnQuery(f.get());
264+
self->SetFlushed(std::unique_lock<std::mutex>(self->mu_), std::move(result));
261265
});
262266
}
263267

@@ -295,8 +299,9 @@ class AsyncWriterConnectionResumedState
295299
buffer_offset_ = persisted_size;
296300
write_offset_ -= static_cast<std::size_t>(n);
297301
// If the buffer is small enough, collect all the handlers to notify them.
298-
auto const handlers = ClearHandlersIfEmpty(lk);
299-
WriteLoop(std::move(lk));
302+
auto const handlers = ClearHandlersIfEmpty(lk);
303+
writing_ = false;
304+
StartWriting(std::move(lk));
300305
// The notifications are deferred until the lock is released, as they might
301306
// call back and try to acquire the lock.
302307
for (auto const& h : handlers) {
@@ -318,7 +323,8 @@ class AsyncWriterConnectionResumedState
318323
if (!result.ok()) return Resume(std::move(result));
319324
std::unique_lock<std::mutex> lk(mu_);
320325
write_offset_ += write_size;
321-
return WriteLoop(std::move(lk));
326+
writing_ = false;
327+
return StartWriting(std::move(lk));
322328
}
323329

324330
void Resume(Status const& s) {
@@ -347,7 +353,8 @@ class AsyncWriterConnectionResumedState
347353
bool was_finalizing;
348354
{
349355
std::unique_lock<std::mutex> lk(mu_);
350-
was_finalizing = finalizing_;
356+
was_finalizing = finalizing_;
357+
writing_ = false;
351358
if (!s.ok() && cancelled_) {
352359
return SetError(std::move(lk), std::move(s));
353360
}
@@ -461,10 +468,6 @@ class AsyncWriterConnectionResumedState
461468
// lock.
462469
for (auto& h : handlers) h->Execute(Status{});
463470
flushed.set_value(result);
464-
// Restart the write loop ONLY if we are not already finalizing.
465-
// If finalizing_ is true, the completion will be handled by OnFinalize.
466-
std::unique_lock<std::mutex> loop_lk(mu_);
467-
if (!finalizing_) WriteLoop(std::move(loop_lk));
468471
}
469472

470473
void SetError(std::unique_lock<std::mutex> lk, Status const& status) {
@@ -590,7 +593,7 @@ class AsyncWriterConnectionResumedState
590593
// - A Flush() call that returns an unsatisified future until the buffer is
591594
// small enough.
592595
std::vector<std::unique_ptr<BufferShrinkHandler>> flush_handlers_;
593-
596+
594597
// True if the writing loop is activate.
595598
bool writing_ = false;
596599

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "google/cloud/testing_util/status_matchers.h"
2323
#include <google/storage/v2/storage.pb.h>
2424
#include <gmock/gmock.h>
25+
#include <chrono>
26+
#include <thread>
2527

2628
namespace google {
2729
namespace cloud {
@@ -170,7 +172,7 @@ TEST(WriterConnectionResumed, FlushEmpty) {
170172
auto mock = std::make_unique<MockAsyncWriterConnection>();
171173
EXPECT_CALL(*mock, PersistedState)
172174
.WillRepeatedly(Return(MakePersistedState(0)));
173-
EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) {
175+
EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) {
174176
EXPECT_TRUE(p.payload().empty());
175177
return sequencer.PushBack("Flush").then([](auto f) {
176178
if (!f.get()) return TransientError();
@@ -214,13 +216,21 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {
214216

215217
EXPECT_CALL(*mock, PersistedState)
216218
.WillRepeatedly(Return(MakePersistedState(0)));
217-
EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) {
218-
EXPECT_EQ(p.payload(), payload.payload());
219-
return sequencer.PushBack("Flush").then([](auto f) {
220-
if (!f.get()) return TransientError();
221-
return Status{};
222-
});
223-
});
219+
EXPECT_CALL(*mock, Flush)
220+
.WillOnce([&](auto const& p) {
221+
EXPECT_EQ(p.payload(), payload.payload());
222+
return sequencer.PushBack("Flush").then([](auto f) {
223+
if (!f.get()) return TransientError();
224+
return Status{};
225+
});
226+
})
227+
.WillOnce([&](auto const& p) {
228+
EXPECT_TRUE(p.payload().empty());
229+
return sequencer.PushBack("Flush").then([](auto f) {
230+
if (!f.get()) return TransientError();
231+
return Status{};
232+
});
233+
});
224234
EXPECT_CALL(*mock, Query).WillOnce([&]() {
225235
return sequencer.PushBack("Query").then(
226236
[](auto f) -> StatusOr<std::int64_t> {
@@ -392,8 +402,85 @@ TEST(WriteConnectionResumed, ResumeUsesAppendObjectSpecFromInitialRequest) {
392402
"projects/_/buckets/test-bucket");
393403
}
394404

405+
TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) {
406+
AsyncSequencer<bool> sequencer;
407+
auto mock = std::make_unique<MockAsyncWriterConnection>();
408+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
409+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
410+
411+
EXPECT_CALL(*mock, PersistedState)
412+
.WillRepeatedly(Return(MakePersistedState(0)));
413+
EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) {
414+
return sequencer.PushBack("Flush").then([](auto f) {
415+
if (!f.get()) return TransientError();
416+
return Status{};
417+
});
418+
});
419+
EXPECT_CALL(*mock, Query).WillOnce([&]() {
420+
return sequencer.PushBack("Query").then([](auto f) -> StatusOr<std::int64_t> {
421+
if (!f.get()) return TransientError();
422+
return 0;
423+
});
424+
});
425+
426+
// Make Write detect concurrent invocations. If two writes run concurrently
427+
// the compare_exchange will fail and the test will fail.
428+
std::atomic<bool> in_write{false};
429+
EXPECT_CALL(*mock, Write(_))
430+
.WillRepeatedly([&](auto) {
431+
bool expected = false;
432+
EXPECT_TRUE(in_write.compare_exchange_strong(expected, true));
433+
// Simulate some work that allows a concurrent write to attempt to run.
434+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
435+
in_write.store(false);
436+
return make_ready_future(Status{});
437+
});
438+
439+
MockFactory mock_factory;
440+
EXPECT_CALL(mock_factory, Call).Times(0);
441+
442+
auto connection = MakeWriterConnectionResumed(
443+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
444+
first_response, Options{});
445+
446+
// Start a flush which will call impl->Flush() and block.
447+
auto flush_future = connection->Flush({});
448+
// Allow the Flush to complete, this will schedule a Query (but Query will
449+
// remain blocked until we pop it).
450+
auto next = sequencer.PopFrontWithName();
451+
EXPECT_EQ(next.second, "Flush");
452+
next.first.set_value(true);
453+
454+
// Immediately perform a user Write after the flush completed but before
455+
// Query completes. This can race with the OnQuery-driven write.
456+
auto write_future = connection->Write(TestPayload(1024));
457+
458+
// Now allow the Query to complete; OnQuery may schedule a write.
459+
next = sequencer.PopFrontWithName();
460+
EXPECT_EQ(next.second, "Query");
461+
next.first.set_value(true);
462+
463+
// Wait for both futures to complete with a timeout to avoid indefinite hang.
464+
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
465+
while (!write_future.is_ready() && std::chrono::steady_clock::now() < deadline) {
466+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
467+
}
468+
deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2);
469+
while (!flush_future.is_ready() && std::chrono::steady_clock::now() < deadline) {
470+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
471+
}
472+
473+
ASSERT_TRUE(write_future.is_ready());
474+
ASSERT_TRUE(flush_future.is_ready());
475+
476+
// Both futures should complete successfully.
477+
EXPECT_THAT(write_future.get(), StatusIs(StatusCode::kOk));
478+
EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk));
479+
}
480+
481+
395482
} // namespace
396483
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
397484
} // namespace storage_internal
398485
} // namespace cloud
399-
} // namespace google
486+
} // namespace google

0 commit comments

Comments
 (0)