Skip to content

Commit e5a139a

Browse files
committed
add tests
1 parent 4fdf57e commit e5a139a

File tree

2 files changed

+198
-17
lines changed

2 files changed

+198
-17
lines changed

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
4040
read_object_spec_(std::move(read_object_spec)),
4141
options_(std::move(options)) {
4242
stream_manager_ = std::make_unique<StreamManager>(
43-
[]() -> std::shared_ptr<ReadStream> { return nullptr; },
43+
[]() -> std::shared_ptr<ReadStream> { return nullptr; }, // NOLINT
4444
std::make_shared<ReadStream>(std::move(stream),
4545
resume_policy_prototype_->clone()));
4646
}
@@ -96,24 +96,33 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
9696
auto stream_future = std::move(pending_stream_);
9797
lk.unlock();
9898

99-
// Wait for the stream to be created.
100-
auto stream_result = stream_future.get();
101-
if (!stream_result) {
102-
// Stream creation failed.
103-
// The next call to AssurePendingStreamQueued will retry creation.
104-
return;
105-
}
99+
// Use .then() to retrieves the result without blocking.
100+
stream_future.then([w = WeakFromThis()](auto f) {
101+
auto self = w.lock();
102+
if (!self) return;
106103

107-
lk.lock();
108-
if (cancelled_) return;
109-
auto read_stream = std::make_shared<ReadStream>(
110-
std::move(stream_result->stream), resume_policy_prototype_->clone());
111-
auto new_it = stream_manager_->AddStream(std::move(read_stream));
112-
// Now that we consumed pending_stream_, queue the next one immediately.
113-
AssurePendingStreamQueued();
104+
auto stream_result = f.get();
105+
if (!stream_result) {
106+
// Stream creation failed.
107+
// The next call to AssurePendingStreamQueued will retry creation.
108+
return;
109+
}
114110

115-
lk.unlock();
116-
OnRead(new_it, std::move(stream_result->first_response));
111+
std::unique_lock<std::mutex> lk(self->mu_);
112+
if (self->cancelled_) return;
113+
114+
auto read_stream =
115+
std::make_shared<ReadStream>(std::move(stream_result->stream),
116+
self->resume_policy_prototype_->clone());
117+
118+
auto new_it = self->stream_manager_->AddStream(std::move(read_stream));
119+
120+
// Now that we consumed pending_stream_, queue the next one immediately.
121+
self->AssurePendingStreamQueued();
122+
123+
lk.unlock();
124+
self->OnRead(new_it, std::move(stream_result->first_response));
125+
});
117126
}
118127

119128
std::unique_ptr<storage_experimental::AsyncReaderConnection>

google/cloud/storage/internal/async/object_descriptor_impl_test.cc

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,6 +1572,178 @@ TEST(ObjectDescriptorImpl, MakeSubsequentStreamReusesAndMovesIdleStream) {
15721572
tested.reset();
15731573
}
15741574

1575+
/// @test Verify that a successful resume executes the OnResume logic correctly.
1576+
TEST(ObjectDescriptorImpl, OnResumeSuccessful) {
1577+
AsyncSequencer<bool> sequencer;
1578+
1579+
auto expect_startup_events = [&](AsyncSequencer<bool>& seq) {
1580+
auto e1 = seq.PopFrontWithName();
1581+
auto e2 = seq.PopFrontWithName();
1582+
std::set<std::string> names = {e1.second, e2.second};
1583+
if (names.count("Read[1]") && names.count("ProactiveFactory")) {
1584+
e1.first.set_value(true); // Allow read to proceed
1585+
e2.first.set_value(true); // Allow factory to proceed
1586+
} else {
1587+
ADD_FAILURE() << "Got unexpected events: " << e1.second << ", "
1588+
<< e2.second;
1589+
}
1590+
};
1591+
1592+
auto stream1 = std::make_unique<MockStream>();
1593+
EXPECT_CALL(*stream1, Write).WillOnce([&](auto, auto) {
1594+
return sequencer.PushBack("Write[1]").then([](auto f) { return f.get(); });
1595+
});
1596+
1597+
// To keep Stream 1 alive during startup, the first Read returns a valid
1598+
// (empty) response. Subsequent reads return nullopt to trigger the
1599+
// Finish/Resume logic.
1600+
EXPECT_CALL(*stream1, Read)
1601+
.WillOnce([&] {
1602+
return sequencer.PushBack("Read[1]").then(
1603+
[](auto) { return absl::make_optional(Response{}); });
1604+
})
1605+
.WillRepeatedly([&] {
1606+
return sequencer.PushBack("Read[Loop]").then([](auto) {
1607+
return absl::optional<Response>{};
1608+
});
1609+
});
1610+
1611+
EXPECT_CALL(*stream1, Finish).WillOnce([&] {
1612+
return sequencer.PushBack("Finish[1]").then([](auto) {
1613+
return TransientError();
1614+
});
1615+
});
1616+
EXPECT_CALL(*stream1, Cancel).Times(AtMost(1));
1617+
1618+
auto stream2 = std::make_unique<MockStream>();
1619+
// The resumed stream to starts reading immediately.
1620+
EXPECT_CALL(*stream2, Read).WillRepeatedly([&] {
1621+
return sequencer.PushBack("Read[2]").then(
1622+
[](auto) { return absl::make_optional(Response{}); });
1623+
});
1624+
EXPECT_CALL(*stream2, Finish).WillOnce(Return(make_ready_future(Status{})));
1625+
EXPECT_CALL(*stream2, Cancel).Times(AtMost(1));
1626+
1627+
MockFactory factory;
1628+
EXPECT_CALL(factory, Call)
1629+
.WillOnce([&](auto) {
1630+
return sequencer.PushBack("ProactiveFactory").then([](auto) {
1631+
return StatusOr<OpenStreamResult>(TransientError());
1632+
});
1633+
})
1634+
.WillOnce([&](auto) {
1635+
return sequencer.PushBack("ResumeFactory").then([&](auto) {
1636+
return StatusOr<OpenStreamResult>(OpenStreamResult{
1637+
std::make_shared<OpenStream>(std::move(stream2)), Response{}});
1638+
});
1639+
});
1640+
1641+
auto tested = std::make_shared<ObjectDescriptorImpl>(
1642+
storage_experimental::LimitedErrorCountResumePolicy(1)(),
1643+
factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{},
1644+
std::make_shared<OpenStream>(std::move(stream1)));
1645+
1646+
tested->Start(Response{});
1647+
expect_startup_events(sequencer);
1648+
1649+
// Register the read range.
1650+
auto reader = tested->Read({0, 100});
1651+
1652+
auto next_event = sequencer.PopFrontWithName();
1653+
promise<bool> fail_stream_promise;
1654+
1655+
if (next_event.second == "Read[Loop]") {
1656+
fail_stream_promise = std::move(next_event.first);
1657+
// Now expect Write[1]
1658+
auto w1 = sequencer.PopFrontWithName();
1659+
EXPECT_EQ(w1.second, "Write[1]");
1660+
w1.first.set_value(true);
1661+
} else {
1662+
// It was Write[1] immediately
1663+
EXPECT_EQ(next_event.second, "Write[1]");
1664+
next_event.first.set_value(true);
1665+
1666+
// Now wait for Read[Loop]
1667+
auto read_loop = sequencer.PopFrontWithName();
1668+
EXPECT_EQ(read_loop.second, "Read[Loop]");
1669+
fail_stream_promise = std::move(read_loop.first);
1670+
}
1671+
1672+
// Trigger Failure on Stream 1.
1673+
fail_stream_promise.set_value(true);
1674+
1675+
auto f1 = sequencer.PopFrontWithName();
1676+
EXPECT_EQ(f1.second, "Finish[1]");
1677+
f1.first.set_value(true);
1678+
1679+
auto resume = sequencer.PopFrontWithName();
1680+
EXPECT_EQ(resume.second, "ResumeFactory");
1681+
resume.first.set_value(true);
1682+
1683+
// The OnResume block calls OnRead, which triggers Read() on Stream 2.
1684+
auto r2 = sequencer.PopFrontWithName();
1685+
EXPECT_EQ(r2.second, "Read[2]");
1686+
r2.first.set_value(true);
1687+
1688+
tested.reset();
1689+
}
1690+
1691+
/// @test Verify Read() behavior when all streams have failed permanently.
1692+
TEST(ObjectDescriptorImpl, ReadFailsWhenAllStreamsAreDead) {
1693+
AsyncSequencer<bool> sequencer;
1694+
auto stream = std::make_unique<MockStream>();
1695+
1696+
// Initial Read returns empty (EOF) to trigger Finish
1697+
EXPECT_CALL(*stream, Read).WillOnce([&] {
1698+
return sequencer.PushBack("Read[1]").then(
1699+
[](auto) { return absl::optional<Response>{}; });
1700+
});
1701+
1702+
EXPECT_CALL(*stream, Finish).WillOnce([&] {
1703+
return sequencer.PushBack("Finish").then(
1704+
[](auto) { return PermanentError(); });
1705+
});
1706+
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
1707+
1708+
MockFactory factory;
1709+
EXPECT_CALL(factory, Call).WillOnce([&](auto) {
1710+
return sequencer.PushBack("ProactiveFactory").then([](auto) {
1711+
return StatusOr<OpenStreamResult>(PermanentError());
1712+
});
1713+
});
1714+
1715+
auto tested = std::make_shared<ObjectDescriptorImpl>(
1716+
NoResume(), factory.AsStdFunction(),
1717+
google::storage::v2::BidiReadObjectSpec{},
1718+
std::make_shared<OpenStream>(std::move(stream)));
1719+
1720+
tested->Start(Response{});
1721+
1722+
auto e1 = sequencer.PopFrontWithName();
1723+
auto e2 = sequencer.PopFrontWithName();
1724+
1725+
std::set<std::string> names = {e1.second, e2.second};
1726+
ASSERT_EQ(names.count("Read[1]"), 1);
1727+
ASSERT_EQ(names.count("ProactiveFactory"), 1);
1728+
1729+
e1.first.set_value(true);
1730+
e2.first.set_value(true);
1731+
1732+
auto finish = sequencer.PopFrontWithName();
1733+
EXPECT_EQ(finish.second, "Finish");
1734+
finish.first.set_value(true);
1735+
1736+
// At this point, the only active stream failed permanently and was removed.
1737+
// The proactive stream creation also failed. The manager is now EMPTY.
1738+
1739+
auto reader = tested->Read({0, 100});
1740+
1741+
// The Read() should immediately fail with FailedPrecondition.
1742+
auto result = reader->Read().get();
1743+
EXPECT_THAT(result,
1744+
VariantWith<Status>(StatusIs(StatusCode::kFailedPrecondition)));
1745+
}
1746+
15751747
} // namespace
15761748
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
15771749
} // namespace storage_internal

0 commit comments

Comments
 (0)