Skip to content

Commit 8cc4ad5

Browse files
authored
impl(bigtable): improve QueryPlan and add more tests (#15688)
1 parent 281be01 commit 8cc4ad5

File tree

3 files changed

+290
-35
lines changed

3 files changed

+290
-35
lines changed

google/cloud/bigtable/internal/query_plan.cc

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ auto constexpr kRefreshDeadlineOffsetMs = 1000;
2626
} // namespace
2727

2828
std::shared_ptr<QueryPlan> QueryPlan::Create(
29-
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
30-
RefreshFn fn, std::shared_ptr<Clock> clock) {
29+
CompletionQueue cq,
30+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response, RefreshFn fn,
31+
std::shared_ptr<Clock> clock) {
3132
auto plan = std::shared_ptr<QueryPlan>(new QueryPlan(
3233
std::move(cq), std::move(clock), std::move(fn), std::move(response)));
3334
plan->Initialize();
@@ -36,7 +37,7 @@ std::shared_ptr<QueryPlan> QueryPlan::Create(
3637

3738
void QueryPlan::Initialize() {
3839
std::unique_lock<std::mutex> lock(mu_);
39-
ScheduleRefresh(lock);
40+
if (state_ == RefreshState::kDone) ScheduleRefresh(lock);
4041
}
4142

4243
// ScheduleRefresh should only be called after updating response_.
@@ -71,7 +72,7 @@ void QueryPlan::ExpiredRefresh() {
7172
state_ = RefreshState::kBegin;
7273
}
7374
}
74-
RefreshQueryPlan(RefreshMode::kExpired);
75+
RefreshQueryPlan();
7576
}
7677

7778
void QueryPlan::Invalidate(Status status,
@@ -82,26 +83,17 @@ void QueryPlan::Invalidate(Status status,
8283
// query plan, so we track what the previous plan id was.
8384
if (!IsRefreshing(lock) && old_query_plan_id_ != invalid_query_plan_id) {
8485
old_query_plan_id_ = invalid_query_plan_id;
86+
response_ = std::move(status);
8587
state_ = RefreshState::kBegin;
8688
}
8789
}
88-
RefreshQueryPlan(RefreshMode::kInvalidated, std::move(status));
8990
}
9091

91-
void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
92+
void QueryPlan::RefreshQueryPlan() {
9293
{
9394
std::unique_lock<std::mutex> lock_1(mu_);
94-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
95-
assert(waiting_threads_ >= 0);
96-
#endif
97-
++waiting_threads_;
9895
cond_.wait(lock_1, [this] { return state_ != RefreshState::kPending; });
99-
--waiting_threads_;
100-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
101-
assert(waiting_threads_ >= 0);
102-
#endif
10396
if (state_ == RefreshState::kDone) return;
104-
if (mode == RefreshMode::kInvalidated) response_ = std::move(error);
10597
state_ = RefreshState::kPending;
10698
}
10799
auto response = refresh_fn_().get();
@@ -114,7 +106,7 @@ void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
114106
done = true;
115107
// If we have to refresh an invalidated query plan, cancel any existing
116108
// timer before starting a new one.
117-
refresh_timer_.cancel();
109+
if (refresh_timer_.valid()) refresh_timer_.cancel();
118110
ScheduleRefresh(lock_2);
119111
} else {
120112
// If there are no waiting threads that could call the refresh_fn, then
@@ -124,13 +116,7 @@ void QueryPlan::RefreshQueryPlan(RefreshMode mode, Status error) {
124116
// If there are waiting threads, then we want to try again to get a
125117
// refreshed query plan, but we want to avoid a stampede of refresh RPCs
126118
// so we only notify one of the waiting threads.
127-
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_QUERY_PLAN_REFRESH_ASSERT
128-
assert(waiting_threads_ >= 0);
129-
#endif
130-
if (waiting_threads_ == 0) {
131-
state_ = RefreshState::kDone;
132-
done = true;
133-
}
119+
state_ = RefreshState::kBegin;
134120
}
135121
}
136122
if (done) {
@@ -147,7 +133,7 @@ StatusOr<google::bigtable::v2::PrepareQueryResponse> QueryPlan::response() {
147133
return response_;
148134
}
149135
lock.unlock();
150-
RefreshQueryPlan(RefreshMode::kAlreadyRefreshing);
136+
RefreshQueryPlan();
151137
lock.lock();
152138
}
153139

google/cloud/bigtable/internal/query_plan.h

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
3939

4040
// Calls the constructor and then Initialize.
4141
static std::shared_ptr<QueryPlan> Create(
42-
CompletionQueue cq, google::bigtable::v2::PrepareQueryResponse response,
42+
CompletionQueue cq,
43+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response,
4344
RefreshFn fn, std::shared_ptr<Clock> clock = std::make_shared<Clock>());
4445

4546
// Invalidates the current QueryPlan and triggers a refresh.
@@ -57,8 +58,9 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
5758

5859
private:
5960
QueryPlan(CompletionQueue cq, std::shared_ptr<Clock> clock, RefreshFn fn,
60-
google::bigtable::v2::PrepareQueryResponse response)
61-
: cq_(std::move(cq)),
61+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response)
62+
: state_(response.ok() ? RefreshState::kDone : RefreshState::kBegin),
63+
cq_(std::move(cq)),
6264
clock_(std::move(clock)),
6365
refresh_fn_(std::move(fn)),
6466
response_(std::move(response)) {}
@@ -73,12 +75,9 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
7375
// capturing a std::weak_ptr to this that calls RefreshQueryPlan.
7476
void ScheduleRefresh(std::unique_lock<std::mutex> const&);
7577

76-
enum class RefreshMode { kExpired, kInvalidated, kAlreadyRefreshing };
7778
// Performs the synchronization around calling RefreshFn and updating
7879
// response_.
79-
// void RefreshQueryPlan();
80-
81-
void RefreshQueryPlan(RefreshMode mode, Status error = {});
80+
void RefreshQueryPlan();
8281

8382
void ExpiredRefresh();
8483

@@ -93,17 +92,14 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
9392
kPending, // waiting for an active thread to refresh response_
9493
kDone, // response_ has been refreshed
9594
};
96-
RefreshState state_ = RefreshState::kDone;
95+
RefreshState state_;
9796

9897
CompletionQueue cq_;
9998
std::shared_ptr<Clock> clock_;
10099
RefreshFn refresh_fn_;
101100
future<void> refresh_timer_;
102101
mutable std::mutex mu_;
103102
std::condition_variable cond_;
104-
// waiting_threads_ is only a snapshot, but it helps us reduce the number of
105-
// RPCs in flight to refresh the same query plan.
106-
int waiting_threads_ = 0;
107103
std::string old_query_plan_id_;
108104
StatusOr<google::bigtable::v2::PrepareQueryResponse>
109105
response_; // GUARDED_BY(mu_)

0 commit comments

Comments
 (0)