@@ -267,34 +267,40 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlan) {
267267
268268 constexpr int kNumThreads = LimitNumThreadsOn32Bit (1000 );
269269 std::vector<std::thread> threads (kNumThreads );
270- std::array<StatusOr<PrepareQueryResponse>, kNumThreads > data_responses;
270+ std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads >
271+ data_responses;
272+ std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads >
273+ data_response_futures;
274+ for (auto i = 0 ; i < kNumThreads ; ++i) {
275+ data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
276+ data_response_futures[i] = data_responses[i].get_future ();
277+ }
271278
272279 auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1 );
273- auto thread_fn = [barrier,
274- query_plan]( StatusOr<PrepareQueryResponse>* thread_data) {
280+ auto thread_fn = [barrier, query_plan](
281+ promise< StatusOr<PrepareQueryResponse> >* thread_data) {
275282 barrier->Block ();
276- * thread_data = query_plan->response ();
283+ thread_data-> set_value ( query_plan->response () );
277284 };
278285
279286 for (int i = 0 ; i < kNumThreads ; ++i) {
280- data_responses[i] = StatusOr<PrepareQueryResponse>(
281- Status (StatusCode::kNotFound , " not found" ));
282287 threads.emplace_back (thread_fn, &(data_responses[i]));
283288 }
284289
285290 auto invalid_status = internal::InternalError (" oops!" );
286291 query_plan->Invalidate (invalid_status, data->prepared_query ());
287292 barrier->Block ();
288293
294+ for (auto & f : data_response_futures) {
295+ auto r = f.get ();
296+ ASSERT_STATUS_OK (r);
297+ EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
298+ }
299+
289300 for (auto & t : threads) {
290301 if (t.joinable ()) t.join ();
291302 }
292-
293303 EXPECT_EQ (calls_to_refresh_fn, 1 );
294- for (auto const & r : data_responses) {
295- ASSERT_STATUS_OK (r);
296- EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
297- }
298304
299305 // Cancel all pending operations, satisfying any remaining futures.
300306 fake_cq_impl->SimulateCompletion (false );
@@ -333,38 +339,46 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlanTransientFailures) {
333339
334340 constexpr int kNumThreads = LimitNumThreadsOn32Bit (1000 );
335341 std::vector<std::thread> threads (kNumThreads );
336- std::array<StatusOr<PrepareQueryResponse>, kNumThreads > data_responses;
342+ std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads >
343+ data_responses;
344+ std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads >
345+ data_response_futures;
346+ for (auto i = 0 ; i < kNumThreads ; ++i) {
347+ data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
348+ data_response_futures[i] = data_responses[i].get_future ();
349+ }
337350
338351 auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1 );
339- auto thread_fn = [barrier,
340- query_plan]( StatusOr<PrepareQueryResponse>* thread_data) {
352+ auto thread_fn = [barrier, query_plan](
353+ promise< StatusOr<PrepareQueryResponse> >* thread_data) {
341354 barrier->Block ();
342- *thread_data = query_plan->response ();
343- while (!thread_data-> ok ()) {
355+ auto response = query_plan->response ();
356+ while (!response. ok ()) {
344357 std::this_thread::yield ();
345- *thread_data = query_plan->response ();
358+ response = query_plan->response ();
346359 }
360+ thread_data->set_value (response);
347361 };
348362
349363 for (int i = 0 ; i < kNumThreads ; ++i) {
350- data_responses[i] = StatusOr<PrepareQueryResponse>(
351- Status (StatusCode::kNotFound , " not found" ));
352364 threads.emplace_back (thread_fn, &(data_responses[i]));
353365 }
354366
355367 auto invalid_status = internal::InternalError (" oops!" );
356368 query_plan->Invalidate (invalid_status, data->prepared_query ());
357369 barrier->Block ();
358370
371+ for (auto & f : data_response_futures) {
372+ auto r = f.get ();
373+ ASSERT_STATUS_OK (r);
374+ EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
375+ }
376+
359377 for (auto & t : threads) {
360378 if (t.joinable ()) t.join ();
361379 }
362380
363381 EXPECT_EQ (calls_to_refresh_fn, 4 );
364- for (auto const & r : data_responses) {
365- ASSERT_STATUS_OK (r);
366- EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
367- }
368382
369383 // Cancel all pending operations, satisfying any remaining futures.
370384 fake_cq_impl->SimulateCompletion (false );
@@ -439,32 +453,38 @@ TEST(QueryPlanMultithreadedTest, RefreshInvalidatedPlanAfterFailedRefresh) {
439453 // it to complete.
440454 constexpr int kNumThreads = LimitNumThreadsOn32Bit (1000 );
441455 std::vector<std::thread> threads (kNumThreads );
442- std::array<StatusOr<PrepareQueryResponse>, kNumThreads > data_responses;
456+ std::array<promise<StatusOr<PrepareQueryResponse>>, kNumThreads >
457+ data_responses;
458+ std::array<future<StatusOr<PrepareQueryResponse>>, kNumThreads >
459+ data_response_futures;
460+ for (auto i = 0 ; i < kNumThreads ; ++i) {
461+ data_responses[i] = promise<StatusOr<PrepareQueryResponse>>();
462+ data_response_futures[i] = data_responses[i].get_future ();
463+ }
443464
444465 auto barrier = std::make_shared<absl::Barrier>(kNumThreads + 1 );
445- auto thread_fn = [barrier,
446- query_plan]( StatusOr<PrepareQueryResponse>* thread_data) {
466+ auto thread_fn = [barrier, query_plan](
467+ promise< StatusOr<PrepareQueryResponse> >* thread_data) {
447468 barrier->Block ();
448- * thread_data = query_plan->response ();
469+ thread_data-> set_value ( query_plan->response () );
449470 };
450471
451472 for (int i = 0 ; i < kNumThreads ; ++i) {
452- data_responses[i] = StatusOr<PrepareQueryResponse>(
453- Status (StatusCode::kNotFound , " not found" ));
454473 threads.emplace_back (thread_fn, &(data_responses[i]));
455474 }
456475
457476 barrier->Block ();
458477
478+ for (auto & f : data_response_futures) {
479+ auto r = f.get ();
480+ ASSERT_STATUS_OK (r);
481+ EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
482+ }
483+
459484 for (auto & t : threads) {
460485 if (t.joinable ()) t.join ();
461486 }
462-
463487 EXPECT_EQ (calls_to_refresh_fn, kNumFailingThreads + 1 );
464- for (auto const & r : data_responses) {
465- ASSERT_STATUS_OK (r);
466- EXPECT_EQ (r->prepared_query (), " refreshed-query-plan" );
467- }
468488
469489 // Cancel all pending operations, satisfying any remaining futures.
470490 fake_cq_impl->SimulateCompletion (false );
0 commit comments