Skip to content

Commit 341df75

Browse files
authored
Merge pull request #261 from sy-c/master
v2.21.1
2 parents c2ab71d + 53dab18 commit 341df75

File tree

9 files changed

+97
-23
lines changed

9 files changed

+97
-23
lines changed

doc/releaseNotes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,8 @@ This file describes the main feature changes for each readout.exe released versi
577577
- added readout.numberOfRuns: in standalone mode, number of START/STOP cycles to execute (used for testing).
578578
- added readout.tfRateLimitMode: can be set to 1 to use number of TF instead of computed TF id for rate throttling. Useful when replaying files with jumps in TF ids. (not needed with autoTimeframeId)
579579
- added equipment.autoTimeframeId: to force incremental timeframe IDs. Useful when replaying files with jumps in TF ids. BC still used to detect boundaries between TFs.
580+
581+
## v2.21.1 - 29/08/2023
582+
- o2-readout-rawreader: added options logOff and dumpOrbitStats.
583+
- Bookkeeping: final stats publish at end of run is synchronous.
584+
- Monitoring: added buffers statistics in bytes: readout.bufferUsage.bytes (in addition to existing readout.bufferUsage.value, as a percentage)

src/ConsumerFMQchannel.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
#ifdef WITH_FAIRMQ
2727

28-
#include <fairmq/FairMQDevice.h>
28+
#include <fairmq/Device.h>
2929
#include <fairmq/FairMQMessage.h>
3030
#include <fairmq/FairMQTransportFactory.h>
3131
#include <fairmq/tools/Unique.h>
@@ -415,6 +415,7 @@ class ConsumerFMQchannel : public Consumer
415415
mp -> setWarningCallback(std::bind(&ConsumerFMQchannel::mplog, this, std::placeholders::_1));
416416
if ((mp->getId() >= 0) && (mp->getId() < ReadoutStatsMaxItems)) {
417417
mp -> setBufferStateVariable(&gReadoutStats.counters.bufferUsage[mp->getId()]);
418+
gReadoutStats.counters.bufferSize[mp->getId()] = memoryPoolPageSize * memoryPoolNumberOfPages;
418419
}
419420
}
420421
theLog.log(LogInfoDevel_(3008), "Using memory pool [%d]: %d pages x %d bytes", mp->getId(), memoryPoolNumberOfPages, memoryPoolPageSize);

src/ConsumerStats.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ class ConsumerStats : public Consumer
176176
// buffer stats
177177
for (int i = 0; i < ReadoutStatsMaxItems; i++) {
178178
double r = snapshot.bufferUsage[i].load();
179+
uint64_t b = (uint64_t)(r * snapshot.bufferSize[i].load());
179180
if (r >= 0) {
180-
sendMetricNoException(Metric{(int)(r*100), "readout.bufferUsage"}.addTag(tags::Key::ID, i));
181+
sendMetricNoException(Metric{"readout.bufferUsage"}.addValue((int)(r*100), "value").addValue(b, "bytes").addTag(tags::Key::ID, i));
181182
}
182183
}
183184
}

src/ReadoutEquipment.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
227227
mp -> setWarningCallback(std::bind(&ReadoutEquipment::mplog, this, std::placeholders::_1));
228228
if ((mp->getId() >= 0) && (mp->getId() < ReadoutStatsMaxItems)) {
229229
mp -> setBufferStateVariable(&gReadoutStats.counters.bufferUsage[mp->getId()]);
230+
gReadoutStats.counters.bufferSize[mp->getId()] = memoryPoolPageSize * memoryPoolNumberOfPages;
230231
}
231232
theLog.log(LogInfoDevel_(3008), "Using memory pool [%d]: %d pages x %d bytes", mp->getId(), memoryPoolNumberOfPages, memoryPoolPageSize);
232233

src/ReadoutStats.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ void ReadoutStats::reset(bool lightReset)
7373
if (!lightReset) {
7474
for (unsigned int i = 0; i < ReadoutStatsMaxItems; i++) {
7575
counters.bufferUsage[i] = -1.0;
76+
counters.bufferSize[i] = 0;
7677
}
7778
}
79+
80+
counters.runNumber = undefinedRunNumber;
7881
}
7982

8083
void ReadoutStats::print()

src/ReadoutStats.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ struct ReadoutStatsCounters {
3535
std::atomic<uint64_t> bytesReadout;
3636
std::atomic<uint64_t> bytesRecorded;
3737
std::atomic<uint64_t> bytesFairMQ;
38-
std::atomic<double> timestamp;
38+
std::atomic<double> timestamp;
3939
std::atomic<double> bytesReadoutRate;
4040
std::atomic<uint64_t> state;
4141
std::atomic<uint64_t> pagesPendingFairMQ; // number of pages pending in ConsumerFMQ
@@ -48,14 +48,16 @@ struct ReadoutStatsCounters {
4848
std::atomic<uint32_t> logMessagesError; // number of log messages (severity: error)
4949
std::atomic<uint32_t> currentOrbit; // 1st orbit of current timeframe (last out of aggregator)
5050
std::atomic<double> bufferUsage[ReadoutStatsMaxItems]; // buffer usage. -1 means not used.
51+
std::atomic<uint64_t> bufferSize[ReadoutStatsMaxItems]; // buffer size in bytes. 0 means not used.
5152
std::atomic<uint64_t> ddHBFRepacked; // Data Distribution: number of HBF re-packed (HBF overlapping superpages)
5253
std::atomic<uint64_t> ddBytesCopied; // Data Distribution: number of bytes copied (HBF overlapping superpages)
5354
std::atomic<uint64_t> ddMemoryPendingBytes; // Data Distribution: number of bytes pending release in ConsumerFMQ (real memory)
5455
std::atomic<uint64_t> ddPayloadPendingBytes; // Data Distribution: number of bytes pending release in ConsumerFMQ (payload only, not accounting for memory fragmentation overhead)
56+
std::atomic<uint64_t> runNumber; // current run number (valid only in running state)
5557
};
5658

5759
// version number of this struct
58-
const uint32_t ReadoutStatsCountersVersion = 0xA0000003;
60+
const uint32_t ReadoutStatsCountersVersion = 0xA0000004;
5961

6062
// need to be able to easily transmit this struct as a whole
6163
static_assert(std::is_pod<ReadoutStatsCounters>::value);

src/ReadoutVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12-
#define READOUT_VERSION "2.21.0"
12+
#define READOUT_VERSION "2.21.1"
1313

src/mainReadout.cxx

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,38 +212,71 @@ class LogbookThread {
212212
th = nullptr;
213213
}
214214
};
215-
int publishStats() {
215+
// when timeout is set, ensures the function returns only when bookkeeping operation completed
216+
// otherwise, it is done asynchronously later
217+
// returns 0 on success, or an error code
218+
int publishStats(int timeoutMilliseconds = 0) {
216219
if (logbookHandle == nullptr) return __LINE__; // fail if no connection
220+
AliceO2::Common::Timer timer;
221+
if (timeoutMilliseconds > 0) {
222+
// wait pending request completed, if any (so that we are sure to push the latest counters)
223+
timer.reset(timeoutMilliseconds * 1000);
224+
while (publishRequest.load()) {
225+
if (timer.isTimeout()) return __LINE__;
226+
usleep(1000);
227+
}
228+
}
217229
if (publishRequest.load()) return __LINE__; // fail if request already pending
218230
publishRequest = 1;
231+
if (this->verbose) {
232+
theLog.log(LogInfoDevel_(3210), "Requested to publish logbook stats");
233+
}
234+
if (timeoutMilliseconds > 0) {
235+
// wait request completed and check status
236+
while (publishRequest.load()) {
237+
if (timer.isTimeout()) return __LINE__;
238+
usleep(1000);
239+
}
240+
if (!publishSuccess.load()) {
241+
return __LINE__;
242+
}
243+
}
219244
return 0;
220245
};
246+
bool verbose = 0; // flag for extra logs on request/publish
221247
private:
222248
std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle; // handle to logbook
223249
std::unique_ptr<std::thread> th; // a thread reading from fd and injecting to theLog
224250
std::atomic<int> shutdownRequest; // flag to terminate thread
225251
std::atomic<int> publishRequest; // flag to ask thread to publish current values
252+
std::atomic<int> publishSuccess; // flag to report status of latest publish operation
226253
void run() {
227254
setThreadName("logbook");
228255
// thread loop, 10Hz
229256
while (!shutdownRequest && (logbookHandle != nullptr)) {
230257
if (publishRequest.load() == 1) {
258+
publishSuccess = 0;
259+
// copy current counters
260+
ReadoutStatsCounters snapshot;
261+
memcpy((void *)&snapshot, (void *)&gReadoutStats.counters, sizeof(snapshot));
231262
// publishing to logbook makes sense only if a run number defined
232-
if (occRunNumber != undefinedRunNumber) {
233-
bool isOk = false;
263+
if (snapshot.runNumber.load() != undefinedRunNumber) {
234264
try {
235265
// interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
236266
logbookHandle->flp()->updateReadoutCountersByFlpNameAndRunNumber(
237-
occRole, occRunNumber,
238-
(int64_t)gReadoutStats.counters.numberOfSubtimeframes, (int64_t)gReadoutStats.counters.bytesReadout, (int64_t)gReadoutStats.counters.bytesRecorded, (int64_t)gReadoutStats.counters.bytesFairMQ
267+
snapshot.source, snapshot.runNumber.load(),
268+
(int64_t)snapshot.numberOfSubtimeframes.load(), (int64_t)snapshot.bytesReadout.load(), (int64_t)snapshot.bytesRecorded.load(), (int64_t)snapshot.bytesFairMQ.load()
239269
);
240-
isOk = true;
270+
if (this->verbose) {
271+
theLog.log(LogInfoDevel_(3210), "Publishing logbook stats: tf = %llu, bytesReadout = %llu", (unsigned long long)snapshot.numberOfSubtimeframes.load(), (unsigned long long)snapshot.bytesReadout.load());
272+
}
273+
publishSuccess = 1;
241274
} catch (const std::exception& ex) {
242275
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: %s", ex.what());
243276
} catch (...) {
244277
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: unknown exception");
245278
}
246-
if (!isOk) {
279+
if (!publishSuccess.load()) {
247280
// closing logbook immediately
248281
logbookHandle = nullptr;
249282
theLog.log(LogErrorSupport_(3210), "Logbook now disabled");
@@ -405,7 +438,7 @@ class Readout
405438
std::unique_ptr<ReadoutDatabase> dbHandle; // handle to readout database
406439
#endif
407440

408-
void publishLogbookStats(); // publish current readout counters to logbook
441+
void publishLogbookStats(int timeout = 0); // publish current readout counters to logbook. Optional timeout in milliseconds, async op if not set.
409442
AliceO2::Common::Timer logbookTimer; // timer to handle readout logbook publish interval
410443

411444
uint64_t currentTimeframeId = undefinedTimeframeId;
@@ -418,16 +451,10 @@ class Readout
418451

419452
bool testLogbook = false; // flag for logbook test mode
420453

421-
void Readout::publishLogbookStats()
454+
void Readout::publishLogbookStats(int timeout)
422455
{
423456
// gReadoutStats.print();
424457

425-
#ifdef WITH_LOGBOOK
426-
if (theLogbookThread != nullptr) {
427-
theLogbookThread->publishStats();
428-
}
429-
#endif
430-
431458
#ifdef WITH_DB
432459
if (dbHandle != nullptr) {
433460
dbHandle->updateRunCounters(
@@ -438,6 +465,15 @@ void Readout::publishLogbookStats()
438465
);
439466
}
440467
#endif
468+
469+
#ifdef WITH_LOGBOOK
470+
if (theLogbookThread != nullptr) {
471+
int err = theLogbookThread->publishStats(timeout);
472+
if ((timeout > 0) && (err)) {
473+
theLog.log(LogErrorDevel_(3210), "Logbook publish failed within given time (%d ms)", timeout);
474+
}
475+
}
476+
#endif
441477
}
442478

443479
int Readout::_init(int argc, char* argv[])
@@ -1384,6 +1420,7 @@ int Readout::_start()
13841420
theLog.log(LogInfoSupport_(3005), "Readout executing START");
13851421
gReadoutStats.reset(1);
13861422
gReadoutStats.counters.state = stringToUint64("> start");
1423+
gReadoutStats.counters.runNumber = occRunNumber;
13871424
gReadoutStats.counters.notify++;
13881425
gReadoutStats.publishNow();
13891426

@@ -1713,8 +1750,8 @@ int Readout::_stop()
17131750
// report log statistics
17141751
theLog.log("Errors: %lu Warnings: %lu", theLog.getMessageCount(InfoLogger::Severity::Error), theLog.getMessageCount(InfoLogger::Severity::Warning));
17151752

1716-
// publish final logbook statistics
1717-
publishLogbookStats();
1753+
// publish final logbook statistics (synchronously with timeout)
1754+
publishLogbookStats(3000);
17181755

17191756
// publish some final counters
17201757
theLog.log(LogInfoDevel_(3003), "Final counters: timeframes = %" PRIu64 " readout = %s recorded = %s",

src/readRaw.cxx

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include "CounterStats.h"
2222

2323
//#define ERRLOG(args...) fprintf(stderr,args)
24-
#define ERRLOG(args...) fprintf(stdout, args)
24+
#define ERRLOG(args...) if (!logOff) {fprintf(stdout, args);}
2525

2626
int main(int argc, const char* argv[])
2727
{
@@ -52,6 +52,11 @@ int main(int argc, const char* argv[])
5252
uint32_t firstTimeframeHbOrbitBegin = 0;
5353
bool isDefinedFirstTimeframeHbOrbitBegin = 0;
5454
uint32_t maxOrbit = 0;
55+
uint32_t minOrbit = 0;
56+
uint32_t firstOrbit = 0;
57+
58+
bool logOff = 0;
59+
bool dumpOrbitStats =0;
5560

5661
// parse input arguments
5762
// format is a list of key=value pairs
@@ -72,6 +77,8 @@ int main(int argc, const char* argv[])
7277
" dumpStats=(int) : if set, some statistics are printed on HBF/TF size.\n"
7378
" fileReadVerbose=(int) : if set, more information is printed when reading/decoding file.\n"
7479
" timeframePeriodOrbits=(int) : if set, TF id computed (and printed, when dump enabled) for each RDH. Typically, 128 or 256.\n"
80+
" logOff=(int) : if set, logs disabled.\n"
81+
" dumpOrbitStats=(int) : if set, first / min / max orbits are printed after file read.\n"
7582
" \n",
7683
argv[0]);
7784
return -1;
@@ -124,6 +131,10 @@ int main(int argc, const char* argv[])
124131
timeframePeriodOrbits = (uint32_t) std::stoi(value);
125132
} else if (key == "dumpStats") {
126133
dumpStats = std::stoi(value);
134+
} else if (key == "logOff") {
135+
logOff = std::stoi(value);
136+
} else if (key == "dumpOrbitStats") {
137+
dumpOrbitStats = std::stoi(value);
127138
} else {
128139
ERRLOG("unknown option %s\n", key.c_str());
129140
}
@@ -368,6 +379,12 @@ int main(int argc, const char* argv[])
368379
h.computeTimeframeId(firstTimeframeHbOrbitBegin, timeframePeriodOrbits);
369380
}
370381

382+
if (RDHBlockCount == 1) {
383+
firstOrbit = h.getTriggerOrbit();
384+
minOrbit = firstOrbit;
385+
maxOrbit = firstOrbit;
386+
}
387+
371388
if (dumpRDH) {
372389
h.dumpRdh(pageOffset + blockOffset, 1);
373390
}
@@ -423,6 +440,9 @@ int main(int argc, const char* argv[])
423440
if (h.getTriggerOrbit() > maxOrbit) {
424441
maxOrbit = h.getTriggerOrbit();
425442
}
443+
if ((h.getTriggerOrbit() < minOrbit) || (minOrbit == 0)) {
444+
minOrbit = h.getTriggerOrbit();
445+
}
426446

427447
unsigned int linkId = h.getLinkId();
428448
if (linkId <= RdhMaxLinkId) {
@@ -496,6 +516,10 @@ int main(int argc, const char* argv[])
496516
ERRLOG("max orbit 0x%X\n", maxOrbit);
497517
}
498518

519+
if (dumpOrbitStats) {
520+
printf("Orbits:\t first = 0x%X\tmin = 0x%X\tmax = 0x%X\n", firstOrbit, minOrbit, maxOrbit);
521+
}
522+
499523
if (dumpStats) {
500524
// register final HBF size
501525
int nLinksActive = 0;

0 commit comments

Comments
 (0)