Skip to content

Commit 1a4aebe

Browse files
committed
implement multi stream manager for async downloads
1 parent d6aff23 commit 1a4aebe

11 files changed

+1068
-350
lines changed

google/cloud/storage/async/object_descriptor.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,11 @@ absl::optional<google::storage::v2::Object> ObjectDescriptor::metadata() const {
2727

2828
std::pair<AsyncReader, AsyncToken> ObjectDescriptor::Read(std::int64_t offset,
2929
std::int64_t limit) {
30-
// TODO(15340): This change is causing performance regression. We need to
31-
// revisit it after benchmarking our code.
32-
33-
// std::int64_t max_range =
34-
// impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
35-
// if (limit > max_range) {
36-
// impl_->MakeSubsequentStream();
37-
// }
30+
std::int64_t max_range =
31+
impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
32+
if (limit > max_range) {
33+
impl_->MakeSubsequentStream();
34+
}
3835
auto reader = impl_->Read({offset, limit});
3936
auto token = storage_internal::MakeAsyncToken(reader.get());
4037
return {AsyncReader(std::move(reader)), std::move(token)};

google/cloud/storage/async/object_descriptor_test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ TEST(ObjectDescriptor, ReadLast) {
149149
}
150150

151151
TEST(ObjectDescriptor, ReadExceedsMaxRange) {
152-
GTEST_SKIP();
153152
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
154153
auto constexpr kMaxRange = 1024;
155154
EXPECT_CALL(*mock, options)

google/cloud/storage/google_cloud_cpp_storage_grpc.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ google_cloud_cpp_storage_grpc_hdrs = [
4343
"internal/async/default_options.h",
4444
"internal/async/handle_redirect_error.h",
4545
"internal/async/insert_object.h",
46+
"internal/async/multi_stream_manager.h",
4647
"internal/async/object_descriptor_connection_tracing.h",
4748
"internal/async/object_descriptor_impl.h",
4849
"internal/async/object_descriptor_reader.h",
@@ -120,6 +121,7 @@ google_cloud_cpp_storage_grpc_srcs = [
120121
"internal/async/default_options.cc",
121122
"internal/async/handle_redirect_error.cc",
122123
"internal/async/insert_object.cc",
124+
"internal/async/multi_stream_manager.cc",
123125
"internal/async/object_descriptor_connection_tracing.cc",
124126
"internal/async/object_descriptor_impl.cc",
125127
"internal/async/object_descriptor_reader.cc",

google/cloud/storage/google_cloud_cpp_storage_grpc.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ add_library(
110110
internal/async/handle_redirect_error.h
111111
internal/async/insert_object.cc
112112
internal/async/insert_object.h
113+
internal/async/multi_stream_manager.cc
114+
internal/async/multi_stream_manager.h
113115
internal/async/object_descriptor_connection_tracing.cc
114116
internal/async/object_descriptor_connection_tracing.h
115117
internal/async/object_descriptor_impl.cc
@@ -442,6 +444,7 @@ set(storage_client_grpc_unit_tests
442444
internal/async/default_options_test.cc
443445
internal/async/handle_redirect_error_test.cc
444446
internal/async/insert_object_test.cc
447+
internal/async/multi_stream_manager_test.cc
445448
internal/async/object_descriptor_connection_tracing_test.cc
446449
internal/async/object_descriptor_impl_test.cc
447450
internal/async/object_descriptor_reader_test.cc
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/internal/async/multi_stream_manager.h"
16+
#include "google/cloud/storage/internal/async/object_descriptor_impl.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace storage_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
// Explicit instantiation for ObjectDescriptorImpl usage.
24+
template class MultiStreamManager<ReadStream, ReadRange>;
25+
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
27+
} // namespace storage_internal
28+
} // namespace cloud
29+
} // namespace google
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H
17+
18+
#include "google/cloud/status.h"
19+
#include "google/cloud/version.h"
20+
#include <cassert>
21+
#include <cstdint>
22+
#include <functional>
23+
#include <list>
24+
#include <memory>
25+
#include <unordered_map>
26+
27+
namespace google {
28+
namespace cloud {
29+
namespace storage_internal {
30+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
31+
32+
class StreamBase {
33+
public:
34+
virtual ~StreamBase() = default;
35+
virtual void Cancel() = 0;
36+
};
37+
38+
// Manages a collection of streams.
39+
//
40+
// This class implements the "Subsequent Stream" logic where idle streams
41+
// are moved to the back of the queue for reuse.
42+
//
43+
// THREAD SAFETY:
44+
// This class is NOT thread-safe. The owner (ObjectDescriptorImpl) must
45+
// serialize access, typically by holding `mu_` while calling these methods.
46+
template <typename StreamT, typename RangeT>
47+
class MultiStreamManager {
48+
public:
49+
struct Stream {
50+
std::shared_ptr<StreamT> stream;
51+
std::unordered_map<std::int64_t, std::shared_ptr<RangeT>> active_ranges;
52+
};
53+
54+
using StreamIterator = typename std::list<Stream>::iterator;
55+
using StreamFactory = std::function<std::shared_ptr<StreamT>()>;
56+
57+
// Constructor creates the first stream using the factory immediately.
58+
explicit MultiStreamManager(StreamFactory stream_factory)
59+
: stream_factory_(std::move(stream_factory)) {
60+
streams_.push_back(Stream{stream_factory_(), {}});
61+
}
62+
63+
// Constructor accepts an already-created initial stream.
64+
// This is required by ObjectDescriptorImpl which receives an OpenStream.
65+
MultiStreamManager(StreamFactory stream_factory, std::shared_ptr<StreamT> initial_stream)
66+
: stream_factory_(std::move(stream_factory)) {
67+
streams_.push_back(Stream{std::move(initial_stream), {}});
68+
}
69+
70+
StreamIterator GetLastStream() {
71+
// SAFETY: The caller must ensure the manager is not empty.
72+
// In ObjectDescriptorImpl, we ensure there is always at least one stream,
73+
// but this assertion protects against future refactoring errors.
74+
assert(!streams_.empty());
75+
return std::prev(streams_.end());
76+
}
77+
78+
StreamIterator GetLeastBusyStream() {
79+
// SAFETY: The caller must ensure the manager is not empty.
80+
// In ObjectDescriptorImpl, we ensure there is always at least one stream,
81+
// but this assertion protects against future refactoring errors.
82+
assert(!streams_.empty());
83+
auto best_it = streams_.begin();
84+
// Track min_ranges to avoid calling .size() repeatedly if possible,
85+
// though for std::unordered_map .size() is O(1).
86+
std::size_t min_ranges = best_it->active_ranges.size();
87+
88+
// Start checking from the second element
89+
for (auto it = std::next(streams_.begin()); it != streams_.end(); ++it) {
90+
// Strict less-than ensures stability (preferring older streams if tied)
91+
if (it->active_ranges.size() < min_ranges) {
92+
best_it = it;
93+
min_ranges = it->active_ranges.size();
94+
}
95+
}
96+
return best_it;
97+
}
98+
99+
StreamIterator AddStream(std::shared_ptr<StreamT> stream) {
100+
streams_.push_back(Stream{std::move(stream), {}});
101+
return std::prev(streams_.end());
102+
}
103+
104+
void CancelAll() {
105+
for (auto& s : streams_) {
106+
if (s.stream) s.stream->Cancel();
107+
}
108+
}
109+
110+
void RemoveStreamAndNotifyRanges(StreamIterator it, Status const& status) {
111+
auto ranges = std::move(it->active_ranges);
112+
streams_.erase(it);
113+
for (auto const& kv : ranges) {
114+
kv.second->OnFinish(status);
115+
}
116+
}
117+
118+
void MoveActiveRanges(StreamIterator from, StreamIterator to) {
119+
to->active_ranges = std::move(from->active_ranges);
120+
}
121+
122+
void CleanupDoneRanges(StreamIterator it) {
123+
auto& active_ranges = it->active_ranges;
124+
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
125+
if (i->second->IsDone()) {
126+
i = active_ranges.erase(i);
127+
} else {
128+
++i;
129+
}
130+
}
131+
}
132+
133+
template <typename Pred>
134+
bool ReuseIdleStreamToBack(Pred pred) {
135+
for (auto it = streams_.begin(); it != streams_.end(); ++it) {
136+
if (!pred(*it)) continue;
137+
138+
// If the idle stream is already at the back, we don't
139+
// need to move it. If it's elsewhere, use splice() to move the node.
140+
// splice() is O(1) and, crucially, does not invalidate iterators
141+
// or copy the Stream object.
142+
if (std::next(it) != streams_.end()) {
143+
streams_.splice(streams_.end(), streams_, it);
144+
}
145+
return true;
146+
}
147+
return false;
148+
}
149+
150+
bool Empty() const { return streams_.empty(); }
151+
std::size_t Size() const { return streams_.size(); }
152+
153+
private:
154+
std::list<Stream> streams_;
155+
StreamFactory stream_factory_;
156+
};
157+
158+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
159+
} // namespace storage_internal
160+
} // namespace cloud
161+
} // namespace google
162+
163+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H

0 commit comments

Comments
 (0)