Skip to content

Commit 1203542

Browse files
authored
Merge pull request #190 from sy-c/master
v2.4.0
2 parents c0f84b6 + ef3b836 commit 1203542

15 files changed

+327
-76
lines changed

CMakeLists.txt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ find_package(FairMQ)
6767
find_package(ZLIB)
6868
find_package(RDMA)
6969
find_package(Occ)
70-
#find_package(BookkeepingApiCpp)
71-
#find_package(cpprestsdk)
70+
find_package(BookkeepingApiCpp)
7271
find_package(ZMQ)
7372

7473
# extract include directories from targets
@@ -339,6 +338,12 @@ add_executable(
339338
$<TARGET_OBJECTS:objReadoutUtils>
340339
)
341340

341+
# a test for FMQ memory
342+
add_executable(
343+
o2-readout-test-fmq-memory
344+
${SOURCE_DIR}/testFmqMemory.cxx
345+
)
346+
342347

343348
# a test to check memory banks
344349
add_executable(
@@ -401,7 +406,7 @@ add_executable(
401406

402407
# disable some executables when corresponding dependencies not found
403408
if (NOT FairMQ_FOUND)
404-
set_target_properties(o2-readout-test-fmq-tx o2-readout-test-fmq-rx o2-readout-test-fmq-perf-tx o2-readout-test-fmq-perf-rx PROPERTIES EXCLUDE_FROM_ALL 1)
409+
set_target_properties(o2-readout-test-fmq-tx o2-readout-test-fmq-rx o2-readout-test-fmq-perf-tx o2-readout-test-fmq-perf-rx o2-readout-test-fmq-memory PROPERTIES EXCLUDE_FROM_ALL 1)
405410
endif ()
406411

407412
# set include and libraries for all
@@ -420,6 +425,12 @@ if (${ZMQ_FOUND})
420425
list (APPEND executables o2-readout-monitor)
421426
endif()
422427

428+
# special minimal build for test-fmq-memory (to avoid grpc garbage from occ when using valgrind)
429+
foreach (exe o2-readout-test-fmq-memory)
430+
list (APPEND executables ${exe})
431+
target_include_directories(${exe} PRIVATE ${READOUT_INCLUDE_DIRS})
432+
target_link_libraries(${exe} PRIVATE $<$<BOOL:${FairMQ_FOUND}>:FairMQ::FairMQ>)
433+
endforeach()
423434

424435
###################################################
425436
# files installation

doc/releaseNotes.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,3 +345,12 @@ This file describes the main feature changes for each readout.exe released versi
345345
- consumer-stats: publish/print the current timeframe Id sent to STFB.
346346
- consumer-stats: ZMQ stats client cleanup timeout, to avoid blocking on exit.
347347
- auto-mute RDH warnings: verbosity reduced if many successive logs done in a short time.
348+
349+
## v2.4.0 - 03/06/2021
350+
- equipment-player: fixed bug in "autochunk" replay mode. There was a "last packet invalid" error wrongly reported (and aborting replay) in the rare case when a link change would occur exactly at the beginning of a page.
351+
- Default value of equipment-rorc.rdhUseFirstInPageEnabled is now 1 for all RDH equipments (RORC, emu, player).
352+
- FMQ stats not printed when consoleUpdate=1 unless there is a running consumerFMQchannel with disableSending=0.
353+
- tfRateLimit is handled in the equipment directly and avoid potential issues with timeframe slicing at very slow rates.
354+
- equipment-cruemulator: TF id extracted from trigger counters (single timer source for improved coherency).
355+
- Memory allocation policy updated: all readout memory is locked (RAM only, can not be swapped). A warning is reported if not.
356+
- consumer-FMQchannel: checks are done before FMQ shared memory region is created, to avoid going in a state with over-committed memory (no checks done in FMQ library about the validity of the region created, which can cause severe crash when trying to access it). Both /proc/meminfo (MemFree) and /dev/shm (if using shmem transport type) should report enough available memory before proceeding. Memory is also immediately locked and zeroed to avoid later crashes.

src/ConsumerFMQchannel.cxx

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ class ConsumerFMQchannel : public Consumer
118118
if (cfgDisableSending) {
119119
theLog.log(LogInfoDevel_(3002), "FMQ message sending disabled");
120120
disableSending = true;
121+
} else {
122+
gReadoutStats.isFairMQ = 1; // enable FMQ stats
121123
}
122124

123125
// configuration parameter: | consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
@@ -183,14 +185,47 @@ class ConsumerFMQchannel : public Consumer
183185
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".unmanagedMemorySize", cfgUnmanagedMemorySize);
184186
long long mMemorySize = ReadoutUtils::getNumberOfBytesFromString(cfgUnmanagedMemorySize.c_str());
185187
if (mMemorySize > 0) {
188+
189+
// check system resources first, as region creation does not check available memory, so bad crash could occur later
190+
theLog.log(LogInfoDevel_(3002), "Configuring memory buffer %lld MB", (long long)(mMemorySize/1048576LL));
191+
192+
// free system memory
193+
unsigned long long freeBytes;
194+
if (getStatsFreeMemory(freeBytes)) {
195+
theLog.log(LogWarningSupport_(3230), "Can not get stats about system free memory available");
196+
} else {
197+
theLog.log(LogInfoSupport_(3230), "Stats free memory available: %lld MB", (long long)(freeBytes/1048576LL));
198+
if ((long long)freeBytes < mMemorySize) {
199+
theLog.log(LogErrorSupport_(3230), "Not enough system memory available - check /proc/meminfo");
200+
throw "ConsumerFMQ: can not allocate shared memory region";
201+
} else {
202+
203+
}
204+
}
205+
206+
// free SHM memory
207+
// check only if transport is of type shmem
208+
if (cfgTransportType == "shmem") {
209+
if (getStatsFreeSHM(freeBytes)) {
210+
theLog.log(LogWarningSupport_(3230), "Can not get stats about shared memory available");
211+
} else {
212+
theLog.log(LogInfoSupport_(3230), "Stats shared memory available: %lld MB", (long long)(freeBytes/1048576LL));
213+
if ((long long)freeBytes < mMemorySize) {
214+
theLog.log(LogErrorSupport_(3230), "Not enough shared memory available - check /dev/shm");
215+
throw "ConsumerFMQ: can not allocate shared memory region";
216+
}
217+
}
218+
}
219+
220+
theLog.log(LogInfoDevel_(3008), "Creating FMQ unmanaged memory region");
186221
memoryBuffer = sendingChannel->Transport()->CreateUnmanagedRegion(mMemorySize, [](void* /*data*/, size_t /*size*/, void* hint) { // cleanup callback
187222
if (hint != nullptr) {
188223
DataBlockContainerReference* blockRef = (DataBlockContainerReference*)hint;
189224
//printf("ack hint=%p page %p\n",hint,(*blockRef)->getData());
190225
decDataBlockStats((*blockRef)->getData());
191226
delete blockRef;
192227
}
193-
});
228+
},"",0,fair::mq::RegionConfig{true, true}); // lock / zero
194229

195230
theLog.log(LogInfoDevel_(3008), "Got FMQ unmanaged memory buffer size %lu @ %p", memoryBuffer->GetSize(), memoryBuffer->GetData());
196231
}

src/ConsumerStats.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ class ConsumerStats : public Consumer
171171
if (consoleUpdate) {
172172
if (deltaT > 0) {
173173
theLog.log(LogInfoOps_(3003), "Last interval (%.2fs): blocksRx=%llu, block rate=%.2lf, bytesRx=%llu, rate=%s", deltaT, (unsigned long long)counterBlocksDiff, counterBlocksDiff / deltaT, (unsigned long long)counterBytesDiff, NumberOfBytesToString(counterBytesDiff * 8 / deltaT, "b/s", 1000).c_str());
174-
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
174+
if (gReadoutStats.isFairMQ) {
175+
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
176+
}
175177
}
176178
}
177179

src/ReadoutEquipment.cxx

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
extern tRunNumber occRunNumber;
1717

18-
ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
18+
ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, bool setRdhEquipment)
1919
{
2020

2121
// example: browse config keys
@@ -28,6 +28,13 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
2828
// configuration parameter: | equipment-* | name | string| | Name used to identify this equipment (in logs). By default, it takes the name of the configuration section, equipment-xxx |
2929
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".name", name, cfgEntryPoint);
3030

31+
// change defaults for equipments generating data with RDH
32+
if (setRdhEquipment) {
33+
theLog.log(LogInfoDevel_(3002), "Equipment %s: generates data with RDH, using specific defaults", name.c_str());
34+
isRdhEquipment = true;
35+
cfgRdhUseFirstInPageEnabled = 1; // by default, use first RDH in page
36+
}
37+
3138
// configuration parameter: | equipment-* | id | int| | Optional. Number used to identify equipment (used e.g. in file recording). Range 1-65535.|
3239
int cfgEquipmentId = undefinedEquipmentId;
3340
cfg.getOptionalValue<int>(cfgEntryPoint + ".id", cfgEquipmentId);
@@ -89,6 +96,10 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
8996
this->debugFirstPages = cfgDebugFirstPages;
9097
}
9198

99+
// get TF rate from toplevel config
100+
cfgTfRateLimit = 0;
101+
cfg.getOptionalValue<double>("readout.tfRateLimit", cfgTfRateLimit);
102+
92103
// log config summary
93104
theLog.log(LogInfoDevel_(3002), "Equipment %s: from config [%s], max rate=%lf Hz, idleSleepTime=%d us, outputFifoSize=%d", name.c_str(), cfgEntryPoint.c_str(), readoutRate, cfgIdleSleepTime, cfgOutputFifoSize);
94105
theLog.log(LogInfoDevel_(3008), "Equipment %s: requesting memory pool %d pages x %d bytes from bank '%s', block aligned @ 0x%X, 1st page offset @ 0x%X", name.c_str(), (int)memoryPoolNumberOfPages, (int)memoryPoolPageSize, memoryBankName.c_str(), (int)cfgBlockAlign, (int)cfgFirstPageOffset);
@@ -105,7 +116,7 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
105116
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhDumpErrorEnabled", cfgRdhDumpErrorEnabled);
106117
// configuration parameter: | equipment-* | rdhDumpWarningEnabled | int | 0 | If set, a log message is printed for each RDH header warning found.|
107118
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhDumpWarningEnabled", cfgRdhDumpWarningEnabled);
108-
// configuration parameter: | equipment-* | rdhUseFirstInPageEnabled | int | 0 | If set, the first RDH in each data page is used to populate readout headers (e.g. linkId).|
119+
// configuration parameter: | equipment-* | rdhUseFirstInPageEnabled | int | 0 or 1 | If set, the first RDH in each data page is used to populate readout headers (e.g. linkId). Default is 1 for equipments generating data with RDH, 0 otherwsise. |
109120
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhUseFirstInPageEnabled", cfgRdhUseFirstInPageEnabled);
110121
theLog.log(LogInfoDevel_(3002), "RDH settings: rdhCheckEnabled=%d rdhDumpEnabled=%d rdhDumpErrorEnabled=%d rdhDumpWarningEnabled=%d rdhUseFirstInPageEnabled=%d", cfgRdhCheckEnabled, cfgRdhDumpEnabled, cfgRdhDumpErrorEnabled, cfgRdhDumpWarningEnabled, cfgRdhUseFirstInPageEnabled);
111122

@@ -194,6 +205,10 @@ void ReadoutEquipment::start()
194205
}
195206
clk0.reset();
196207

208+
// reset TF rate clock
209+
TFregulator.init(cfgTfRateLimit);
210+
throttlePendingBlock = nullptr;
211+
197212
// reset stats timer
198213
consoleStatsTimer.reset(cfgConsoleStatsUpdateTime * 1000000);
199214

@@ -214,6 +229,9 @@ void ReadoutEquipment::stop()
214229
this->finalCounters();
215230
ReadoutEquipment::finalCounters();
216231

232+
// cleanup
233+
throttlePendingBlock = nullptr;
234+
217235
for (int i = 0; i < (int)EquipmentStatsIndexes::maxIndex; i++) {
218236
if (equipmentStats[i].getCount()) {
219237
theLog.log(LogInfoDevel_(3003), "%s.%s = %llu (avg=%.2lf min=%llu max=%llu count=%llu)", name.c_str(), EquipmentStatsNames[i], (unsigned long long)equipmentStats[i].get(), equipmentStats[i].getAverage(), (unsigned long long)equipmentStats[i].getMinimum(), (unsigned long long)equipmentStats[i].getMaximum(), (unsigned long long)equipmentStats[i].getCount());
@@ -253,7 +271,6 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
253271
if (ptr->usingSoftwareClock) {
254272
if (ptr->timeframeClock.isTimeout()) {
255273
ptr->currentTimeframe++;
256-
ptr->statsNumberOfTimeframes++;
257274
ptr->timeframeClock.increment();
258275
}
259276
}
@@ -301,38 +318,63 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
301318

302319
// get next block
303320
DataBlockContainerReference nextBlock = nullptr;
304-
try {
305-
nextBlock = ptr->getNextBlock();
306-
} catch (...) {
307-
theLog.log(LogWarningDevel_(3230), "getNextBlock() exception");
308-
break;
309-
}
310-
// printf("getNextBlock=%p\n",nextBlock);
311-
if (nextBlock == nullptr) {
312-
break;
313-
}
314-
315-
// handle RDH-formatted data
316-
if (ptr->isRdhEquipment) {
317-
ptr->processRdh(nextBlock);
318-
}
321+
if (ptr->throttlePendingBlock != nullptr) {
322+
nextBlock = std::move(ptr->throttlePendingBlock);
323+
} else {
324+
try {
325+
nextBlock = ptr->getNextBlock();
326+
} catch (...) {
327+
theLog.log(LogWarningDevel_(3230), "getNextBlock() exception");
328+
break;
329+
}
319330

320-
// tag data with equipment Id, if set (will overwrite field if was already set by equipment)
321-
if (ptr->id != undefinedEquipmentId) {
322-
nextBlock->getData()->header.equipmentId = ptr->id;
331+
if (nextBlock == nullptr) {
332+
break;
333+
}
334+
335+
// handle RDH-formatted data
336+
if (ptr->isRdhEquipment) {
337+
ptr->processRdh(nextBlock);
338+
}
339+
340+
// tag data with equipment Id, if set (will overwrite field if was already set by equipment)
341+
if (ptr->id != undefinedEquipmentId) {
342+
nextBlock->getData()->header.equipmentId = ptr->id;
343+
}
344+
345+
// tag data with block id
346+
ptr->currentBlockId++; // don't start from 0
347+
nextBlock->getData()->header.blockId = ptr->currentBlockId;
348+
349+
// tag data with (dummy) timeframeid, if none set
350+
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
351+
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
352+
}
353+
354+
// tag data with run number
355+
nextBlock->getData()->header.runNumber = occRunNumber;
323356
}
324-
325-
// tag data with block id
326-
ptr->currentBlockId++; // don't start from 0
327-
nextBlock->getData()->header.blockId = ptr->currentBlockId;
328-
329-
// tag data with (dummy) timeframeid, if none set
330-
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
331-
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
357+
358+
// check TF id of new block
359+
uint64_t tfId = nextBlock->getData()->header.timeframeId;
360+
if (tfId != ptr->lastTimeframe) {
361+
// regulate TF rate if needed
362+
if (!ptr->TFregulator.next()) {
363+
ptr->throttlePendingBlock = std::move(nextBlock); // keep block with new TF for later
364+
isActive = false; // ask for delay before retry
365+
break;
366+
}
367+
368+
ptr->statsNumberOfTimeframes++;
369+
370+
// detect gaps in TF id continuity
371+
if (tfId != ptr->lastTimeframe + 1) {
372+
if (ptr->cfgRdhDumpWarningEnabled) {
373+
theLog.log(LogWarningSupport_(3004), "Non-contiguous timeframe IDs %llu ... %llu", (unsigned long long)ptr->lastTimeframe, (unsigned long long)tfId);
374+
}
375+
}
332376
}
333-
334-
// tag data with run number
335-
nextBlock->getData()->header.runNumber = occRunNumber;
377+
ptr->lastTimeframe = tfId;
336378

337379
// update rate-limit clock
338380
if (ptr->readoutRate > 0) {
@@ -433,13 +475,15 @@ void ReadoutEquipment::initCounters()
433475
statsNumberOfTimeframes = 0;
434476

435477
// reset timeframe clock
478+
currentTimeframe = undefinedTimeframeId;
479+
lastTimeframe = undefinedTimeframeId;
480+
firstTimeframeHbOrbitBegin = undefinedOrbit;
481+
isDefinedFirstTimeframeHbOrbitBegin = 0;
436482
if (usingSoftwareClock) {
437483
timeframeClock.reset(1000000 / timeframeRate);
484+
currentTimeframe = 1;
438485
}
439486

440-
currentTimeframe = undefinedTimeframeId;
441-
firstTimeframeHbOrbitBegin = undefinedOrbit;
442-
isDefinedFirstTimeframeHbOrbitBegin = 0;
443487
};
444488

445489
void ReadoutEquipment::finalCounters()
@@ -449,27 +493,13 @@ void ReadoutEquipment::finalCounters()
449493
}
450494
};
451495

452-
void ReadoutEquipment::initRdhEquipment() { isRdhEquipment = true; }
453-
454496
uint64_t ReadoutEquipment::getTimeframeFromOrbit(uint32_t hbOrbit)
455497
{
456498
if (!isDefinedFirstTimeframeHbOrbitBegin) {
457499
firstTimeframeHbOrbitBegin = hbOrbit;
458500
isDefinedFirstTimeframeHbOrbitBegin = 1;
459501
}
460502
uint64_t tfId = 1 + (hbOrbit - firstTimeframeHbOrbitBegin) / getTimeframePeriodOrbits();
461-
if (tfId != currentTimeframe) {
462-
// theLog.log(LogDebugTrace, "TF %lu", tfId);
463-
statsNumberOfTimeframes++;
464-
465-
// detect gaps in TF id continuity
466-
if (tfId != currentTimeframe + 1) {
467-
if (cfgRdhDumpWarningEnabled) {
468-
theLog.log(LogWarningSupport_(3004), "Non-contiguous timeframe IDs %llu ... %llu", (unsigned long long)currentTimeframe, (unsigned long long)tfId);
469-
}
470-
}
471-
}
472-
currentTimeframe = tfId;
473503
return tfId;
474504
}
475505

src/ReadoutEquipment.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
#include "MemoryBankManager.h"
2222
#include "MemoryHandler.h"
2323
#include "RdhUtils.h"
24+
#include "RateRegulator.h"
2425

2526
using namespace AliceO2::Common;
2627

2728
class ReadoutEquipment
2829
{
2930
public:
30-
ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint);
31+
// constructor parameters:
32+
// - isRdhEquipment: to be set by equipments producing RDH-formatted data. Done here so that appropriate defaults can be used in constructor.
33+
ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, bool isRdhEquipment = 0);
3134
virtual ~ReadoutEquipment();
3235

3336
DataBlockContainerReference getBlock();
@@ -136,7 +139,8 @@ class ReadoutEquipment
136139
AliceO2::Common::Timer timeframeClock; // timeframe id should be increased at each clock cycle
137140
uint64_t currentTimeframe = 0; // id of current timeframe
138141
bool usingSoftwareClock = false; // if set, using internal software clock to generate timeframe id
139-
142+
uint64_t lastTimeframe = 0; // id of last timeframe (in data)
143+
140144
//const unsigned int LHCBunches = 3564; // number of bunches in LHC
141145
const unsigned int LHCOrbitRate = 11246; // LHC orbit rate, in Hz. 299792458 / 26659
142146
uint32_t timeframePeriodOrbits = 256; // timeframe interval duration in number of LHC orbits
@@ -150,6 +154,9 @@ class ReadoutEquipment
150154
int cfgRdhDumpWarningEnabled = 0; // flag to enable RDH warning log at runtime
151155
int cfgRdhUseFirstInPageEnabled = 0; // flag to enable reading of first RDH in page to populate readout headers
152156
//int cfgRdhCheckPacketCounterContiguous = 1; // flag to enable checking if RDH packetCounter value contiguous (done link-by-link)
157+
double cfgTfRateLimit = 0; // TF rate limit, to throttle data readout
158+
RateRegulator TFregulator; // clock counter for TF rate checks
159+
DataBlockContainerReference throttlePendingBlock; // in case TF rate limit was reached, a block may be set aside for later (when it belongs to next TF)
153160

154161
bool isRdhEquipment = false; // to be set true for RDH equipments
155162

@@ -166,8 +173,6 @@ class ReadoutEquipment
166173
// compute range of orbits for given timeframe
167174
void getTimeframeOrbitRange(uint64_t tfId, uint32_t& hbOrbitMin, uint32_t& hbOrbitMax);
168175

169-
void initRdhEquipment(); // to be called by equipments producing RDH-formatted data
170-
171176
unsigned long long statsRdhCheckOk = 0; // number of RDH structs which have passed check ok
172177
unsigned long long statsRdhCheckErr = 0; // number of RDH structs which have not passed check
173178
unsigned long long statsRdhCheckStreamErr = 0; // number of inconsistencies in RDH stream (e.g. ids/timing compared to previous RDH)

0 commit comments

Comments
 (0)