Skip to content

Commit 6582726

Browse files
authored
Merge pull request #193 from sy-c/master
features
2 parents 2df7f91 + e1a2dd3 commit 6582726

File tree

7 files changed

+139
-63
lines changed

7 files changed

+139
-63
lines changed

doc/releaseNotes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,3 +354,8 @@ This file describes the main feature changes for each readout.exe released versi
354354
- equipment-cruemulator: TF id extracted from trigger counters (single timer source for improved coherency).
355355
- Memory allocation policy updated: all readout memory is locked (RAM only, can not be swapped). A warning is reported if not.
356356
- consumer-FMQchannel: checks are done before FMQ shared memory region is created, to avoid going in a state with over-committed memory (no checks done in FMQ library about the validity of the region created, which can cause severe crash when trying to access it). Both /proc/meminfo (MemFree) and /dev/shm (if using shmem transport type) should report enough available memory before proceeding. Memory is also immediately locked and zeroed to avoid later crashes.
357+
358+
## Next version
359+
- Updated configuration parameters:
360+
- added readout.disableTimefarmes: when set, all timeframe-related features are disabled (STF slicing, TF rate limits, etc). All data are tagged with TF id = 0. To be used for some calibration runs not using a central trigger clock.
361+
- added consumer-FMQchannel.checkResources: controls which resources are checked for fitting unmanaged region. This is a comma-separated list of items to be checked. By default, no checks are done. Recommended value: /dev/shm, MemAvailable.

src/ConsumerFMQchannel.cxx

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -188,34 +188,44 @@ class ConsumerFMQchannel : public Consumer
188188

189189
// check system resources first, as region creation does not check available memory, so bad crash could occur later
190190
theLog.log(LogInfoDevel_(3002), "Configuring memory buffer %lld MB", (long long)(mMemorySize/1048576LL));
191-
192-
// free system memory
193-
unsigned long long freeBytes;
194-
if (getStatsFreeMemory(freeBytes)) {
195-
theLog.log(LogWarningSupport_(3230), "Can not get stats about system free memory available");
196-
} else {
197-
theLog.log(LogInfoSupport_(3230), "Stats free memory available: %lld MB", (long long)(freeBytes/1048576LL));
198-
if ((long long)freeBytes < mMemorySize) {
199-
theLog.log(LogErrorSupport_(3230), "Not enough system memory available - check /proc/meminfo");
200-
throw "ConsumerFMQ: can not allocate shared memory region";
201-
} else {
202191

203-
}
192+
// configuration parameter: | consumer-FairMQChannel-* | checkResources | string | | Check beforehand if unmanaged region would fit in given list of resources. Comma-separated list of items to be checked: eg /dev/shm, MemFree, MemAvailable. (any filesystem path, and any /proc/meminfo entry).|
193+
std::string cfgCheckResources;
194+
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".checkResources", cfgCheckResources);
195+
bool isResourceError = 0;
196+
std::vector<std::string> resources;
197+
198+
if (getListFromString(cfgCheckResources, resources)) {
199+
throw("Can not parse configuration item checkResources");
204200
}
205-
206-
// free SHM memory
207-
// check only if transport is of type shmem
208-
if (cfgTransportType == "shmem") {
209-
if (getStatsFreeSHM(freeBytes)) {
210-
theLog.log(LogWarningSupport_(3230), "Can not get stats about shared memory available");
201+
202+
for(auto r : resources) {
203+
unsigned long long freeBytes;
204+
205+
int getStatsErr = 0;
206+
if (r.find_first_of("/")!=std::string::npos) {
207+
// this is a path
208+
getStatsErr = getStatsFilesystem(freeBytes, r);
211209
} else {
212-
theLog.log(LogInfoSupport_(3230), "Stats shared memory available: %lld MB", (long long)(freeBytes/1048576LL));
210+
// look in /proc/meminfo
211+
getStatsErr = getStatsMemory(freeBytes, r);
212+
r = "/proc/meminfo " + r;
213+
}
214+
215+
if (getStatsErr) {
216+
theLog.log(LogWarningSupport_(3230), "Can not get stats for %s", r.c_str());
217+
} else {
218+
theLog.log(LogInfoSupport_(3230), "Stats for %s : %lld MB available", r.c_str(), (long long)(freeBytes/1048576LL));
213219
if ((long long)freeBytes < mMemorySize) {
214-
theLog.log(LogErrorSupport_(3230), "Not enough shared memory available - check /dev/shm");
215-
throw "ConsumerFMQ: can not allocate shared memory region";
220+
theLog.log(LogErrorSupport_(3230), "Not enough space on %s", r.c_str());
221+
isResourceError = 1;
216222
}
217223
}
218224
}
225+
226+
if (isResourceError) {
227+
throw "ConsumerFMQ: can not allocate shared memory region, system resources check failed";
228+
}
219229

220230
theLog.log(LogInfoDevel_(3008), "Creating FMQ unmanaged memory region");
221231
memoryBuffer = sendingChannel->Transport()->CreateUnmanagedRegion(mMemorySize, [](void* /*data*/, size_t /*size*/, void* hint) { // cleanup callback

src/ReadoutEquipment.cxx

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
100100
cfgTfRateLimit = 0;
101101
cfg.getOptionalValue<double>("readout.tfRateLimit", cfgTfRateLimit);
102102

103+
// get TF disable flag from toplevel config
104+
cfgDisableTimeframes = 0;
105+
cfg.getOptionalValue<int>("readout.disableTimeframes", cfgDisableTimeframes);
106+
103107
// log config summary
104108
theLog.log(LogInfoDevel_(3002), "Equipment %s: from config [%s], max rate=%lf Hz, idleSleepTime=%d us, outputFifoSize=%d", name.c_str(), cfgEntryPoint.c_str(), readoutRate, cfgIdleSleepTime, cfgOutputFifoSize);
105109
theLog.log(LogInfoDevel_(3008), "Equipment %s: requesting memory pool %d pages x %d bytes from bank '%s', block aligned @ 0x%X, 1st page offset @ 0x%X", name.c_str(), (int)memoryPoolNumberOfPages, (int)memoryPoolPageSize, memoryBankName.c_str(), (int)cfgBlockAlign, (int)cfgFirstPageOffset);
@@ -120,22 +124,24 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
120124
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhUseFirstInPageEnabled", cfgRdhUseFirstInPageEnabled);
121125
theLog.log(LogInfoDevel_(3002), "RDH settings: rdhCheckEnabled=%d rdhDumpEnabled=%d rdhDumpErrorEnabled=%d rdhDumpWarningEnabled=%d rdhUseFirstInPageEnabled=%d", cfgRdhCheckEnabled, cfgRdhDumpEnabled, cfgRdhDumpErrorEnabled, cfgRdhDumpWarningEnabled, cfgRdhUseFirstInPageEnabled);
122126

123-
// configuration parameter: | equipment-* | TFperiod | int | 256 | Duration of a timeframe, in number of LHC orbits. |
124-
int cfgTFperiod = 256;
125-
cfg.getOptionalValue<int>(cfgEntryPoint + ".TFperiod", cfgTFperiod);
126-
timeframePeriodOrbits = cfgTFperiod;
127+
if (!cfgDisableTimeframes) {
128+
// configuration parameter: | equipment-* | TFperiod | int | 256 | Duration of a timeframe, in number of LHC orbits. |
129+
int cfgTFperiod = 256;
130+
cfg.getOptionalValue<int>(cfgEntryPoint + ".TFperiod", cfgTFperiod);
131+
timeframePeriodOrbits = cfgTFperiod;
127132

128-
if (!cfgRdhUseFirstInPageEnabled) {
129-
usingSoftwareClock = true; // if RDH disabled, use internal clock for TF id
130-
}
131-
theLog.log(LogInfoDevel_(3002), "Timeframe length = %d orbits", (int)timeframePeriodOrbits);
132-
if (usingSoftwareClock) {
133-
timeframeRate = LHCOrbitRate * 1.0 / timeframePeriodOrbits; // timeframe rate, in Hz
134-
theLog.log(LogInfoDevel_(3002), "Timeframe IDs generated by software, %.2lf Hz", timeframeRate);
135-
} else {
136-
theLog.log(LogInfoDevel_(3002), "Timeframe IDs generated from RDH trigger counters");
133+
if (!cfgRdhUseFirstInPageEnabled) {
134+
usingSoftwareClock = true; // if RDH disabled, use internal clock for TF id
135+
}
136+
theLog.log(LogInfoDevel_(3002), "Timeframe length = %d orbits", (int)timeframePeriodOrbits);
137+
if (usingSoftwareClock) {
138+
timeframeRate = LHCOrbitRate * 1.0 / timeframePeriodOrbits; // timeframe rate, in Hz
139+
theLog.log(LogInfoDevel_(3002), "Timeframe IDs generated by software, %.2lf Hz", timeframeRate);
140+
} else {
141+
theLog.log(LogInfoDevel_(3002), "Timeframe IDs generated from RDH trigger counters");
142+
}
137143
}
138-
144+
139145
// init stats
140146
equipmentStats.resize(EquipmentStatsIndexes::maxIndex);
141147
equipmentStatsLast.resize(EquipmentStatsIndexes::maxIndex);
@@ -346,9 +352,14 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
346352
ptr->currentBlockId++; // don't start from 0
347353
nextBlock->getData()->header.blockId = ptr->currentBlockId;
348354

349-
// tag data with (dummy) timeframeid, if none set
350-
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
351-
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
355+
if (ptr->cfgDisableTimeframes) {
356+
// disable TF id
357+
nextBlock->getData()->header.timeframeId = undefinedTimeframeId;
358+
} else {
359+
// tag data with (dummy) timeframeid, if none set
360+
if (nextBlock->getData()->header.timeframeId == undefinedTimeframeId) {
361+
nextBlock->getData()->header.timeframeId = ptr->getCurrentTimeframe();
362+
}
352363
}
353364

354365
// tag data with run number
@@ -647,14 +658,16 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
647658
}
648659

649660
// check no timeframe overlap in page
650-
if (((blockHeader.timeframeOrbitFirst < blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) || (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast))) || ((blockHeader.timeframeOrbitFirst > blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) && (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast)))) {
651-
if (cfgRdhDumpErrorEnabled) {
652-
theLog.log(logRdhErrorsToken, "Equipment %d RDH #%d @ 0x%X : TimeFrame ID change in page not allowed : orbit 0x%08X not in range [0x%08X,0x%08X]", id, rdhIndexInPage, (unsigned int)pageOffset, (int)h.getTriggerOrbit(), (int)blockHeader.timeframeOrbitFirst, (int)blockHeader.timeframeOrbitLast);
653-
}
654-
statsRdhCheckStreamErr++;
655-
break; // stop checking this page
661+
if (!cfgDisableTimeframes) {
662+
if (((blockHeader.timeframeOrbitFirst < blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) || (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast))) || ((blockHeader.timeframeOrbitFirst > blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) && (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast)))) {
663+
if (cfgRdhDumpErrorEnabled) {
664+
theLog.log(logRdhErrorsToken, "Equipment %d RDH #%d @ 0x%X : TimeFrame ID change in page not allowed : orbit 0x%08X not in range [0x%08X,0x%08X]", id, rdhIndexInPage, (unsigned int)pageOffset, (int)h.getTriggerOrbit(), (int)blockHeader.timeframeOrbitFirst, (int)blockHeader.timeframeOrbitLast);
665+
}
666+
statsRdhCheckStreamErr++;
667+
break; // stop checking this page
668+
}
656669
}
657-
670+
658671
/*
659672
// check packetCounter is contiguous
660673
if (cfgRdhCheckPacketCounterContiguous) {

src/ReadoutEquipment.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ class ReadoutEquipment
155155
int cfgRdhUseFirstInPageEnabled = 0; // flag to enable reading of first RDH in page to populate readout headers
156156
//int cfgRdhCheckPacketCounterContiguous = 1; // flag to enable checking if RDH packetCounter value contiguous (done link-by-link)
157157
double cfgTfRateLimit = 0; // TF rate limit, to throttle data readout
158+
int cfgDisableTimeframes = 0; // When set, all TF features disabled
158159
RateRegulator TFregulator; // clock counter for TF rate checks
159160
DataBlockContainerReference throttlePendingBlock; // in case TF rate limit was reached, a block may be set aside for later (when it belongs to next TF)
160161

src/ReadoutUtils.cxx

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,18 +181,48 @@ int getIntegerListFromString(const std::string& input, std::vector<int>& output)
181181
return 0;
182182
}
183183

184-
int getStatsFreeMemory(unsigned long long &freeBytes) {
184+
185+
// check if a string made of only of simple chars
186+
// arbitrary selection: letters, digits, ()_
187+
bool isSimpleString(const std::string &str) {
188+
return find_if_not(str.begin(), str.end(),
189+
[](char c) { return (isalnum(c) || (c == '(') || (c == ')')|| (c == '_')); }) == str.end();
190+
}
191+
192+
193+
int getListFromString(const std::string& input, std::vector<std::string>& output)
194+
{
195+
// coma-separated list of simple strings
196+
std::istringstream f(input);
197+
std::string s;
198+
while (std::getline(f, s, ',')) {
199+
// trim
200+
const std::string blanks = "\t\n\v\f\r ";
201+
s.erase(s.find_last_not_of(blanks) + 1);
202+
s.erase(0, s.find_first_not_of(blanks));
203+
output.push_back(s);
204+
}
205+
return 0;
206+
}
207+
208+
int getStatsMemory(unsigned long long &freeBytes, const std::string &keyword) {
185209
FILE *fp;
186210
const int lineBufSz = 128;
187211
char lineBuf[lineBufSz];
188212
long long value;
189213
int success = 0;
190214
freeBytes = 0;
191215

216+
// check if keyword looks suspicious
217+
if (!isSimpleString(keyword)) {
218+
return -1;
219+
}
220+
192221
if ((fp = fopen("/proc/meminfo", "r")) != NULL) {
193222

223+
std::string entryLine = keyword + ": %lld kB";
194224
while (fgets(lineBuf, lineBufSz, fp) != NULL) {
195-
if ( sscanf(lineBuf, "MemFree: %lld kB", &value) == 1) {
225+
if ( sscanf(lineBuf, entryLine.c_str(), &value) == 1) {
196226
freeBytes = ((unsigned long long)value) * 1024;
197227
success = 1;
198228
break;
@@ -208,12 +238,12 @@ int getStatsFreeMemory(unsigned long long &freeBytes) {
208238
return 0;
209239
}
210240

211-
int getStatsFreeSHM(unsigned long long &freeBytes) {
241+
int getStatsFilesystem(unsigned long long &freeBytes, const std::string &path) {
212242
int success = 0;
213243
freeBytes = 0;
214244

215245
try {
216-
freeBytes = (unsigned long long) (std::filesystem::space("/dev/shm")).free;
246+
freeBytes = (unsigned long long) (std::filesystem::space(path)).free;
217247
success = 1;
218248
}
219249
catch (...) {

src/ReadoutUtils.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ int getKeyValuePairsFromString(const std::string& input, std::map<std::string, s
4545
// returns 0 on success, -1 on error
4646
int getIntegerListFromString(const std::string& input, std::vector<int>& output);
4747

48+
// parse a string of coma-separated strings into a vector
49+
// returns 0 on success, -1 on error
50+
int getListFromString(const std::string& input, std::vector<std::string>& output);
51+
4852
// function to convert a value in bytes to a prefixed number 3+3 digits
4953
// suffix is the "base unit" to add after calculated prefix, e.g. Byte-> kBytes
5054
std::string NumberOfBytesToString(double value, const char* suffix, int base = 1024);
@@ -56,15 +60,15 @@ int getProcessStats(double& uTime, double& sTime);
5660
typedef uint32_t tRunNumber;
5761
typedef uint32_t tTimeframeId;
5862

59-
// function to retrieve amount of free memory on the system
63+
// function to retrieve some memory statistics on the system
6064
// Works only when /proc/meminfo available
65+
// Look for entry corresponding to provided keyword (eg: MemFree, MemAvailable)
6166
// returns 0 on success, -1 on error
62-
int getStatsFreeMemory(unsigned long long &freeBytes);
67+
int getStatsMemory(unsigned long long &freeBytes, const std::string& keyword);
6368

64-
// function to retrieve amount of free memory on the system
65-
// Works only when /dev/shm available
69+
// function to retrieve amount of free area for given path on the filesystem
6670
// returns 0 on success, -1 on error
67-
int getStatsFreeSHM(unsigned long long &freeBytes);
71+
int getStatsFilesystem(unsigned long long &freeBytes, const std::string& path);
6872

6973
// end of _READOUTUTILS_H
7074
#endif

src/mainReadout.cxx

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class Readout
143143
// configuration parameters
144144
double cfgExitTimeout;
145145
double cfgFlushEquipmentTimeout;
146+
int cfgDisableTimeframes;
146147
int cfgDisableAggregatorSlicing;
147148
double cfgAggregatorSliceTimeout;
148149
double cfgAggregatorStfTimeout;
@@ -468,10 +469,21 @@ int Readout::configure(const boost::property_tree::ptree& properties)
468469
// configuration parameter: | readout | tfRateLimit | double | 0 | When set, the output is limited to a given timeframe rate. |
469470
cfgTfRateLimit = 0;
470471
cfg.getOptionalValue<double>("readout.tfRateLimit", cfgTfRateLimit);
472+
473+
// configuration parameter: | readout | disableTimeframes | int | 0 | When set, all timeframe related features are disabled (this may supersede other config parameters). |
474+
cfgDisableTimeframes = 0;
475+
cfg.getOptionalValue<int>("readout.disableTimeframes", cfgDisableTimeframes);
476+
if (cfgDisableTimeframes) {
477+
cfgDisableAggregatorSlicing = 1;
478+
cfgTfRateLimit = 0;
479+
theLog.log(LogInfoDevel, "Timeframes disabled");
480+
}
481+
471482
if (cfgTfRateLimit > 0) {
472483
theLog.log(LogInfoDevel, "Timeframe rate limit = % .2lf Hz", cfgTfRateLimit);
473484
}
474485

486+
475487
// configuration parameter: | readout | logbookEnabled | int | 0 | When set, the logbook is enabled and populated with readout stats at runtime. |
476488
cfgLogbookEnabled = 0;
477489
cfg.getOptionalValue<int>("readout.logbookEnabled", cfgLogbookEnabled);
@@ -828,15 +840,16 @@ int Readout::start()
828840
if (cfgDisableAggregatorSlicing) {
829841
theLog.log(LogInfoDevel, "Aggregator slicing disabled");
830842
agg->disableSlicing = 1;
831-
}
832-
if (cfgAggregatorSliceTimeout > 0) {
833-
theLog.log(LogInfoDevel, "Aggregator slice timeout = %.2lf seconds", cfgAggregatorSliceTimeout);
834-
agg->cfgSliceTimeout = cfgAggregatorSliceTimeout;
835-
}
836-
if (cfgAggregatorStfTimeout > 0) {
837-
theLog.log(LogInfoDevel, "Aggregator subtimeframe timeout = %.2lf seconds", cfgAggregatorStfTimeout);
838-
agg->cfgStfTimeout = cfgAggregatorStfTimeout;
839-
agg->enableStfBuilding = 1;
843+
} else {
844+
if (cfgAggregatorSliceTimeout > 0) {
845+
theLog.log(LogInfoDevel, "Aggregator slice timeout = %.2lf seconds", cfgAggregatorSliceTimeout);
846+
agg->cfgSliceTimeout = cfgAggregatorSliceTimeout;
847+
}
848+
if (cfgAggregatorStfTimeout > 0) {
849+
theLog.log(LogInfoDevel, "Aggregator subtimeframe timeout = %.2lf seconds", cfgAggregatorStfTimeout);
850+
agg->cfgStfTimeout = cfgAggregatorStfTimeout;
851+
agg->enableStfBuilding = 1;
852+
}
840853
}
841854

842855
agg->start();

0 commit comments

Comments
 (0)