@@ -193,6 +193,68 @@ class TheLogRedirection {
193193 }
194194};
195195
196+ // an object running a thread to publish data to logbook
197+ // this allows isolating blocking calls to the bookkeeping api
198+ #ifdef WITH_LOGBOOK
199+ class LogbookThread {
200+ public:
201+ LogbookThread (std::unique_ptr<o2::bkp::api::BkpClient> h) {
202+ this ->logbookHandle = std::move (h);
203+ shutdownRequest = 0 ;
204+ publishRequest = 0 ;
205+ std::function<void (void )> f = std::bind (&LogbookThread::run, this );
206+ th = std::make_unique<std::thread>(f);
207+ };
208+ ~LogbookThread () {
209+ shutdownRequest = 1 ;
210+ if (th != nullptr ) {
211+ th->join ();
212+ th = nullptr ;
213+ }
214+ };
215+ int publishStats () {
216+ if (logbookHandle == nullptr ) return __LINE__; // fail if no connection
217+ if (publishRequest.load ()) return __LINE__; // fail if request already pending
218+ publishRequest = 1 ;
219+ return 0 ;
220+ };
221+ private:
222+ std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle; // handle to logbook
223+ std::unique_ptr<std::thread> th; // a thread reading from fd and injecting to theLog
224+ std::atomic<int > shutdownRequest; // flag to terminate thread
225+ std::atomic<int > publishRequest; // flag to ask thread to publish current values
226+ void run () {
227+ setThreadName (" logbook" );
228+ // thread loop, 10Hz
229+ while (!shutdownRequest && (logbookHandle != nullptr )) {
230+ if (publishRequest.load () == 1 ) {
231+ bool isOk = false ;
232+ try {
233+ // interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
234+ logbookHandle->flp ()->updateReadoutCountersByFlpNameAndRunNumber (
235+ occRole, occRunNumber,
236+ (int64_t )gReadoutStats .counters .numberOfSubtimeframes , (int64_t )gReadoutStats .counters .bytesReadout , (int64_t )gReadoutStats .counters .bytesRecorded , (int64_t )gReadoutStats .counters .bytesFairMQ
237+ );
238+ isOk = true ;
239+ } catch (const std::exception& ex) {
240+ theLog.log (LogErrorDevel_ (3210 ), " Failed to update logbook: %s" , ex.what ());
241+ } catch (...) {
242+ theLog.log (LogErrorDevel_ (3210 ), " Failed to update logbook: unknown exception" );
243+ }
244+ if (!isOk) {
245+ // closing logbook immediately
246+ logbookHandle = nullptr ;
247+ theLog.log (LogErrorSupport_ (3210 ), " Logbook now disabled" );
248+ break ;
249+ }
250+ publishRequest = 0 ;
251+ }
252+ usleep (100000 );
253+ }
254+ }
255+ };
256+ #endif
257+
196258class Readout
197259{
198260
@@ -331,8 +393,9 @@ class Readout
331393 bool logFirstError = 0 ; // flag set to 1 after 1 error reported from iterateCheck/iterateRunning procedures
332394
333395#ifdef WITH_LOGBOOK
334- std::unique_ptr<o2::bkp::api::BkpClient> logbookHandle ; // handle to logbook
396+ std::unique_ptr<LogbookThread> theLogbookThread ; // handle to logbook
335397#endif
398+
336399#ifdef WITH_DB
337400 std::unique_ptr<ReadoutDatabase> dbHandle; // handle to readout database
338401#endif
@@ -351,37 +414,12 @@ bool testLogbook = false; // flag for logbook test mode
351414
352415void Readout::publishLogbookStats ()
353416{
417+ // gReadoutStats.print();
418+
354419#ifdef WITH_LOGBOOK
355- if (logbookHandle != nullptr ) {
356- bool isOk = false ;
357- try {
358- // interface: https://github.com/AliceO2Group/Bookkeeping/tree/main/cxx-client/include/BookkeepingApi
359- if (testLogbook) {
360- // in test mode, create a dummy run entry in logbook
361- if (occRole.length () == 0 ) { occRole = " flp-test" ; }
362- if (occRunNumber == 0 ) { occRunNumber = 999999999 ; }
363- theLog.log (LogInfoDevel_ (3210 ), " Logbook in test mode: create run number/flp %d / %s" , (int )occRunNumber, occRole.c_str ());
364- /*
365- std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
366- logbookHandle->runStart(occRunNumber, now, now, "readout", RunType::TECHNICAL, 0, 0, 0, false, false, false, "normal");
367- logbookHandle->flpAdd(occRole, "localhost", occRunNumber);
368- */
369- testLogbook=0 ;
370- }
371- logbookHandle->flp ()->updateReadoutCountersByFlpNameAndRunNumber (occRole, occRunNumber, (int64_t )gReadoutStats .counters .numberOfSubtimeframes , (int64_t )gReadoutStats .counters .bytesReadout , (int64_t )gReadoutStats .counters .bytesRecorded , (int64_t )gReadoutStats .counters .bytesFairMQ );
372- isOk = true ;
373- } catch (const std::exception& ex) {
374- theLog.log (LogErrorDevel_ (3210 ), " Failed to update logbook: %s" , ex.what ());
375- } catch (...) {
376- theLog.log (LogErrorDevel_ (3210 ), " Failed to update logbook: unknown exception" );
377- }
378- if (!isOk) {
379- // closing logbook immediately
380- logbookHandle = nullptr ;
381- theLog.log (LogErrorSupport_ (3210 ), " Logbook now disabled" );
382- }
420+ if (theLogbookThread != nullptr ) {
421+ theLogbookThread->publishStats ();
383422 }
384- // gReadoutStats.print();
385423#endif
386424
387425#ifdef WITH_DB
@@ -924,9 +962,11 @@ int Readout::_configure(const boost::property_tree::ptree& properties)
924962 cfg.getOptionalValue <std::string>(" readout.logbookUrl" , cfgLogbookUrl);
925963
926964 theLog.log (LogInfoDevel, " Logbook enabled, %ds update interval, using URL = %s" , cfgLogbookUpdateInterval, cfgLogbookUrl.c_str ());
927- logbookHandle = o2::bkp::api::BkpClientFactory::create (cfgLogbookUrl);
965+ auto logbookHandle = o2::bkp::api::BkpClientFactory::create (cfgLogbookUrl);
928966 if (logbookHandle == nullptr ) {
929967 theLog.log (LogErrorSupport_ (3210 ), " Failed to create handle to logbook" );
968+ } else {
969+ theLogbookThread = std::make_unique<LogbookThread>(std::move (logbookHandle));
930970 }
931971#endif
932972 }
@@ -1731,7 +1771,7 @@ int Readout::_reset()
17311771
17321772#ifdef WITH_LOGBOOK
17331773 // closing logbook
1734- logbookHandle = nullptr ;
1774+ theLogbookThread = nullptr ;
17351775#endif
17361776
17371777#ifdef WITH_ZMQ
@@ -2112,7 +2152,6 @@ int main(int argc, char* argv[])
21122152 } else if (theState == States::Configured) {
21132153 if (theCommand == Commands::Start) {
21142154 occRunNumber++;
2115- printf (" run number = %d\n " , occRunNumber);
21162155 err = theReadout->start ();
21172156 if (err) {
21182157 newState = States::Error;
@@ -2223,6 +2262,7 @@ int main(int argc, char* argv[])
22232262
22242263 // loop for testing, single iteration in normal conditions
22252264 for (int i = 0 ; i < nloop; i++) {
2265+ occRunNumber++;
22262266 err = theReadout->start ();
22272267 if (err) {
22282268 return err;
0 commit comments