Skip to content

Commit 4401ca7

Browse files
Create separate functionality classes
1 parent dc54fe9 commit 4401ca7

File tree

8 files changed

+577
-4
lines changed

8 files changed

+577
-4
lines changed

VCellMessaging/CMakeLists.txt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@ list(APPEND SOURCES GitDescribe.h)
66
set(HEADER_FILES
77
include/VCELL/SimulationMessaging.h
88
include/VCELL/GitDescribe.h
9+
include/VCELL/MessageEventManager.h
10+
include/VCELL/CurlProxyClasses.h
11+
include/VCELL/JobEventStatus.h
12+
include/VCELL/WorkerEvent.h
913
)
1014

1115
set(SRC_FILES
1216
src/SimulationMessaging.cpp
13-
"${CMAKE_CURRENT_BINARY_DIR}/GitDescribe.cpp")
17+
"${CMAKE_CURRENT_BINARY_DIR}/GitDescribe.cpp"
18+
src/MessageEventManager.cpp
19+
src/CurlProxyClasses.cpp
20+
src/JobEventStatus.cpp)
1421

1522
include_directories(include)
1623

@@ -20,9 +27,11 @@ target_include_directories(vcellmessaging INTERFACE
2027
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
2128
$<INSTALL_INTERFACE:include> # <prefix>/include
2229
)
23-
if (${OPTION_TARGET_MESSAGING})
24-
message(STATUS "CURL_LIBRARIES = '${CURL_LIBRARIES}'")
25-
message(STATUS "CURL_INCLUDE_DIR = '${CURL_INCLUDE_DIR}'")
30+
if (OPTION_TARGET_MESSAGING)
31+
if (OPTION_EXTRA_CONFIG_INFO)
32+
message(STATUS "CURL_LIBRARIES = '${CURL_LIBRARIES}'")
33+
message(STATUS "CURL_INCLUDE_DIR = '${CURL_INCLUDE_DIR}'")
34+
endif ()
2635

2736
target_link_libraries(vcellmessaging ${CURL_LIBRARIES} Threads::Threads)
2837
target_compile_definitions(vcellmessaging PUBLIC USE_MESSAGING=1)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//
2+
// Created by Logan Drescher on 11/25/25.
3+
//
4+
#ifndef VCELL_ODE_NUMERICS_CURLPROXY_H
5+
#define VCELL_ODE_NUMERICS_CURLPROXY_H
6+
#include <string>
7+
#include <curl/curl.h>
8+
9+
#include "VCELL/WorkerEvent.h"
10+
11+
12+
class AbstractCurlProxy {
13+
public:
14+
virtual ~AbstractCurlProxy() =default;
15+
16+
virtual void sendStatus(WorkerEvent* event) = 0;
17+
virtual void keepAlive() = 0;
18+
};
19+
20+
class NullCurlProxy final : public AbstractCurlProxy {
21+
public:
22+
NullCurlProxy() =default;
23+
~NullCurlProxy()override =default;
24+
void sendStatus(WorkerEvent* event) override {}
25+
void keepAlive() override {}
26+
};
27+
28+
#ifdef USE_MESSAGING
29+
class CurlProxy final : public AbstractCurlProxy {
30+
public:
31+
CurlProxy(long simKey, int taskID, int jobIndex, const std::string& vcusername, const std::string& broker, int ttlLow, int ttlHigh);
32+
~CurlProxy() override;
33+
34+
void sendStatus(WorkerEvent* event) override;
35+
void keepAlive() override;
36+
private:
37+
static const char* TIMETOLIVE_PROPERTY;
38+
static const char* DELIVERYMODE_PROPERTY;
39+
static const char* DELIVERYMODE_PERSISTENT_VALUE;
40+
static const char* DELIVERYMODE_NONPERSISTENT_VALUE;
41+
static const char* PRIORITY_PROPERTY;
42+
static const char* PRIORITY_DEFAULT_VALUE;
43+
44+
static const char* MESSAGE_TYPE_PROPERTY;
45+
static const char* MESSAGE_TYPE_WORKEREVENT_VALUE;
46+
47+
static const char* USERNAME_PROPERTY;
48+
static const char* HOSTNAME_PROPERTY;
49+
static const char* SIMKEY_PROPERTY;
50+
static const char* TASKID_PROPERTY;
51+
static const char* JOBINDEX_PROPERTY;
52+
static const char* WORKEREVENT_STATUS;
53+
static const char* WORKEREVENT_PROGRESS;
54+
static const char* WORKEREVENT_TIMEPOINT;
55+
static const char* WORKEREVENT_STATUSMSG;
56+
57+
int TTL_LOW_PRIORITY;
58+
int TTL_HIGH_PRIORITY;
59+
60+
61+
long simKey;
62+
int taskID;
63+
int jobIndex;
64+
std::string hostname;
65+
std::string broker;
66+
std::string vcusername;
67+
time_t lastTimeEventWasSent;
68+
};
69+
70+
71+
72+
// This comment is transplanted from when this logic was a part of Simulation Messaging; originally written by Jim Schaff
73+
// Documentation for the ActiveMQ restful API is missing, must see source code
74+
//
75+
// https://github.com/apache/activemq/blob/master/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
76+
// https://github.com/apache/activemq/blob/master/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
77+
//
78+
// currently, the "web" api seems to use the same credentials as the "web console" ... defaults to admin:admin.
79+
// TODO: pass in credentials, and protect them better (consider HTTPS).
80+
//
81+
/*
82+
PROPERTIES="JMSDeliveryMode=persistent&JMSTimeToLive=3000"
83+
PROPERTIES="${PROPERTIES}&SimKey=12446271133&JobIndex=0&TaskID=0&UserName=schaff"
84+
PROPERTIES="${PROPERTIES}&MessageType=WorkerEvent&WorkerEvent_Status=1001&WorkerEvent_StatusMsg=Running"
85+
PROPERTIES="${PROPERTIES}&WorkerEvent_TimePoint=2.0&WorkerEvent_Progress=0.4&HostName=localhost"
86+
curl -XPOST "http://admin:admin@`hostname`:8165/api/message/workerEvent?type=queue&${PROPERTIES}"
87+
*/
88+
#endif //USE_MESSAGING
89+
90+
#endif //VCELL_ODE_NUMERICS_CURLPROXY_H
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//
2+
// Created by Logan Drescher on 11/25/25.
3+
//
4+
5+
#ifndef VCELL_ODE_NUMERICS_JOBEVENTSTATUS_H
6+
#define VCELL_ODE_NUMERICS_JOBEVENTSTATUS_H
7+
#include <string>
8+
// enum JobEventStatus {
9+
// JOB_STARTING = 999,
10+
// JOB_DATA = 1000,
11+
// JOB_PROGRESS = 1001,
12+
// JOB_COMPLETED = 1003,
13+
// JOB_FAILURE = 1002,
14+
// JOB_ALIVE = 1004,
15+
// };
16+
17+
namespace JobEvent {
18+
enum Status {
19+
JOB_STARTING = 999,
20+
JOB_DATA = 1000,
21+
JOB_PROGRESS = 1001,
22+
JOB_COMPLETED = 1003,
23+
JOB_FAILURE = 1002,
24+
JOB_ALIVE = 1004,
25+
};
26+
std::string toString(Status status);
27+
}
28+
29+
#endif //VCELL_ODE_NUMERICS_JOBEVENTSTATUS_H
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// Created by Logan Drescher on 11/24/25.
3+
//
4+
#ifndef VCELL_ODE_NUMERICS_MESSAGEEVENTQUEUE_H
5+
#define VCELL_ODE_NUMERICS_MESSAGEEVENTQUEUE_H
6+
7+
#include <queue>
8+
#include <thread>
9+
#include <mutex>
10+
#include <functional>
11+
#include <condition_variable>
12+
#include "WorkerEvent.h"
13+
/*
14+
* We want to avoid threads being idle, while processing updates
15+
* Components:
16+
* 1) An "active" boolean (locked by a mutex) that indicates whether the queue is being processed.
17+
* 1a) We must ensure that all relevant information a thread would use to ensure it's done, is always locked by the mutex.
18+
* 2) A jthread dedicated to processing the queue when it has items, and sleeping when it does not
19+
* 3) A condition variable used to ensure the jthread only runs when it needs to.
20+
*
21+
* LOCK ORDERING
22+
* We are using two mutexes that could conflict with each other
23+
* 1) Mutex for whether the worker is active
24+
* 2) Mutex for access to the event queue
25+
*
26+
* There is a potential deadlock if the worker gets ownership of his isActive mutex,
27+
* while another thread owns the event queue mutex. This is because before the worker decides to "clock out",
28+
* they would check if they have work in the queue. Since the queue is owned, deadlock.
29+
* *****ALWAYS LOCK THE IS_WORKER_ACTIVE MUTEX BEFORE THE QUEUE MUTEX*******
30+
*
31+
* The inverse is possible if the code is changed so that: the worker owns the queue the entire time its doing work,
32+
* until it's empty. At the time of this warning, this is not the case.
33+
*/
34+
class MessageEventManager {
35+
public:
36+
explicit MessageEventManager(std::function<void(WorkerEvent*)> sendUpdateFunction);
37+
virtual ~MessageEventManager();
38+
void enqueue(JobEvent::Status status, double progress, double timepoint, const char *eventMessage);
39+
void enqueue(JobEvent::Status status, double progress, double timepoint);
40+
void enqueue(JobEvent::Status status, const char *eventMessage);
41+
void requestStopAndWaitForIt();
42+
bool stopWasCalled();
43+
44+
private:
45+
void processQueue();
46+
void processEvent(WorkerEvent* event);
47+
void enqueue(WorkerEvent*);
48+
49+
std::mutex timeClockMutex;
50+
51+
bool stopRequested;
52+
std::condition_variable requestedStopForeman;
53+
std::mutex stopRequestedMutex;
54+
std::queue<WorkerEvent*> eventQueue;
55+
std::mutex queuetex;
56+
57+
std::jthread eventQueueProcessingWorkerThread;
58+
std::condition_variable needMessageProcessingForeman;
59+
std::function<void(WorkerEvent*)> sendUpdateFunction;
60+
};
61+
62+
#endif //VCELL_ODE_NUMERICS_MESSAGEEVENTQUEUE_H
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//
2+
// Created by Logan Drescher on 11/25/25.
3+
//
4+
5+
#ifndef VCELL_ODE_NUMERICS_WORKEREVENT_H
6+
#define VCELL_ODE_NUMERICS_WORKEREVENT_H
7+
#include <string>
8+
9+
#include "JobEventStatus.h"
10+
11+
struct WorkerEvent {
12+
JobEvent::Status status;
13+
double progress;
14+
double timepoint;
15+
std::string eventMessage;
16+
17+
WorkerEvent(const WorkerEvent* aWorkerEvent) {
18+
status = aWorkerEvent->status;
19+
progress = aWorkerEvent->progress;
20+
timepoint = aWorkerEvent->timepoint;
21+
eventMessage = aWorkerEvent->eventMessage;
22+
}
23+
24+
WorkerEvent(JobEvent::Status status, double progress, double timepoint, const char *eventMessage)
25+
:status(status),
26+
progress(progress),
27+
timepoint(timepoint),
28+
eventMessage(eventMessage) {}
29+
30+
WorkerEvent(JobEvent::Status status, double progress, double timepoint)
31+
:status(status),
32+
progress(progress),
33+
timepoint(timepoint){}
34+
35+
36+
WorkerEvent(JobEvent::Status arg_status, const char* eventMessage)
37+
:status(arg_status),
38+
progress(0),
39+
timepoint(0),
40+
eventMessage(eventMessage) {}
41+
42+
bool equals(const WorkerEvent* aWorkerEvent) const {
43+
return nullptr != aWorkerEvent
44+
&& this->status == aWorkerEvent->status
45+
&& this->progress == aWorkerEvent->progress
46+
&& this->timepoint == aWorkerEvent->timepoint
47+
&& this->eventMessage == aWorkerEvent->eventMessage;
48+
}
49+
};
50+
51+
#endif //VCELL_ODE_NUMERICS_WORKEREVENT_H

0 commit comments

Comments
 (0)