Skip to content

Commit e73e1bc

Browse files
committed
resolving comments
1 parent ee681cd commit e73e1bc

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

google/cloud/storage/internal/async/multi_stream_manager.h

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ namespace cloud {
2929
namespace storage_internal {
3030
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3131

32+
// Defines the interface contract that any stream type (e.g., ReadStream,
33+
// WriteStream) managed by MultiStreamManager must implement. This explicit base
34+
// class ensures we have a clear, enforceable interface for operations like
35+
// CancelAll().
3236
class StreamBase {
3337
public:
3438
virtual ~StreamBase() = default;
@@ -41,8 +45,20 @@ class StreamBase {
4145
// are moved to the back of the queue for reuse.
4246
//
4347
// THREAD SAFETY:
44-
// This class is NOT thread-safe. The owner (ObjectDescriptorImpl) must
45-
// serialize access, typically by holding `mu_` while calling these methods.
48+
// This class is NOT thread-safe. The owner (e.g. ObjectDescriptorImpl
49+
// or AsyncWriterImpl etc) must serialize access, typically by holding
50+
// an external mutex while calling these methods.
51+
//
52+
// EXAMPLE USAGE:
53+
// class MyOwner {
54+
// std::mutex mu_;
55+
// MultiStreamManager<MyStream, MyRange> manager_;
56+
//
57+
// void StartRead() {
58+
// std::unique_lock<std::mutex> lk(mu_);
59+
// auto it = manager_.GetLeastBusyStream();
60+
// }
61+
// };
4662
template <typename StreamT, typename RangeT>
4763
class MultiStreamManager {
4864
public:
@@ -81,20 +97,20 @@ class MultiStreamManager {
8197
// In ObjectDescriptorImpl, we ensure there is always at least one stream,
8298
// but this assertion protects against future refactoring errors.
8399
assert(!streams_.empty());
84-
auto best_it = streams_.begin();
100+
auto least_busy_it = streams_.begin();
85101
// Track min_ranges to avoid calling .size() repeatedly if possible,
86102
// though for std::unordered_map .size() is O(1).
87-
std::size_t min_ranges = best_it->active_ranges.size();
103+
std::size_t min_ranges = least_busy_it->active_ranges.size();
88104

89105
// Start checking from the second element
90106
for (auto it = std::next(streams_.begin()); it != streams_.end(); ++it) {
91107
// Strict less-than ensures stability (preferring older streams if tied)
92108
if (it->active_ranges.size() < min_ranges) {
93-
best_it = it;
109+
least_busy_it = it;
94110
min_ranges = it->active_ranges.size();
95111
}
96112
}
97-
return best_it;
113+
return least_busy_it;
98114
}
99115

100116
StreamIterator AddStream(std::shared_ptr<StreamT> stream) {

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ void ObjectDescriptorImpl::Start(
5151
google::storage::v2::BidiReadObjectResponse first_response) {
5252
std::unique_lock<std::mutex> lk(mu_);
5353
auto it = stream_manager_->GetLastStream();
54-
// Unlock and start the Read loop first.
5554
lk.unlock();
5655
OnRead(it, std::move(first_response));
5756
// Acquire lock and queue the background stream.
@@ -87,7 +86,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
8786
[](StreamManager::Stream const& s) {
8887
auto const* rs = s.stream.get();
8988
return rs != nullptr && s.active_ranges.empty() &&
90-
!rs->write_pending;
89+
!rs->write_pending_;
9190
})) {
9291
return;
9392
}
@@ -153,7 +152,7 @@ ObjectDescriptorImpl::Read(ReadParams p) {
153152
auto it = stream_manager_->GetLeastBusyStream();
154153
auto const id = ++read_id_generator_;
155154
it->active_ranges.emplace(id, range);
156-
auto& read_range = *it->stream->next_request.add_read_ranges();
155+
auto& read_range = *it->stream->next_request_.add_read_ranges();
157156
read_range.set_read_id(id);
158157
read_range.set_read_offset(p.start);
159158
read_range.set_read_length(p.length);
@@ -169,13 +168,13 @@ ObjectDescriptorImpl::Read(ReadParams p) {
169168

170169
void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk,
171170
StreamIterator it) {
172-
if (it->stream->write_pending ||
173-
it->stream->next_request.read_ranges().empty()) {
171+
if (it->stream->write_pending_ ||
172+
it->stream->next_request_.read_ranges().empty()) {
174173
return;
175174
}
176-
it->stream->write_pending = true;
175+
it->stream->write_pending_ = true;
177176
google::storage::v2::BidiReadObjectRequest request;
178-
request.Swap(&it->stream->next_request);
177+
request.Swap(&it->stream->next_request_);
179178

180179
// Assign CurrentStream to a temporary variable to prevent
181180
// lifetime extension which can cause the lock to be held until the
@@ -191,14 +190,14 @@ void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk,
191190
void ObjectDescriptorImpl::OnWrite(StreamIterator it, bool ok) {
192191
std::unique_lock<std::mutex> lk(mu_);
193192
if (!ok) return DoFinish(std::move(lk), it);
194-
it->stream->write_pending = false;
193+
it->stream->write_pending_ = false;
195194
Flush(std::move(lk), it);
196195
}
197196

198197
void ObjectDescriptorImpl::DoRead(std::unique_lock<std::mutex> lk,
199198
StreamIterator it) {
200-
if (it->stream->read_pending) return;
201-
it->stream->read_pending = true;
199+
if (it->stream->read_pending_) return;
200+
it->stream->read_pending_ = true;
202201

203202
// Assign CurrentStream to a temporary variable to prevent
204203
// lifetime extension which can cause the lock to be held until the
@@ -214,7 +213,7 @@ void ObjectDescriptorImpl::OnRead(
214213
StreamIterator it,
215214
absl::optional<google::storage::v2::BidiReadObjectResponse> response) {
216215
std::unique_lock<std::mutex> lk(mu_);
217-
it->stream->read_pending = false;
216+
it->stream->read_pending_ = false;
218217

219218
if (!response) return DoFinish(std::move(lk), it);
220219
if (response->has_metadata()) {
@@ -243,7 +242,7 @@ void ObjectDescriptorImpl::OnRead(
243242

244243
void ObjectDescriptorImpl::DoFinish(std::unique_lock<std::mutex> lk,
245244
StreamIterator it) {
246-
it->stream->read_pending = false;
245+
it->stream->read_pending_ = false;
247246
// Assign CurrentStream to a temporary variable to prevent
248247
// lifetime extension which can cause the lock to be held until the
249248
// end of the block.
@@ -293,8 +292,8 @@ void ObjectDescriptorImpl::OnResume(StreamIterator it,
293292

294293
it->stream = std::make_shared<ReadStream>(std::move(result->stream),
295294
resume_policy_prototype_->clone());
296-
it->stream->write_pending = false;
297-
it->stream->read_pending = false;
295+
it->stream->write_pending_ = false;
296+
it->stream->read_pending_ = false;
298297

299298
// TODO(#15105) - this should be done without release the lock.
300299
Flush(std::move(lk), it);

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class ReadStream : public storage_internal::StreamBase {
4848

4949
std::shared_ptr<OpenStream> stream_;
5050
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_;
51-
google::storage::v2::BidiReadObjectRequest next_request;
52-
bool write_pending = false;
53-
bool read_pending = false;
51+
google::storage::v2::BidiReadObjectRequest next_request_;
52+
bool write_pending_ = false;
53+
bool read_pending_ = false;
5454
};
5555

5656
class ObjectDescriptorImpl

0 commit comments

Comments
 (0)