Skip to content

Commit a125707

Browse files
authored
Merge branch 'main' into execute-query
2 parents 7c36a04 + 18073cc commit a125707

13 files changed

+614
-37
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ add_library(
201201
internal/legacy_row_reader.h
202202
internal/logging_data_client.cc
203203
internal/logging_data_client.h
204+
internal/logging_result_set_reader.cc
205+
internal/logging_result_set_reader.h
204206
internal/metrics.cc
205207
internal/metrics.h
206208
internal/mutate_rows_limiter.cc
@@ -484,6 +486,7 @@ if (BUILD_TESTING)
484486
internal/legacy_bulk_mutator_test.cc
485487
internal/legacy_row_reader_test.cc
486488
internal/logging_data_client_test.cc
489+
internal/logging_result_set_reader_test.cc
487490
internal/metrics_test.cc
488491
internal/mutate_rows_limiter_test.cc
489492
internal/operation_context_factory_test.cc

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ bigtable_client_unit_tests = [
6262
"internal/legacy_bulk_mutator_test.cc",
6363
"internal/legacy_row_reader_test.cc",
6464
"internal/logging_data_client_test.cc",
65+
"internal/logging_result_set_reader_test.cc",
6566
"internal/metrics_test.cc",
6667
"internal/mutate_rows_limiter_test.cc",
6768
"internal/operation_context_factory_test.cc",

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ google_cloud_cpp_bigtable_hdrs = [
9999
"internal/legacy_async_row_sampler.h",
100100
"internal/legacy_row_reader.h",
101101
"internal/logging_data_client.h",
102+
"internal/logging_result_set_reader.h",
102103
"internal/metrics.h",
103104
"internal/mutate_rows_limiter.h",
104105
"internal/operation_context.h",
@@ -217,6 +218,7 @@ google_cloud_cpp_bigtable_srcs = [
217218
"internal/legacy_async_row_sampler.cc",
218219
"internal/legacy_row_reader.cc",
219220
"internal/logging_data_client.cc",
221+
"internal/logging_result_set_reader.cc",
220222
"internal/metrics.cc",
221223
"internal/mutate_rows_limiter.cc",
222224
"internal/operation_context.cc",
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/logging_result_set_reader.h"
16+
#include "google/cloud/internal/debug_string.h"
17+
#include "google/cloud/log.h"
18+
19+
namespace google {
20+
namespace cloud {
21+
namespace bigtable_internal {
22+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
23+
24+
using ::google::cloud::internal::DebugString;
25+
26+
void LoggingResultSetReader::TryCancel() { impl_->TryCancel(); }
27+
28+
bool LoggingResultSetReader::Read(
29+
absl::optional<std::string> const& resume_token,
30+
UnownedPartialResultSet& result) {
31+
if (resume_token) {
32+
GCP_LOG(DEBUG) << __func__ << "() << resume_token=\""
33+
<< DebugString(*resume_token, tracing_options_) << "\"";
34+
} else {
35+
GCP_LOG(DEBUG) << __func__ << "() << (unresumable)";
36+
}
37+
bool success = impl_->Read(resume_token, result);
38+
if (!success) {
39+
GCP_LOG(DEBUG) << __func__ << "() >> (failed)";
40+
} else {
41+
GCP_LOG(DEBUG) << __func__ << "() >> resumption="
42+
<< (result.resumption ? "true" : "false");
43+
}
44+
return success;
45+
}
46+
47+
Status LoggingResultSetReader::Finish() { return impl_->Finish(); }
48+
49+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
50+
} // namespace bigtable_internal
51+
} // namespace cloud
52+
} // namespace google
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LOGGING_RESULT_SET_READER_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LOGGING_RESULT_SET_READER_H
17+
18+
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
19+
#include "google/cloud/bigtable/version.h"
20+
#include "google/cloud/tracing_options.h"
21+
#include "absl/types/optional.h"
22+
#include <memory>
23+
24+
namespace google {
25+
namespace cloud {
26+
namespace bigtable_internal {
27+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
28+
29+
/**
30+
* A `PartialResultSetReader` decorator that logs the `Read()` `resume_token`
31+
* parameter and the `PartialResultSet::resumption` return value. This is an
32+
* extension to the standard `BigtableLogging` request/response logging.
33+
*/
34+
class LoggingResultSetReader : public PartialResultSetReader {
35+
public:
36+
LoggingResultSetReader(std::unique_ptr<PartialResultSetReader> impl,
37+
TracingOptions tracing_options)
38+
: impl_(std::move(impl)), tracing_options_(std::move(tracing_options)) {}
39+
~LoggingResultSetReader() override = default;
40+
41+
void TryCancel() override;
42+
bool Read(absl::optional<std::string> const& resume_token,
43+
UnownedPartialResultSet& result) override;
44+
Status Finish() override;
45+
46+
private:
47+
std::unique_ptr<PartialResultSetReader> impl_;
48+
TracingOptions tracing_options_;
49+
};
50+
51+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
52+
} // namespace bigtable_internal
53+
} // namespace cloud
54+
} // namespace google
55+
56+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_LOGGING_RESULT_SET_READER_H
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/logging_result_set_reader.h"
16+
#include "google/cloud/bigtable/testing/mock_partial_result_set_reader.h"
17+
#include "google/cloud/log.h"
18+
#include "google/cloud/testing_util/scoped_log.h"
19+
#include "google/cloud/testing_util/status_matchers.h"
20+
#include "google/cloud/tracing_options.h"
21+
#include <gmock/gmock.h>
22+
23+
namespace google {
24+
namespace cloud {
25+
namespace bigtable_internal {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
namespace {
28+
29+
using ::testing::_;
30+
using ::testing::AllOf;
31+
using ::testing::Contains;
32+
using ::testing::HasSubstr;
33+
using ::testing::IsEmpty;
34+
using ::testing::StartsWith;
35+
36+
class LoggingResultSetReaderTest : public ::testing::Test {
37+
protected:
38+
testing_util::ScopedLog log_;
39+
};
40+
41+
TEST_F(LoggingResultSetReaderTest, TryCancel) {
42+
auto mock = std::make_unique<bigtable_testing::MockPartialResultSetReader>();
43+
EXPECT_CALL(*mock, TryCancel()).Times(1);
44+
LoggingResultSetReader reader(std::move(mock), TracingOptions{});
45+
reader.TryCancel();
46+
47+
EXPECT_THAT(log_.ExtractLines(), IsEmpty());
48+
}
49+
50+
TEST_F(LoggingResultSetReaderTest, Read) {
51+
auto mock = std::make_unique<bigtable_testing::MockPartialResultSetReader>();
52+
EXPECT_CALL(*mock, Read(_, _))
53+
.WillOnce([](absl::optional<std::string> const&,
54+
UnownedPartialResultSet& result) {
55+
result.resumption = false;
56+
result.result.set_resume_token("test-token");
57+
return true;
58+
})
59+
.WillOnce([] { return false; });
60+
LoggingResultSetReader reader(std::move(mock), TracingOptions{});
61+
google::bigtable::v2::PartialResultSet partial_result_set;
62+
auto result =
63+
UnownedPartialResultSet::FromPartialResultSet(partial_result_set);
64+
ASSERT_TRUE(reader.Read("", result));
65+
EXPECT_EQ("test-token", result.result.resume_token());
66+
67+
auto log_lines = log_.ExtractLines();
68+
EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()"))));
69+
EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"\"")));
70+
EXPECT_THAT(log_lines, Contains(HasSubstr("resumption=false")));
71+
72+
ASSERT_FALSE(reader.Read("test-token", result));
73+
74+
log_lines = log_.ExtractLines();
75+
EXPECT_THAT(log_lines, AllOf(Contains(StartsWith("Read()"))));
76+
EXPECT_THAT(log_lines, Contains(HasSubstr("resume_token=\"test-token\"")));
77+
EXPECT_THAT(log_lines, Contains(HasSubstr("(failed)")));
78+
}
79+
80+
TEST_F(LoggingResultSetReaderTest, Finish) {
81+
Status const expected_status = Status(StatusCode::kOutOfRange, "weird");
82+
auto mock = std::make_unique<bigtable_testing::MockPartialResultSetReader>();
83+
EXPECT_CALL(*mock, Finish()).WillOnce([expected_status] {
84+
return expected_status; // NOLINT(performance-no-automatic-move)
85+
});
86+
LoggingResultSetReader reader(std::move(mock), TracingOptions{});
87+
auto status = reader.Finish();
88+
EXPECT_EQ(expected_status, status);
89+
90+
auto log_lines = log_.ExtractLines();
91+
EXPECT_THAT(log_lines, IsEmpty());
92+
}
93+
94+
} // namespace
95+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
96+
} // namespace bigtable_internal
97+
} // namespace cloud
98+
} // namespace google

google/cloud/bigtable/internal/query_plan.cc

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ auto constexpr kRefreshDeadlineOffsetMs = 1000;
2626
} // namespace
2727

2828
std::shared_ptr<QueryPlan> QueryPlan::Create(
29-
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
30-
RefreshFn fn, std::shared_ptr<Clock> clock) {
29+
CompletionQueue cq,
30+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response, RefreshFn fn,
31+
std::shared_ptr<Clock> clock) {
3132
auto plan = std::shared_ptr<QueryPlan>(new QueryPlan(
3233
std::move(cq), std::move(clock), std::move(fn), std::move(response)));
3334
plan->Initialize();
@@ -36,7 +37,7 @@ std::shared_ptr<QueryPlan> QueryPlan::Create(
3637

3738
void QueryPlan::Initialize() {
3839
std::unique_lock<std::mutex> lock(mu_);
39-
ScheduleRefresh(lock);
40+
if (state_ == RefreshState::kDone) ScheduleRefresh(lock);
4041
}
4142

4243
// ScheduleRefresh should only be called after updating response_.
@@ -71,7 +72,7 @@ void QueryPlan::ExpiredRefresh() {
7172
state_ = RefreshState::kBegin;
7273
}
7374
}
74-
RefreshQueryPlan(RefreshMode::kExpired);
75+
RefreshQueryPlan();
7576
}
7677

7778
void QueryPlan::Invalidate(Status status,
@@ -82,26 +83,17 @@ void QueryPlan::Invalidate(Status status,
8283
// query plan, so we track what the previous plan id was.
8384
if (!IsRefreshing(lock) && old_query_plan_id_ != invalid_query_plan_id) {
8485
old_query_plan_id_ = invalid_query_plan_id;
86+
response_ = std::move(status);
8587
state_ = RefreshState::kBegin;
8688
}
8789
}
88-
RefreshQueryPlan(RefreshMode::kInvalidated, std::move(status));
8990
}
9091

91-
void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
92+
void QueryPlan::RefreshQueryPlan() {
9293
{
9394
std::unique_lock<std::mutex> lock_1(mu_);
94-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
95-
assert(waiting_threads_ >= 0);
96-
#endif
97-
++waiting_threads_;
9895
cond_.wait(lock_1, [this] { return state_ != RefreshState::kPending; });
99-
--waiting_threads_;
100-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
101-
assert(waiting_threads_ >= 0);
102-
#endif
10396
if (state_ == RefreshState::kDone) return;
104-
if (mode == RefreshMode::kInvalidated) response_ = std::move(error);
10597
state_ = RefreshState::kPending;
10698
}
10799
auto response = refresh_fn_().get();
@@ -114,7 +106,7 @@ void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
114106
done = true;
115107
// If we have to refresh an invalidated query plan, cancel any existing
116108
// timer before starting a new one.
117-
refresh_timer_.cancel();
109+
if (refresh_timer_.valid()) refresh_timer_.cancel();
118110
ScheduleRefresh(lock_2);
119111
} else {
120112
// If there are no waiting threads that could call the refresh_fn, then
@@ -124,13 +116,7 @@ void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
124116
// If there are waiting threads, then we want to try again to get a
125117
// refreshed query plan, but we want to avoid a stampede of refresh RPCs
126118
// so we only notify one of the waiting threads.
127-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
128-
assert(waiting_threads_ >= 0);
129-
#endif
130-
if (waiting_threads_ == 0) {
131-
state_ = RefreshState::kDone;
132-
done = true;
133-
}
119+
state_ = RefreshState::kBegin;
134120
}
135121
}
136122
if (done) {
@@ -147,7 +133,7 @@ StatusOr<google::bigtable::v2::PrepareQueryResponse> QueryPlan::response() {
147133
return response_;
148134
}
149135
lock.unlock();
150-
RefreshQueryPlan(RefreshMode::kAlreadyRefreshing);
136+
RefreshQueryPlan();
151137
lock.lock();
152138
}
153139

google/cloud/bigtable/internal/query_plan.h

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
3939

4040
// Calls the constructor and then Initialize.
4141
static std::shared_ptr<QueryPlan> Create(
42-
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
42+
CompletionQueue cq,
43+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response,
4344
RefreshFn fn, std::shared_ptr<Clock> clock = std::make_shared<Clock>());
4445

4546
// Invalidates the current QueryPlan and triggers a refresh.
@@ -57,8 +58,9 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
5758

5859
private:
5960
QueryPlan(CompletionQueue cq, std::shared_ptr<Clock> clock, RefreshFn fn,
60-
google::bigtable::v2::PrepareQueryResponse response)
61-
: cq_(std::move(cq)),
61+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response)
62+
: state_(response.ok() ? RefreshState::kDone : RefreshState::kBegin),
63+
cq_(std::move(cq)),
6264
clock_(std::move(clock)),
6365
refresh_fn_(std::move(fn)),
6466
response_(std::move(response)) {}
@@ -73,12 +75,9 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
7375
// capturing a std::weak_ptr to this that calls RefreshQueryPlan.
7476
void ScheduleRefresh(std::unique_lock<std::mutex> const&);
7577

76-
enum class RefreshMode { kExpired, kInvalidated, kAlreadyRefreshing };
7778
// Performs the synchronization around calling RefreshFn and updating
7879
// response_.
79-
// void RefreshQueryPlan();
80-
81-
void RefreshQueryPlan(RefreshMode mode, Status error = {});
80+
void RefreshQueryPlan();
8281

8382
void ExpiredRefresh();
8483

@@ -93,17 +92,14 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
9392
kPending, // waiting for an active thread to refresh response_
9493
kDone, // response_ has been refreshed
9594
};
96-
RefreshState state_ = RefreshState::kDone;
95+
RefreshState state_;
9796

9897
CompletionQueue cq_;
9998
std::shared_ptr<Clock> clock_;
10099
RefreshFn refresh_fn_;
101100
future<void> refresh_timer_;
102101
mutable std::mutex mu_;
103102
std::condition_variable cond_;
104-
// waiting_threads_ is only a snapshot, but it helps us reduce the number of
105-
// RPCs in flight to refresh the same query plan.
106-
int waiting_threads_ = 0;
107103
std::string old_query_plan_id_;
108104
StatusOr<google::bigtable::v2::PrepareQueryResponse>
109105
response_; // GUARDED_BY(mu_)

0 commit comments

Comments
 (0)