@@ -200,11 +200,13 @@ class ConsumerFMQchannel : public Consumer
200200 std::unique_ptr<AliceO2::Common::Fifo<wThreadInput>> input;
201201 std::unique_ptr<AliceO2::Common::Fifo<wThreadOutput>> output;
202202 std::unique_ptr<std::thread> thread;
203+ bool isRunning;
203204 };
204205 std::vector<wThread> wThreads;
205206 int wThreadShutdown = 0 ;
206207 const int wThreadSleepTime = 1000 ; // sleep time in microseconds.
207208 std::unique_ptr<std::thread> senderThread; // this one empties the output FIFOs of the wThreads
209+ bool senderThreadIsRunning;
208210 int wThreadIxWrite = 0 ; // push data round-robin in wThreads
209211 int wThreadIxRead = 0 ; // read data round-robin in wThreads
210212 void cleanupThreads () {
@@ -224,11 +226,15 @@ class ConsumerFMQchannel : public Consumer
224226 nwThreads = 0 ;
225227 }
226228 }
229+ std::atomic<uint64_t > nTFdiscardedEOR = 0 ;
227230
228231 int processForDataDistribution (DataSetReference& bc);
229232
230233 public:
231234
235+ int start ();
236+ int stop ();
237+
232238 ConsumerFMQchannel (ConfigFile& cfg, std::string cfgEntryPoint) : Consumer(cfg, cfgEntryPoint)
233239 {
234240
@@ -445,16 +451,6 @@ class ConsumerFMQchannel : public Consumer
445451 // stop threads
446452 cleanupThreads ();
447453
448- // log memory pool statistics
449- if (mp!=nullptr ) {
450- theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - memory pool statistics ... %s" , name.c_str (), mp->getStats ().c_str ());
451- theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64 " repacked/received = %" PRIu64 " /%" PRIu64 " = %.1f%%" , name.c_str (), repackSizeStats.getCount (), (uint64_t )repackSizeStats.getAverage (), repackSizeStats.getMaximum (), nPagesUsedForRepack, nPagesUsedInput, nPagesUsedForRepack * 100.0 / nPagesUsedInput);
452- }
453-
454- if (TFdropped) {
455- theLog.log (LogInfoSupport_ (3235 ), " Consumer %s - %llu incomplete TF dropped" , name.c_str (), (unsigned long long )TFdropped);
456- }
457-
458454 // release in reverse order
459455 mp = nullptr ;
460456 memoryBuffer = nullptr ; // warning: data range may still be referenced in memory bank manager
@@ -643,6 +639,19 @@ class ConsumerFMQchannel : public Consumer
643639 break ;
644640 }
645641
642+ if (!isRunning) {
643+ // when not running, empty incoming buffer and get ready to start
644+ wThreadInput tf;
645+ while ( wThreads[thIx].input ->pop (tf) == 0 ) {
646+ nTFdiscardedEOR++;
647+ }
648+ pushCount = 0 ;
649+ wThreads[thIx].isRunning = 0 ;
650+ usleep (wThreadSleepTime);
651+ continue ;
652+ }
653+ wThreads[thIx].isRunning = 1 ;
654+
646655 // wait that there is a slot in outgoing FIFO
647656 if (wThreads[thIx].output ->isFull ()) {
648657 usleep (wThreadSleepTime);
@@ -716,6 +725,22 @@ class ConsumerFMQchannel : public Consumer
716725 break ;
717726 }
718727
728+ if (!isRunning) {
729+ // when not running, empty output buffer and get ready to start
730+ for (thIx = 0 ; thIx < nwThreads; thIx++) {
731+ wThreadOutput msglist;
732+ while ( wThreads[thIx].output ->pop (msglist) == 0 ) {
733+ nTFdiscardedEOR++;
734+ }
735+ }
736+ thIx = 0 ;
737+ lastTimeframeId = undefinedTimeframeId;
738+ senderThreadIsRunning = 0 ;
739+ usleep (wThreadSleepTime);
740+ continue ;
741+ }
742+ senderThreadIsRunning = 1 ;
743+
719744 // get a TF from FIFO
720745 wThreadOutput msglist;
721746 if (wThreads[thIx].output ->pop (msglist) != 0 ) {
@@ -748,9 +773,12 @@ class ConsumerFMQchannel : public Consumer
748773 if (DDsendMessage (msg)) {
749774 // sending failed
750775 isError = 1 ;
751- }
776+ }
752777 }
753778 if (isError) {
779+ if (!isRunning) {
780+ nTFdiscardedEOR++; // account for this one at EOR flush
781+ }
754782 totalPushError++;
755783 }
756784
@@ -1198,16 +1226,19 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
11981226int ConsumerFMQchannel::DDsendMessage (DDMessage &ddm) {
11991227 // send the messages
12001228 int err;
1201- while (!wThreadShutdown) {
1202- err = sendingChannel->Send (ddm.messagesToSend , 500 );
1229+ while (( !wThreadShutdown) && (isRunning) ) {
1230+ err = sendingChannel->Send (ddm.messagesToSend , 1 + wThreadSleepTime / 100 );
12031231 if (err>=0 ) break ;
12041232 }
12051233 if ( err >= 0 ) {
12061234 gReadoutStats .counters .bytesFairMQ += ddm.subTimeframeTotalSize ;
12071235 gReadoutStats .counters .timeframeIdFairMQ = ddm.stfHeader ->timeframeId ;
12081236 gReadoutStats .counters .notify ++;
12091237 } else {
1210- theLog.log (LogErrorSupport_ (3233 ), " Sending failed" );
1238+ if ((!wThreadShutdown) && (isRunning)) {
1239+ static InfoLogger::AutoMuteToken logtokenfmq (LogWarningSupport_ (3233 ), 1 , 60 );
1240+ theLog.log (logtokenfmq, " FMQ sending failed" );
1241+ }
12111242 return -1 ;
12121243 }
12131244
@@ -1316,6 +1347,74 @@ int ConsumerFMQchannel::processForDataDistribution(DataSetReference& bc) {
13161347 return 0 ;
13171348}
13181349
1350+ int ConsumerFMQchannel::start () {
1351+ nTFdiscardedEOR = 0 ;
1352+
1353+ repackSizeStats.reset ();
1354+ nPagesUsedForRepack = 0 ;
1355+ nPagesUsedInput = 0 ;
1356+ nIncompleteHBF = 0 ;
1357+ TFdropped = 0 ;
1358+
1359+ return Consumer::start ();
1360+ }
1361+ int ConsumerFMQchannel::stop () {
1362+ nTFdiscardedEOR = 0 ;
1363+ isRunning = 0 ;
1364+ double timeout = 1.0 ; // 1s should be enough, it was tested that FMQ usually release pages every 0.5s
1365+
1366+ theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - cleaning up pending data, timeout = %.2fs" , name.c_str (), timeout);
1367+
1368+ AliceO2::Common::Timer stopTimer;
1369+ stopTimer.reset (timeout * 1000000 ); // in microseconds
1370+
1371+ // wait for threads for a minimum time
1372+ while (!stopTimer.isTimeout ()) {
1373+ usleep (wThreadSleepTime); // first leave a chance to update isRunning flag
1374+ int nRunning = 0 ;
1375+ for (auto & w : wThreads) {
1376+ if (w.thread != nullptr ) {
1377+ nRunning += w.isRunning ;
1378+ }
1379+ }
1380+ if (!nRunning) {
1381+ break ;
1382+ }
1383+ }
1384+ if (senderThread) {
1385+ senderThreadIsRunning=1 ; // ensure we do another iteration now that working threads cleaned
1386+ while (!stopTimer.isTimeout ()) {
1387+ usleep (wThreadSleepTime); // first leave a chance to update isRunning flag
1388+ if (!senderThreadIsRunning) {
1389+ break ;
1390+ }
1391+ }
1392+ }
1393+ // FMQ release is asynchronous... wait until all pages released
1394+ if (gReadoutStats .counters .pagesPendingFairMQ .load () != 0 ) {
1395+ theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - waiting FMQ to release %" PRIu64 " pages" , name.c_str (), gReadoutStats .counters .pagesPendingFairMQ .load ());
1396+ }
1397+ while (!stopTimer.isTimeout ()) {
1398+ if (gReadoutStats .counters .pagesPendingFairMQ .load () == 0 ) break ;
1399+ usleep (wThreadSleepTime);
1400+ }
1401+
1402+ // report
1403+ theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - discarded %" PRIu64 " TFs from buffer at End Of Run" , name.c_str (), nTFdiscardedEOR.load ());
1404+
1405+ // log memory pool statistics
1406+ if (mp!=nullptr ) {
1407+ theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - memory pool statistics ... %s" , name.c_str (), mp->getStats ().c_str ());
1408+ theLog.log (LogInfoDevel_ (3003 ), " Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64 " repacked/received = %" PRIu64 " /%" PRIu64 " = %.1f%%" , name.c_str (), repackSizeStats.getCount (), (uint64_t )repackSizeStats.getAverage (), repackSizeStats.getMaximum (), nPagesUsedForRepack, nPagesUsedInput, nPagesUsedForRepack * 100.0 / nPagesUsedInput);
1409+ }
1410+
1411+ if (TFdropped) {
1412+ theLog.log (LogInfoSupport_ (3235 ), " Consumer %s - %llu incomplete TF dropped" , name.c_str (), (unsigned long long )TFdropped);
1413+ }
1414+
1415+ // wait threads completed
1416+ return Consumer::stop ();
1417+ }
13191418
13201419std::unique_ptr<Consumer> getUniqueConsumerFMQchannel (ConfigFile& cfg, std::string cfgEntryPoint) { return std::make_unique<ConsumerFMQchannel>(cfg, cfgEntryPoint); }
13211420
0 commit comments