Skip to content

Commit 639809b

Browse files
authored
Merge pull request #217 from sy-c/master
v2.9.10
2 parents 9fc25f3 + 28b198e commit 639809b

18 files changed

+400
-17
lines changed

CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,3 +534,9 @@ install(
534534
DESTINATION ${CMAKE_INSTALL_PREFIX}/bin
535535
RENAME o2-readout-status
536536
)
537+
538+
install(
539+
PROGRAMS src/readoutCommandLauncher.sh
540+
DESTINATION ${CMAKE_INSTALL_PREFIX}/bin
541+
RENAME o2-readout-command-launcher
542+
)

doc/configurationParameters.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ The parameters related to 3rd-party libraries are described here for convenience
166166
| readout | timeStop | string | | In standalone mode, time at which to execute stop. If not set, on int/term/quit signal. |
167167
| readout-monitor | broadcastHost | string | | used by readout-status to connect to readout-monitor broadcast channel. |
168168
| readout-monitor | broadcastPort | int | 0 | when set, the process will create a listening TCP port and broadcast statistics to connected clients. |
169+
| readout-monitor | logFile | string | | when set, the process will log received metrics to a file. |
170+
| readout-monitor | logFileHistory | int | 1 | defines the maximum number of previous log files to keep, when a maximum size is set. |
171+
| readout-monitor | logFileMaxSize | int | 128 | defines the maximum size of log file (in MB). When reaching this threshold, the log file is rotated. |
169172
| readout-monitor | monitorAddress | string | tcp://127.0.0.1:6008 | Address of the receiving ZeroMQ channel to receive readout statistics. |
170173
| readout-monitor | outputFormat | int | 0 | 0: default, human readable. 1: raw bytes. |
171174
| receiverFMQ | channelAddress | string | ipc:///tmp/pipe-readout | c.f. parameter with same name in consumer-FairMQchannel-* |

doc/howto.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,22 @@ bytesMax=1G
1616
filesMax=0
1717
fileName=/tmp/data_%f.raw
1818
```
19+
20+
21+
## How to launch custom commands on start/stop
22+
23+
First, custom command shells must be enabled in the default readout settings `/etc/o2.d/readout-defaults.cfg`:
24+
```
25+
[readout]
26+
...
27+
customCommandsEnabled=1
28+
```
29+
This launches a sub-process shell on readout startup to execute commands later.
30+
31+
Second, custom commands should be defined in the readout configuration file for associated state transitions: pre|post + START|STOP.
32+
For example, to start/stop internal ROC CTP emulator automatically with readout:
33+
```
34+
[readout]
35+
...
36+
customCommands=postSTART=roc-ctp-emulator --id=#1 --trigger-mode periodic --trigger-freq 40000 --hbmax 255 --bcmax 3563,preSTOP=o2-roc-ctp-emulator --id=#1 --eox
37+
```

doc/releaseNotes.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,3 +439,11 @@ This file describes the main feature changes for each readout.exe released versi
439439

440440
## v2.9.2 - 31/03/2022
441441
- Cosmetics improvements related to threads cleanup order on exit.
442+
443+
## v2.10.0 - 13/04/2022
444+
- readout-monitor: added log file for received measurements.
445+
- added support for custom commands to be executed on states transitions.
446+
- Updated configuration parameters:
447+
- readout.flushConsumerTimeout: when set, readout waits up to this amount of time that all data pages locked by consumers are released before stopping.
448+
- Added warning message on buffers low.
449+
- Added message at end of run showing the links which have provided data for each equipment and how much per link.

src/ConsumerFMQchannel.cxx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void incDataBlockStats(DataBlock* b)
7171
s->t0 = timeNowMicrosec();
7272
gReadoutStats.counters.pagesPendingFairMQ++;
7373
gReadoutStats.counters.notify++;
74-
// printf("init %p\n",b);
74+
// printf("init %p -> pages locked = %lu\n",b,(unsigned long)gReadoutStats.counters.pagesPendingFairMQ);
7575
}
7676
}
7777

@@ -112,6 +112,12 @@ class ConsumerFMQchannel : public Consumer
112112

113113
CounterStats repackSizeStats; // keep track of page size used when repacking
114114

115+
// custom log function for memory pool
116+
void mplog(const std::string &msg) {
117+
static InfoLogger::AutoMuteToken logMPToken(LogWarningSupport_(3230), 10, 60);
118+
theLog.log(logMPToken, "Consumer %s : %s", name.c_str(), msg.c_str());
119+
}
120+
115121
public:
116122
std::vector<FairMQMessagePtr> messagesToSend; // collect HBF messages of each update
117123
uint64_t messagesToSendSize; // size (bytes) of messagesToSend payload
@@ -278,6 +284,8 @@ class ConsumerFMQchannel : public Consumer
278284
mp = theMemoryBankManager.getPagedPool(memoryPoolPageSize, memoryPoolNumberOfPages, memoryBankName);
279285
if (mp == nullptr) {
280286
throw "ConsumerFMQ: failed to get memory pool from " + memoryBankName + " for " + std::to_string(memoryPoolNumberOfPages) + " pages x " + std::to_string(memoryPoolPageSize) + " bytes";
287+
} else {
288+
mp -> setWarningCallback(std::bind(&ConsumerFMQchannel::mplog, this, std::placeholders::_1));
281289
}
282290
theLog.log(LogInfoDevel_(3008), "Using memory pool %d pages x %d bytes", memoryPoolNumberOfPages, memoryPoolPageSize);
283291
}

src/ConsumerStats.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class ConsumerStats : public Consumer
169169
if (deltaT > 0) {
170170
theLog.log(LogInfoOps_(3003), "Last interval (%.2fs): blocksRx=%llu, block rate=%.2lf, bytesRx=%llu, rate=%s", deltaT, (unsigned long long)counterBlocksDiff, counterBlocksDiff / deltaT, (unsigned long long)counterBytesDiff, NumberOfBytesToString(counterBytesDiff * 8 / deltaT, "b/s", 1000).c_str());
171171
if (gReadoutStats.isFairMQ) {
172-
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
172+
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, released = %llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", (unsigned long long) snapshot.pagesPendingFairMQ.load(), nRfmq, rRfmq, avgTfmq, tfidfmq );
173173
}
174174
}
175175
}

src/MemoryPagesPool.cxx

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ MemoryPagesPool::MemoryPagesPool(size_t vPageSize, size_t vNumberOfPages, void*
2424
baseBlockAddress = vBaseAddress;
2525
baseBlockSize = vBaseSize;
2626
releaseBaseBlockCallback = vCallback;
27+
state = BufferState::empty;
2728

2829
// check page / header sizes
2930
assert(headerReservedSpace >= sizeof(DataBlock));
@@ -164,6 +165,9 @@ void* MemoryPagesPool::getPage()
164165
}
165166
}
166167

168+
// udpate buffer state
169+
updateBufferState();
170+
167171
return ptr;
168172
}
169173

@@ -190,6 +194,9 @@ void MemoryPagesPool::releasePage(void* address)
190194

191195
// put back page in list of available pages
192196
pagesAvailable->push(address);
197+
198+
// udpate buffer state
199+
updateBufferState();
193200
}
194201

195202
size_t MemoryPagesPool::getPageSize() { return pageSize; }
@@ -269,3 +276,30 @@ std::string MemoryPagesPool::getStats() {
269276
return "number of pages used: " + std::to_string(poolStats.getTotal()) + " average free pages: " + std::to_string((uint64_t)poolStats.getAverage()) + " minimum free pages: " + std::to_string(poolStats.getMinimum());
270277
}
271278

279+
280+
void MemoryPagesPool::setWarningCallback(const LogCallback& cb, double vthHigh, double vthOk) {
281+
thHigh = vthHigh;
282+
thOk = vthOk;
283+
theLogCallback = cb;
284+
}
285+
286+
void MemoryPagesPool::log(const std::string &msg) {
287+
if (theLogCallback!=nullptr) {
288+
// the log callback will format it with appropriate text headers (eg who owns the pool)
289+
theLogCallback(msg);
290+
}
291+
}
292+
293+
void MemoryPagesPool::updateBufferState() {
294+
double r = 1.0 - (getNumberOfPagesAvailable() * 1.0 / getTotalNumberOfPages());
295+
if ((r == 1.0) && (state != BufferState::full)) {
296+
state = BufferState::full;
297+
log("buffer full");
298+
} else if ((r > thHigh) && (state == BufferState::empty)) {
299+
state = BufferState::high;
300+
log("buffer usage is high");
301+
} else if ((r < thOk) && ((state == BufferState::full) || (state == BufferState::high))) {
302+
state = BufferState::empty;
303+
log("buffer usage back to reasonable level");
304+
}
305+
}

src/MemoryPagesPool.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,21 @@ class MemoryPagesPool
7373

7474
std::string getStats(); // return a string summarizing memory pool usage statistics
7575

76+
// an optional user-provided logging function for all memory pool related ops (including warnings on low)
77+
typedef std::function<void(const std::string &)> LogCallback;
78+
79+
void setWarningCallback(const LogCallback& cb, double thHigh = 0.9, double thOk = 0.8);
80+
7681
private:
82+
83+
LogCallback theLogCallback;
84+
void log(const std::string &log);
85+
double thHigh;
86+
double thOk;
87+
enum BufferState {empty, high, full};
88+
BufferState state = BufferState::empty;
89+
void updateBufferState();
90+
7791
std::unique_ptr<AliceO2::Common::Fifo<void*>> pagesAvailable; // a buffer to keep track of individual pages
7892

7993
size_t numberOfPages; // number of pages

src/ReadoutDatabase.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ ReadoutDatabase::ReadoutDatabase(const char* cx, int v, const LogCallback& cb) {
106106
}
107107

108108
ReadoutDatabase::~ReadoutDatabase() {
109-
if (db!=nullptr) {
109+
if (db != nullptr) {
110110
mysql_close(db);
111111
db=nullptr;
112+
log("DB closed");
112113
}
113114
}
114115

src/ReadoutEquipment.cxx

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
4848
cfg.getOptionalValue<double>("readout.rate", readoutRate, -1.0);
4949

5050
// configuration parameter: | equipment-* | idleSleepTime | int | 200 | Thread idle sleep time, in microseconds. |
51-
int cfgIdleSleepTime = 200;
51+
cfgIdleSleepTime = 200;
5252
cfg.getOptionalValue<int>(cfgEntryPoint + ".idleSleepTime", cfgIdleSleepTime);
5353

5454
// size of equipment output FIFO
@@ -201,6 +201,8 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
201201
if (mp == nullptr) {
202202
theLog.log(LogErrorSupport_(3230), "Failed to create pool of memory pages");
203203
throw __LINE__;
204+
} else {
205+
mp -> setWarningCallback(std::bind(&ReadoutEquipment::mplog, this, std::placeholders::_1));
204206
}
205207
// todo: move page align to MemoryPool class
206208
assert(pageSpaceReserved == mp->getPageSize() - mp->getDataBlockMaxSize());
@@ -227,6 +229,9 @@ void ReadoutEquipment::start()
227229
equipmentStats[i].reset();
228230
equipmentStatsLast[i] = 0;
229231
}
232+
equipmentLinksUsed.reset();
233+
equipmentLinksData.resize(RdhMaxLinkId + 1);
234+
equipmentLinksData.clear();
230235
isError = 0;
231236
currentBlockId = 0;
232237
isDataOn = false;
@@ -280,6 +285,15 @@ void ReadoutEquipment::stop()
280285
theLog.log(LogInfoDevel_(3003), "Average pages pushed per iteration: %.1f", equipmentStats[EquipmentStatsIndexes::nBlocksOut].get() * 1.0 / (equipmentStats[EquipmentStatsIndexes::nLoop].get() - equipmentStats[EquipmentStatsIndexes::nIdle].get()));
281286
theLog.log(LogInfoDevel_(3003), "Average fifoready occupancy: %.1f", equipmentStats[EquipmentStatsIndexes::fifoOccupancyFreeBlocks].get() * 1.0 / (equipmentStats[EquipmentStatsIndexes::nLoop].get() - equipmentStats[EquipmentStatsIndexes::nIdle].get()));
282287
theLog.log(LogInfoDevel_(3003), "Average data throughput: %s", ReadoutUtils::NumberOfBytesToString(equipmentStats[EquipmentStatsIndexes::nBytesOut].get() / runningTime, "B/s").c_str());
288+
theLog.log(LogInfoDevel_(3003), "Links used: %s", equipmentLinksUsed.to_string().c_str());
289+
290+
std::string perLinkStats;
291+
for (unsigned int i = 0; i<= RdhMaxLinkId; i++) {
292+
if (equipmentLinksUsed[i]) {
293+
perLinkStats += "[" + std::to_string(i) + "]=" + NumberOfBytesToString(equipmentLinksData[i], "B", 1024) + " ";
294+
}
295+
}
296+
theLog.log(LogInfoDevel_(3003), "Links data received: %s", perLinkStats.c_str());
283297
}
284298

285299
ReadoutEquipment::~ReadoutEquipment()
@@ -678,6 +692,13 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
678692
cfgRdhDumpFirstInPageEnabled++;
679693
}
680694

695+
// update links statistics
696+
if (h.getLinkId() <= RdhMaxLinkId) {
697+
equipmentLinksUsed[h.getLinkId()] = 1;
698+
equipmentLinksData[h.getLinkId()] += blockHeader.dataSize;
699+
}
700+
701+
681702
// detect changes in detector bits field
682703
if (cfgRdhCheckDetectorField) {
683704
if (isDefinedLastDetectorField) {
@@ -830,3 +851,8 @@ void ReadoutEquipment::abortThread() {
830851
// ensure thread is stopped
831852
readoutThread = nullptr;
832853
}
854+
855+
void ReadoutEquipment::mplog(const std::string &msg) {
856+
static InfoLogger::AutoMuteToken logMPToken(LogWarningSupport_(3230), 10, 60);
857+
theLog.log(logMPToken, "Equipment %s : %s", name.c_str(), msg.c_str());
858+
}

0 commit comments

Comments
 (0)