Skip to content

Commit 361f9b8

Browse files
authored
impl(bigtable): use promises to enforce happens before in query plan tests (#15741)
1 parent 465b16b commit 361f9b8

File tree

1 file changed

+54
-34
lines changed

1 file changed

+54
-34
lines changed

google/cloud/bigtable/internal/query_plan_test.cc

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -267,34 +267,40 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlan) {
267267

268268
constexpr int kNumThreads = LimitNumThreadsOn32Bit(1000);
269269
std::vector<std::thread> threads(kNumThreads);
270-
std::array<StatusOr<PrepareQueryResponse>, kNumThreads> data_responses;
270+
std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads>
271+
data_responses;
272+
std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads>
273+
data_response_futures;
274+
for (auto i = 0; i < kNumThreads; ++i) {
275+
data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
276+
data_response_futures[i] = data_responses[i].get_future();
277+
}
271278

272279
auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1);
273-
auto thread_fn = [barrier,
274-
query_plan](StatusOr<PrepareQueryResponse>* thread_data) {
280+
auto thread_fn = [barrier, query_plan](
281+
promise<StatusOr<PrepareQueryResponse>>* thread_data) {
275282
barrier->Block();
276-
*thread_data = query_plan->response();
283+
thread_data->set_value(query_plan->response());
277284
};
278285

279286
for (int i = 0; i < kNumThreads; ++i) {
280-
data_responses[i] = StatusOr<PrepareQueryResponse>(
281-
Status(StatusCode::kNotFound, "not found"));
282287
threads.emplace_back(thread_fn, &(data_responses[i]));
283288
}
284289

285290
auto invalid_status = internal::InternalError("oops!");
286291
query_plan->Invalidate(invalid_status, data->prepared_query());
287292
barrier->Block();
288293

294+
for (auto& f : data_response_futures) {
295+
auto r = f.get();
296+
ASSERT_STATUS_OK(r);
297+
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
298+
}
299+
289300
for (auto& t : threads) {
290301
if (t.joinable()) t.join();
291302
}
292-
293303
EXPECT_EQ(calls_to_refresh_fn, 1);
294-
for (auto const& r : data_responses) {
295-
ASSERT_STATUS_OK(r);
296-
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
297-
}
298304

299305
// Cancel all pending operations, satisfying any remaining futures.
300306
fake_cq_impl->SimulateCompletion(false);
@@ -333,38 +339,46 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlanTransientFailures) {
333339

334340
constexpr int kNumThreads = LimitNumThreadsOn32Bit(1000);
335341
std::vector<std::thread> threads(kNumThreads);
336-
std::array<StatusOr<PrepareQueryResponse>, kNumThreads> data_responses;
342+
std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads>
343+
data_responses;
344+
std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads>
345+
data_response_futures;
346+
for (auto i = 0; i < kNumThreads; ++i) {
347+
data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
348+
data_response_futures[i] = data_responses[i].get_future();
349+
}
337350

338351
auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1);
339-
auto thread_fn = [barrier,
340-
query_plan](StatusOr<PrepareQueryResponse>* thread_data) {
352+
auto thread_fn = [barrier, query_plan](
353+
promise<StatusOr<PrepareQueryResponse>>* thread_data) {
341354
barrier->Block();
342-
*thread_data = query_plan->response();
343-
while (!thread_data->ok()) {
355+
auto response = query_plan->response();
356+
while (!response.ok()) {
344357
std::this_thread::yield();
345-
*thread_data = query_plan->response();
358+
response = query_plan->response();
346359
}
360+
thread_data->set_value(response);
347361
};
348362

349363
for (int i = 0; i < kNumThreads; ++i) {
350-
data_responses[i] = StatusOr<PrepareQueryResponse>(
351-
Status(StatusCode::kNotFound, "not found"));
352364
threads.emplace_back(thread_fn, &(data_responses[i]));
353365
}
354366

355367
auto invalid_status = internal::InternalError("oops!");
356368
query_plan->Invalidate(invalid_status, data->prepared_query());
357369
barrier->Block();
358370

371+
for (auto& f : data_response_futures) {
372+
auto r = f.get();
373+
ASSERT_STATUS_OK(r);
374+
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
375+
}
376+
359377
for (auto& t : threads) {
360378
if (t.joinable()) t.join();
361379
}
362380

363381
EXPECT_EQ(calls_to_refresh_fn, 4);
364-
for (auto const& r : data_responses) {
365-
ASSERT_STATUS_OK(r);
366-
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
367-
}
368382

369383
// Cancel all pending operations, satisfying any remaining futures.
370384
fake_cq_impl->SimulateCompletion(false);
@@ -439,32 +453,38 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlanAfterFailedRefresh) {
439453
// it to complete.
440454
constexpr int kNumThreads = LimitNumThreadsOn32Bit(1000);
441455
std::vector<std::thread> threads(kNumThreads);
442-
std::array<StatusOr<PrepareQueryResponse>, kNumThreads> data_responses;
456+
std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads>
457+
data_responses;
458+
std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads>
459+
data_response_futures;
460+
for (auto i = 0; i < kNumThreads; ++i) {
461+
data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
462+
data_response_futures[i] = data_responses[i].get_future();
463+
}
443464

444465
auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1);
445-
auto thread_fn = [barrier,
446-
query_plan](StatusOr<PrepareQueryResponse>* thread_data) {
466+
auto thread_fn = [barrier, query_plan](
467+
promise<StatusOr<PrepareQueryResponse>>* thread_data) {
447468
barrier->Block();
448-
*thread_data = query_plan->response();
469+
thread_data->set_value(query_plan->response());
449470
};
450471

451472
for (int i = 0; i < kNumThreads; ++i) {
452-
data_responses[i] = StatusOr<PrepareQueryResponse>(
453-
Status(StatusCode::kNotFound, "not found"));
454473
threads.emplace_back(thread_fn, &(data_responses[i]));
455474
}
456475

457476
barrier->Block();
458477

478+
for (auto& f : data_response_futures) {
479+
auto r = f.get();
480+
ASSERT_STATUS_OK(r);
481+
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
482+
}
483+
459484
for (auto& t : threads) {
460485
if (t.joinable()) t.join();
461486
}
462-
463487
EXPECT_EQ(calls_to_refresh_fn, kNumFailingThreads + 1);
464-
for (auto const& r : data_responses) {
465-
ASSERT_STATUS_OK(r);
466-
EXPECT_EQ(r->prepared_query(), "refreshed-query-plan");
467-
}
468488

469489
// Cancel all pending operations, satisfying any remaining futures.
470490
fake_cq_impl->SimulateCompletion(false);

0 commit comments

Comments
 (0)