Skip to content

Commit 8dde7e7

Browse files
committed
added dropIncomplete
1 parent 9caa6cf commit 8dde7e7

File tree

1 file changed

+99
-2
lines changed

1 file changed

+99
-2
lines changed

src/ConsumerFMQchannel.cxx

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class ConsumerFMQchannel : public Consumer
157157
bool enableStfSuperpage = false; // optimized stf transport: minimize STF packets
158158
bool enableRawFormatDatablock = false;
159159
int enablePackedCopy = 1; // default mode for repacking of page overlapping HBF. 0 = one page per copy, 1 = change page on TF only
160+
int dropIncomplete = 0; // TF with missing packets are discarded
160161

161162
std::shared_ptr<MemoryBank> memBank; // a dedicated memory bank allocated by FMQ mechanism
162163
std::shared_ptr<MemoryPagesPool> mp; // a memory pool from which to allocate data pages
@@ -167,6 +168,8 @@ class ConsumerFMQchannel : public Consumer
167168
CounterStats repackSizeStats; // keep track of page size used when repacking
168169
uint64_t nPagesUsedForRepack = 0; // count pages used for repack
169170
uint64_t nPagesUsedInput = 0; // count pages received
171+
uint64_t nIncompleteHBF = 0; // count incomplete HBF
172+
uint64_t TFdropped = 0; // number of TF dropped
170173

171174
// custom log function for memory pool
172175
void mplog(const std::string &msg) {
@@ -239,6 +242,12 @@ class ConsumerFMQchannel : public Consumer
239242
gReadoutStats.isFairMQ = 1; // enable FMQ stats
240243
}
241244

245+
// configuration parameter: | consumer-FairMQChannel-* | dropIncomplete | int | 0 | If set, TF with incomplete HBF (i.e. HBF having missing packets) are discarded. |
246+
cfg.getOptionalValue<int>(cfgEntryPoint + ".dropIncomplete", dropIncomplete, dropIncomplete);
247+
if (dropIncomplete) {
248+
theLog.log(LogInfoDevel_(3002), "TF with incomplete HBF will be discarded");
249+
}
250+
242251
// configuration parameter: | consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
243252
int cfgEnableRawFormat = 0;
244253
cfg.getOptionalValue<int>(cfgEntryPoint + ".enableRawFormat", cfgEnableRawFormat);
@@ -439,6 +448,10 @@ class ConsumerFMQchannel : public Consumer
439448
theLog.log(LogInfoDevel_(3003), "Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64 " repacked/received = %" PRIu64 "/%" PRIu64 " = %.1f%%", name.c_str(), repackSizeStats.getCount(), (uint64_t)repackSizeStats.getAverage(), repackSizeStats.getMaximum(), nPagesUsedForRepack, nPagesUsedInput, nPagesUsedForRepack * 100.0 / nPagesUsedInput);
440449
}
441450

451+
if (TFdropped) {
452+
theLog.log(LogInfoSupport_(3300), "Consumer %s - %llu incomplete TF dropped", name.c_str(), (unsigned long long)TFdropped);
453+
}
454+
442455
// release in reverse order
443456
mp = nullptr;
444457
memoryBuffer = nullptr; // warning: data range may still be referenced in memory bank manager
@@ -657,6 +670,7 @@ class ConsumerFMQchannel : public Consumer
657670

658671
wThreadOutput msglist;
659672
msglist = std::make_shared<std::vector<DDMessage>>();
673+
bool dropEntireTFonError = 0; // when set, the whole TF is dropped in case of issue on one link
660674
if (msglist == nullptr) {
661675
isError = 1;
662676
} else {
@@ -666,10 +680,14 @@ class ConsumerFMQchannel : public Consumer
666680
msglist->emplace_back();
667681
if (DDformatMessage(bc, msglist->back())!=0) {
668682
isError = 1;
669-
break;
683+
msglist->pop_back();
684+
if (dropEntireTFonError) break;
670685
}
671686
}
672-
if (!isError) {
687+
// ensure end-of-timeframe flag is set for last message
688+
msglist->back().stfHeader->lastTFMessage = 1;
689+
// send msg
690+
if ((!isError)||(!dropEntireTFonError)) {
673691
if (wThreads[thIx].output->push(std::move(msglist))) {
674692
isError = 1;
675693
} else {
@@ -783,6 +801,50 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
783801
unsigned int lastHBid = -1;
784802
int isFirst = true;
785803
int ix = 0;
804+
static InfoLogger::AutoMuteToken tokenHBF(LogWarningSupport_(3004));
805+
uint16_t HBFpagescounterFirst = 0; // pages counter for first RDH in HBF
806+
uint16_t HBFpagescounterLast = 0; // pages counter for last RDH in HBF
807+
int HBFpagescounter = 0; // number of pages in current HBF
808+
int HBFstop = 0; // number of stop bits for current HBF
809+
int HBFstopLast = 0; // stop bit value for last RDH in HBF
810+
int HBFisOk = 1;
811+
int HBFisFirst = 1;
812+
int HBFincomplete = 0;
813+
std::string HBFerr;
814+
int HBFerrid = 0;
815+
auto HBFincrerr = [&] () {
816+
HBFerr += " (" + std::to_string(++HBFerrid) + ") ";
817+
};
818+
auto checkLastHB = [&] () {
819+
if (HBFisFirst) {
820+
return; // no HBF seen so far
821+
}
822+
if (HBFstop != 1) {
823+
HBFincrerr();
824+
HBFerr += "wrong number of stop bits: " + std::to_string((int)HBFstop);
825+
HBFisOk = 0;
826+
}
827+
if (HBFstopLast != 1) {
828+
HBFincrerr();
829+
HBFerr += "no stop bit on last RDH";
830+
HBFisOk = 0;
831+
}
832+
//printf("HB 0x%X = %d pages\n",(int)lastHBid, (int)HBFpagescounter);
833+
834+
if (!HBFisOk) {
835+
HBFincomplete++;
836+
theLog.log(tokenHBF, "TF%d equipment %d link %d HBF 0x%X is incomplete: %s", (int)stfHeader->timeframeId, (int)stfHeader->equipmentId, (int)stfHeader->linkId, (int)lastHBid, HBFerr.c_str());
837+
}
838+
839+
// reset counters
840+
HBFpagescounter = 0;
841+
HBFstop = 0;
842+
HBFisOk = 1;
843+
HBFisFirst = 1;
844+
HBFerrid = 0;
845+
HBFerr = "";
846+
};
847+
786848
for (auto& br : *bc) {
787849
ix++;
788850
DataBlock* b = br->getData();
@@ -830,6 +892,8 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
830892
// printf("checking %p : %d\n",b,offset);
831893
o2::Header::RAWDataHeader* rdh = (o2::Header::RAWDataHeader*)&b->data[offset];
832894
if (rdh->heartbeatOrbit != lastHBid) {
895+
// this is a new HBF, finalize checks of previous one and reset
896+
checkLastHB();
833897
lastHBid = rdh->heartbeatOrbit;
834898
// printf("offset %d - HBid=%d\n",offset,lastHBid);
835899
}
@@ -839,6 +903,27 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
839903
// dumpRDH(rdh);
840904
// printf("block %p : offset %d = %p\n",b,offset,rdh);
841905
}
906+
907+
uint16_t HBFpagescounterNew = (uint16_t)rdh->pagesCounter;
908+
if (HBFisFirst) {
909+
HBFpagescounterFirst = HBFpagescounterNew;
910+
HBFisFirst = 0;
911+
if (HBFpagescounterFirst != 0) {
912+
HBFincrerr();
913+
HBFerr += "first pagesCounter not zero: " + std::to_string((int)HBFpagescounterFirst);
914+
}
915+
} else {
916+
if (HBFpagescounterNew != HBFpagescounterLast + 1) {
917+
HBFincrerr();
918+
HBFerr += "pagesCounter jump from " + std::to_string((int)HBFpagescounterLast)+ " to " + std::to_string( (int)HBFpagescounterNew);
919+
HBFisOk = 0;
920+
}
921+
}
922+
HBFpagescounter++;
923+
HBFpagescounterLast = HBFpagescounterNew;
924+
HBFstop += rdh->stopBit;
925+
HBFstopLast = rdh->stopBit;
926+
842927
uint16_t offsetNextPacket = rdh->offsetNextPacket;
843928
if (offsetNextPacket == 0) {
844929
break;
@@ -850,6 +935,17 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
850935
headerBlock->getData()->header.dataSize=sizeof(SubTimeframe);
851936
ddm.subTimeframeTotalSize += ddm.subTimeframeDataSize;
852937
ddm.subTimeframeFMQSize = 0;
938+
939+
// this is a new HBF, finalize checks of previous one
940+
checkLastHB();
941+
942+
nIncompleteHBF += HBFincomplete;
943+
if ((HBFincomplete) && (dropIncomplete)) {
944+
static InfoLogger::AutoMuteToken tokenTFdropped(LogWarningSupport_(3300));
945+
TFdropped++;
946+
theLog.log(tokenTFdropped, "%s eq %d link %d : TF %d dropped (total: %llu)", this->name.c_str(), (int)stfHeader->equipmentId, (int)stfHeader->linkId, (int)stfHeader->timeframeId, (unsigned long long)TFdropped);
947+
return -1;
948+
}
853949

854950
// printf("TF %d link %d = %d blocks \n",(int)stfHeader->timeframeId,(int)stfHeader->linkId,(int)bc->size());
855951

@@ -1129,6 +1225,7 @@ int ConsumerFMQchannel::processForDataDistribution(DataSetReference& bc) {
11291225
if (DDformatMessage(bc, msg)) {
11301226
isError = 1;
11311227
} else {
1228+
// sending now means flag for end-of-timeframe might be missing if something happens with next message
11321229
if (DDsendMessage(msg)) {
11331230
isError = 1;
11341231
}

0 commit comments

Comments
 (0)