Skip to content

Commit 141060a

Browse files
committed
resolving comments
1 parent bf8d810 commit 141060a

File tree

3 files changed

+49
-57
lines changed

3 files changed

+49
-57
lines changed

google/cloud/storage/internal/async/multi_stream_manager.h

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class StreamBase {
4141
// Manages a collection of streams.
4242
//
4343
// This class implements the "Subsequent Stream" logic where idle streams
44-
// are moved to the back of the queue for reuse.
44+
// are moved to the front of the queue for reuse.
4545
//
4646
// THREAD SAFETY:
4747
// This class is NOT thread-safe. The owner (e.g. ObjectDescriptorImpl
@@ -83,9 +83,9 @@ class MultiStreamManager {
8383
streams_.push_back(Stream{std::move(initial_stream), {}});
8484
}
8585

86-
StreamIterator GetLastStream() {
86+
StreamIterator GetFirstStream() {
8787
if (streams_.empty()) return streams_.end();
88-
return std::prev(streams_.end());
88+
return streams_.begin();
8989
}
9090

9191
StreamIterator GetLeastBusyStream() {
@@ -94,21 +94,24 @@ class MultiStreamManager {
9494
// Track min_ranges to avoid calling .size() repeatedly if possible,
9595
// though for std::unordered_map .size() is O(1).
9696
std::size_t min_ranges = least_busy_it->active_ranges.size();
97+
if (min_ranges == 0) return least_busy_it;
9798

9899
// Start checking from the second element
99100
for (auto it = std::next(streams_.begin()); it != streams_.end(); ++it) {
100101
// Strict less-than ensures stability (preferring older streams if tied)
101-
if (it->active_ranges.size() < min_ranges) {
102+
auto size = it->active_ranges.size();
103+
if (size < min_ranges) {
102104
least_busy_it = it;
103-
min_ranges = it->active_ranges.size();
105+
min_ranges = size;
106+
if (min_ranges == 0) return least_busy_it;
104107
}
105108
}
106109
return least_busy_it;
107110
}
108111

109112
StreamIterator AddStream(std::shared_ptr<StreamT> stream) {
110-
streams_.push_back(Stream{std::move(stream), {}});
111-
return std::prev(streams_.end());
113+
streams_.push_front(Stream{std::move(stream), {}});
114+
return streams_.begin();
112115
}
113116

114117
void CancelAll() {
@@ -141,16 +144,14 @@ class MultiStreamManager {
141144
}
142145

143146
template <typename Pred>
144-
bool ReuseIdleStreamToBack(Pred pred) {
147+
bool ReuseIdleStreamToFront(Pred pred) {
145148
for (auto it = streams_.begin(); it != streams_.end(); ++it) {
146149
if (!pred(*it)) continue;
147150

148-
// If the idle stream is already at the back, we don't
149-
// need to move it. If it's elsewhere, use splice() to move the node.
150-
// splice() is O(1) and, crucially, does not invalidate iterators
151-
// or copy the Stream object.
152-
if (std::next(it) != streams_.end()) {
153-
streams_.splice(streams_.end(), streams_, it);
151+
// If the idle stream is already at the front, we don't
152+
// need to move it. Otherwise splice to the front in O(1).
153+
if (it != streams_.begin()) {
154+
streams_.splice(streams_.begin(), streams_, it);
154155
}
155156
return true;
156157
}

google/cloud/storage/internal/async/multi_stream_manager_test.cc

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,26 +50,26 @@ TEST(MultiStreamManagerTest, ConstructsWithFactoryAndHasOneStream) {
5050
auto mgr = MultiStreamManagerTest::MakeManager();
5151
EXPECT_FALSE(mgr.Empty());
5252
EXPECT_EQ(mgr.Size(), 1U);
53-
auto it = mgr.GetLastStream();
53+
auto it = mgr.GetFirstStream();
5454
ASSERT_TRUE(it->stream);
5555
}
5656

5757
TEST(MultiStreamManagerTest, ConstructsWithInitialStream) {
5858
auto initial = std::make_shared<FakeStream>();
5959
Manager mgr([] { return nullptr; }, initial);
6060
EXPECT_EQ(mgr.Size(), 1U);
61-
auto it = mgr.GetLastStream();
61+
auto it = mgr.GetFirstStream();
6262
EXPECT_EQ(it->stream, initial);
6363
}
6464

65-
TEST(MultiStreamManagerTest, AddStreamAppendsAndGetLastReturnsNew) {
65+
TEST(MultiStreamManagerTest, AddStreamAppendsAndGetFirstReturnsNew) {
6666
auto mgr = MultiStreamManagerTest::MakeManager();
6767
auto s1 = std::make_shared<FakeStream>();
6868
auto it1 = mgr.AddStream(s1);
6969
EXPECT_EQ(mgr.Size(), 2U);
7070
EXPECT_EQ(it1->stream.get(), s1.get());
71-
auto it_last = mgr.GetLastStream();
72-
EXPECT_EQ(it_last->stream.get(), s1.get());
71+
auto it_first = mgr.GetFirstStream();
72+
EXPECT_EQ(it_first->stream.get(), s1.get());
7373
}
7474

7575
TEST(MultiStreamManagerTest, GetLeastBusyPrefersFewestActiveRanges) {
@@ -78,7 +78,7 @@ TEST(MultiStreamManagerTest, GetLeastBusyPrefersFewestActiveRanges) {
7878
// The manager starts with an initial stream (size 0).
7979
// We must make it "busy" so it doesn't win the comparison against our test
8080
// streams.
81-
auto it_init = mgr.GetLastStream();
81+
auto it_init = mgr.GetFirstStream();
8282
it_init->active_ranges.emplace(999, std::make_shared<FakeRange>());
8383
it_init->active_ranges.emplace(998, std::make_shared<FakeRange>());
8484

@@ -103,7 +103,7 @@ TEST(MultiStreamManagerTest, GetLeastBusyPrefersFewestActiveRanges) {
103103

104104
TEST(MultiStreamManagerTest, CleanupDoneRangesRemovesFinished) {
105105
auto mgr = MultiStreamManagerTest::MakeManager();
106-
auto it = mgr.GetLastStream();
106+
auto it = mgr.GetFirstStream();
107107
auto r1 = std::make_shared<FakeRange>();
108108
r1->done = false;
109109
auto r2 = std::make_shared<FakeRange>();
@@ -120,7 +120,7 @@ TEST(MultiStreamManagerTest, CleanupDoneRangesRemovesFinished) {
120120

121121
TEST(MultiStreamManagerTest, RemoveStreamAndNotifyRangesCallsOnFinish) {
122122
auto mgr = MultiStreamManagerTest::MakeManager();
123-
auto it = mgr.GetLastStream();
123+
auto it = mgr.GetFirstStream();
124124
auto r1 = std::make_shared<FakeRange>();
125125
auto r2 = std::make_shared<FakeRange>();
126126
it->active_ranges.emplace(11, r1);
@@ -142,49 +142,50 @@ TEST(MultiStreamManagerTest, CancelAllInvokesCancel) {
142142
EXPECT_EQ(s2->cancelled, 1);
143143
}
144144

145-
TEST(MultiStreamManagerTest, ReuseIdleStreamToBackMovesElement) {
145+
TEST(MultiStreamManagerTest, ReuseIdleStreamToFrontMovesElement) {
146146
auto mgr = MultiStreamManagerTest::MakeManager();
147147
// Capture the factory-created stream pointer (initial element)
148-
auto* factory_ptr = mgr.GetLastStream()->stream.get();
148+
auto* factory_ptr = mgr.GetFirstStream()->stream.get();
149149
auto s1 = std::make_shared<FakeStream>();
150150
mgr.AddStream(s1);
151-
bool moved = mgr.ReuseIdleStreamToBack([](Manager::Stream const& s) {
151+
bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) {
152152
auto* fs = s.stream.get();
153153
return fs != nullptr && s.active_ranges.empty() && !fs->write_pending;
154154
});
155155
EXPECT_TRUE(moved);
156-
auto it_last = mgr.GetLastStream();
157-
// After move, the factory stream should be last
158-
EXPECT_EQ(it_last->stream.get(), factory_ptr);
159-
EXPECT_NE(it_last->stream.get(), s1.get());
156+
auto it_first = mgr.GetFirstStream();
157+
// After move, the s1 stream should be first
158+
EXPECT_EQ(it_first->stream.get(), s1.get());
159+
EXPECT_NE(it_first->stream.get(), factory_ptr);
160160
}
161161

162162
TEST(MultiStreamManagerTest,
163-
ReuseIdleStreamAlreadyAtBackReturnsTrueWithoutMove) {
163+
ReuseIdleStreamAlreadyAtFrontReturnsTrueWithoutMove) {
164164
auto mgr = MultiStreamManagerTest::MakeManager();
165-
// The manager starts with one stream. It is the last stream, and it is idle.
166-
auto initial_last = mgr.GetLastStream();
167-
bool reused = mgr.ReuseIdleStreamToBack(
165+
// The manager starts with one stream. It is the first stream, and it is idle.
166+
auto initial_first = mgr.GetFirstStream();
167+
bool reused = mgr.ReuseIdleStreamToFront(
168168
[](Manager::Stream const& s) { return s.active_ranges.empty(); });
169169
EXPECT_TRUE(reused);
170-
// Pointer should remain the same (it was already at the back)
171-
EXPECT_EQ(mgr.GetLastStream(), initial_last);
170+
// Pointer should remain the same (it was already at the front)
171+
EXPECT_EQ(mgr.GetFirstStream(), initial_first);
172172
}
173173

174174
TEST(MultiStreamManagerTest, ReuseIdleStreamDoesNotMoveWhenWritePending) {
175175
auto mgr = MultiStreamManagerTest::MakeManager();
176+
auto factory_ptr = mgr.GetFirstStream()->stream.get();
176177
// Mark factory stream as not reusable
177-
mgr.GetLastStream()->stream->write_pending = true;
178+
mgr.GetFirstStream()->stream->write_pending = true;
178179
auto s1 = std::make_shared<FakeStream>();
179180
s1->write_pending = true; // also mark appended stream as not reusable
180181
mgr.AddStream(s1);
181-
bool moved = mgr.ReuseIdleStreamToBack([](Manager::Stream const& s) {
182+
bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) {
182183
auto* fs = s.stream.get();
183184
return fs != nullptr && s.active_ranges.empty() && !fs->write_pending;
184185
});
185186
EXPECT_FALSE(moved);
186-
auto it_last = mgr.GetLastStream();
187-
EXPECT_EQ(it_last->stream.get(), s1.get());
187+
auto it_first = mgr.GetFirstStream();
188+
EXPECT_EQ(it_first->stream.get(), factory_ptr);
188189
}
189190

190191
TEST(MultiStreamManagerTest, MoveActiveRangesTransfersAllEntries) {
@@ -204,24 +205,24 @@ TEST(MultiStreamManagerTest, MoveActiveRangesTransfersAllEntries) {
204205
EXPECT_TRUE(it2->active_ranges.count(202));
205206
}
206207

207-
TEST(MultiStreamManagerTest, GetLastStreamReflectsRecentAppendAndReuse) {
208+
TEST(MultiStreamManagerTest, GetFirstStreamReflectsFrontReuse) {
208209
auto mgr = MultiStreamManagerTest::MakeManager();
209210
auto s1 = std::make_shared<FakeStream>();
210211
mgr.AddStream(s1);
211-
EXPECT_EQ(mgr.GetLastStream()->stream.get(), s1.get());
212-
bool moved = mgr.ReuseIdleStreamToBack([](Manager::Stream const& s) {
212+
EXPECT_EQ(mgr.GetFirstStream()->stream.get(), s1.get());
213+
bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) {
213214
return s.stream != nullptr && s.active_ranges.empty();
214215
});
215216
EXPECT_TRUE(moved);
216-
auto it_last = mgr.GetLastStream();
217-
EXPECT_NE(it_last->stream.get(), s1.get());
217+
auto it_first = mgr.GetFirstStream();
218+
EXPECT_EQ(it_first->stream.get(), s1.get());
218219
}
219220

220221
TEST(MultiStreamManagerTest, EmptyAndSizeTransitions) {
221222
auto mgr = MultiStreamManagerTest::MakeManager();
222223
EXPECT_FALSE(mgr.Empty());
223224
EXPECT_EQ(mgr.Size(), 1U);
224-
auto it = mgr.GetLastStream();
225+
auto it = mgr.GetFirstStream();
225226
mgr.RemoveStreamAndNotifyRanges(it, Status());
226227
EXPECT_TRUE(mgr.Empty());
227228
EXPECT_EQ(mgr.Size(), 0U);

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ ObjectDescriptorImpl::~ObjectDescriptorImpl() { Cancel(); }
5050
void ObjectDescriptorImpl::Start(
5151
google::storage::v2::BidiReadObjectResponse first_response) {
5252
std::unique_lock<std::mutex> lk(mu_);
53-
auto it = stream_manager_->GetLastStream();
53+
auto it = stream_manager_->GetFirstStream();
5454
if (it == stream_manager_->End()) return;
5555
lk.unlock();
5656
OnRead(it, std::move(first_response));
@@ -83,7 +83,7 @@ void ObjectDescriptorImpl::AssurePendingStreamQueued() {
8383
void ObjectDescriptorImpl::MakeSubsequentStream() {
8484
std::unique_lock<std::mutex> lk(mu_);
8585
// Reuse an idle stream if possible.
86-
if (stream_manager_->ReuseIdleStreamToBack(
86+
if (stream_manager_->ReuseIdleStreamToFront(
8787
[](StreamManager::Stream const& s) {
8888
auto const* rs = s.stream.get();
8989
return rs != nullptr && s.active_ranges.empty() &&
@@ -151,16 +151,6 @@ ObjectDescriptorImpl::Read(ReadParams p) {
151151
}
152152

153153
auto it = stream_manager_->GetLeastBusyStream();
154-
if (it == stream_manager_->End()) {
155-
lk.unlock();
156-
range->OnFinish(Status(StatusCode::kFailedPrecondition,
157-
"Cannot read object, all streams failed"));
158-
if (!internal::TracingEnabled(options_)) {
159-
return std::unique_ptr<storage_experimental::AsyncReaderConnection>(
160-
std::make_unique<ObjectDescriptorReader>(std::move(range)));
161-
}
162-
return MakeTracingObjectDescriptorReader(std::move(range));
163-
}
164154
auto const id = ++read_id_generator_;
165155
it->active_ranges.emplace(id, range);
166156
auto& read_range = *it->stream->next_request.add_read_ranges();

0 commit comments

Comments
 (0)