Skip to content

Commit 1492ba6

Browse files
committed
added sync server
1 parent f30450b commit 1492ba6

File tree

8 files changed

+264
-0
lines changed

8 files changed

+264
-0
lines changed

CMakeLists.txt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,23 @@ add_executable(
461461
target_include_directories(o2-readout-monitor PRIVATE ${READOUT_INCLUDE_DIRS})
462462
target_link_libraries(o2-readout-monitor PRIVATE AliceO2::InfoLogger AliceO2::Common ${ZMQ_LIBRARIES})
463463

464+
# a process to send synchronization signals for replay mode
465+
add_executable(
466+
o2-readout-sync-server
467+
${SOURCE_DIR}/syncServer.cxx
468+
)
469+
target_include_directories(o2-readout-sync-server PRIVATE ${READOUT_INCLUDE_DIRS})
470+
target_link_libraries(o2-readout-sync-server PRIVATE AliceO2::InfoLogger AliceO2::Common ${ZMQ_LIBRARIES})
471+
472+
add_executable(
473+
o2-readout-test-sync-client
474+
${SOURCE_DIR}/testSyncClient.cxx
475+
)
476+
target_include_directories(o2-readout-test-sync-client PRIVATE ${READOUT_INCLUDE_DIRS})
477+
target_link_libraries(o2-readout-test-sync-client PRIVATE AliceO2::InfoLogger AliceO2::Common ${ZMQ_LIBRARIES})
478+
479+
480+
464481
if(SDL_FOUND AND ZMQ_FOUND)
465482
# a process to monitor memory of remote readout processes
466483
add_executable(

doc/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The following executables, presented with the _nicknames_ used below, are part o
1616
- [_o2-readout-receiver_](#receiver) or _Receiver_ : a process to receive data from _Readout_ by FMQ, e.g. for local communication tests when STFB is not available.
1717
- [_o2-readout-status_] or _ReadoutStatus_: an interactive interface to displaying runtime statistics connected by _Monitor_.
1818
- [_o2-readout-rawmmerger_](#rawmerger) or _RawMerger_: a tool to concatenate multiple raw data files in a single file, e.g. for replay by _Readout_.
19+
- [_o2-readout-sync-server_](#syncserver) or _SyncServer_: a tool to generate synchronization signals, e.g. for replay by multiple _Readout_ instances.
1920

2021
There are also some readout internal test components, not used in normal runtime conditions, for development and debugging purpose (_o2-readout-test-*_)
2122
The source code repository is [https://github.com/AliceO2Group/Readout].
@@ -271,6 +272,11 @@ o2-readout-rawmerger outputFile=/local/replay/2024-02-07-LHC23zzk_544490_50khz_T
271272
It assumes that all input files have the same trigger orbit sequence in order to create an output file with data grouped by timeframes.
272273

273274

275+
## SyncServer
276+
277+
This is a console utility to generate synchronization signals for multiple running readout instances at start of run.
278+
279+
274280
## EventDump
275281

276282
This is an interactive program to check/display content of online data taken with Readout. It needs a special consumer defined in Readout configuration, to publish data pages over ZeroMQ:

doc/configurationParameters.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ The parameters related to 3rd-party libraries are described here for convenience
206206
| readout | disableAggregatorSlicing | int | 0 | When set, the aggregator slicing is disabled, data pages are passed through without grouping/slicing. |
207207
| readout | disableTimeframes | int | 0 | When set, all timeframe related features are disabled (this may supersede other config parameters). |
208208
| readout | exitTimeout | double | -1 | Time in seconds after which the program exits automatically. -1 for unlimited. |
209+
| readout | externalSyncServer | string | | If set, ZMQ address to request SYNC signal at SOR. |
210+
| readout | externalSyncTimeout | int | 3000 | Timeout (in milliseconds) to wait for the SYNC signal at SOR (when externalSyncServer is defined). |
209211
| readout | fairmqConsoleSeverity | int | -1 | Select amount of FMQ messages with fair::Logger::SetConsoleSeverity(). Value as defined in Severity enum defined from FairLogger/Logger.h. Use -1 to leave current setting. |
210212
| readout | flushConsumerTimeout | double | 1 | Time in seconds to wait before stopping the consumers (ie wait allocated pages released). 0 means stop immediately. |
211213
| readout | flushEquipmentTimeout | double | 1 | Time in seconds to wait for data once the equipments are stopped. 0 means stop immediately. |

doc/releaseNotes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,8 @@ This file describes the main feature changes for each readout.exe released versi
661661
- equipment.TFperiod is now set to 32 by default, instead of 128 previously. This is the duration of a timeframe, in number of LHC orbits. The new value corresponds to what is used in production.
662662
- equipment-rorc-*: added parameter monitorFirstOrbitEnabled, to enable reporting to monitoring system the metric "readout.RORCfirstOrbit" on startup, as retrieved from CRU firmware. This is to be used on CTP FLP to detect possibly wrong orbit. A return value of 0xFFFFFFFF indicates the value could not be retrieved.
663663
- ConsumerStats: added an internal queue to allow pushing spontaneous monitoring measurements (compared to periodic ones) from any readout module.
664+
665+
## next
666+
- Added configuration parameters:
667+
- readout.externalSyncServer and readout.externalSyncTimeout, to connect and wait at START for a sync signal sent by o2-readout-sync-server.
668+
- Added o2-readout-sync-server utility, to generate a sync signal in order to start multiple readout replay instances synchronously. (implemented with ZMQ REQ/REP).

src/mainReadout.cxx

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ class Readout
398398
int cfgLogbookUpdateInterval;
399399
std::string cfgDatabaseCxParams;
400400
std::string cfgTimeframeServerUrl;
401+
std::string cfgExternalSyncServer;
402+
int cfgExternalSyncTimeout;
401403
int cfgVerbose = 0;
402404
int cfgMaxMsgError; // maximum number of error messages before stopping run
403405
int cfgMaxMsgWarning; // maximum number of warning messages before stopping run
@@ -1209,6 +1211,18 @@ int Readout::_configure(const boost::property_tree::ptree& properties)
12091211
#endif
12101212
}
12111213

1214+
// configuration parameter: | readout | externalSyncServer | string | | If set, ZMQ address to request SYNC signal at SOR. |
1215+
cfg.getOptionalValue<std::string>("readout.externalSyncServer", cfgExternalSyncServer);
1216+
// configuration parameter: | readout | externalSyncTimeout | int | 3000 | Timeout (in milliseconds) to wait for the SYNC signal at SOR (when externalSyncServer is defined). |
1217+
cfg.getOptionalValue<int>("readout.externalSyncTimeout", cfgExternalSyncTimeout, 3000);
1218+
if (cfgExternalSyncServer != "") {
1219+
#ifdef WITH_ZMQ
1220+
theLog.log(LogInfoDevel_(3002), "External SOR synchronization enabled: server = %s timeout = %dms", cfgExternalSyncServer.c_str(), cfgExternalSyncTimeout);
1221+
#else
1222+
theLog.log(LogWarningSupport_(3101), "Skipping SOR synchronization - not supported by this build");
1223+
#endif
1224+
}
1225+
12121226
#ifdef WITH_FAIRMQ
12131227
// configuration parameter: | readout | fairmqConsoleSeverity | int | -1 | Select amount of FMQ messages with fair::Logger::SetConsoleSeverity(). Value as defined in Severity enum defined from FairLogger/Logger.h. Use -1 to leave current setting. |
12141228
int cfgFairmqConsoleSeverity = -1;
@@ -1593,6 +1607,32 @@ int Readout::_start()
15931607
theLog.log(LogInfoDevel, "Run number not defined");
15941608
}
15951609

1610+
if (cfgExternalSyncServer != "") {
1611+
#ifdef WITH_ZMQ
1612+
// sync barrier here
1613+
// send a request to remote server and wait for SYNC reply (or timeout) before proceeding
1614+
theLog.log(LogInfoDevel, "Readout sync starting");
1615+
std::string msg = std::to_string((int)occRunNumber); // send run number to sync server
1616+
void* context = zmq_ctx_new();
1617+
void* socket = zmq_socket(context, ZMQ_REQ);
1618+
zmq_connect(socket, cfgExternalSyncServer.c_str());
1619+
int timeout = cfgExternalSyncTimeout; // ms
1620+
zmq_setsockopt(socket, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)); // for zmq_send
1621+
zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); // for zmq_recv
1622+
zmq_send(socket, msg.c_str(), msg.size(), 0);
1623+
char buffer[256];
1624+
int size = zmq_recv(socket, buffer, sizeof(buffer) - 1, 0);
1625+
if (size == -1) {
1626+
theLog.log(LogInfoDevel, "Readout sync timeout");
1627+
} else {
1628+
buffer[size] = '\0'; // for later use. no use of the reply so far.
1629+
theLog.log(LogInfoDevel, "Readout sync done");
1630+
}
1631+
zmq_close(socket);
1632+
zmq_ctx_term(context);
1633+
#endif
1634+
}
1635+
15961636
theLog.resetMessageCount();
15971637
theLog.log(LogInfoSupport_(3005), "Readout executing START");
15981638
gReadoutStats.reset(1);

src/readoutConfigEditor.tcl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ set configurationParametersDescriptor {
152152
| readout | disableAggregatorSlicing | int | 0 | When set, the aggregator slicing is disabled, data pages are passed through without grouping/slicing. |
153153
| readout | disableTimeframes | int | 0 | When set, all timeframe related features are disabled (this may supersede other config parameters). |
154154
| readout | exitTimeout | double | -1 | Time in seconds after which the program exits automatically. -1 for unlimited. |
155+
| readout | externalSyncServer | string | | If set, ZMQ address to request SYNC signal at SOR. |
156+
| readout | externalSyncTimeout | int | 3000 | Timeout (in milliseconds) to wait for the SYNC signal at SOR (when externalSyncServer is defined). |
155157
| readout | fairmqConsoleSeverity | int | -1 | Select amount of FMQ messages with fair::Logger::SetConsoleSeverity(). Value as defined in Severity enum defined from FairLogger/Logger.h. Use -1 to leave current setting. |
156158
| readout | flushConsumerTimeout | double | 1 | Time in seconds to wait before stopping the consumers (ie wait allocated pages released). 0 means stop immediately. |
157159
| readout | flushEquipmentTimeout | double | 1 | Time in seconds to wait for data once the equipments are stopped. 0 means stop immediately. |

src/syncServer.cxx

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// server to implement simple ZMQ REQ/REPLY synchronization barrier
2+
// all clients with same syncId will get a (almost) synchronous after a timeout, which triggers after no new client with this id connects
3+
4+
#include <zmq.h>
5+
#include <iostream>
6+
#include <string>
7+
#include <unordered_map>
8+
#include <vector>
9+
#include <thread>
10+
#include <mutex>
11+
#include <chrono>
12+
#include <cstring>
13+
#include <algorithm>
14+
15+
#include <InfoLogger/InfoLogger.hxx>
16+
#include <InfoLogger/InfoLoggerMacros.hxx>
17+
18+
// definition of a global for logging
19+
using namespace AliceO2::InfoLogger;
20+
InfoLogger theLog;
21+
22+
23+
using namespace std;
24+
using namespace std::chrono;
25+
26+
struct Client {
27+
zmq_msg_t identity;
28+
};
29+
30+
struct Group {
31+
vector<Client> clients;
32+
time_point<steady_clock> lastActivity;
33+
bool waiting = false;
34+
};
35+
36+
unordered_map<int, Group> groups;
37+
mutex groups_mutex;
38+
void* router_socket = nullptr;
39+
40+
void send_reply_to_group(int syncId) {
41+
vector<Client> clientsToReply;
42+
43+
{
44+
lock_guard lock(groups_mutex);
45+
auto& group = groups[syncId];
46+
clientsToReply = move(group.clients);
47+
group.clients.clear();
48+
group.waiting = false;
49+
}
50+
51+
for (auto& client : clientsToReply) {
52+
zmq_msg_send(&client.identity, router_socket, ZMQ_SNDMORE);
53+
zmq_msg_close(&client.identity);
54+
55+
zmq_msg_t empty;
56+
zmq_msg_init_size(&empty, 0);
57+
zmq_msg_send(&empty, router_socket, ZMQ_SNDMORE);
58+
zmq_msg_close(&empty);
59+
60+
std::string reply = "SYNC for id " + std::to_string(syncId);
61+
zmq_msg_t msg;
62+
zmq_msg_init_size(&msg, reply.length());
63+
memcpy(zmq_msg_data(&msg), reply.c_str(), reply.length());
64+
zmq_msg_send(&msg, router_socket, 0);
65+
zmq_msg_close(&msg);
66+
}
67+
68+
theLog.log(LogInfoDevel, "SYNC for id %d sent to %d clients", syncId, (int) clientsToReply.size());
69+
}
70+
71+
72+
void start_group_timer(int syncId) {
73+
std::thread([syncId]() {
74+
using namespace std::chrono;
75+
constexpr auto timeout = seconds(5);
76+
77+
while (true) {
78+
time_point<steady_clock> last;
79+
80+
{
81+
std::lock_guard lock(groups_mutex);
82+
last = groups[syncId].lastActivity;
83+
}
84+
85+
auto now = steady_clock::now();
86+
auto elapsed = now - last;
87+
88+
if (elapsed >= timeout) {
89+
send_reply_to_group(syncId);
90+
break;
91+
}
92+
93+
auto remaining = timeout - elapsed;
94+
auto sleep_duration = std::min(duration_cast<milliseconds>(remaining), milliseconds(500));
95+
std::this_thread::sleep_for(sleep_duration);
96+
}
97+
}).detach();
98+
}
99+
100+
101+
102+
int main() {
103+
void* context = zmq_ctx_new();
104+
router_socket = zmq_socket(context, ZMQ_ROUTER);
105+
const char *address = "tcp://*:50003";
106+
zmq_bind(router_socket, address);
107+
108+
theLog.setContext(InfoLoggerContext({ { InfoLoggerContext::FieldName::Facility, (std::string) "readout/sync" } }));
109+
110+
theLog.log(LogInfoDevel, "readout SYNC server started on %s", address);
111+
112+
while (true) {
113+
zmq_msg_t identity;
114+
zmq_msg_init(&identity);
115+
zmq_msg_recv(&identity, router_socket, 0);
116+
117+
zmq_msg_t empty;
118+
zmq_msg_init(&empty);
119+
zmq_msg_recv(&empty, router_socket, 0);
120+
zmq_msg_close(&empty);
121+
122+
zmq_msg_t message;
123+
zmq_msg_init(&message);
124+
zmq_msg_recv(&message, router_socket, 0);
125+
126+
string msg_str((char*)zmq_msg_data(&message), zmq_msg_size(&message));
127+
int syncId = stoi(msg_str);
128+
zmq_msg_close(&message);
129+
130+
lock_guard lock(groups_mutex);
131+
auto& group = groups[syncId];
132+
133+
Client c;
134+
zmq_msg_init(&c.identity);
135+
zmq_msg_copy(&c.identity, &identity);
136+
group.clients.push_back(move(c));
137+
group.lastActivity = steady_clock::now();
138+
139+
theLog.log(LogInfoDevel, "New client waiting for sync id %d", syncId);
140+
141+
if (!group.waiting) {
142+
group.waiting = true;
143+
start_group_timer(syncId);
144+
}
145+
146+
zmq_msg_close(&identity);
147+
}
148+
149+
zmq_close(router_socket);
150+
zmq_ctx_term(context);
151+
return 0;
152+
}

src/testSyncClient.cxx

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// client to test a simple ZMQ REQ/REPLY synchronization barrier
2+
3+
#include <zmq.h>
4+
#include <iostream>
5+
#include <string>
6+
#include <cstring>
7+
8+
int main(int argc, char* argv[]) {
9+
if (argc != 2) {
10+
std::cerr << "Usage: o2-readout-test-sync-client <syncId>\n";
11+
return 1;
12+
}
13+
14+
int syncId = std::stoi(argv[1]);
15+
std::string msg = std::to_string(syncId);
16+
17+
void* context = zmq_ctx_new();
18+
void* socket = zmq_socket(context, ZMQ_REQ);
19+
zmq_connect(socket, "tcp://localhost:50003");
20+
21+
int timeout = 7000; // ms
22+
zmq_setsockopt(socket, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)); // for zmq_send
23+
zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); // for zmq_recv
24+
25+
zmq_send(socket, msg.c_str(), msg.size(), 0);
26+
27+
char buffer[256];
28+
int size = zmq_recv(socket, buffer, sizeof(buffer) - 1, 0);
29+
30+
if (size == -1) {
31+
std::cout << "Timeout: No reply within " << timeout << " ms\n";
32+
} else {
33+
buffer[size] = '\0';
34+
std::cout << "Reply: " << buffer << "\n";
35+
}
36+
37+
zmq_close(socket);
38+
zmq_ctx_term(context);
39+
return 0;
40+
}

0 commit comments

Comments
 (0)