Skip to content

Commit e7a2773

Browse files
committed
initial implementation
1 parent 826dd38 commit e7a2773

10 files changed

+803
-0
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ add_library(
166166
internal/bigtable_logging_decorator.h
167167
internal/bigtable_metadata_decorator.cc
168168
internal/bigtable_metadata_decorator.h
169+
internal/bigtable_random_two_least_used_decorator.cc
170+
internal/bigtable_random_two_least_used_decorator.h
169171
internal/bigtable_round_robin_decorator.cc
170172
internal/bigtable_round_robin_decorator.h
171173
internal/bigtable_stub.cc

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ google_cloud_cpp_bigtable_hdrs = [
8080
"internal/bigtable_channel_refresh.h",
8181
"internal/bigtable_logging_decorator.h",
8282
"internal/bigtable_metadata_decorator.h",
83+
"internal/bigtable_random_two_least_used_decorator.h",
8384
"internal/bigtable_round_robin_decorator.h",
8485
"internal/bigtable_stub.h",
8586
"internal/bigtable_stub_factory.h",
@@ -204,6 +205,7 @@ google_cloud_cpp_bigtable_srcs = [
204205
"internal/bigtable_channel_refresh.cc",
205206
"internal/bigtable_logging_decorator.cc",
206207
"internal/bigtable_metadata_decorator.cc",
208+
"internal/bigtable_random_two_least_used_decorator.cc",
207209
"internal/bigtable_round_robin_decorator.cc",
208210
"internal/bigtable_stub.cc",
209211
"internal/bigtable_stub_factory.cc",
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Generated by the Codegen C++ plugin.
16+
// If you make any local changes, they will be lost.
17+
// source: google/bigtable/v2/bigtable.proto
18+
19+
#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
20+
#include <memory>
21+
#include <mutex>
22+
#include <vector>
23+
24+
namespace google {
25+
namespace cloud {
26+
namespace bigtable_internal {
27+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
28+
namespace {
29+
30+
template <typename T>
31+
class StreamingReadRpcTracking
32+
: public google::cloud::internal::StreamingReadRpc<T> {
33+
public:
34+
StreamingReadRpcTracking(
35+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<T>> child,
36+
std::function<void(void)> on_destruction)
37+
: child_(std::move(child)), on_destruction_(std::move(on_destruction)) {}
38+
39+
~StreamingReadRpcTracking() override { on_destruction_(); }
40+
41+
void Cancel() override { child_->Cancel(); }
42+
absl::optional<Status> Read(T* response) override {
43+
return child_->Read(response);
44+
}
45+
RpcMetadata GetRequestMetadata() const override {
46+
return child_->GetRequestMetadata();
47+
}
48+
49+
private:
50+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<T>> child_;
51+
std::function<void(void)> on_destruction_;
52+
};
53+
54+
template <typename T>
55+
class AsyncStreamingReadRpcTracking
56+
: public google::cloud::internal::AsyncStreamingReadRpc<T> {
57+
public:
58+
AsyncStreamingReadRpcTracking(
59+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<T>> child,
60+
std::function<void(void)> on_destruction)
61+
: child_(std::move(child)), on_destruction_(std::move(on_destruction)) {}
62+
63+
~AsyncStreamingReadRpcTracking() override { on_destruction_(); }
64+
65+
void Cancel() override { child_->Cancel(); }
66+
future<bool> Start() override { return child_->Start(); }
67+
future<absl::optional<T>> Read() override { return child_->Read(); }
68+
future<Status> Finish() override { return child_->Finish(); }
69+
RpcMetadata GetRequestMetadata() const override {
70+
return child_->GetRequestMetadata();
71+
}
72+
73+
private:
74+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<T>> child_;
75+
std::function<void(void)> on_destruction_;
76+
};
77+
78+
} // namespace
79+
80+
BigtableRandomTwoLeastUsed::BigtableRandomTwoLeastUsed(
81+
CompletionQueue cq,
82+
internal::DynamicChannelPool<BigtableStub>::StubFactoryFn factory_fn,
83+
std::vector<std::shared_ptr<BigtableStub>> children)
84+
: pool_(internal::DynamicChannelPool<BigtableStub>::Create(
85+
std::move(cq), std::move(children), std::move(factory_fn))) {}
86+
87+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
88+
google::bigtable::v2::ReadRowsResponse>>
89+
BigtableRandomTwoLeastUsed::ReadRows(
90+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
91+
google::bigtable::v2::ReadRowsRequest const& request) {
92+
auto child = Child();
93+
auto stub = child->AcquireStub();
94+
auto result = stub->ReadRows(std::move(context), options, request);
95+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
96+
auto release_fn = [weak = std::move(weak)]() {
97+
auto child = weak.lock();
98+
if (child) child->ReleaseStub();
99+
};
100+
return std::make_unique<
101+
StreamingReadRpcTracking<google::bigtable::v2::ReadRowsResponse>>(
102+
std::move(result), std::move(release_fn));
103+
}
104+
105+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
106+
google::bigtable::v2::SampleRowKeysResponse>>
107+
BigtableRandomTwoLeastUsed::SampleRowKeys(
108+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
109+
google::bigtable::v2::SampleRowKeysRequest const& request) {
110+
auto child = Child();
111+
auto stub = child->AcquireStub();
112+
auto result = stub->SampleRowKeys(std::move(context), options, request);
113+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
114+
auto release_fn = [weak = std::move(weak)]() {
115+
auto child = weak.lock();
116+
if (child) child->ReleaseStub();
117+
};
118+
return std::make_unique<
119+
StreamingReadRpcTracking<google::bigtable::v2::SampleRowKeysResponse>>(
120+
std::move(result), std::move(release_fn));
121+
}
122+
123+
StatusOr<google::bigtable::v2::MutateRowResponse>
124+
BigtableRandomTwoLeastUsed::MutateRow(
125+
grpc::ClientContext& context, Options const& options,
126+
google::bigtable::v2::MutateRowRequest const& request) {
127+
auto child = Child();
128+
auto stub = child->AcquireStub();
129+
auto result = stub->MutateRow(context, options, request);
130+
child->ReleaseStub();
131+
return result;
132+
}
133+
134+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
135+
google::bigtable::v2::MutateRowsResponse>>
136+
BigtableRandomTwoLeastUsed::MutateRows(
137+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
138+
google::bigtable::v2::MutateRowsRequest const& request) {
139+
auto child = Child();
140+
auto stub = child->AcquireStub();
141+
auto result = stub->MutateRows(std::move(context), options, request);
142+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
143+
auto release_fn = [weak = std::move(weak)]() {
144+
auto child = weak.lock();
145+
if (child) child->ReleaseStub();
146+
};
147+
return std::make_unique<
148+
StreamingReadRpcTracking<google::bigtable::v2::MutateRowsResponse>>(
149+
std::move(result), std::move(release_fn));
150+
}
151+
152+
StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>
153+
BigtableRandomTwoLeastUsed::CheckAndMutateRow(
154+
grpc::ClientContext& context, Options const& options,
155+
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
156+
auto child = Child();
157+
auto stub = child->AcquireStub();
158+
auto result = stub->CheckAndMutateRow(context, options, request);
159+
child->ReleaseStub();
160+
return result;
161+
}
162+
163+
StatusOr<google::bigtable::v2::PingAndWarmResponse>
164+
BigtableRandomTwoLeastUsed::PingAndWarm(
165+
grpc::ClientContext& context, Options const& options,
166+
google::bigtable::v2::PingAndWarmRequest const& request) {
167+
auto child = Child();
168+
auto stub = child->AcquireStub();
169+
auto result = stub->PingAndWarm(context, options, request);
170+
child->ReleaseStub();
171+
return result;
172+
}
173+
174+
StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>
175+
BigtableRandomTwoLeastUsed::ReadModifyWriteRow(
176+
grpc::ClientContext& context, Options const& options,
177+
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
178+
auto child = Child();
179+
auto stub = child->AcquireStub();
180+
auto result = stub->ReadModifyWriteRow(context, options, request);
181+
child->ReleaseStub();
182+
return result;
183+
}
184+
185+
StatusOr<google::bigtable::v2::PrepareQueryResponse>
186+
BigtableRandomTwoLeastUsed::PrepareQuery(
187+
grpc::ClientContext& context, Options const& options,
188+
google::bigtable::v2::PrepareQueryRequest const& request) {
189+
auto child = Child();
190+
auto stub = child->AcquireStub();
191+
auto result = stub->PrepareQuery(context, options, request);
192+
child->ReleaseStub();
193+
return result;
194+
}
195+
196+
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
197+
google::bigtable::v2::ExecuteQueryResponse>>
198+
BigtableRandomTwoLeastUsed::ExecuteQuery(
199+
std::shared_ptr<grpc::ClientContext> context, Options const& options,
200+
google::bigtable::v2::ExecuteQueryRequest const& request) {
201+
auto child = Child();
202+
auto stub = child->AcquireStub();
203+
auto result = stub->ExecuteQuery(std::move(context), options, request);
204+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
205+
auto release_fn = [weak = std::move(weak)]() {
206+
auto child = weak.lock();
207+
if (child) child->ReleaseStub();
208+
};
209+
return std::make_unique<
210+
StreamingReadRpcTracking<google::bigtable::v2::ExecuteQueryResponse>>(
211+
std::move(result), std::move(release_fn));
212+
}
213+
214+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
215+
google::bigtable::v2::ReadRowsResponse>>
216+
BigtableRandomTwoLeastUsed::AsyncReadRows(
217+
google::cloud::CompletionQueue const& cq,
218+
std::shared_ptr<grpc::ClientContext> context,
219+
google::cloud::internal::ImmutableOptions options,
220+
google::bigtable::v2::ReadRowsRequest const& request) {
221+
auto child = Child();
222+
auto stub = child->AcquireStub();
223+
auto result =
224+
stub->AsyncReadRows(cq, std::move(context), std::move(options), request);
225+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
226+
auto release_fn = [weak = std::move(weak)]() {
227+
auto child = weak.lock();
228+
if (child) child->ReleaseStub();
229+
};
230+
return std::make_unique<
231+
AsyncStreamingReadRpcTracking<google::bigtable::v2::ReadRowsResponse>>(
232+
std::move(result), std::move(release_fn));
233+
}
234+
235+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
236+
google::bigtable::v2::SampleRowKeysResponse>>
237+
BigtableRandomTwoLeastUsed::AsyncSampleRowKeys(
238+
google::cloud::CompletionQueue const& cq,
239+
std::shared_ptr<grpc::ClientContext> context,
240+
google::cloud::internal::ImmutableOptions options,
241+
google::bigtable::v2::SampleRowKeysRequest const& request) {
242+
auto child = Child();
243+
auto stub = child->AcquireStub();
244+
auto result = stub->AsyncSampleRowKeys(cq, std::move(context),
245+
std::move(options), request);
246+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
247+
auto release_fn = [weak = std::move(weak)]() {
248+
auto child = weak.lock();
249+
if (child) child->ReleaseStub();
250+
};
251+
return std::make_unique<AsyncStreamingReadRpcTracking<
252+
google::bigtable::v2::SampleRowKeysResponse>>(std::move(result),
253+
std::move(release_fn));
254+
}
255+
256+
future<StatusOr<google::bigtable::v2::MutateRowResponse>>
257+
BigtableRandomTwoLeastUsed::AsyncMutateRow(
258+
google::cloud::CompletionQueue& cq,
259+
std::shared_ptr<grpc::ClientContext> context,
260+
google::cloud::internal::ImmutableOptions options,
261+
google::bigtable::v2::MutateRowRequest const& request) {
262+
auto child = Child();
263+
auto stub = child->AcquireStub();
264+
auto result =
265+
stub->AsyncMutateRow(cq, std::move(context), std::move(options), request);
266+
child->ReleaseStub();
267+
return result;
268+
}
269+
270+
std::unique_ptr<google::cloud::internal::AsyncStreamingReadRpc<
271+
google::bigtable::v2::MutateRowsResponse>>
272+
BigtableRandomTwoLeastUsed::AsyncMutateRows(
273+
google::cloud::CompletionQueue const& cq,
274+
std::shared_ptr<grpc::ClientContext> context,
275+
google::cloud::internal::ImmutableOptions options,
276+
google::bigtable::v2::MutateRowsRequest const& request) {
277+
auto child = Child();
278+
auto stub = child->AcquireStub();
279+
auto result = stub->AsyncMutateRows(cq, std::move(context),
280+
std::move(options), request);
281+
282+
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
283+
auto release_fn = [weak = std::move(weak)]() {
284+
auto child = weak.lock();
285+
if (child) child->ReleaseStub();
286+
};
287+
288+
return std::make_unique<
289+
AsyncStreamingReadRpcTracking<google::bigtable::v2::MutateRowsResponse>>(
290+
std::move(result), std::move(release_fn));
291+
}
292+
293+
future<StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>>
294+
BigtableRandomTwoLeastUsed::AsyncCheckAndMutateRow(
295+
google::cloud::CompletionQueue& cq,
296+
std::shared_ptr<grpc::ClientContext> context,
297+
google::cloud::internal::ImmutableOptions options,
298+
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
299+
auto child = Child();
300+
auto stub = child->AcquireStub();
301+
auto result = stub->AsyncCheckAndMutateRow(cq, std::move(context),
302+
std::move(options), request);
303+
child->ReleaseStub();
304+
return result;
305+
}
306+
307+
future<StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>>
308+
BigtableRandomTwoLeastUsed::AsyncReadModifyWriteRow(
309+
google::cloud::CompletionQueue& cq,
310+
std::shared_ptr<grpc::ClientContext> context,
311+
google::cloud::internal::ImmutableOptions options,
312+
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
313+
auto child = Child();
314+
auto stub = child->AcquireStub();
315+
auto result = stub->AsyncReadModifyWriteRow(cq, std::move(context),
316+
std::move(options), request);
317+
child->ReleaseStub();
318+
return result;
319+
}
320+
321+
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
322+
BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
323+
google::cloud::CompletionQueue& cq,
324+
std::shared_ptr<grpc::ClientContext> context,
325+
google::cloud::internal::ImmutableOptions options,
326+
google::bigtable::v2::PrepareQueryRequest const& request) {
327+
auto child = Child();
328+
auto stub = child->AcquireStub();
329+
auto result = stub->AsyncPrepareQuery(cq, std::move(context),
330+
std::move(options), request);
331+
child->ReleaseStub();
332+
return result;
333+
}
334+
335+
std::shared_ptr<internal::StubWrapper<BigtableStub>>
336+
BigtableRandomTwoLeastUsed::Child() {
337+
return pool_->GetChannelRandomTwoLeastUsed();
338+
// std::unique_lock<std::mutex> lk(mu_);
339+
// std::vector<std::size_t> indices(pool_->size(lk) - 1);
340+
// // TODO(sdhart): Maybe use iota on iterators instead of indices
341+
// std::iota(indices.begin(), indices.end(), 0);
342+
// std::shuffle(indices.begin(), indices.end(), rng_);
343+
// auto channel_1 = pool_->GetChannel(lk, indices[0]);
344+
// auto channel_2 = pool_->GetChannel(lk, indices[1]);
345+
//
346+
// return channel_1->outstanding_rpcs(lk) < channel_2->outstanding_rpcs(lk)
347+
// ? channel_1
348+
// : channel_2;
349+
}
350+
351+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
352+
} // namespace bigtable_internal
353+
} // namespace cloud
354+
} // namespace google

0 commit comments

Comments
 (0)