Skip to content

Commit 897b1bb

Browse files
committed
bookkeeping final stats publish synchronous
1 parent 1a1bfbe commit 897b1bb

File tree

1 file changed

+54
-17
lines changed

1 file changed

+54
-17
lines changed

src/mainReadout.cxx

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,38 +212,71 @@ class LogbookThread {
212212
th = nullptr;
213213
}
214214
};
215-
int publishStats() {
215+
// when timeout is set, ensures the function returns only when bookkeeping operation completed
216+
// otherwise, it is done asynchronously later
217+
// returns 0 on success, or an error code
218+
int publishStats(int timeoutMilliseconds = 0) {
216219
if (logbookHandle == nullptr) return __LINE__; // fail if no connection
220+
AliceO2::Common::Timer timer;
221+
if (timeoutMilliseconds > 0) {
222+
// wait pending request completed, if any (so that we are sure to push the latest counters)
223+
timer.reset(timeoutMilliseconds * 1000);
224+
while (publishRequest.load()) {
225+
if (timer.isTimeout()) return __LINE__;
226+
usleep(1000);
227+
}
228+
}
217229
if (publishRequest.load()) return __LINE__; // fail if request already pending
218230
publishRequest = 1;
231+
if (this->verbose) {
232+
theLog.log(LogInfoDevel_(3210), "Requested to publish logbook stats");
233+
}
234+
if (timeoutMilliseconds > 0) {
235+
// wait request completed and check status
236+
while (publishRequest.load()) {
237+
if (timer.isTimeout()) return __LINE__;
238+
usleep(1000);
239+
}
240+
if (!publishSuccess.load()) {
241+
return __LINE__;
242+
}
243+
}
219244
return 0;
220245
};
246+
bool verbose = 0; // flag for extra logs on request/publish
221247
private:
222248
std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle; // handle to logbook
223249
std::unique_ptr<std::thread> th; // a thread reading from fd and injecting to theLog
224250
std::atomic<int> shutdownRequest; // flag to terminate thread
225251
std::atomic<int> publishRequest; // flag to ask thread to publish current values
252+
std::atomic<int> publishSuccess; // flag to report status of latest publish operation
226253
void run() {
227254
setThreadName("logbook");
228255
// thread loop, 10Hz
229256
while (!shutdownRequest && (logbookHandle != nullptr)) {
230257
if (publishRequest.load() == 1) {
258+
publishSuccess = 0;
259+
// copy current counters
260+
ReadoutStatsCounters snapshot;
261+
memcpy((void *)&snapshot, (void *)&gReadoutStats.counters, sizeof(snapshot));
231262
// publishing to logbook makes sense only if a run number defined
232-
if (occRunNumber != undefinedRunNumber) {
233-
bool isOk = false;
263+
if (snapshot.runNumber.load() != undefinedRunNumber) {
234264
try {
235265
// interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
236266
logbookHandle->flp()->updateReadoutCountersByFlpNameAndRunNumber(
237-
occRole, occRunNumber,
238-
(int64_t)gReadoutStats.counters.numberOfSubtimeframes, (int64_t)gReadoutStats.counters.bytesReadout, (int64_t)gReadoutStats.counters.bytesRecorded, (int64_t)gReadoutStats.counters.bytesFairMQ
267+
snapshot.source, snapshot.runNumber.load(),
268+
(int64_t)snapshot.numberOfSubtimeframes.load(), (int64_t)snapshot.bytesReadout.load(), (int64_t)snapshot.bytesRecorded.load(), (int64_t)snapshot.bytesFairMQ.load()
239269
);
240-
isOk = true;
270+
if (this->verbose) {
271+
theLog.log(LogInfoDevel_(3210), "Publishing logbook stats: tf = %llu, bytesReadout = %llu", (unsigned long long)snapshot.numberOfSubtimeframes.load(), (unsigned long long)snapshot.bytesReadout.load());
272+
}
273+
publishSuccess = 1;
241274
} catch (const std::exception& ex) {
242275
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: %s", ex.what());
243276
} catch (...) {
244277
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: unknown exception");
245278
}
246-
if (!isOk) {
279+
if (!publishSuccess.load()) {
247280
// closing logbook immediately
248281
logbookHandle = nullptr;
249282
theLog.log(LogErrorSupport_(3210), "Logbook now disabled");
@@ -405,7 +438,7 @@ class Readout
405438
std::unique_ptr<ReadoutDatabase> dbHandle; // handle to readout database
406439
#endif
407440

408-
void publishLogbookStats(); // publish current readout counters to logbook
441+
void publishLogbookStats(int timeout = 0); // publish current readout counters to logbook. Optional timeout in milliseconds, async op if not set.
409442
AliceO2::Common::Timer logbookTimer; // timer to handle readout logbook publish interval
410443

411444
uint64_t currentTimeframeId = undefinedTimeframeId;
@@ -418,16 +451,10 @@ class Readout
418451

419452
bool testLogbook = false; // flag for logbook test mode
420453

421-
void Readout::publishLogbookStats()
454+
void Readout::publishLogbookStats(int timeout)
422455
{
423456
// gReadoutStats.print();
424457

425-
#ifdef WITH_LOGBOOK
426-
if (theLogbookThread != nullptr) {
427-
theLogbookThread->publishStats();
428-
}
429-
#endif
430-
431458
#ifdef WITH_DB
432459
if (dbHandle != nullptr) {
433460
dbHandle->updateRunCounters(
@@ -438,6 +465,15 @@ void Readout::publishLogbookStats()
438465
);
439466
}
440467
#endif
468+
469+
#ifdef WITH_LOGBOOK
470+
if (theLogbookThread != nullptr) {
471+
int err = theLogbookThread->publishStats(timeout);
472+
if ((timeout > 0) && (err)) {
473+
theLog.log(LogErrorDevel_(3210), "Logbook publish failed within given time (%d ms)", timeout);
474+
}
475+
}
476+
#endif
441477
}
442478

443479
int Readout::_init(int argc, char* argv[])
@@ -1384,6 +1420,7 @@ int Readout::_start()
13841420
theLog.log(LogInfoSupport_(3005), "Readout executing START");
13851421
gReadoutStats.reset(1);
13861422
gReadoutStats.counters.state = stringToUint64("> start");
1423+
gReadoutStats.counters.runNumber = occRunNumber;
13871424
gReadoutStats.counters.notify++;
13881425
gReadoutStats.publishNow();
13891426

@@ -1713,8 +1750,8 @@ int Readout::_stop()
17131750
// report log statistics
17141751
theLog.log("Errors: %lu Warnings: %lu", theLog.getMessageCount(InfoLogger::Severity::Error), theLog.getMessageCount(InfoLogger::Severity::Warning));
17151752

1716-
// publish final logbook statistics
1717-
publishLogbookStats();
1753+
// publish final logbook statistics (synchronously with timeout)
1754+
publishLogbookStats(3000);
17181755

17191756
// publish some final counters
17201757
theLog.log(LogInfoDevel_(3003), "Final counters: timeframes = %" PRIu64 " readout = %s recorded = %s",

0 commit comments

Comments
 (0)