Skip to content

Commit 1796476

Browse files
committed
default set to use multithread mode (1) + FMQ send timeout to avoid locks on EOR
1 parent 8dde7e7 commit 1796476

File tree

1 file changed

+10
-3
lines changed

1 file changed

+10
-3
lines changed

src/ConsumerFMQchannel.cxx

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class ConsumerFMQchannel : public Consumer
178178
}
179179

180180
// pool of threads for the processing
181-
int nwThreads = 0;
181+
int nwThreads = 1;
182182
int wThreadFifoSize = 0;
183183

184184
struct DDMessage {
@@ -412,6 +412,9 @@ class ConsumerFMQchannel : public Consumer
412412

413413
// configuration parameter: | consumer-FairMQChannel-* | threads | int | 0 | If set, a pool of thread is created for the data processing. |
414414
cfg.getOptionalValue<int>(cfgEntryPoint + ".threads", nwThreads);
415+
if (nwThreads) {
416+
theLog.log(LogInfoDevel_(3008), "Using %d threads for DD formatting", nwThreads);
417+
}
415418
if (nwThreads) {
416419
wThreadFifoSize = 88 / nwThreads; // 1s of buffer
417420
wThreads.resize(nwThreads);
@@ -1194,8 +1197,12 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
11941197

11951198
int ConsumerFMQchannel::DDsendMessage(DDMessage &ddm) {
11961199
// send the messages
1197-
// TODO: use a timeout to avoid non-blocking ???
1198-
if (sendingChannel->Send(ddm.messagesToSend) >= 0) {
1200+
int err;
1201+
while (!wThreadShutdown) {
1202+
err = sendingChannel->Send(ddm.messagesToSend, 500);
1203+
if (err>=0) break;
1204+
}
1205+
if ( err >= 0) {
11991206
gReadoutStats.counters.bytesFairMQ += ddm.subTimeframeTotalSize;
12001207
gReadoutStats.counters.timeframeIdFairMQ = ddm.stfHeader->timeframeId;
12011208
gReadoutStats.counters.notify++;

0 commit comments

Comments
 (0)