Skip to content

Commit c2a1609

Browse files
authored
Merge pull request #183 from c-jimenez/fix/multithreading_issues
[tools] Fix some multi threading issues
2 parents 2124bf8 + 2a66753 commit c2a1609

File tree

4 files changed

+19
-7
lines changed

4 files changed

+19
-7
lines changed

src/rpc/RpcBase.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,15 @@ void RpcBase::processDisconnected()
278278
// Disable queues
279279
m_requests_queue.setEnable(false);
280280
m_results_queue.setEnable(false);
281+
282+
// Check if a pool has been configured
283+
if (m_pool)
284+
{
285+
// Disable owner
286+
m_rpc_owner->lock.lock();
287+
m_rpc_owner->is_operational = false;
288+
m_rpc_owner->lock.unlock();
289+
}
281290
}
282291

283292
/** @brief Process received data */

src/tools/helpers/Queue.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.
1919
#ifndef OPENOCPP_QUEUE_H
2020
#define OPENOCPP_QUEUE_H
2121

22+
#include <atomic>
2223
#include <chrono>
2324
#include <condition_variable>
2425
#include <cstddef>
@@ -208,7 +209,7 @@ class Queue
208209
/** @brief Queue to store data */
209210
std::queue<ItemType> m_queue;
210211
/** @brief Indicate that the queue is enabled */
211-
bool m_enabled;
212+
std::atomic<bool> m_enabled;
212213
};
213214

214215
} // namespace helpers

src/tools/helpers/TimerPool.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.
2121

2222
#include "ITimerPool.h"
2323

24+
#include <atomic>
2425
#include <chrono>
2526
#include <condition_variable>
2627
#include <functional>
@@ -54,9 +55,9 @@ class TimerPool : public ITimerPool
5455

5556
private:
5657
/** @brief Indicate that the timers must stop */
57-
bool m_stop;
58+
std::atomic<bool> m_stop;
5859
/** @brief Indicate that the next wakeup time has changed */
59-
bool m_update_wakeup_time;
60+
std::atomic<bool> m_update_wakeup_time;
6061
/** @brief Mutex for wakeup condition */
6162
std::mutex m_wakeup_mutex;
6263
/** @brief Wakeup condition */

src/tools/helpers/WorkerThreadPool.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ along with OpenOCPP. If not, see <http://www.gnu.org/licenses/>.
2121

2222
#include "Queue.h"
2323

24+
#include <atomic>
2425
#include <chrono>
2526
#include <condition_variable>
2627
#include <functional>
@@ -64,7 +65,7 @@ class JobBase : public IJob
6465
/** @brief Condition variable for end of job synchronization */
6566
std::condition_variable end_of_job_var;
6667
/** @brief Indicate end of job */
67-
bool end;
68+
std::atomic<bool> end;
6869
/** @brief Function to execute */
6970
std::function<ReturnType()> function;
7071
};
@@ -159,7 +160,7 @@ class Waiter
159160
{
160161
Job<ReturnType>* job = dynamic_cast<Job<ReturnType>*>(m_job.get());
161162
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
162-
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
163+
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
163164
}
164165

165166
private:
@@ -186,7 +187,7 @@ class Waiter<void>
186187
{
187188
Job<void>* job = dynamic_cast<Job<void>*>(m_job.get());
188189
std::unique_lock<std::mutex> lock(job->end_of_job_mutex);
189-
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end; });
190+
return job->end_of_job_var.wait_for(lock, timeout, [job] { return job->end.operator bool(); });
190191
}
191192

192193
private:
@@ -229,7 +230,7 @@ class WorkerThreadPool
229230

230231
private:
231232
/** @brief Indicate that the threads must stop */
232-
bool m_stop;
233+
std::atomic<bool> m_stop;
233234
/** @brief Worker threads */
234235
std::vector<std::thread*> m_threads;
235236
/** @brief Job queue */

0 commit comments

Comments
 (0)