Skip to content

Commit 07546aa

Browse files
committed
RDH equipments defaults, FMQ stats hidden when not used, tfRateLimit handling, cruemu tf id clock
1 parent 09c5b08 commit 07546aa

10 files changed

+105
-68
lines changed

doc/releaseNotes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,7 @@ This file describes the main feature changes for each readout.exe released versi
348348

349349
## next version
350350
- equipment-player: fixed bug in "autochunk" replay mode. There was a "last packet invalid" error wrongly reported (and aborting replay) in the rare case when a link change would occur exactly at the beginning of a page.
351+
- Default value of equipment-rorc.rdhUseFirstInPageEnabled is now 1 for all RDH equipments (RORC, emu, player).
352+
- FMQ stats not printed when consoleUpdate=1 unless there is a running consumerFMQchannel with disableSending=0.
353+
- tfRateLimit is handled in the equipment directly and avoid potential issues with timeframe slicing at very slow rates.
354+
- equipment-cruemulator: TF id extracted from trigger counters (single timer source for improved coherency).

src/ConsumerFMQchannel.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ class ConsumerFMQchannel : public Consumer
118118
if (cfgDisableSending) {
119119
theLog.log(LogInfoDevel_(3002), "FMQ message sending disabled");
120120
disableSending = true;
121+
} else {
122+
gReadoutStats.isFairMQ = 1; // enable FMQ stats
121123
}
122124

123125
// configuration parameter: | consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|

src/ConsumerStats.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ class ConsumerStats : public Consumer
171171
if (consoleUpdate) {
172172
if (deltaT > 0) {
173173
theLog.log(LogInfoOps_(3003), "Last interval (%.2fs): blocksRx=%llu, block rate=%.2lf, bytesRx=%llu, rate=%s", deltaT, (unsigned long long)counterBlocksDiff, counterBlocksDiff / deltaT, (unsigned long long)counterBytesDiff, NumberOfBytesToString(counterBytesDiff * 8 / deltaT, "b/s", 1000).c_str());
174-
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
174+
if (gReadoutStats.isFairMQ) {
175+
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
176+
}
175177
}
176178
}
177179

src/ReadoutEquipment.cxx

Lines changed: 79 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
extern tRunNumber occRunNumber;
1717

18-
ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
18+
ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, bool setRdhEquipment)
1919
{
2020

2121
// example: browse config keys
@@ -28,6 +28,13 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
2828
// configuration parameter: | equipment-* | name | string| | Name used to identify this equipment (in logs). By default, it takes the name of the configuration section, equipment-xxx |
2929
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".name", name, cfgEntryPoint);
3030

31+
// change defaults for equipments generating data with RDH
32+
if (setRdhEquipment) {
33+
theLog.log(LogInfoDevel_(3002), "Equipment %s: generates data with RDH, using specific defaults", name.c_str());
34+
isRdhEquipment = true;
35+
cfgRdhUseFirstInPageEnabled = 1; // by default, use first RDH in page
36+
}
37+
3138
// configuration parameter: | equipment-* | id | int| | Optional. Number used to identify equipment (used e.g. in file recording). Range 1-65535.|
3239
int cfgEquipmentId = undefinedEquipmentId;
3340
cfg.getOptionalValue<int>(cfgEntryPoint + ".id", cfgEquipmentId);
@@ -89,6 +96,10 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
8996
this->debugFirstPages = cfgDebugFirstPages;
9097
}
9198

99+
// get TF rate from toplevel config
100+
cfgTfRateLimit = 0;
101+
cfg.getOptionalValue<double>("readout.tfRateLimit", cfgTfRateLimit);
102+
92103
// log config summary
93104
theLog.log(LogInfoDevel_(3002), "Equipment %s: from config [%s], max rate=%lf Hz, idleSleepTime=%d us, outputFifoSize=%d", name.c_str(), cfgEntryPoint.c_str(), readoutRate, cfgIdleSleepTime, cfgOutputFifoSize);
94105
theLog.log(LogInfoDevel_(3008), "Equipment %s: requesting memory pool %d pages x %d bytes from bank '%s', block aligned @ 0x%X, 1st page offset @ 0x%X", name.c_str(), (int)memoryPoolNumberOfPages, (int)memoryPoolPageSize, memoryBankName.c_str(), (int)cfgBlockAlign, (int)cfgFirstPageOffset);
@@ -105,7 +116,7 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint)
105116
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhDumpErrorEnabled", cfgRdhDumpErrorEnabled);
106117
// configuration parameter: | equipment-* | rdhDumpWarningEnabled | int | 0 | If set, a log message is printed for each RDH header warning found.|
107118
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhDumpWarningEnabled", cfgRdhDumpWarningEnabled);
108-
// configuration parameter: | equipment-* | rdhUseFirstInPageEnabled | int | 0 | If set, the first RDH in each data page is used to populate readout headers (e.g. linkId).|
119+
// configuration parameter: | equipment-* | rdhUseFirstInPageEnabled | int | 0 or 1 | If set, the first RDH in each data page is used to populate readout headers (e.g. linkId). Default is 1 for equipments generating data with RDH, 0 otherwsise. |
109120
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhUseFirstInPageEnabled", cfgRdhUseFirstInPageEnabled);
110121
theLog.log(LogInfoDevel_(3002), "RDH settings: rdhCheckEnabled=%d rdhDumpEnabled=%d rdhDumpErrorEnabled=%d rdhDumpWarningEnabled=%d rdhUseFirstInPageEnabled=%d", cfgRdhCheckEnabled, cfgRdhDumpEnabled, cfgRdhDumpErrorEnabled, cfgRdhDumpWarningEnabled, cfgRdhUseFirstInPageEnabled);
111122

@@ -194,6 +205,10 @@ void ReadoutEquipment::start()
194205
}
195206
clk0.reset();
196207

208+
// reset TF rate clock
209+
TFregulator.init(cfgTfRateLimit);
210+
throttlePendingBlock = nullptr;
211+
197212
// reset stats timer
198213
consoleStatsTimer.reset(cfgConsoleStatsUpdateTime * 1000000);
199214

@@ -214,6 +229,9 @@ void ReadoutEquipment::stop()
214229
this->finalCounters();
215230
ReadoutEquipment::finalCounters();
216231

232+
// cleanup
233+
throttlePendingBlock = nullptr;
234+
217235
for (int i = 0; i < (int)EquipmentStatsIndexes::maxIndex; i++) {
218236
if (equipmentStats[i].getCount()) {
219237
theLog.log(LogInfoDevel_(3003), "%s.%s = %llu (avg=%.2lf min=%llu max=%llu count=%llu)", name.c_str(), EquipmentStatsNames[i], (unsigned long long)equipmentStats[i].get(), equipmentStats[i].getAverage(), (unsigned long long)equipmentStats[i].getMinimum(), (unsigned long long)equipmentStats[i].getMaximum(), (unsigned long long)equipmentStats[i].getCount());
@@ -253,7 +271,6 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
253271
if (ptr->usingSoftwareClock) {
254272
if (ptr->timeframeClock.isTimeout()) {
255273
ptr->currentTimeframe++;
256-
ptr->statsNumberOfTimeframes++;
257274
ptr->timeframeClock.increment();
258275
}
259276
}
@@ -301,38 +318,63 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
301318

302319
// get next block
303320
DataBlockContainerReference nextBlock = nullptr;
304-
try {
305-
nextBlock = ptr->getNextBlock();
306-
} catch (...) {
307-
theLog.log(LogWarningDevel_(3230), "getNextBlock() exception");
308-
break;
309-
}
310-
// printf("getNextBlock=%p\n",nextBlock);
311-
if (nextBlock == nullptr) {
312-
break;
313-
}
314-
315-
// handle RDH-formatted data
316-
if (ptr->isRdhEquipment) {
317-
ptr->processRdh(nextBlock);
318-
}
321+
if (ptr->throttlePendingBlock != nullptr) {
322+
nextBlock = std::move(ptr->throttlePendingBlock);
323+
} else {
324+
try {
325+
nextBlock = ptr->getNextBlock();
326+
} catch (...) {
327+
theLog.log(LogWarningDevel_(3230), "getNextBlock() exception");
328+
break;
329+
}
319330

320-
// tag data with equipment Id, if set (will overwrite field if was already set by equipment)
321-
if (ptr->id != undefinedEquipmentId) {
322-
nextBlock->getData()->header.equipmentId = ptr->id;
331+
if (nextBlock == nullptr) {
332+
break;
333+
}
334+
335+
// handle RDH-formatted data
336+
if (ptr->isRdhEquipment) {
337+
ptr->processRdh(nextBlock);
338+
}
339+
340+
// tag data with equipment Id, if set (will overwrite field if was already set by equipment)
341+
if (ptr->id != undefinedEquipmentId) {
342+
nextBlock->getData()->header.equipmentId = ptr->id;
343+
}
344+
345+
// tag data with block id
346+
ptr->currentBlockId++; // don't start from 0
347+
nextBlock->getData()->header.blockId = ptr->currentBlockId;
348+
349+
// tag data with (dummy) timeframeid, if none set
350+
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
351+
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
352+
}
353+
354+
// tag data with run number
355+
nextBlock->getData()->header.runNumber = occRunNumber;
323356
}
324-
325-
// tag data with block id
326-
ptr->currentBlockId++; // don't start from 0
327-
nextBlock->getData()->header.blockId = ptr->currentBlockId;
328-
329-
// tag data with (dummy) timeframeid, if none set
330-
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
331-
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
357+
358+
// check TF id of new block
359+
uint64_t tfId = nextBlock->getData()->header.timeframeId;
360+
if (tfId != ptr->lastTimeframe) {
361+
// regulate TF rate if needed
362+
if (!ptr->TFregulator.next()) {
363+
ptr->throttlePendingBlock = std::move(nextBlock); // keep block with new TF for later
364+
isActive = false; // ask for delay before retry
365+
break;
366+
}
367+
368+
ptr->statsNumberOfTimeframes++;
369+
370+
// detect gaps in TF id continuity
371+
if (tfId != ptr->lastTimeframe + 1) {
372+
if (ptr->cfgRdhDumpWarningEnabled) {
373+
theLog.log(LogWarningSupport_(3004), "Non-contiguous timeframe IDs %llu ... %llu", (unsigned long long)ptr->lastTimeframe, (unsigned long long)tfId);
374+
}
375+
}
332376
}
333-
334-
// tag data with run number
335-
nextBlock->getData()->header.runNumber = occRunNumber;
377+
ptr->lastTimeframe = tfId;
336378

337379
// update rate-limit clock
338380
if (ptr->readoutRate > 0) {
@@ -433,13 +475,15 @@ void ReadoutEquipment::initCounters()
433475
statsNumberOfTimeframes = 0;
434476

435477
// reset timeframe clock
478+
currentTimeframe = undefinedTimeframeId;
479+
lastTimeframe = undefinedTimeframeId;
480+
firstTimeframeHbOrbitBegin = undefinedOrbit;
481+
isDefinedFirstTimeframeHbOrbitBegin = 0;
436482
if (usingSoftwareClock) {
437483
timeframeClock.reset(1000000 / timeframeRate);
484+
currentTimeframe = 1;
438485
}
439486

440-
currentTimeframe = undefinedTimeframeId;
441-
firstTimeframeHbOrbitBegin = undefinedOrbit;
442-
isDefinedFirstTimeframeHbOrbitBegin = 0;
443487
};
444488

445489
void ReadoutEquipment::finalCounters()
@@ -449,27 +493,13 @@ void ReadoutEquipment::finalCounters()
449493
}
450494
};
451495

452-
void ReadoutEquipment::initRdhEquipment() { isRdhEquipment = true; }
453-
454496
uint64_t ReadoutEquipment::getTimeframeFromOrbit(uint32_t hbOrbit)
455497
{
456498
if (!isDefinedFirstTimeframeHbOrbitBegin) {
457499
firstTimeframeHbOrbitBegin = hbOrbit;
458500
isDefinedFirstTimeframeHbOrbitBegin = 1;
459501
}
460502
uint64_t tfId = 1 + (hbOrbit - firstTimeframeHbOrbitBegin) / getTimeframePeriodOrbits();
461-
if (tfId != currentTimeframe) {
462-
// theLog.log(LogDebugTrace, "TF %lu", tfId);
463-
statsNumberOfTimeframes++;
464-
465-
// detect gaps in TF id continuity
466-
if (tfId != currentTimeframe + 1) {
467-
if (cfgRdhDumpWarningEnabled) {
468-
theLog.log(LogWarningSupport_(3004), "Non-contiguous timeframe IDs %llu ... %llu", (unsigned long long)currentTimeframe, (unsigned long long)tfId);
469-
}
470-
}
471-
}
472-
currentTimeframe = tfId;
473503
return tfId;
474504
}
475505

src/ReadoutEquipment.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
#include "MemoryBankManager.h"
2222
#include "MemoryHandler.h"
2323
#include "RdhUtils.h"
24+
#include "RateRegulator.h"
2425

2526
using namespace AliceO2::Common;
2627

2728
class ReadoutEquipment
2829
{
2930
public:
30-
ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint);
31+
// constructor parameters:
32+
// - isRdhEquipment: to be set by equipments producing RDH-formatted data. Done here so that appropriate defaults can be used in constructor.
33+
ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, bool isRdhEquipment = 0);
3134
virtual ~ReadoutEquipment();
3235

3336
DataBlockContainerReference getBlock();
@@ -136,7 +139,8 @@ class ReadoutEquipment
136139
AliceO2::Common::Timer timeframeClock; // timeframe id should be increased at each clock cycle
137140
uint64_t currentTimeframe = 0; // id of current timeframe
138141
bool usingSoftwareClock = false; // if set, using internal software clock to generate timeframe id
139-
142+
uint64_t lastTimeframe = 0; // id of last timeframe (in data)
143+
140144
//const unsigned int LHCBunches = 3564; // number of bunches in LHC
141145
const unsigned int LHCOrbitRate = 11246; // LHC orbit rate, in Hz. 299792458 / 26659
142146
uint32_t timeframePeriodOrbits = 256; // timeframe interval duration in number of LHC orbits
@@ -150,6 +154,9 @@ class ReadoutEquipment
150154
int cfgRdhDumpWarningEnabled = 0; // flag to enable RDH warning log at runtime
151155
int cfgRdhUseFirstInPageEnabled = 0; // flag to enable reading of first RDH in page to populate readout headers
152156
//int cfgRdhCheckPacketCounterContiguous = 1; // flag to enable checking if RDH packetCounter value contiguous (done link-by-link)
157+
double cfgTfRateLimit = 0; // TF rate limit, to throttle data readout
158+
RateRegulator TFregulator; // clock counter for TF rate checks
159+
DataBlockContainerReference throttlePendingBlock; // in case TF rate limit was reached, a block may be set aside for later (when it belongs to next TF)
153160

154161
bool isRdhEquipment = false; // to be set true for RDH equipments
155162

@@ -166,8 +173,6 @@ class ReadoutEquipment
166173
// compute range of orbits for given timeframe
167174
void getTimeframeOrbitRange(uint64_t tfId, uint32_t& hbOrbitMin, uint32_t& hbOrbitMax);
168175

169-
void initRdhEquipment(); // to be called by equipments producing RDH-formatted data
170-
171176
unsigned long long statsRdhCheckOk = 0; // number of RDH structs which have passed check ok
172177
unsigned long long statsRdhCheckErr = 0; // number of RDH structs which have not passed check
173178
unsigned long long statsRdhCheckStreamErr = 0; // number of inconsistencies in RDH stream (e.g. ids/timing compared to previous RDH)

src/ReadoutEquipmentCruEmulator.cxx

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,9 @@ class ReadoutEquipmentCruEmulator : public ReadoutEquipment
7373
std::vector<DataBlockContainerReference> pendingBlocks; // pages being filled (1 per link)
7474
};
7575

76-
ReadoutEquipmentCruEmulator::ReadoutEquipmentCruEmulator(ConfigFile& cfg, std::string cfgEntryPoint) : ReadoutEquipment(cfg, cfgEntryPoint)
76+
ReadoutEquipmentCruEmulator::ReadoutEquipmentCruEmulator(ConfigFile& cfg, std::string cfgEntryPoint) : ReadoutEquipment(cfg, cfgEntryPoint, 1) // this is RDH-data equipment
7777
{
7878

79-
// declare RDH equipment
80-
initRdhEquipment();
81-
8279
// get configuration values
8380
// configuration parameter: | equipment-cruemulator-* | maxBlocksPerPage | int | 0 | [obsolete- not used]. Maximum number of blocks per page. |
8481
// configuration parameter: | equipment-cruemulator-* | cruBlockSize | int | 8192 | Size of a RDH block. |
@@ -182,7 +179,7 @@ Thread::CallbackResult ReadoutEquipmentCruEmulator::prepareBlocks()
182179

183180
nowOrbit = LHCorbit;
184181
nowBc = LHCbc;
185-
uint64_t nowId = getCurrentTimeframe();
182+
uint64_t nowId = getTimeframeFromOrbit(nowOrbit);
186183

187184
int linkId = cfgLinkId + currentLink;
188185
int bytesAvailableInPage = b->header.dataSize; // a bit less than memoryPoolPageSize;
@@ -287,7 +284,6 @@ Thread::CallbackResult ReadoutEquipmentCruEmulator::prepareBlocks()
287284
// no need to fill header defaults, this is done by getNewDataBlockContainer()
288285
// only adjust payload size
289286
b->header.dataSize = dSize;
290-
b->header.timeframeId = nowId;
291287
b->header.linkId = linkId;
292288

293289
readyBlocks->push(pendingBlocks[currentLink]);

src/ReadoutEquipmentPlayer.cxx

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,9 @@ void ReadoutEquipmentPlayer::copyFileDataToPage(void* page)
7575
}
7676
}
7777

78-
ReadoutEquipmentPlayer::ReadoutEquipmentPlayer(ConfigFile& cfg, std::string cfgEntryPoint) : ReadoutEquipment(cfg, cfgEntryPoint)
78+
ReadoutEquipmentPlayer::ReadoutEquipmentPlayer(ConfigFile& cfg, std::string cfgEntryPoint) : ReadoutEquipment(cfg, cfgEntryPoint, 1) // this is RDH-data equipment
7979
{
8080

81-
// declare RDH equipment
82-
initRdhEquipment();
83-
8481
auto errorHandler = [&](const std::string& err) {
8582
if (fp != nullptr) {
8683
fclose(fp);

src/ReadoutEquipmentRORC.cxx

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,9 @@ class ReadoutEquipmentRORC : public ReadoutEquipment
7070
struct ReadoutEquipmentRORCException : virtual Exception {
7171
};
7272

73-
ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile& cfg, std::string name) : ReadoutEquipment(cfg, name)
73+
ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile& cfg, std::string name) : ReadoutEquipment(cfg, name, 1) // this is RDH-data equipment
7474
{
7575

76-
// declare RDH equipment
77-
initRdhEquipment();
78-
7976
try {
8077

8178
// get parameters from configuration

src/ReadoutStats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ReadoutStats
4646
void print();
4747

4848
ReadoutStatsCounters counters;
49+
bool isFairMQ; // flag to report when FairMQ used
4950
};
5051

5152
extern ReadoutStats gReadoutStats;

src/mainReadout.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ int Readout::configure(const boost::property_tree::ptree& properties)
295295
{
296296
theLog.log(LogInfoSupport_(3005), "Readout executing CONFIGURE");
297297

298+
// reset some flags
299+
gReadoutStats.isFairMQ = 0; // disable FMQ stats
300+
298301
// load configuration file
299302
theLog.log(LogInfoSupport, "Reading configuration from %s %s", cfgFileURI, cfgFileEntryPoint);
300303
try {

0 commit comments

Comments
 (0)