Skip to content

Commit 8ef8ebc

Browse files
committed
added separate thread for logbook
1 parent 8719af3 commit 8ef8ebc

File tree

2 files changed

+76
-33
lines changed

2 files changed

+76
-33
lines changed

doc/releaseNotes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,3 +563,6 @@ This file describes the main feature changes for each readout.exe released versi
563563
- o2-readout-test-fmq-memory: more options.
564564
- Added protection against unhandled exceptions (e.g. from ReadoutCard).
565565
- Added protection against unexpected state machine transitions (e.g. ECS sending RESET while STARTING).
566+
567+
## next version
568+
- Added a separate thread to call bookkeeping API functions, so that data flow is not affected in case they are blocking.

src/mainReadout.cxx

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,68 @@ class TheLogRedirection {
193193
}
194194
};
195195

196+
// an object running a thread to publish data to logbook
197+
// this allows isolating blocking calls to the bookkeeping api
198+
#ifdef WITH_LOGBOOK
199+
class LogbookThread {
200+
public:
201+
LogbookThread(std::unique_ptr<o2::bkp::api::BkpClient> h) {
202+
this->logbookHandle = std::move(h);
203+
shutdownRequest = 0;
204+
publishRequest = 0;
205+
std::function<void(void)> f = std::bind(&LogbookThread::run, this);
206+
th = std::make_unique<std::thread>(f);
207+
};
208+
~LogbookThread() {
209+
shutdownRequest = 1;
210+
if (th != nullptr) {
211+
th->join();
212+
th = nullptr;
213+
}
214+
};
215+
int publishStats() {
216+
if (logbookHandle == nullptr) return __LINE__; // fail if no connection
217+
if (publishRequest.load()) return __LINE__; // fail if request already pending
218+
publishRequest = 1;
219+
return 0;
220+
};
221+
private:
222+
std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle; // handle to logbook
223+
std::unique_ptr<std::thread> th; // a thread reading from fd and injecting to theLog
224+
std::atomic<int> shutdownRequest; // flag to terminate thread
225+
std::atomic<int> publishRequest; // flag to ask thread to publish current values
226+
void run() {
227+
setThreadName("logbook");
228+
// thread loop, 10Hz
229+
while (!shutdownRequest && (logbookHandle != nullptr)) {
230+
if (publishRequest.load() == 1) {
231+
bool isOk = false;
232+
try {
233+
// interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
234+
logbookHandle->flp()->updateReadoutCountersByFlpNameAndRunNumber(
235+
occRole, occRunNumber,
236+
(int64_t)gReadoutStats.counters.numberOfSubtimeframes, (int64_t)gReadoutStats.counters.bytesReadout, (int64_t)gReadoutStats.counters.bytesRecorded, (int64_t)gReadoutStats.counters.bytesFairMQ
237+
);
238+
isOk = true;
239+
} catch (const std::exception& ex) {
240+
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: %s", ex.what());
241+
} catch (...) {
242+
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: unknown exception");
243+
}
244+
if (!isOk) {
245+
// closing logbook immediately
246+
logbookHandle = nullptr;
247+
theLog.log(LogErrorSupport_(3210), "Logbook now disabled");
248+
break;
249+
}
250+
publishRequest = 0;
251+
}
252+
usleep(100000);
253+
}
254+
}
255+
};
256+
#endif
257+
196258
class Readout
197259
{
198260

@@ -331,8 +393,9 @@ class Readout
331393
bool logFirstError = 0; // flag set to 1 after 1 error reported from iterateCheck/iterateRunning procedures
332394

333395
#ifdef WITH_LOGBOOK
334-
std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle; // handle to logbook
396+
std::unique_ptr<LogbookThread> theLogbookThread; // handle to logbook
335397
#endif
398+
336399
#ifdef WITH_DB
337400
std::unique_ptr<ReadoutDatabase> dbHandle; // handle to readout database
338401
#endif
@@ -351,37 +414,12 @@ bool testLogbook = false; // flag for logbook test mode
351414

352415
void Readout::publishLogbookStats()
353416
{
417+
// gReadoutStats.print();
418+
354419
#ifdef WITH_LOGBOOK
355-
if (logbookHandle != nullptr) {
356-
bool isOk = false;
357-
try {
358-
// interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
359-
if (testLogbook) {
360-
// in test mode, create a dummy run entry in logbook
361-
if (occRole.length() == 0) { occRole = "flp-test"; }
362-
if (occRunNumber == 0) { occRunNumber = 999999999; }
363-
theLog.log(LogInfoDevel_(3210), "Logbook in test mode: create run number/flp %d / %s", (int)occRunNumber, occRole.c_str());
364-
/*
365-
std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
366-
logbookHandle->runStart(occRunNumber, now, now, "readout", RunType::TECHNICAL, 0, 0, 0, false, false, false, "normal");
367-
logbookHandle->flpAdd(occRole, "localhost", occRunNumber);
368-
*/
369-
testLogbook=0;
370-
}
371-
logbookHandle->flp()->updateReadoutCountersByFlpNameAndRunNumber(occRole, occRunNumber, (int64_t)gReadoutStats.counters.numberOfSubtimeframes, (int64_t)gReadoutStats.counters.bytesReadout, (int64_t)gReadoutStats.counters.bytesRecorded, (int64_t)gReadoutStats.counters.bytesFairMQ);
372-
isOk = true;
373-
} catch (const std::exception& ex) {
374-
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: %s", ex.what());
375-
} catch (...) {
376-
theLog.log(LogErrorDevel_(3210), "Failed to update logbook: unknown exception");
377-
}
378-
if (!isOk) {
379-
// closing logbook immediately
380-
logbookHandle = nullptr;
381-
theLog.log(LogErrorSupport_(3210), "Logbook now disabled");
382-
}
420+
if (theLogbookThread != nullptr) {
421+
theLogbookThread->publishStats();
383422
}
384-
// gReadoutStats.print();
385423
#endif
386424

387425
#ifdef WITH_DB
@@ -924,9 +962,11 @@ int Readout::_configure(const boost::property_tree::ptree& properties)
924962
cfg.getOptionalValue<std::string>("readout.logbookUrl", cfgLogbookUrl);
925963

926964
theLog.log(LogInfoDevel, "Logbook enabled, %ds update interval, using URL = %s", cfgLogbookUpdateInterval, cfgLogbookUrl.c_str());
927-
logbookHandle = o2::bkp::api::BkpClientFactory::create(cfgLogbookUrl);
965+
auto logbookHandle = o2::bkp::api::BkpClientFactory::create(cfgLogbookUrl);
928966
if (logbookHandle == nullptr) {
929967
theLog.log(LogErrorSupport_(3210), "Failed to create handle to logbook");
968+
} else {
969+
theLogbookThread = std::make_unique<LogbookThread>(std::move(logbookHandle));
930970
}
931971
#endif
932972
}
@@ -1731,7 +1771,7 @@ int Readout::_reset()
17311771

17321772
#ifdef WITH_LOGBOOK
17331773
// closing logbook
1734-
logbookHandle = nullptr;
1774+
theLogbookThread = nullptr;
17351775
#endif
17361776

17371777
#ifdef WITH_ZMQ
@@ -2112,7 +2152,6 @@ int main(int argc, char* argv[])
21122152
} else if (theState == States::Configured) {
21132153
if (theCommand == Commands::Start) {
21142154
occRunNumber++;
2115-
printf("run number = %d\n", occRunNumber);
21162155
err = theReadout->start();
21172156
if (err) {
21182157
newState = States::Error;
@@ -2223,6 +2262,7 @@ int main(int argc, char* argv[])
22232262

22242263
// loop for testing, single iteration in normal conditions
22252264
for (int i = 0; i < nloop; i++) {
2265+
occRunNumber++;
22262266
err = theReadout->start();
22272267
if (err) {
22282268
return err;

0 commit comments

Comments
 (0)