Skip to content

Commit 0fa90b9

Browse files
simplified event-queue, and fixed mutex-access-before-constructor-finished bug
1 parent 1ede88e commit 0fa90b9

File tree

4 files changed

+45
-84
lines changed

4 files changed

+45
-84
lines changed

VCellMessaging/include/VCELL/MessageEventManager.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,18 @@ class MessageEventManager {
4747
void processEvent(WorkerEvent* event);
4848
void enqueue(WorkerEvent*);
4949

50-
std::mutex timeClockMutex;
50+
std::thread eventQueueProcessingWorkerThread; //TODO: make a `std::jthread` once compilers catch up with standard
51+
std::function<void(WorkerEvent*)> sendUpdateFunction;
5152

53+
// Critical Resources / Regions
54+
// -> CR #1 - Whether stop has been requested or not
5255
bool stopRequested;
5356
std::condition_variable requestedStopForeman;
5457
std::mutex stopRequestedMutex;
58+
// -> CR #2 - The Event Queue
5559
std::queue<WorkerEvent*> eventQueue;
60+
std::condition_variable eventQueueForeman;
5661
std::mutex queuetex;
57-
58-
std::thread eventQueueProcessingWorkerThread; //TODO: make a `std::jthread` once compilers catch up with standard
59-
std::condition_variable needMessageProcessingForeman;
60-
std::function<void(WorkerEvent*)> sendUpdateFunction;
6162
};
6263

6364
#endif //VCELL_ODE_NUMERICS_MESSAGEEVENTQUEUE_H

VCellMessaging/include/VCELL/SimulationMessaging.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,8 @@ static constexpr int Message_DEFAULT_PRIORITY = 0;
3333
//TODO: Re-write so that there is a messaging handler abstract class with two children: stdOut and CUrl
3434
class SimulationMessaging {
3535
public:
36-
virtual ~SimulationMessaging() noexcept;
37-
static void initialize();
38-
static SimulationMessaging* create();
3936
static SimulationMessaging* getInstVar();
37+
static void cleanupInstanceVar();
4038
void setWorkerEvent(JobEvent::Status status, const char *eventMessage);
4139
void setWorkerEvent(JobEvent::Status status, double progress, double timepoint);
4240
void setWorkerEvent(JobEvent::Status status, double progress, double timepoint, const char *eventMessage);
@@ -54,11 +52,12 @@ class SimulationMessaging {
5452
}
5553
void waitUntilFinished();
5654
#ifdef USE_MESSAGING
57-
static SimulationMessaging* create(const char* broker, const char* smqusername, const char* passwd, const char* qname, const char* tname,
58-
const char* vcusername, int simKey, int jobIndex, int taskID, int ttl_low=DEFAULT_TTL_LOW, int ttl_high=DEFAULT_TTL_HIGH);
59-
friend void* startMessagingThread(void* param);
55+
void initialize_curl_messaging(bool alsoPrintToStdOut, const char* broker, const char* vcusername, int simKey, int jobIndex, int taskID, int ttl_low=DEFAULT_TTL_LOW, int ttl_high=DEFAULT_TTL_HIGH);
6056
#endif
6157

58+
protected:
59+
virtual ~SimulationMessaging() noexcept;
60+
6261
private:
6362
// Statics
6463
static void sendStdOutStatus(WorkerEvent*);
@@ -78,9 +77,6 @@ class SimulationMessaging {
7877
static bool isInitialized;
7978

8079
SimulationMessaging();
81-
#ifdef USE_MESSAGING
82-
SimulationMessaging(const char* broker, const char* vcusername, int simKey, int jobIndex, int taskID, int ttl_low=DEFAULT_TTL_LOW, int ttl_high=DEFAULT_TTL_HIGH);
83-
#endif
8480

8581
void sendStatus(WorkerEvent*);
8682
void keepAlive();

VCellMessaging/src/MessageEventManager.cpp

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@
1010

1111
MessageEventManager::MessageEventManager(std::function<void(WorkerEvent*)> sendUpdateFunction):
1212
stopRequested{false},
13-
sendUpdateFunction{std::move(sendUpdateFunction)},
14-
eventQueueProcessingWorkerThread([this]() {this->performQueueProcessing();}) // Should auto-start thread
13+
sendUpdateFunction{std::move(sendUpdateFunction)}
1514
{
16-
15+
this->eventQueueProcessingWorkerThread = std::thread([this]() {this->performQueueProcessing();}); // Should auto-start thread
1716
}
1817

1918
MessageEventManager::~MessageEventManager() {
@@ -37,14 +36,16 @@ void MessageEventManager::requestStopAndWaitForIt() {
3736
std::unique_lock stopRequestedLock{this->stopRequestedMutex};
3837
if (this->stopRequested) return;
3938
this->stopRequested = true;
40-
this->needMessageProcessingForeman.notify_all();
41-
this->requestedStopForeman.wait(stopRequestedLock, [this]()->bool{return this->eventQueue.empty();});
39+
this->eventQueueForeman.notify_all(); // We want any sleeping workers to wake up, so they check if a stop was requested.
40+
this->requestedStopForeman.wait(stopRequestedLock, [this]()->bool {
41+
std::unique_lock queueLock{this->queuetex};
42+
return this->eventQueue.empty();
43+
});
4244
}
4345

4446
bool MessageEventManager::stopWasCalled() {
4547
std::unique_lock stopRequestedLock{this->stopRequestedMutex};
4648
return this->stopRequested;
47-
4849
}
4950

5051
///////////////////////////////////////////////////
@@ -66,35 +67,27 @@ void MessageEventManager::performQueueProcessing() {
6667
void MessageEventManager::processQueue() {
6768
while (true) {
6869
WorkerEvent* event;
69-
{// START Clock-out Scope //
70-
// The order of the locks is important! See Header File
71-
std::unique_lock checkIfTheresWorkLock{this->timeClockMutex}; // This is to prevent loop processing while `enqueue` is being called
70+
{// START Queuetex Scope //
7271
std::unique_lock shouldBeActiveLock(this->queuetex);
7372
if (this->eventQueue.empty()) {
7473
{ // START stop-requested Scope
7574
std::unique_lock stopRequestedLock{this->stopRequestedMutex};
7675
if (this->stopRequested) {
77-
this->requestedStopForeman.notify_all();
78-
return; // We're all done; since this should be done on a jthread, this should also trigger a join.
76+
this->requestedStopForeman.notify_all(); // Tell all stop-requesters that we've registered a stop.
77+
return; // We're all done
7978
}
8079
} // END stop-requested Scope
81-
checkIfTheresWorkLock.unlock(); // Need to allow enqueuing, can't do that with this locked, and we can't rescope but unique_lock order matters above!
82-
// The worker can stop working, and "get some sleep"
83-
// Wait for the worker to be "prodded" (via `needMessagingForeman.notify_one()`), AND the event queue is not empty
84-
// Note that this `wait()` call, by design, unlocks the queue mutex, until it is "prodded".
85-
this->needMessageProcessingForeman.wait(shouldBeActiveLock);
86-
std::unique_lock stopRequestedLock{this->stopRequestedMutex};
80+
// If the code gets to here, the worker can stop working, and "get some sleep"
81+
this->eventQueueForeman.wait(shouldBeActiveLock); // Note that this `wait()` call, by design, unlocks the queue mutex, until it is "prodded".
8782
if (this->eventQueue.empty()) continue; // Probably means we need to check if stop was requested again
8883
}
89-
std::unique_lock stopRequestedLock{this->stopRequestedMutex};
9084
event = this->eventQueue.front();
9185
this->eventQueue.pop();
92-
}// END Clock-out Scope //
86+
}// END Queuetex Scope //
9387

9488
// Process Event
9589
this->processEvent(event);
96-
// Remember to delete the event! We need the memory back!
97-
delete event;
90+
delete event; // Remember to delete the event! We need the memory back!
9891
}
9992
}
10093

@@ -103,19 +96,13 @@ void MessageEventManager::processEvent(WorkerEvent* event) {
10396
}
10497

10598
void MessageEventManager::enqueue(WorkerEvent* event) {
106-
// The order of the locks is important! See Header File
107-
std::lock_guard workerIsActiveLock{this->timeClockMutex}; // Need to lock to prevent worker from "clocking out" while we set this up
99+
std::lock_guard workerIsActiveLock{this->queuetex}; // Need to lock to prevent worker from checking if it has more work while we make the request.
108100
std::lock_guard stopRequestedLock{this->stopRequestedMutex}; // We scope this lock to the whole function; "last in the door" policy
109101
if (this->stopRequested) {
110102
std::cerr << "A new event was added to the messaging queue, but this queue has had `stop` requested!" << std::endl;
111103
delete event;
112104
return; // note: on this return function scope ends, and thus so too does out function-scoped locks
113105
}
114-
115-
{// START Emplace Scope //
116-
std::lock_guard queueLock{this->queuetex};
117-
this->eventQueue.emplace(event);
118-
}// END Emplace Scope //
119-
120-
this->needMessageProcessingForeman.notify_one(); // Nudges one sleeping worker awake. If the worker is awake...this does nothing; as it should.
106+
this->eventQueue.emplace(event);
107+
this->eventQueueForeman.notify_one(); // Nudges one sleeping worker awake. If the worker is awake...this does nothing; as it should.
121108
}

VCellMessaging/src/SimulationMessaging.cpp

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,37 @@
44
#include <string>
55
#include <format>
66

7-
8-
9-
static const double WORKEREVENT_MESSAGE_MIN_TIME_SECONDS = 15.0;
7+
static constexpr double WORKEREVENT_MESSAGE_MIN_TIME_SECONDS = 15.0;
108
bool SimulationMessaging::isInitialized = false;
119

1210

13-
SimulationMessaging *SimulationMessaging::m_inst = NULL;
11+
SimulationMessaging *SimulationMessaging::m_inst = nullptr;
1412

1513
SimulationMessaging::SimulationMessaging():
1614
eventHandler(std::bind(&SimulationMessaging::sendStatus, this, std::placeholders::_1)),
15+
workerEventOutputMode{WORKEREVENT_OUTPUT_MODE_STDOUT},
1716
taskID{-1},
18-
m_jobIndex{-1}
19-
{
20-
this->taskID = -1;
21-
this->workerEventOutputMode = WORKEREVENT_OUTPUT_MODE_STDOUT;
22-
this->curlHandler = new NullCurlProxy();
23-
24-
}
25-
26-
#ifdef USE_MESSAGING
27-
SimulationMessaging::SimulationMessaging(const char* broker, const char* vcusername, int simKey, int jobIndex, int taskID, int ttl_low, int ttl_high):
28-
eventHandler(std::bind(&SimulationMessaging::sendStatus, this, std::placeholders::_1)),
29-
workerEventOutputMode{WORKEREVENT_OUTPUT_MODE_MESSAGING},
30-
taskID{taskID},
31-
m_jobIndex{jobIndex},
17+
m_jobIndex{-1},
3218
bNewWorkerEvent{false}
3319
{
34-
this->curlHandler = new CurlProxy(simKey, taskID, jobIndex, vcusername, broker, ttl_low, ttl_high);
20+
this->curlHandler = new NullCurlProxy();
3521
time(&this->lastSentEventTime);
3622
}
37-
#endif
3823

3924
SimulationMessaging::~SimulationMessaging() noexcept{
4025
delete this->curlHandler;
4126
}
4227

4328
SimulationMessaging* SimulationMessaging::getInstVar() {
29+
if (nullptr == SimulationMessaging::m_inst) SimulationMessaging::m_inst = new SimulationMessaging();
4430
return SimulationMessaging::m_inst;
4531
}
4632

47-
SimulationMessaging* SimulationMessaging::create(){
48-
if (SimulationMessaging::m_inst == NULL) SimulationMessaging::m_inst = new SimulationMessaging();
49-
50-
return SimulationMessaging::m_inst;
33+
void SimulationMessaging::cleanupInstanceVar() {
34+
if (nullptr == SimulationMessaging::m_inst) return;
35+
SimulationMessaging::m_inst->waitUntilFinished();
36+
delete SimulationMessaging::m_inst;
37+
SimulationMessaging::m_inst = nullptr;
5138
}
5239

5340
void SimulationMessaging::sendStatus(WorkerEvent* event) {
@@ -132,26 +119,16 @@ void SimulationMessaging::setWorkerEvent(JobEvent::Status status, const double p
132119

133120
void SimulationMessaging::waitUntilFinished() {
134121
this->eventHandler.requestStopAndWaitForIt();
135-
// if (workerEventOutputMode == WORKEREVENT_OUTPUT_MODE_STDOUT) return;
136-
// #ifdef USE_MESSAGING
137-
// std::cout << "!!!waiting for thread to exit" << std::endl;
138-
// pthread_join(newWorkerEventThread, NULL);
139-
// std::cout << "!!Threads joined successfully" << std::endl;
140-
// #endif
141122
}
142123

143124
#ifdef USE_MESSAGING
144125

145-
SimulationMessaging* SimulationMessaging::create(const char* broker, const char* smqusername, const char* passwd, const char* qname, const char* tname, const char* vcusername, int simKey, int jobIndex, int taskID, int ttl_low, int ttl_high){
146-
if (m_inst != NULL && m_inst->workerEventOutputMode == WORKEREVENT_OUTPUT_MODE_STDOUT) {
147-
delete m_inst;
148-
m_inst = NULL;
149-
}
150-
if (m_inst == NULL){
151-
m_inst = new SimulationMessaging(broker, vcusername, simKey, jobIndex, taskID, ttl_low, ttl_high);
152-
}
153-
154-
return(m_inst);
126+
void SimulationMessaging::initialize_curl_messaging(bool alsoPrintToStdOut, const char* broker, const char* vcusername, int simKey, int jobIndex, int givenTaskID, int ttl_low, int ttl_high){
127+
this->workerEventOutputMode = alsoPrintToStdOut ? WORKEREVENT_OUTPUT_MODE_ALL : WORKEREVENT_OUTPUT_MODE_MESSAGING;
128+
this->taskID = givenTaskID;
129+
this->m_jobIndex = jobIndex;
130+
delete this->curlHandler; // get rid of the null one we make by default
131+
this->curlHandler = new CurlProxy(simKey, taskID, jobIndex, vcusername, broker, ttl_low, ttl_high);
155132
}
156133

157134
// void SimulationMessaging::start() {

0 commit comments

Comments
 (0)