@@ -579,7 +579,13 @@ class ConsumerFMQchannel : public Consumer
579579 // arg thIx is the thread index
580580 std::string thname = name + " -w-" + std::to_string (thIx);
581581 setThreadName (thname.c_str ());
582+ int pushCount = 0 ; // keep track of incoming FIFO items, and ensure same number pushed out (if necessary filling with empty items)
582583 for (;;) {
584+ if (pushCount) {
585+ wThreads[thIx].output ->push (nullptr );
586+ pushCount--; // one is out
587+ }
588+
583589 if (wThreadShutdown) {
584590 break ;
585591 }
@@ -597,6 +603,7 @@ class ConsumerFMQchannel : public Consumer
597603 usleep (wThreadSleepTime);
598604 continue ;
599605 }
606+ pushCount++; // one is in
600607
601608 if (tf == nullptr ) {
602609 continue ;
@@ -628,7 +635,9 @@ class ConsumerFMQchannel : public Consumer
628635 if (!isError) {
629636 if (wThreads[thIx].output ->push (std::move (msglist))) {
630637 isError = 1 ;
631- }
638+ } else {
639+ pushCount--; // one is out
640+ }
632641 }
633642 }
634643 if (isError) {
@@ -643,6 +652,7 @@ class ConsumerFMQchannel : public Consumer
643652 setThreadName (thname.c_str ());
644653
645654 int thIx = 0 ; // index of next thread to read from
655+ uint64_t lastTimeframeId = undefinedTimeframeId; // latest TF id received
646656 for (;;) {
647657 if (wThreadShutdown) {
648658 break ;
@@ -660,11 +670,23 @@ class ConsumerFMQchannel : public Consumer
660670 thIx = 0 ;
661671 }
662672
673+ if (msglist == nullptr ) {
674+ // this can happen when an empty item is pushed (in case there was an error processing it)
675+ // in order to keep all FIFOs in sync round robin
676+ continue ;
677+ }
678+ uint64_t nextTimeframeId = msglist->at (0 ).stfHeader ->timeframeId ;
679+ if ((lastTimeframeId != undefinedTimeframeId) && (nextTimeframeId != lastTimeframeId + 1 )) {
680+ static InfoLogger::AutoMuteToken token (LogWarningSupport_ (3004 ));
681+ theLog.log (token, " %s - DD send - TF %d following TF %d: non-continuous ordering" , name.c_str (), (int )nextTimeframeId, (int )lastTimeframeId);
682+ }
683+ lastTimeframeId = nextTimeframeId;
684+
663685 // printf("sender: got TF\n");
664686
665687 bool isError = 0 ;
666688 for (auto &msg: *msglist) {
667- // printf ("sending thread TF %d\n", (int) msg.stfHeader->timeframeId);
689+ // printf ("sending thread TF %d (from fifo %d) \n", (int) msg.stfHeader->timeframeId, thIx );
668690 if (DDsendMessage (msg)) {
669691 // sending failed
670692 isError = 1 ;
@@ -1078,7 +1100,7 @@ int ConsumerFMQchannel::processForDataDistribution(DataSetReference& bc) {
10781100 if (currentTimeframeBuffer == nullptr ) {
10791101 return 0 ;
10801102 }
1081- // printf( "push %d - %d datasets\n", (int)currentTimeframeId, (int) currentTimeframeBuffer->size());
1103+ // printf( "push %d @ %d - %d datasets\n", (int)currentTimeframeId, (int) wThreadIxWrite , (int) currentTimeframeBuffer->size());
10821104
10831105 if (wThreads[wThreadIxWrite].input ->push (currentTimeframeBuffer)) {
10841106 static InfoLogger::AutoMuteToken token (LogWarningSupport_ (3004 ));
0 commit comments