Skip to content

Commit bf8d810

Browse files
committed
resolving comments
1 parent e73e1bc commit bf8d810

File tree

3 files changed

+43
-39
lines changed

3 files changed

+43
-39
lines changed

google/cloud/storage/internal/async/multi_stream_manager.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
#include "google/cloud/status.h"
1919
#include "google/cloud/version.h"
20-
#include <cassert>
2120
#include <cstdint>
2221
#include <functional>
2322
#include <list>
@@ -85,18 +84,12 @@ class MultiStreamManager {
8584
}
8685

8786
StreamIterator GetLastStream() {
88-
// SAFETY: The caller must ensure the manager is not empty.
89-
// In ObjectDescriptorImpl, we ensure there is always at least one stream,
90-
// but this assertion protects against future refactoring errors.
91-
assert(!streams_.empty());
87+
if (streams_.empty()) return streams_.end();
9288
return std::prev(streams_.end());
9389
}
9490

9591
StreamIterator GetLeastBusyStream() {
96-
// SAFETY: The caller must ensure the manager is not empty.
97-
// In ObjectDescriptorImpl, we ensure there is always at least one stream,
98-
// but this assertion protects against future refactoring errors.
99-
assert(!streams_.empty());
92+
if (streams_.empty()) return streams_.end();
10093
auto least_busy_it = streams_.begin();
10194
// Track min_ranges to avoid calling .size() repeatedly if possible,
10295
// though for std::unordered_map .size() is O(1).
@@ -165,6 +158,7 @@ class MultiStreamManager {
165158
}
166159

167160
bool Empty() const { return streams_.empty(); }
161+
StreamIterator End() { return streams_.end(); }
168162
std::size_t Size() const { return streams_.size(); }
169163

170164
private:

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ void ObjectDescriptorImpl::Start(
5151
google::storage::v2::BidiReadObjectResponse first_response) {
5252
std::unique_lock<std::mutex> lk(mu_);
5353
auto it = stream_manager_->GetLastStream();
54+
if (it == stream_manager_->End()) return;
5455
lk.unlock();
5556
OnRead(it, std::move(first_response));
5657
// Acquire lock and queue the background stream.
@@ -86,7 +87,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
8687
[](StreamManager::Stream const& s) {
8788
auto const* rs = s.stream.get();
8889
return rs != nullptr && s.active_ranges.empty() &&
89-
!rs->write_pending_;
90+
!rs->write_pending;
9091
})) {
9192
return;
9293
}
@@ -150,9 +151,19 @@ ObjectDescriptorImpl::Read(ReadParams p) {
150151
}
151152

152153
auto it = stream_manager_->GetLeastBusyStream();
154+
if (it == stream_manager_->End()) {
155+
lk.unlock();
156+
range->OnFinish(Status(StatusCode::kFailedPrecondition,
157+
"Cannot read object, all streams failed"));
158+
if (!internal::TracingEnabled(options_)) {
159+
return std::unique_ptr<storage_experimental::AsyncReaderConnection>(
160+
std::make_unique<ObjectDescriptorReader>(std::move(range)));
161+
}
162+
return MakeTracingObjectDescriptorReader(std::move(range));
163+
}
153164
auto const id = ++read_id_generator_;
154165
it->active_ranges.emplace(id, range);
155-
auto& read_range = *it->stream->next_request_.add_read_ranges();
166+
auto& read_range = *it->stream->next_request.add_read_ranges();
156167
read_range.set_read_id(id);
157168
read_range.set_read_offset(p.start);
158169
read_range.set_read_length(p.length);
@@ -168,18 +179,18 @@ ObjectDescriptorImpl::Read(ReadParams p) {
168179

169180
void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk,
170181
StreamIterator it) {
171-
if (it->stream->write_pending_ ||
172-
it->stream->next_request_.read_ranges().empty()) {
182+
if (it->stream->write_pending ||
183+
it->stream->next_request.read_ranges().empty()) {
173184
return;
174185
}
175-
it->stream->write_pending_ = true;
186+
it->stream->write_pending = true;
176187
google::storage::v2::BidiReadObjectRequest request;
177-
request.Swap(&it->stream->next_request_);
188+
request.Swap(&it->stream->next_request);
178189

179190
// Assign CurrentStream to a temporary variable to prevent
180191
// lifetime extension which can cause the lock to be held until the
181192
// end of the block.
182-
auto current_stream = it->stream->stream_;
193+
auto current_stream = it->stream->stream;
183194
lk.unlock();
184195
current_stream->Write(std::move(request))
185196
.then([w = WeakFromThis(), it](auto f) {
@@ -190,19 +201,19 @@ void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk,
190201
void ObjectDescriptorImpl::OnWrite(StreamIterator it, bool ok) {
191202
std::unique_lock<std::mutex> lk(mu_);
192203
if (!ok) return DoFinish(std::move(lk), it);
193-
it->stream->write_pending_ = false;
204+
it->stream->write_pending = false;
194205
Flush(std::move(lk), it);
195206
}
196207

197208
void ObjectDescriptorImpl::DoRead(std::unique_lock<std::mutex> lk,
198209
StreamIterator it) {
199-
if (it->stream->read_pending_) return;
200-
it->stream->read_pending_ = true;
210+
if (it->stream->read_pending) return;
211+
it->stream->read_pending = true;
201212

202213
// Assign CurrentStream to a temporary variable to prevent
203214
// lifetime extension which can cause the lock to be held until the
204215
// end of the block.
205-
auto current_stream = it->stream->stream_;
216+
auto current_stream = it->stream->stream;
206217
lk.unlock();
207218
current_stream->Read().then([w = WeakFromThis(), it](auto f) {
208219
if (auto self = w.lock()) self->OnRead(it, f.get());
@@ -213,7 +224,7 @@ void ObjectDescriptorImpl::OnRead(
213224
StreamIterator it,
214225
absl::optional<google::storage::v2::BidiReadObjectResponse> response) {
215226
std::unique_lock<std::mutex> lk(mu_);
216-
it->stream->read_pending_ = false;
227+
it->stream->read_pending = false;
217228

218229
if (!response) return DoFinish(std::move(lk), it);
219230
if (response->has_metadata()) {
@@ -242,11 +253,11 @@ void ObjectDescriptorImpl::OnRead(
242253

243254
void ObjectDescriptorImpl::DoFinish(std::unique_lock<std::mutex> lk,
244255
StreamIterator it) {
245-
it->stream->read_pending_ = false;
256+
it->stream->read_pending = false;
246257
// Assign CurrentStream to a temporary variable to prevent
247258
// lifetime extension which can cause the lock to be held until the
248259
// end of the block.
249-
auto current_stream = it->stream->stream_;
260+
auto current_stream = it->stream->stream;
250261
lk.unlock();
251262
auto pending = current_stream->Finish();
252263
if (!pending.valid()) return;
@@ -292,8 +303,8 @@ void ObjectDescriptorImpl::OnResume(StreamIterator it,
292303

293304
it->stream = std::make_shared<ReadStream>(std::move(result->stream),
294305
resume_policy_prototype_->clone());
295-
it->stream->write_pending_ = false;
296-
it->stream->read_pending_ = false;
306+
it->stream->write_pending = false;
307+
it->stream->read_pending = false;
297308

298309
// TODO(#15105) - this should be done without release the lock.
299310
Flush(std::move(lk), it);
@@ -326,7 +337,7 @@ bool ObjectDescriptorImpl::IsResumable(
326337
stream_manager_->CleanupDoneRanges(it);
327338
return true;
328339
}
329-
return it->stream->resume_policy_->OnFinish(status) ==
340+
return it->stream->resume_policy->OnFinish(status) ==
330341
storage_experimental::ResumePolicy::kContinue;
331342
}
332343

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,25 @@ namespace cloud {
3636
namespace storage_internal {
3737
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3838

39-
class ReadStream : public storage_internal::StreamBase {
40-
public:
39+
struct ReadStream : public storage_internal::StreamBase {
4140
ReadStream(std::shared_ptr<OpenStream> stream,
4241
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy)
43-
: stream_(std::move(stream)), resume_policy_(std::move(resume_policy)) {}
42+
: stream(std::move(stream)), resume_policy(std::move(resume_policy)) {}
4443

4544
void Cancel() override {
46-
if (stream_) stream_->Cancel();
45+
if (stream) stream->Cancel();
4746
}
4847

49-
std::shared_ptr<OpenStream> stream_;
50-
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_;
51-
google::storage::v2::BidiReadObjectRequest next_request_;
52-
bool write_pending_ = false;
53-
bool read_pending_ = false;
48+
std::shared_ptr<OpenStream> stream;
49+
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy;
50+
google::storage::v2::BidiReadObjectRequest next_request;
51+
bool write_pending = false;
52+
bool read_pending = false;
5453
};
5554

5655
class ObjectDescriptorImpl
5756
: public storage_experimental::ObjectDescriptorConnection,
5857
public std::enable_shared_from_this<ObjectDescriptorImpl> {
59-
private:
60-
using StreamManager = MultiStreamManager<ReadStream, ReadRange>;
61-
using StreamIterator = StreamManager::StreamIterator;
62-
6358
public:
6459
ObjectDescriptorImpl(
6560
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy,
@@ -87,6 +82,10 @@ class ObjectDescriptorImpl
8782
void MakeSubsequentStream() override;
8883

8984
private:
85+
using StreamManager = MultiStreamManager<ReadStream, ReadRange>;
86+
using StreamIterator =
87+
MultiStreamManager<ReadStream, ReadRange>::StreamIterator;
88+
9089
std::weak_ptr<ObjectDescriptorImpl> WeakFromThis() {
9190
return shared_from_this();
9291
}

0 commit comments

Comments
 (0)