Skip to content

Commit 7b81935

Browse files
authored
Merge pull request #195 from sy-c/master
v2.4.2
2 parents b9fd8f1 + 3324b82 commit 7b81935

File tree

10 files changed

+259
-86
lines changed

10 files changed

+259
-86
lines changed

doc/releaseNotes.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,9 @@ This file describes the main feature changes for each readout.exe released versi
359359
- Updated configuration parameters:
360360
- added readout.disableTimefarmes: when set, all timeframe-related features are disabled (STF slicing, TF rate limits, etc). All data are tagged with TF id = 0. To be used for some calibration runs not using a central trigger clock.
361361
- added consumer-FMQchannel.checkResources: controls which resources are checked for fitting unmanaged region. This is a comma-separated list of items to be checked. By default, no checks are done. Recommended value: /dev/shm, MemAvailable.
362+
363+
## v2.4.2 - 28/06/2021
364+
- verbose logs auto-mute (aggregator, consumer-FMQchannel).
365+
- consumer-FMQchannel: drop TF on error (to avoid unhappy STFB when sending incomplete data, eg on "data page too small" or "no page left" conditions).
366+
- added memory pool usage statistics (to help tuning buffer pages count and size).
367+
- added some ZeroMQ options for consumerZMQ and equipmentZMQ.

src/ConsumerFMQchannel.cxx

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
#include "MemoryPagesPool.h"
1515
#include "ReadoutStats.h"
1616
#include "ReadoutUtils.h"
17+
#include "CounterStats.h"
1718
#include <atomic>
1819
#include <chrono>
20+
#include <inttypes.h>
1921

2022
#ifdef WITH_FAIRMQ
2123

@@ -105,6 +107,8 @@ class ConsumerFMQchannel : public Consumer
105107
int memoryPoolPageSize;
106108
int memoryPoolNumberOfPages;
107109

110+
CounterStats repackSizeStats; // keep track of page size used when repacking
111+
108112
public:
109113
std::vector<FairMQMessagePtr> messagesToSend; // collect HBF messages of each update
110114
uint64_t messagesToSendSize; // size (bytes) of messagesToSend payload
@@ -277,6 +281,12 @@ class ConsumerFMQchannel : public Consumer
277281

278282
~ConsumerFMQchannel()
279283
{
284+
// log memory pool statistics
285+
if (mp!=nullptr) {
286+
theLog.log(LogInfoDevel_(3003), "Consumer %s - memory pool statistics ... %s", name.c_str(), mp->getStats().c_str());
287+
theLog.log(LogInfoDevel_(3003), "Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64, name.c_str(), repackSizeStats.getCount(), (uint64_t)repackSizeStats.getAverage(), repackSizeStats.getMaximum());
288+
}
289+
280290
// release in reverse order
281291
mp = nullptr;
282292
memoryBuffer = nullptr; // warning: data range may still be referenced in memory bank manager
@@ -596,11 +606,16 @@ class ConsumerFMQchannel : public Consumer
596606
for (auto& f : pendingFrames) {
597607
totalSize += f.HBlength;
598608
}
609+
610+
// keep stats on repack page size
611+
repackSizeStats.set(totalSize);
612+
599613
// allocate
600614
// todo: same code as for header -> create func/lambda
601615
// todo: send empty message if no page left in buffer
602616
if (memoryPoolPageSize < totalSize) {
603-
theLog.log(LogWarningSupport_(3230), "page size too small %d < %d", memoryPoolPageSize, totalSize);
617+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3230));
618+
theLog.log(token, "page size too small %d < %d", memoryPoolPageSize, totalSize);
604619
throw __LINE__;
605620
}
606621
DataBlockContainerReference copyBlock = nullptr;
@@ -609,7 +624,8 @@ class ConsumerFMQchannel : public Consumer
609624
} catch (...) {
610625
}
611626
if (copyBlock == nullptr) {
612-
theLog.log(LogWarningSupport_(3230), "no page left");
627+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3230));
628+
theLog.log(token, "no page left");
613629
throw __LINE__;
614630
}
615631
auto blockRef = new DataBlockContainerReference(copyBlock);
@@ -705,8 +721,10 @@ class ConsumerFMQchannel : public Consumer
705721
}
706722
}
707723
pendingFrames.clear();
708-
709-
theLog.log(LogErrorSupport_(3233), "ConsumerFMQ : error %d", err);
724+
static InfoLogger::AutoMuteToken token(LogErrorSupport_(3233));
725+
theLog.log(token, "ConsumerFMQ : error %d", err);
726+
// cleanup buffer, and start fresh (in particular: avoid sending empty message parts)
727+
messagesToSend.clear();
710728
totalPushError++;
711729
return -1;
712730
}

src/ConsumerZmq.cxx

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,16 @@ class ConsumerZMQ : public Consumer
2626
uint64_t nBytesSent = 0;
2727
uint64_t nBlocksSent = 0;
2828
std::string cfgAddress = "tcp://127.0.0.1:50001";
29-
int cfgZmqMaxQueue = 10;
30-
int cfgZmqConflate = 1;
31-
int cfgZmqSendTimeout = 1000;
32-
int cfgZmqLinger = 1000;
29+
30+
// default ZMQ settings for data monitoring
31+
// settings for CTP readout: ZMQ_CONFLATE=0,ZMQ_IO_THREADS=4,ZMQ_SNDHWM=1000
32+
int cfg_ZMQ_CONFLATE = 1; // buffer last message only
33+
int cfg_ZMQ_IO_THREADS = 1; // number of IO threads
34+
int cfg_ZMQ_LINGER = 1000; // close timeout
35+
int cfg_ZMQ_SNDBUF = 16*1024*1024; // kernel transmit buffer size
36+
int cfg_ZMQ_SNDHWM = 10; // max send queue size
37+
int cfg_ZMQ_SNDTIMEO = 1000; // send timeout
38+
3339
double cfgMaxRate = 0; // max number of pages per second (average)
3440
int cfgPagesPerBurst = 1; // number of pages per burst (peak successive pages accepted without avg rate check)
3541
int pagesInBurst = 0; // current number of pages in burst
@@ -50,8 +56,37 @@ class ConsumerZMQ : public Consumer
5056
if (cfgPagesPerBurst < 1) {
5157
cfgPagesPerBurst = 1;
5258
}
53-
theLog.log(LogInfoDevel_(3002), "ZeroMQ server @ %s, rate limit = %.4f pages/s, in burst of %d pages", cfgAddress.c_str(), cfgMaxRate, cfgPagesPerBurst);
5459

60+
// configuration parameter: | consumer-zmq-* | zmqOptions | string | | Additional ZMQ options, as a comma-separated list of key=value pairs. Possible keys: ZMQ_CONFLATE, ZMQ_IO_THREADS, ZMQ_LINGER, ZMQ_SNDBUF, ZMQ_SNDHWM, ZMQ_SNDTIMEO. |
61+
std::string cfg_ZMQOptions = "";
62+
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".zmqOptions", cfg_ZMQOptions);
63+
std::map<std::string, std::string> mapOptions;
64+
if (getKeyValuePairsFromString(cfg_ZMQOptions, mapOptions)) {
65+
throw("Can not parse configuration item zmqOptions");
66+
}
67+
bool isOk=1;
68+
for (auto& it : mapOptions) {
69+
int value = atoi(it.second.c_str());
70+
if (it.first=="ZMQ_CONFLATE") { cfg_ZMQ_CONFLATE = value; }
71+
else if (it.first=="ZMQ_IO_THREADS") { cfg_ZMQ_IO_THREADS = value; }
72+
else if (it.first=="ZMQ_LINGER") { cfg_ZMQ_LINGER = value; }
73+
else if (it.first=="ZMQ_SNDBUF") { cfg_ZMQ_SNDBUF = value; }
74+
else if (it.first=="ZMQ_SNDHWM") { cfg_ZMQ_SNDHWM = value; }
75+
else if (it.first=="ZMQ_SNDTIMEO") { cfg_ZMQ_SNDTIMEO = value; }
76+
else {
77+
theLog.log(LogErrorSupport_(3102), "Wrong ZMQ option %s", it.first.c_str());
78+
isOk = 0;
79+
continue;
80+
}
81+
}
82+
if (!isOk) {
83+
throw __LINE__;
84+
}
85+
86+
// log config summary
87+
theLog.log(LogInfoDevel_(3002), "ZeroMQ PUB server @ %s, rate limit = %.4f pages/s, in burst of %d pages", cfgAddress.c_str(), cfgMaxRate, cfgPagesPerBurst);
88+
theLog.log(LogInfoDevel_(3002), "ZMQ options: ZMQ_SNDHWM=%d ZMQ_CONFLATE=%d ZMQ_SNDTIMEO=%d ZMQ_LINGER=%d ZMQ_SNDBUF=%d ZMQ_IO_THREADS=%d", cfg_ZMQ_SNDHWM, cfg_ZMQ_CONFLATE, cfg_ZMQ_SNDTIMEO, cfg_ZMQ_LINGER, cfg_ZMQ_SNDBUF, cfg_ZMQ_IO_THREADS);
89+
5590
int linerr = 0;
5691
int zmqerr = 0;
5792
for (;;) {
@@ -61,25 +96,26 @@ class ConsumerZMQ : public Consumer
6196
zmqerr = zmq_errno();
6297
break;
6398
}
99+
100+
zmq_ctx_set(context, ZMQ_IO_THREADS, cfg_ZMQ_IO_THREADS);
101+
if (zmq_ctx_get(context, ZMQ_IO_THREADS) != cfg_ZMQ_IO_THREADS) {
102+
linerr = __LINE__;
103+
break;
104+
}
64105
zh = zmq_socket(context, ZMQ_PUB);
65-
/*
66106
if (zh==nullptr) { linerr=__LINE__; zmqerr=zmq_errno(); break; }
67-
int timeout = 1000;
68-
zmqerr=zmq_setsockopt(zh, ZMQ_RCVTIMEO, (void*) &timeout, sizeof(int));
107+
zmqerr = zmq_bind(zh, cfgAddress.c_str());
69108
if (zmqerr) { linerr=__LINE__; break; }
70-
int linger = 0;
71-
zmqerr=zmq_setsockopt(zh, ZMQ_LINGER, (void*) &linger, sizeof(int));
109+
zmqerr = zmq_setsockopt(zh, ZMQ_CONFLATE, &cfg_ZMQ_CONFLATE, sizeof(cfg_ZMQ_CONFLATE));
110+
if (zmqerr) { linerr=__LINE__; break; }
111+
zmqerr = zmq_setsockopt(zh, ZMQ_LINGER, (void*)&cfg_ZMQ_LINGER, sizeof(cfg_ZMQ_LINGER));
112+
if (zmqerr) { linerr=__LINE__; break; }
113+
zmqerr = zmq_setsockopt(zh, ZMQ_SNDBUF, (void*)&cfg_ZMQ_SNDBUF, sizeof(cfg_ZMQ_SNDBUF));
114+
if (zmqerr) { linerr=__LINE__; break; }
115+
zmqerr=zmq_setsockopt(zh, ZMQ_SNDHWM, &cfg_ZMQ_SNDHWM, sizeof(cfg_ZMQ_SNDHWM));
116+
if (zmqerr) { linerr=__LINE__; break; }
117+
zmqerr = zmq_setsockopt(zh, ZMQ_SNDTIMEO, (void*)&cfg_ZMQ_SNDTIMEO, sizeof(cfg_ZMQ_SNDTIMEO));
72118
if (zmqerr) { linerr=__LINE__; break; }
73-
*/
74-
zmqerr = zmq_bind(zh, cfgAddress.c_str());
75-
if (zmqerr) {
76-
linerr = __LINE__;
77-
break;
78-
}
79-
zmq_setsockopt(zh, ZMQ_SNDHWM, &cfgZmqMaxQueue, sizeof(cfgZmqMaxQueue)); // max queue size
80-
zmq_setsockopt(zh, ZMQ_CONFLATE, &cfgZmqConflate, sizeof(cfgZmqConflate)); // buffer last message only
81-
zmq_setsockopt(zh, ZMQ_SNDTIMEO, (void*)&cfgZmqSendTimeout, sizeof(cfgZmqSendTimeout)); // send timeout
82-
zmq_setsockopt(zh, ZMQ_LINGER, (void*)&cfgZmqLinger, sizeof(cfgZmqLinger)); // close timeout
83119
break;
84120
}
85121

@@ -103,6 +139,7 @@ class ConsumerZMQ : public Consumer
103139
}
104140
// the stats are not meaningfull for a ZMQ PUB: send always works...
105141
// theLog.log(LogInfoDevel_(3003), "ZeroMQ stats: %" PRIu64 "/%" PRIu64 " blocks sent/discarded", nBlocksSent, nBlocksDropped);
142+
theLog.log(LogInfoDevel_(3003), "ZeroMQ publish stats: %" PRIu64 " blocks %" PRIu64 " bytes", nBlocksSent, nBytesSent);
106143
}
107144

108145
int pushData(DataBlockContainerReference& b)

src/DataBlockAggregator.cxx

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ Thread::CallbackResult DataBlockAggregator::executeCallback()
170170
uint64_t tfId = db->header.timeframeId;
171171
uint64_t sourceId = (((uint64_t)db->header.equipmentId) << 32) | ((uint64_t)db->header.linkId);
172172
if (tfId <= lastTimeframeId) {
173-
theLog.log(LogWarningSupport_(3235), "Discarding late data for TF %" PRIu64 " (source = 0x%" PRIx64 ")", tfId, sourceId);
173+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3004));
174+
theLog.log(token, "Discarding late data for TF %" PRIu64 " (source = 0x%" PRIx64 ")", tfId, sourceId);
174175
} else {
175176
tStf& stf = stfBuffer[tfId];
176177
stf.tfId = tfId;
@@ -221,7 +222,12 @@ Thread::CallbackResult DataBlockAggregator::executeCallback()
221222
nSources = it->second.sstf.size(); // keep track of number of sources in first TF
222223
}
223224
nStfPushed++;
224-
lastTimeframeId = it->second.tfId;
225+
uint64_t newTimeframeId = it->second.tfId;
226+
if (newTimeframeId > lastTimeframeId + 1) {
227+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3004));
228+
theLog.log(token, "Gap in timeframe ids detected: previous = %" PRIu64 " new = %" PRIu64, lastTimeframeId, newTimeframeId);
229+
}
230+
lastTimeframeId = newTimeframeId;
225231
/*
226232
if (lastTimeframeId % 10 == 1) {
227233
theLog.log(LogDebugTrace, "LastTimeframeId=%lu deltaT=%f",lastTimeframeId,tmax-tmin);
@@ -259,7 +265,8 @@ int DataBlockSlicer::appendBlock(DataBlockContainerReference const& block, doubl
259265

260266
if (sourceId.linkId != undefinedLinkId) {
261267
if (sourceId.linkId >= maxLinks) {
262-
theLog.log(LogWarningSupport_(3004), "wrong link id %d > %d", sourceId.linkId, maxLinks - 1);
268+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3004));
269+
theLog.log(token, "wrong link id %d > %d", sourceId.linkId, maxLinks - 1);
263270
return -1;
264271
}
265272
}

src/MemoryPagesPool.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ MemoryPagesPool::~MemoryPagesPool()
140140

141141
void* MemoryPagesPool::getPage()
142142
{
143+
// update statistics
144+
poolStats.set((CounterValue)getNumberOfPagesAvailable());
145+
143146
// get a page from fifo, if available
144147
void* ptr = nullptr;
145148
pagesAvailable->pop(ptr);
@@ -260,3 +263,7 @@ bool MemoryPagesPool::isPageValid(void* pagePtr)
260263
}
261264

262265
size_t MemoryPagesPool::getDataBlockMaxSize() { return pageSize - headerReservedSpace; }
266+
267+
std::string MemoryPagesPool::getStats() {
268+
return "number of pages used: " + std::to_string(poolStats.getTotal()) + " average free pages: " + std::to_string((uint64_t)poolStats.getAverage()) + " minimum free pages: " + std::to_string(poolStats.getMinimum());
269+
}

src/MemoryPagesPool.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <functional>
1717
#include <map>
1818
#include <memory>
19+
#include <string>
1920

2021
#include "CounterStats.h"
2122
#include "DataBlockContainer.h"
@@ -69,6 +70,8 @@ class MemoryPagesPool
6970

7071
bool isPageValid(void* page); // check to see if a page address is valid
7172

73+
std::string getStats(); // return a string summarizing memory pool usage statistics
74+
7275
private:
7376
std::unique_ptr<AliceO2::Common::Fifo<void*>> pagesAvailable; // a buffer to keep track of individual pages
7477

@@ -104,6 +107,8 @@ class MemoryPagesPool
104107
// t3: releasepage->getpage
105108
// t4: getpage->releasepage
106109
CounterStats t1, t2, t3, t4;
110+
111+
CounterStats poolStats; // keep track of number of free pages in the pool
107112
};
108113

109114
#endif // #ifndef _MEMORYPAGESPOOL_H

src/ReadoutEquipment.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ void ReadoutEquipment::stop()
253253

254254
ReadoutEquipment::~ReadoutEquipment()
255255
{
256+
if (mp != nullptr) {
257+
theLog.log(LogInfoDevel_(3003), "Equipment %s - memory pool statistics ... %s", name.c_str(), mp->getStats().c_str());
258+
}
259+
256260
// check if mempool still referenced
257261
if (!mp.unique()) {
258262
theLog.log(LogInfoDevel_(3008), "Equipment %s : mempool still has %d references\n", name.c_str(), (int)mp.use_count());
@@ -339,7 +343,7 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
339343
}
340344

341345
// handle RDH-formatted data
342-
if (ptr->isRdhEquipment) {
346+
if (ptr->cfgRdhUseFirstInPageEnabled) {
343347
ptr->processRdh(nextBlock);
344348
}
345349

0 commit comments

Comments
 (0)