@@ -964,34 +964,54 @@ TEST_F(TestCls2PCQueue, MultiProducerConsumer)
964964 }
965965
966966 const auto max_elements = 128 ;
967- std::vector<std::thread> consumers (max_workers/2 );
968- for (auto & c : consumers ) {
969- c = std::thread ([this , &queue_name, &producer_count] {
967+ std::vector<std::thread> readers (max_workers/2 );
968+ for (auto & c : readers ) {
969+ c = std::thread ([this , &queue_name, &producer_count, &retry_happened ] {
970970 librados::ObjectWriteOperation op;
971971 const std::string marker;
972972 bool truncated = true ;
973973 std::string end_marker;
974974 std::vector<cls_queue_entry> entries;
975975 while (producer_count > 0 || truncated) {
976+ if (!retry_happened) {
977+ // queue was never full, let it fill
978+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
979+ continue ;
980+ }
976981 const auto ret = cls_2pc_queue_list_entries (ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
977982 ASSERT_EQ (0 , ret);
978983 if (entries.empty ()) {
979- // queue is empty, let it fill
980- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
981- } else {
982- cls_2pc_queue_remove_entries (op, end_marker, max_elements);
983- ASSERT_EQ (0 , ioctx.operate (queue_name, &op));
984+ // another consumer has emptied the queue
985+ return ;
984986 }
985987 }
986988 });
987989 }
990+
991+ auto deleter = std::thread ([this , &queue_name, &producer_count, &retry_happened] {
992+ librados::ObjectWriteOperation op;
993+ const std::string marker;
994+ bool truncated = true ;
995+ std::string end_marker;
996+ std::vector<cls_queue_entry> entries;
997+ while (producer_count > 0 || truncated) {
998+ if (!retry_happened) {
999+ // queue was never full, let it fill
1000+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
1001+ continue ;
1002+ }
1003+ const auto ret = cls_2pc_queue_list_entries (ioctx, queue_name, marker, max_elements, entries, &truncated, end_marker);
1004+ ASSERT_EQ (0 , ret);
1005+ ASSERT_FALSE (entries.empty ());
1006+ cls_2pc_queue_remove_entries (op, end_marker, max_elements);
1007+ ASSERT_EQ (0 , ioctx.operate (queue_name, &op));
1008+ }
1009+ });
9881010
9891011 std::for_each (producers.begin (), producers.end (), [](auto & p) { p.join (); });
990- std::for_each (consumers.begin (), consumers.end (), [](auto & c) { c.join (); });
991- if (!retry_happened) {
992- std::cerr << " Queue was never full - all reservations were successful." <<
993- " Please decrease the amount of consumer threads" << std::endl;
994- }
1012+ std::for_each (readers.begin (), readers.end (), [](auto & c) { c.join (); });
1013+ deleter.join ();
1014+ ASSERT_TRUE (retry_happened);
9951015 // make sure that queue is empty and no reservations remain
9961016 cls_2pc_reservations reservations;
9971017 ASSERT_EQ (0 , cls_2pc_queue_list_reservations (ioctx, queue_name, reservations));
0 commit comments