Skip to content

Commit 7002549

Browse files
authored
Merge pull request #244 from sy-c/master
v2.17.1
2 parents de4389f + e184b48 commit 7002549

13 files changed

+181
-9
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ find_package(Configuration)
6969
find_package(Numa)
7070
find_package(FairMQ)
7171
find_package(ZLIB)
72-
find_package(RDMA)
72+
#find_package(RDMA)
7373
find_package(Occ)
7474
find_package(BookkeepingApi)
7575
find_package(ZMQ)

doc/configurationParameters.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ The parameters related to 3rd-party libraries are described here for convenience
103103
| equipment-* | dataPagesLogPath | string | | Path where to save a summary of each data pages generated by equipment. |
104104
| equipment-* | debugFirstPages | int | 0 | If set, print debug information for first (given number of) data pages readout. |
105105
| equipment-* | disableOutput | int | 0 | If non-zero, data generated by this equipment is discarded immediately and is not pushed to output fifo of readout thread. Used for testing. |
106+
| equipment-* | dropPagesWithError | int | 0 | If set, the pages with RDH errors are discarded (requires rdhCheckEnabled or rdhUseFirstInPage). |
106107
| equipment-* | enabled | int | 1 | Enable (value=1) or disable (value=0) the equipment. |
107108
| equipment-* | equipmentType | string | | The type of equipment to be instanciated. One of: dummy, rorc, cruEmulator |
108109
| equipment-* | firstPageOffset | bytes | | Offset of the first page, in bytes from the beginning of the memory pool. If not set (recommended), will start at memoryPoolPageSize (one free page is kept before the first usable page for readout internal use). |

doc/releaseNotes.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,3 +524,9 @@ This file describes the main feature changes for each readout.exe released versi
524524
## v2.17.0 - 01/03/2022
525525
- Updated configuration syntax: section names ending with `-*` can be used to define default parameters. They are applied to all section with similar names. Existing key-value pairs are not overwritten, but are defined according to defaults if they don't exist. For example, it is possible to define the TFperiod for all equipments by adding a section named `[equipment-*]` with `TFperiod=32`.
526526
- Updated readout to new bookkeeping API.
527+
528+
## v2.17.1 - 08/03/2022
529+
- Updated configuration parameters:
530+
- added equipment-*.dropPagesWithError: if set, the pages with RDH errors are discarded (requires rdhCheckEnabled or rdhUseFirstInPage). This may be used if downstream software is not robust to RDH errors.
531+
- Disabled unused RDMA features (still available by switch in CMake)
532+
- Added feature for memory banks real time monitoring. Enabled by setting membanksMonitorRate in /etc/o2.d/readout-defaults.cfg. Output (high-rate text) can be seen locally in real time with `tail -f /tmp/readout-monitor-mempool-(id)`. The status of each page in the bank is displayed.

src/MemoryBankManager.cxx

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@
1414
#include <unistd.h>
1515
#include <sys/mman.h>
1616
#include <thread>
17+
#include <Common/Timer.h>
18+
#include <sys/types.h>
19+
#include <sys/stat.h>
20+
1721

1822
#include "readoutInfoLogger.h"
1923

20-
MemoryBankManager::MemoryBankManager() {}
24+
MemoryBankManager::MemoryBankManager() {
25+
}
2126

22-
MemoryBankManager::~MemoryBankManager() {}
27+
MemoryBankManager::~MemoryBankManager() {
28+
stopMonitoring();
29+
}
2330

2431
int MemoryBankManager::addBank(std::shared_ptr<MemoryBank> bankPtr, std::string name)
2532
{
@@ -39,6 +46,15 @@ int MemoryBankManager::addBank(std::shared_ptr<MemoryBank> bankPtr, std::string
3946
return 0;
4047
}
4148

49+
std::string MemoryBankManager::getMonitorFifoPath(int id) {
50+
if (id < 0) {
51+
return monitorPath;
52+
}
53+
char fn[128];
54+
snprintf(fn,sizeof(fn),"%s-%d", monitorPath.c_str(), id);
55+
return fn;
56+
}
57+
4258
std::shared_ptr<MemoryPagesPool> MemoryBankManager::getPagedPool(size_t pageSize, size_t pageNumber, std::string bankName, size_t firstPageOffset, size_t blockAlign, int numaNode)
4359
{
4460

@@ -191,6 +207,13 @@ std::shared_ptr<MemoryPagesPool> MemoryBankManager::getPagedPool(size_t pageSize
191207
std::shared_ptr<MemoryPagesPool> mpp;
192208
try {
193209
mpp = std::make_shared<MemoryPagesPool>(pageSize, pageNumber, &(((char*)baseAddress)[offset]), blockSize, nullptr, firstPageOffset, newId);
210+
if (mpp != nullptr) {
211+
// create FIFO for monitoring
212+
mkfifo(getMonitorFifoPath(newId).c_str(), S_IRUSR | S_IWUSR | S_IRGRP| S_IROTH);
213+
// keep reference to created pool for monitoring purpose
214+
std::unique_lock<std::mutex> lock(bankMutex);
215+
pools.push_back(mpp);
216+
}
194217
}
195218
catch (int err) {
196219
theLog.log(LogErrorSupport_(3230), "Can not create memory pool from bank: error %d", err);
@@ -220,11 +243,64 @@ int MemoryBankManager::getMemoryRegions(std::vector<memoryRange>& ranges)
220243
void MemoryBankManager::reset()
221244
{
222245
std::unique_lock<std::mutex> lock(bankMutex);
246+
// release references to page pools
247+
pools.clear();
248+
// release banks
223249
for (auto& it : banks) {
224250
int useCount = it.bank.use_count();
225251
theLog.log(LogInfoDevel_(3008), "Releasing bank %s%s", it.name.c_str(), (useCount == 1) ? "" : "warning - still in use elsewhere !");
226252
}
227253
banks.clear();
228254
poolIndex = -1;
255+
stopMonitoring();
229256
}
230257

258+
void MemoryBankManager::monitorThLoop() {
259+
AliceO2::Common::Timer t;
260+
t.reset(1000000.0 / monitorUpdateRate);
261+
for(;!monitorThShutdown.load();) {
262+
if (t.isTimeout()) {
263+
std::unique_lock<std::mutex> lock(bankMutex);
264+
for (auto& it : pools) {
265+
// it->getId()
266+
//printf("%s\n", it->getDetailedStats().c_str());
267+
FILE *fp=fopen(getMonitorFifoPath(it->getId()).c_str(),"w+");
268+
if (fp!=NULL) {
269+
//\e[3J
270+
fprintf(fp,"\ec%s\n\n", it->getDetailedStats().c_str());
271+
fclose(fp);
272+
}
273+
}
274+
t.increment();
275+
} else {
276+
std::this_thread::sleep_for(std::chrono::microseconds(10000));
277+
}
278+
}
279+
}
280+
281+
282+
void MemoryBankManager::startMonitoring(double v_updateRate, const char* v_monitorPath) {
283+
if (monitorTh != nullptr) {
284+
stopMonitoring();
285+
}
286+
if (v_updateRate <= 0) {
287+
return;
288+
}
289+
monitorUpdateRate = v_updateRate;
290+
if ((v_monitorPath != nullptr) && (strlen(v_monitorPath) != 0)) {
291+
monitorPath = v_monitorPath;
292+
} else {
293+
monitorPath = monitorPathDefault;
294+
}
295+
std::function<void(void)> f = std::bind(&MemoryBankManager::monitorThLoop, this);
296+
monitorThShutdown = 0;
297+
monitorTh=std::make_unique<std::thread>(f);
298+
}
299+
300+
void MemoryBankManager::stopMonitoring() {
301+
if (monitorTh != nullptr) {
302+
monitorThShutdown = 1;
303+
monitorTh->join();
304+
monitorTh = nullptr;
305+
}
306+
}

src/MemoryBankManager.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <map>
2323
#include <memory>
2424
#include <mutex>
25+
#include <thread>
2526

2627
#include "MemoryBank.h"
2728
#include "MemoryPagesPool.h"
@@ -66,10 +67,22 @@ class MemoryBankManager
6667
// reset bank manager in fresh state, in particular: clear all banks
6768
void reset();
6869

70+
void startMonitoring(double updateRate, const char* monitorPath = ""); // starts monitoring at given updateRate and output path
71+
void stopMonitoring(); // stop monitoring
72+
std::string getMonitorFifoPath(int id); // return full path to monitor pipe for given pool index
73+
6974
private:
7075
std::vector<bankDescriptor> banks; // list of registered memory banks
7176
std::mutex bankMutex; // instance mutex to handle concurrent access to public methods
7277
int poolIndex = -1; // an increasing index used to assign a unique id to memory pools
78+
79+
std::unique_ptr<std::thread> monitorTh; // monitor thread
80+
std::atomic<int> monitorThShutdown; // flag to terminate monitor thread
81+
void monitorThLoop(); // function launched in monitor thread
82+
std::vector<std::shared_ptr<MemoryPagesPool>> pools; // reference to existing page pools, for monitoring purpose
83+
double monitorUpdateRate = 0; // monitoring period, in Hz
84+
const char *monitorPathDefault = "/tmp/readout-monitor-mempool";
85+
std::string monitorPath = monitorPathDefault; // prefix path to output monitoring pipes. "-%d" (bank number) added to it and fifo created.
7386
};
7487

7588
// a global MemoryBankManager instance

src/MemoryPagesPool.cxx

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,3 +545,51 @@ void MemoryPage::reportPageStates() {
545545
}
546546
printf("\n");
547547
}
548+
549+
std::string MemoryPagesPool::getDetailedStats() {
550+
std::string report;
551+
int count[MemoryPage::PageState::Undefined] = {};
552+
report += "#";
553+
report += std::to_string(getId());
554+
report += " ";
555+
556+
for(const auto &p : pages) {
557+
count[p.currentPageState]++;
558+
}
559+
for (int i=0; i<MemoryPage::PageState::Undefined; i++) {
560+
char frac[12];
561+
snprintf(frac, sizeof(frac), "%.1f", count[i] * 100.0 / pages.size());
562+
report += MemoryPage::getPageStateString((MemoryPage::PageState)i);
563+
//report += std::to_string(i);
564+
report += ": ";
565+
report += frac;
566+
report += " % ";
567+
}
568+
report+='\n';
569+
for(const auto &p : pages) {
570+
char s='o';
571+
switch(p.currentPageState) {
572+
case MemoryPage::Idle:
573+
s='.'; break;
574+
case MemoryPage::Allocated:
575+
s='a'; break;
576+
case MemoryPage::InROC:
577+
s='C'; break;
578+
case MemoryPage::InAggregator:
579+
s='A'; break;
580+
case MemoryPage::InFMQ:
581+
s='F'; break;
582+
default:
583+
break;
584+
}
585+
report += s;
586+
}
587+
report+='\n';
588+
589+
return report;
590+
}
591+
592+
// todo:
593+
// add FMQ release rate
594+
// start/stop/start -> reorder pages in pool FIFO
595+
// or even on-the-fly, if there i ?

src/MemoryPagesPool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class MemoryPagesPool
129129
bool isPageValid(void* page); // check to see if a page address is valid
130130

131131
std::string getStats(); // return a string summarizing memory pool usage statistics
132+
std::string getDetailedStats(); // return detailed stats
132133

133134
// an optional user-provided logging function for all memory pool related ops (including warnings on low)
134135
typedef std::function<void(const std::string &)> LogCallback;

src/ReadoutEquipment.cxx

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
145145
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhCheckDetectorField", cfgRdhCheckDetectorField);
146146
// configuration parameter: | equipment-* | rdhCheckTrigger | int | 0 | If set, the RDH trigger counters are checked for consistency. |
147147
cfg.getOptionalValue<int>(cfgEntryPoint + ".rdhCheckTrigger", cfgRdhCheckTrigger);
148-
theLog.log(LogInfoDevel_(3002), "RDH settings: rdhCheckEnabled=%d rdhDumpEnabled=%d rdhDumpErrorEnabled=%d rdhDumpWarningEnabled=%d rdhUseFirstInPageEnabled=%d rdhCheckFirstOrbit=%d rdhCheckDetectorField=%d", cfgRdhCheckEnabled, cfgRdhDumpEnabled, cfgRdhDumpErrorEnabled, cfgRdhDumpWarningEnabled, cfgRdhUseFirstInPageEnabled, cfgRdhCheckFirstOrbit, cfgRdhCheckDetectorField);
148+
// configuration parameter: | equipment-* | dropPagesWithError | int | 0 | If set, the pages with RDH errors are discarded (requires rdhCheckEnabled or rdhUseFirstInPage). |
149+
cfg.getOptionalValue<int>(cfgEntryPoint + ".dropPagesWithError", cfgDropPagesWithError);
150+
theLog.log(LogInfoDevel_(3002), "RDH settings: rdhCheckEnabled=%d rdhDumpEnabled=%d rdhDumpErrorEnabled=%d rdhDumpWarningEnabled=%d rdhUseFirstInPageEnabled=%d rdhCheckFirstOrbit=%d rdhCheckDetectorField=%d dropPagesWithError=%d", cfgRdhCheckEnabled, cfgRdhDumpEnabled, cfgRdhDumpErrorEnabled, cfgRdhDumpWarningEnabled, cfgRdhUseFirstInPageEnabled, cfgRdhCheckFirstOrbit, cfgRdhCheckDetectorField, cfgDropPagesWithError);
149151

150152
// configuration parameter: | equipment-* | ctpMode | int | 0 | If set, the detector field (CTP run mask) is checked. Incoming data is discarded until a new bit is set, and discarded again after this bit is unset. Automatically implies rdhCheckDetectorField=1 and rdhCheckDetectorField=1. |
151153
cfg.getOptionalValue<int>(cfgEntryPoint + ".ctpMode", cfgCtpMode);
@@ -336,7 +338,7 @@ ReadoutEquipment::~ReadoutEquipment()
336338

337339
// check if mempool still referenced
338340
if (!mp.unique()) {
339-
theLog.log(LogInfoDevel_(3008), "Equipment %s : mempool still has %d references\n", name.c_str(), (int)mp.use_count());
341+
theLog.log(LogInfoDevel_(3008), "Equipment %s : mempool still has %d references", name.c_str(), (int)mp.use_count());
340342
}
341343

342344
if (fpDataPagesLog != nullptr) {
@@ -430,7 +432,14 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void* arg)
430432

431433
// handle RDH-formatted data
432434
if (ptr->cfgRdhUseFirstInPageEnabled) {
433-
ptr->processRdh(nextBlock);
435+
if ((ptr->processRdh(nextBlock)) && ptr->cfgDropPagesWithError) {
436+
// drop pages with error when configured to do so
437+
ptr->statsRdhCheckPagesDropped++;
438+
static InfoLogger::AutoMuteToken logPageErrorDrop(LogWarningSupport_(3235), 10, 60);
439+
theLog.log(logPageErrorDrop, "Equipment %s : page with RDH error has been discarded (total: %llu)", ptr->name.c_str(), ptr->statsRdhCheckPagesDropped);
440+
nextBlock = nullptr;
441+
continue;
442+
}
434443
}
435444

436445
// discard data immediately if configured to do so
@@ -593,6 +602,7 @@ void ReadoutEquipment::initCounters()
593602
statsRdhCheckOk = 0;
594603
statsRdhCheckErr = 0;
595604
statsRdhCheckStreamErr = 0;
605+
statsRdhCheckPagesDropped = 0;
596606

597607
statsNumberOfTimeframes = 0;
598608

@@ -621,7 +631,8 @@ void ReadoutEquipment::initCounters()
621631
void ReadoutEquipment::finalCounters()
622632
{
623633
if (cfgRdhCheckEnabled) {
624-
theLog.log(LogInfoDevel_(3003), "Equipment %s : %llu timeframes, RDH checks %llu ok, %llu errors, %llu stream inconsistencies", name.c_str(), statsNumberOfTimeframes, statsRdhCheckOk, statsRdhCheckErr, statsRdhCheckStreamErr);
634+
theLog.log(LogInfoDevel_(3003), "Equipment %s : %llu timeframes, RDH checks %llu ok, %llu errors, %llu stream inconsistencies, %llu pages with error dropped",
635+
name.c_str(), statsNumberOfTimeframes, statsRdhCheckOk, statsRdhCheckErr, statsRdhCheckStreamErr, statsRdhCheckPagesDropped);
625636
}
626637
};
627638

@@ -1037,7 +1048,7 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
10371048
}
10381049
}
10391050

1040-
return 0;
1051+
return isPageError;
10411052
}
10421053

10431054
void ReadoutEquipment::abortThread() {

src/ReadoutEquipment.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class ReadoutEquipment
203203
int saveErrorPagesCount; // counter for number of pages dumped so far
204204
std::string cfgDataPagesLogPath; // path to write to disk a summary for each data page received
205205
FILE *fpDataPagesLog = nullptr; // handle to data page log file
206+
int cfgDropPagesWithError = 0; // if set, pages with RDH errors are dropped by readout
206207

207208
protected:
208209
// get timeframe from orbit
@@ -218,6 +219,7 @@ class ReadoutEquipment
218219
unsigned long long statsRdhCheckOk = 0; // number of RDH structs which have passed check ok
219220
unsigned long long statsRdhCheckErr = 0; // number of RDH structs which have not passed check
220221
unsigned long long statsRdhCheckStreamErr = 0; // number of inconsistencies in RDH stream (e.g. ids/timing compared to previous RDH)
222+
unsigned long long statsRdhCheckPagesDropped = 0; // number of pages discarded because of RDH errors (when configured to do so)
221223
};
222224

223225
std::unique_ptr<ReadoutEquipment> getReadoutEquipmentDummy(ConfigFile& cfg, std::string cfgEntryPoint);

src/ReadoutVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12-
#define READOUT_VERSION "2.17.0"
12+
#define READOUT_VERSION "2.17.1"
1313

0 commit comments

Comments
 (0)