@@ -1433,6 +1433,10 @@ struct server_queue {
14331433 } else {
14341434 queue_tasks.push_back (std::move (task));
14351435 }
1436+ // if this is cancel task make sure to clean up pending tasks
1437+ if (task.type == SERVER_TASK_TYPE_CANCEL) {
1438+ cleanup_pending_task (task.id_target );
1439+ }
14361440 condition_tasks.notify_one ();
14371441 return task.id ;
14381442 }
@@ -1450,6 +1454,10 @@ struct server_queue {
14501454 } else {
14511455 queue_tasks.push_back (std::move (task));
14521456 }
1457+ // if this is cancel task make sure to clean up pending tasks
1458+ if (task.type == SERVER_TASK_TYPE_CANCEL) {
1459+ cleanup_pending_task (task.id_target );
1460+ }
14531461 }
14541462 condition_tasks.notify_one ();
14551463 return 0 ;
@@ -1544,6 +1552,20 @@ struct server_queue {
15441552 }
15451553 }
15461554 }
1555+
1556+ private:
1557+ void cleanup_pending_task (int id_task) {
1558+ // no need lock because this is called exclusively by post()
1559+ auto rm_func = [id_task](const server_task & task) {
1560+ return task.id_target == id_task;
1561+ };
1562+ queue_tasks.erase (
1563+ std::remove_if (queue_tasks.begin (), queue_tasks.end (), rm_func),
1564+ queue_tasks.end ());
1565+ queue_tasks_deferred.erase (
1566+ std::remove_if (queue_tasks_deferred.begin (), queue_tasks_deferred.end (), rm_func),
1567+ queue_tasks_deferred.end ());
1568+ }
15471569};
15481570
15491571struct server_response {
@@ -1579,6 +1601,12 @@ struct server_response {
15791601
15801602 std::unique_lock<std::mutex> lock (mutex_results);
15811603 waiting_task_ids.erase (id_task);
1604+ // make sure to clean up all pending results
1605+ queue_results.erase (
1606+ std::remove_if (queue_results.begin (), queue_results.end (), [id_task](const server_task_result_ptr & res) {
1607+ return res->id == id_task;
1608+ }),
1609+ queue_results.end ());
15821610 }
15831611
15841612 void remove_waiting_task_ids (const std::unordered_set<int > & id_tasks) {
@@ -1598,7 +1626,7 @@ struct server_response {
15981626 return !queue_results.empty ();
15991627 });
16001628
1601- for (int i = 0 ; i < ( int ) queue_results.size (); i++) {
1629+ for (size_t i = 0 ; i < queue_results.size (); i++) {
16021630 if (id_tasks.find (queue_results[i]->id ) != id_tasks.end ()) {
16031631 server_task_result_ptr res = std::move (queue_results[i]);
16041632 queue_results.erase (queue_results.begin () + i);
@@ -1615,12 +1643,6 @@ struct server_response {
16151643 server_task_result_ptr recv_with_timeout (const std::unordered_set<int > & id_tasks, int timeout) {
16161644 while (true ) {
16171645 std::unique_lock<std::mutex> lock (mutex_results);
1618- bool cr_res = condition_results.wait_for (lock, std::chrono::seconds (timeout), [&]{
1619- return !queue_results.empty ();
1620- });
1621- if (!cr_res) {
1622- return nullptr ;
1623- }
16241646
16251647 for (int i = 0 ; i < (int ) queue_results.size (); i++) {
16261648 if (id_tasks.find (queue_results[i]->id ) != id_tasks.end ()) {
@@ -1629,6 +1651,11 @@ struct server_response {
16291651 return res;
16301652 }
16311653 }
1654+
1655+ std::cv_status cr_res = condition_results.wait_for (lock, std::chrono::seconds (timeout));
1656+ if (cr_res == std::cv_status::timeout) {
1657+ return nullptr ;
1658+ }
16321659 }
16331660
16341661 // should never reach here
@@ -2376,8 +2403,8 @@ struct server_context {
23762403
23772404 server_task task (SERVER_TASK_TYPE_CANCEL);
23782405 task.id_target = id_task;
2379- cancel_tasks.push_back (task);
23802406 queue_results.remove_waiting_task_id (id_task);
2407+ cancel_tasks.push_back (task);
23812408 }
23822409 // push to beginning of the queue, so it has highest priority
23832410 queue_tasks.post (cancel_tasks, true );
0 commit comments