Skip to content

Commit a5d627d

Browse files
committed
impl(bigtable): add QueryPlan refresh with simple tests
1 parent cfa8052 commit a5d627d

File tree

4 files changed

+360
-44
lines changed

4 files changed

+360
-44
lines changed

google/cloud/bigtable/BUILD.bazel

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,11 @@ cc_library(
114114
name = test.replace("/", "_").replace(".cc", ""),
115115
srcs = [test],
116116
local_defines = select({
117-
":metrics_enabled": ["GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS"],
118-
"//conditions:default": [],
117+
":metrics_enabled": [
118+
"GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS",
119+
"GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT",
120+
],
121+
"//conditions:default": ["GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT"],
119122
}),
120123
deps = [
121124
":bigtable_client_testing",

google/cloud/bigtable/internal/query_plan.cc

Lines changed: 138 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,162 @@
1414

1515
#include "google/cloud/bigtable/internal/query_plan.h"
1616
#include "google/cloud/completion_queue.h"
17+
#include "google/cloud/internal/time_utils.h"
1718
#include <google/bigtable/v2/data.pb.h>
1819

1920
namespace google {
2021
namespace cloud {
2122
namespace bigtable_internal {
2223
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
24+
namespace {
25+
auto constexpr kRefreshDeadlineOffset = 120;
26+
} // namespace
2327

2428
std::shared_ptr<QueryPlan> QueryPlan::Create(
2529
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
26-
RefreshFn fn) {
27-
auto plan = std::shared_ptr<QueryPlan>(
28-
new QueryPlan(std::move(cq), std::move(response), std::move(fn)));
30+
RefreshFn fn, std::shared_ptr<Clock> clock) {
31+
auto plan = std::shared_ptr<QueryPlan>(new QueryPlan(
32+
std::move(cq), std::move(clock), std::move(fn), std::move(response)));
2933
plan->Initialize();
3034
return plan;
3135
}
3236

33-
bool QueryPlan::IsExpired() { return false; }
37+
void QueryPlan::Initialize() {
38+
std::unique_lock<std::mutex> lock(mu_);
39+
ScheduleRefresh(lock);
40+
}
41+
42+
// ScheduleRefresh should only be called after updating response_.
43+
void QueryPlan::ScheduleRefresh(std::unique_lock<std::mutex> const&) {
44+
if (!response_.ok()) return;
45+
// We want to start the refresh process before the query plan expires.
46+
auto refresh_deadline =
47+
internal::ToChronoTimePoint(response_->valid_until()) -
48+
std::chrono::seconds(kRefreshDeadlineOffset);
49+
std::weak_ptr<QueryPlan> plan = shared_from_this();
50+
refresh_timer_ =
51+
cq_.MakeDeadlineTimer(refresh_deadline)
52+
.then([plan](future<StatusOr<std::chrono::system_clock::time_point>>
53+
result) {
54+
if (result.get().ok()) {
55+
if (auto p = plan.lock()) {
56+
p->ExpiredRefresh();
57+
}
58+
}
59+
});
60+
}
61+
62+
bool QueryPlan::IsRefreshing(std::unique_lock<std::mutex> const&) const {
63+
return state_ == RefreshState::kBegin || state_ == RefreshState::kPending;
64+
}
65+
66+
void QueryPlan::ExpiredRefresh() {
67+
{
68+
std::unique_lock<std::mutex> lock(mu_);
69+
if (!(IsRefreshing(lock))) {
70+
if (response_.ok()) old_query_plan_id_ = response_->prepared_query();
71+
state_ = RefreshState::kBegin;
72+
}
73+
}
74+
RefreshQueryPlan(RefreshMode::kExpired);
75+
}
76+
77+
void QueryPlan::Invalidate(Status status,
78+
std::string const& invalid_query_plan_id) {
79+
{
80+
std::unique_lock<std::mutex> lock(mu_);
81+
// We want to avoid a late arrival causing a refresh of an already refreshed
82+
// query plan, so we track what the previous plan id was.
83+
if (!IsRefreshing(lock) && old_query_plan_id_ != invalid_query_plan_id) {
84+
old_query_plan_id_ = invalid_query_plan_id;
85+
state_ = RefreshState::kBegin;
86+
}
87+
}
88+
RefreshQueryPlan(RefreshMode::kInvalidated, std::move(status));
89+
}
3490

35-
StatusOr<std::string> QueryPlan::prepared_query() const {
36-
std::lock_guard<std::mutex> lock(mu_);
37-
if (IsExpired()) {
38-
return Status(StatusCode::kUnavailable, "Query plan has expired");
91+
void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
92+
{
93+
std::unique_lock<std::mutex> lock_1(mu_);
94+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
95+
assert(waiting_threads_ >= 0);
96+
#endif
97+
++waiting_threads_;
98+
cond_.wait(lock_1, [this] { return state_ != RefreshState::kPending; });
99+
--waiting_threads_;
100+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
101+
assert(waiting_threads_ >= 0);
102+
#endif
103+
if (state_ == RefreshState::kDone) return;
104+
if (mode == RefreshMode::kInvalidated) response_ = std::move(error);
105+
state_ = RefreshState::kPending;
106+
}
107+
auto response = refresh_fn_().get();
108+
bool done = false;
109+
{
110+
std::unique_lock<std::mutex> lock_2(mu_);
111+
response_ = std::move(response);
112+
if (response_.ok()) {
113+
state_ = RefreshState::kDone;
114+
done = true;
115+
// If we have to refresh an invalidated query plan, cancel any existing
116+
// timer before starting a new one.
117+
refresh_timer_.cancel();
118+
ScheduleRefresh(lock_2);
119+
} else {
120+
// If there are no waiting threads that could call the refresh_fn, then
121+
// we need to accept that the refresh is in a failed state and wait for
122+
// some new event that would start this refresh process anew.
123+
//
124+
// If there are waiting threads, then we want to try again to get a
125+
// refreshed query plan, but we want to avoid a stampede of refresh RPCs
126+
// so we only notify one of the waiting threads.
127+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
128+
assert(waiting_threads_ >= 0);
129+
#endif
130+
if (waiting_threads_ == 0) {
131+
state_ = RefreshState::kDone;
132+
done = true;
133+
}
134+
}
135+
}
136+
if (done) {
137+
cond_.notify_all();
138+
} else {
139+
cond_.notify_one();
39140
}
40-
return response_.prepared_query();
41141
}
42142

43-
StatusOr<google::bigtable::v2::ResultSetMetadata> QueryPlan::metadata() const {
44-
std::lock_guard<std::mutex> lock(mu_);
45-
if (IsExpired()) {
46-
return Status(StatusCode::kUnavailable, "Query plan has expired");
143+
StatusOr<QueryPlan::ResponseData> QueryPlan::response_data() {
144+
std::unique_lock<std::mutex> lock(mu_);
145+
if (IsRefreshing(lock)) {
146+
if (response_.ok()) {
147+
return QueryPlan::ResponseData{response_->prepared_query(),
148+
response_->metadata()};
149+
}
150+
lock.unlock();
151+
RefreshQueryPlan(RefreshMode::kAlreadyRefreshing);
152+
lock.lock();
153+
}
154+
155+
if (state_ == RefreshState::kDone && !response_.ok()) {
156+
return response_.status();
47157
}
48-
return response_.metadata();
158+
159+
return QueryPlan::ResponseData{response_->prepared_query(),
160+
response_->metadata()};
161+
}
162+
163+
StatusOr<std::string> QueryPlan::prepared_query() {
164+
auto data = response_data();
165+
if (!data.ok()) return std::move(data.status());
166+
return std::move(data->prepared_query);
167+
}
168+
169+
StatusOr<google::bigtable::v2::ResultSetMetadata> QueryPlan::metadata() {
170+
auto data = response_data();
171+
if (!data.ok()) return std::move(data.status());
172+
return std::move(data->metadata);
49173
}
50174

51175
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/query_plan.h

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "google/cloud/bigtable/version.h"
2020
#include "google/cloud/completion_queue.h"
21+
#include "google/cloud/internal/clock.h"
2122
#include <google/bigtable/v2/bigtable.pb.h>
2223
#include <string>
2324
#include <utility>
@@ -30,46 +31,86 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3031
class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
3132
public:
3233
// Typically, a lambda capturing the original PrepareQueryRequest and
33-
// DataConnection pointer necessary to call the PrepareQuery RPC.
34-
using RefreshFn = std::function<google::bigtable::v2::PrepareQueryResponse()>;
34+
// DataConnection pointer necessary to call the AsyncPrepareQuery RPC.
35+
using RefreshFn = std::function<
36+
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>()>;
37+
38+
using Clock = ::google::cloud::internal::SystemClock;
3539

3640
// Calls the constructor and then Initialize.
3741
static std::shared_ptr<QueryPlan> Create(
3842
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
39-
RefreshFn fn);
43+
RefreshFn fn, std::shared_ptr<Clock> clock = std::make_shared<Clock>());
44+
45+
// Invalidates the current QueryPlan and triggers a refresh.
46+
void Invalidate(Status status, std::string const& invalid_query_plan_id);
47+
48+
struct ResponseData {
49+
std::string prepared_query;
50+
google::bigtable::v2::ResultSetMetadata metadata;
51+
};
52+
53+
// Accessor for the prepared_query and metadata fields in response_.
54+
// Triggers a refresh if needed.
55+
StatusOr<ResponseData> response_data();
4056

41-
// Accessor for the prepared_query field in response_.
42-
StatusOr<std::string> prepared_query() const;
57+
GOOGLE_CLOUD_CPP_DEPRECATED("Use response_data() instead")
58+
StatusOr<std::string> prepared_query();
4359

44-
// Accessor for the metadata field in response_.
45-
StatusOr<google::bigtable::v2::ResultSetMetadata> metadata() const;
60+
GOOGLE_CLOUD_CPP_DEPRECATED("Use response_data() instead")
61+
StatusOr<google::bigtable::v2::ResultSetMetadata> metadata();
4662

4763
private:
48-
QueryPlan(CompletionQueue cq,
49-
google::bigtable::v2::PrepareQueryResponse response, RefreshFn fn)
64+
QueryPlan(CompletionQueue cq, std::shared_ptr<Clock> clock, RefreshFn fn,
65+
google::bigtable::v2::PrepareQueryResponse response)
5066
: cq_(std::move(cq)),
51-
response_(std::move(response)),
52-
fn_(std::move(fn)) {}
53-
static bool IsExpired();
67+
clock_(std::move(clock)),
68+
refresh_fn_(std::move(fn)),
69+
response_(std::move(response)) {}
70+
71+
bool IsRefreshing(std::unique_lock<std::mutex> const&) const;
5472

5573
// Performs the first call to ScheduleRefresh and any other initialization not
5674
// possible in the constructor.
57-
void Initialize() {}
75+
void Initialize();
5876

5977
// Calls MakeDeadlineTimer on the CompletionQueue with a continuation lambda
6078
// capturing a std::weak_ptr to this that calls RefreshQueryPlan.
61-
void ScheduleRefresh() {}
79+
void ScheduleRefresh(std::unique_lock<std::mutex> const&);
6280

81+
enum class RefreshMode { kExpired, kInvalidated, kAlreadyRefreshing };
6382
// Performs the synchronization around calling RefreshFn and updating
6483
// response_.
65-
void RefreshQueryPlan() {}
84+
// void RefreshQueryPlan();
85+
86+
void RefreshQueryPlan(RefreshMode mode, Status error = {});
87+
88+
void ExpiredRefresh();
89+
90+
// State machine where the only valid transitions are:
91+
// kDone -> kBegin
92+
// kBegin -> kPending
93+
// kPending -> kDone
94+
// When refreshing the same previous query plan.
95+
enum class RefreshState {
96+
kBegin, // waiting for a future thread to refresh response_
97+
kPending, // waiting for an active thread to refresh response_
98+
kDone, // response_ has been refreshed
99+
};
100+
RefreshState state_ = RefreshState::kDone;
66101

67102
CompletionQueue cq_;
103+
std::shared_ptr<Clock> clock_;
104+
RefreshFn refresh_fn_;
68105
future<void> refresh_timer_;
69106
mutable std::mutex mu_;
70107
std::condition_variable cond_;
71-
google::bigtable::v2::PrepareQueryResponse response_; // GUARDED_BY(mu_)
72-
RefreshFn fn_;
108+
// waiting_threads_ is only a snapshot, but it helps us reduce the number of
109+
// RPCs in flight to refresh the same query plan.
110+
int waiting_threads_ = 0;
111+
std::string old_query_plan_id_;
112+
StatusOr<google::bigtable::v2::PrepareQueryResponse>
113+
response_; // GUARDED_BY(mu_)
73114
};
74115

75116
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)