Skip to content

Commit 33176dd

Browse files
authored
Merge pull request #247 from sy-c/master
v2.18.0
2 parents d01e89a + 08a5d13 commit 33176dd

File tree

7 files changed

+135
-10
lines changed

7 files changed

+135
-10
lines changed

doc/configurationParameters.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ The parameters related to 3rd-party libraries are described here for convenience
5656
| consumer-data-sampling-* | address | string | ipc:///tmp/readout-pipe-1 | Address of the data sampling. |
5757
| consumer-FairMQChannel-* | checkResources | string | | Check beforehand if unmanaged region would fit in given list of resources. Comma-separated list of items to be checked: eg /dev/shm, MemFree, MemAvailable. (any filesystem path, and any /proc/meminfo entry).|
5858
| consumer-FairMQChannel-* | disableSending | int | 0 | If set, no data is output to FMQ channel. Used for performance test to create FMQ shared memory segment without pushing the data. |
59+
| consumer-FairMQChannel-* | dropIncomplete | int | 0 | If set, TF with incomplete HBF (i.e. HBF having missing packets) are discarded. |
5960
| consumer-FairMQChannel-* | enablePackedCopy | int | 1 | If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages. |
6061
| consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
6162
| consumer-FairMQChannel-* | fmq-address | string | ipc:///tmp/pipe-readout | Address of the FMQ channel. Depends on transportType. c.f. FairMQ::FairMQChannel.h |

doc/releaseNotes.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,3 +533,10 @@ This file describes the main feature changes for each readout.exe released versi
533533

534534
## v2.17.2 - 08/03/2023
535535
- Updated naming of files saved to disk when equipment-*.saveErrorPagesMax is set. Now includes timestamp, equipment id, and file counter: e.g. /tmp/readout-t1678288819-eq1-superpage.1.raw
536+
537+
## v2.18.0 - 21/03/2023
538+
- Updated configuration parameters:
539+
- added consumer-FairMQchannel.dropIncomplete: when set, TF with incomplete HBF are discarded.
540+
- Minor internal changes (not for users):
541+
- equipment-player: stop generating data at beginning of EOR, for faster flushing.
542+
- consumer.FairMQchannel: using thread mode by default for DD formatting (still 1 thread by default, but better data checks than without threading mode). Using non-blocking FMQ send for faster handling of EOR if no process on the other side.

src/ConsumerFMQchannel.cxx

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class ConsumerFMQchannel : public Consumer
157157
bool enableStfSuperpage = false; // optimized stf transport: minimize STF packets
158158
bool enableRawFormatDatablock = false;
159159
int enablePackedCopy = 1; // default mode for repacking of page overlapping HBF. 0 = one page per copy, 1 = change page on TF only
160+
int dropIncomplete = 0; // TF with missing packets are discarded
160161

161162
std::shared_ptr<MemoryBank> memBank; // a dedicated memory bank allocated by FMQ mechanism
162163
std::shared_ptr<MemoryPagesPool> mp; // a memory pool from which to allocate data pages
@@ -167,6 +168,8 @@ class ConsumerFMQchannel : public Consumer
167168
CounterStats repackSizeStats; // keep track of page size used when repacking
168169
uint64_t nPagesUsedForRepack = 0; // count pages used for repack
169170
uint64_t nPagesUsedInput = 0; // count pages received
171+
uint64_t nIncompleteHBF = 0; // count incomplete HBF
172+
uint64_t TFdropped = 0; // number of TF dropped
170173

171174
// custom log function for memory pool
172175
void mplog(const std::string &msg) {
@@ -175,7 +178,7 @@ class ConsumerFMQchannel : public Consumer
175178
}
176179

177180
// pool of threads for the processing
178-
int nwThreads = 0;
181+
int nwThreads = 1;
179182
int wThreadFifoSize = 0;
180183

181184
struct DDMessage {
@@ -239,6 +242,12 @@ class ConsumerFMQchannel : public Consumer
239242
gReadoutStats.isFairMQ = 1; // enable FMQ stats
240243
}
241244

245+
// configuration parameter: | consumer-FairMQChannel-* | dropIncomplete | int | 0 | If set, TF with incomplete HBF (i.e. HBF having missing packets) are discarded. |
246+
cfg.getOptionalValue<int>(cfgEntryPoint + ".dropIncomplete", dropIncomplete, dropIncomplete);
247+
if (dropIncomplete) {
248+
theLog.log(LogInfoDevel_(3002), "TF with incomplete HBF will be discarded");
249+
}
250+
242251
// configuration parameter: | consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
243252
int cfgEnableRawFormat = 0;
244253
cfg.getOptionalValue<int>(cfgEntryPoint + ".enableRawFormat", cfgEnableRawFormat);
@@ -403,6 +412,9 @@ class ConsumerFMQchannel : public Consumer
403412

404413
// configuration parameter: | consumer-FairMQChannel-* | threads | int | 0 | If set, a pool of thread is created for the data processing. |
405414
cfg.getOptionalValue<int>(cfgEntryPoint + ".threads", nwThreads);
415+
if (nwThreads) {
416+
theLog.log(LogInfoDevel_(3008), "Using %d threads for DD formatting", nwThreads);
417+
}
406418
if (nwThreads) {
407419
wThreadFifoSize = 88 / nwThreads; // 1s of buffer
408420
wThreads.resize(nwThreads);
@@ -439,6 +451,10 @@ class ConsumerFMQchannel : public Consumer
439451
theLog.log(LogInfoDevel_(3003), "Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64 " repacked/received = %" PRIu64 "/%" PRIu64 " = %.1f%%", name.c_str(), repackSizeStats.getCount(), (uint64_t)repackSizeStats.getAverage(), repackSizeStats.getMaximum(), nPagesUsedForRepack, nPagesUsedInput, nPagesUsedForRepack * 100.0 / nPagesUsedInput);
440452
}
441453

454+
if (TFdropped) {
455+
theLog.log(LogInfoSupport_(3235), "Consumer %s - %llu incomplete TF dropped", name.c_str(), (unsigned long long)TFdropped);
456+
}
457+
442458
// release in reverse order
443459
mp = nullptr;
444460
memoryBuffer = nullptr; // warning: data range may still be referenced in memory bank manager
@@ -657,6 +673,7 @@ class ConsumerFMQchannel : public Consumer
657673

658674
wThreadOutput msglist;
659675
msglist = std::make_shared<std::vector<DDMessage>>();
676+
bool dropEntireTFonError = 0; // when set, the whole TF is dropped in case of issue on one link
660677
if (msglist == nullptr) {
661678
isError = 1;
662679
} else {
@@ -666,10 +683,14 @@ class ConsumerFMQchannel : public Consumer
666683
msglist->emplace_back();
667684
if (DDformatMessage(bc, msglist->back())!=0) {
668685
isError = 1;
669-
break;
686+
msglist->pop_back();
687+
if (dropEntireTFonError) break;
670688
}
671689
}
672-
if (!isError) {
690+
// ensure end-of-timeframe flag is set for last message
691+
msglist->back().stfHeader->lastTFMessage = 1;
692+
// send msg
693+
if ((!isError)||(!dropEntireTFonError)) {
673694
if (wThreads[thIx].output->push(std::move(msglist))) {
674695
isError = 1;
675696
} else {
@@ -783,6 +804,50 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
783804
unsigned int lastHBid = -1;
784805
int isFirst = true;
785806
int ix = 0;
807+
static InfoLogger::AutoMuteToken tokenHBF(LogWarningSupport_(3004));
808+
uint16_t HBFpagescounterFirst = 0; // pages counter for first RDH in HBF
809+
uint16_t HBFpagescounterLast = 0; // pages counter for last RDH in HBF
810+
int HBFpagescounter = 0; // number of pages in current HBF
811+
int HBFstop = 0; // number of stop bits for current HBF
812+
int HBFstopLast = 0; // stop bit value for last RDH in HBF
813+
int HBFisOk = 1;
814+
int HBFisFirst = 1;
815+
int HBFincomplete = 0;
816+
std::string HBFerr;
817+
int HBFerrid = 0;
818+
auto HBFincrerr = [&] () {
819+
HBFerr += " (" + std::to_string(++HBFerrid) + ") ";
820+
};
821+
auto checkLastHB = [&] () {
822+
if (HBFisFirst) {
823+
return; // no HBF seen so far
824+
}
825+
if (HBFstop != 1) {
826+
HBFincrerr();
827+
HBFerr += "wrong number of stop bits: " + std::to_string((int)HBFstop);
828+
HBFisOk = 0;
829+
}
830+
if (HBFstopLast != 1) {
831+
HBFincrerr();
832+
HBFerr += "no stop bit on last RDH";
833+
HBFisOk = 0;
834+
}
835+
//printf("HB 0x%X = %d pages\n",(int)lastHBid, (int)HBFpagescounter);
836+
837+
if (!HBFisOk) {
838+
HBFincomplete++;
839+
theLog.log(tokenHBF, "TF%d equipment %d link %d HBF 0x%X is incomplete: %s", (int)stfHeader->timeframeId, (int)stfHeader->equipmentId, (int)stfHeader->linkId, (int)lastHBid, HBFerr.c_str());
840+
}
841+
842+
// reset counters
843+
HBFpagescounter = 0;
844+
HBFstop = 0;
845+
HBFisOk = 1;
846+
HBFisFirst = 1;
847+
HBFerrid = 0;
848+
HBFerr = "";
849+
};
850+
786851
for (auto& br : *bc) {
787852
ix++;
788853
DataBlock* b = br->getData();
@@ -830,6 +895,8 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
830895
// printf("checking %p : %d\n",b,offset);
831896
o2::Header::RAWDataHeader* rdh = (o2::Header::RAWDataHeader*)&b->data[offset];
832897
if (rdh->heartbeatOrbit != lastHBid) {
898+
// this is a new HBF, finalize checks of previous one and reset
899+
checkLastHB();
833900
lastHBid = rdh->heartbeatOrbit;
834901
// printf("offset %d - HBid=%d\n",offset,lastHBid);
835902
}
@@ -839,6 +906,27 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
839906
// dumpRDH(rdh);
840907
// printf("block %p : offset %d = %p\n",b,offset,rdh);
841908
}
909+
910+
uint16_t HBFpagescounterNew = (uint16_t)rdh->pagesCounter;
911+
if (HBFisFirst) {
912+
HBFpagescounterFirst = HBFpagescounterNew;
913+
HBFisFirst = 0;
914+
if (HBFpagescounterFirst != 0) {
915+
HBFincrerr();
916+
HBFerr += "first pagesCounter not zero: " + std::to_string((int)HBFpagescounterFirst);
917+
}
918+
} else {
919+
if (HBFpagescounterNew != HBFpagescounterLast + 1) {
920+
HBFincrerr();
921+
HBFerr += "pagesCounter jump from " + std::to_string((int)HBFpagescounterLast)+ " to " + std::to_string( (int)HBFpagescounterNew);
922+
HBFisOk = 0;
923+
}
924+
}
925+
HBFpagescounter++;
926+
HBFpagescounterLast = HBFpagescounterNew;
927+
HBFstop += rdh->stopBit;
928+
HBFstopLast = rdh->stopBit;
929+
842930
uint16_t offsetNextPacket = rdh->offsetNextPacket;
843931
if (offsetNextPacket == 0) {
844932
break;
@@ -850,6 +938,17 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
850938
headerBlock->getData()->header.dataSize=sizeof(SubTimeframe);
851939
ddm.subTimeframeTotalSize += ddm.subTimeframeDataSize;
852940
ddm.subTimeframeFMQSize = 0;
941+
942+
// this is a new HBF, finalize checks of previous one
943+
checkLastHB();
944+
945+
nIncompleteHBF += HBFincomplete;
946+
if ((HBFincomplete) && (dropIncomplete)) {
947+
static InfoLogger::AutoMuteToken tokenTFdropped(LogWarningSupport_(3235));
948+
TFdropped++;
949+
theLog.log(tokenTFdropped, "%s eq %d link %d : TF %d dropped (total: %llu)", this->name.c_str(), (int)stfHeader->equipmentId, (int)stfHeader->linkId, (int)stfHeader->timeframeId, (unsigned long long)TFdropped);
950+
return -1;
951+
}
853952

854953
// printf("TF %d link %d = %d blocks \n",(int)stfHeader->timeframeId,(int)stfHeader->linkId,(int)bc->size());
855954

@@ -1098,7 +1197,12 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
10981197

10991198
int ConsumerFMQchannel::DDsendMessage(DDMessage &ddm) {
11001199
// send the messages
1101-
if (sendingChannel->Send(ddm.messagesToSend) >= 0) {
1200+
int err;
1201+
while (!wThreadShutdown) {
1202+
err = sendingChannel->Send(ddm.messagesToSend, 500);
1203+
if (err>=0) break;
1204+
}
1205+
if ( err >= 0) {
11021206
gReadoutStats.counters.bytesFairMQ += ddm.subTimeframeTotalSize;
11031207
gReadoutStats.counters.timeframeIdFairMQ = ddm.stfHeader->timeframeId;
11041208
gReadoutStats.counters.notify++;
@@ -1128,6 +1232,7 @@ int ConsumerFMQchannel::processForDataDistribution(DataSetReference& bc) {
11281232
if (DDformatMessage(bc, msg)) {
11291233
isError = 1;
11301234
} else {
1235+
// sending now means flag for end-of-timeframe might be missing if something happens with next message
11311236
if (DDsendMessage(msg)) {
11321237
isError = 1;
11331238
}

src/ReadoutEquipmentPlayer.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ ReadoutEquipmentPlayer::~ReadoutEquipmentPlayer()
197197

198198
DataBlockContainerReference ReadoutEquipmentPlayer::getNextBlock()
199199
{
200+
if (!isDataOn) {
201+
return nullptr;
202+
}
203+
200204
// query memory pool for a free block
201205
DataBlockContainerReference nextBlock = nullptr;
202206
try {

src/ReadoutVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12-
#define READOUT_VERSION "2.17.2"
12+
#define READOUT_VERSION "2.18.0"
1313

src/readoutConfigEditor.tcl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set configurationParametersDescriptor {
3232
| consumer-data-sampling-* | address | string | ipc:///tmp/readout-pipe-1 | Address of the data sampling. |
3333
| consumer-FairMQChannel-* | checkResources | string | | Check beforehand if unmanaged region would fit in given list of resources. Comma-separated list of items to be checked: eg /dev/shm, MemFree, MemAvailable. (any filesystem path, and any /proc/meminfo entry).|
3434
| consumer-FairMQChannel-* | disableSending | int | 0 | If set, no data is output to FMQ channel. Used for performance test to create FMQ shared memory segment without pushing the data. |
35+
| consumer-FairMQChannel-* | dropIncomplete | int | 0 | If set, TF with incomplete HBF (i.e. HBF having missing packets) are discarded. |
3536
| consumer-FairMQChannel-* | enablePackedCopy | int | 1 | If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages. |
3637
| consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
3738
| consumer-FairMQChannel-* | fmq-address | string | ipc:///tmp/pipe-readout | Address of the FMQ channel. Depends on transportType. c.f. FairMQ::FairMQChannel.h |

src/receiverFMQ.cxx

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ int main(int argc, const char** argv)
309309
unsigned long long nMsgParts = 0;
310310
unsigned long long nBytes = 0;
311311
unsigned long long nTF = 0;
312-
unsigned long long lastTFid = 0;
312+
unsigned long long lastTFid = undefinedTimeframeId;
313+
bool flagLastTFMessage = 0;
313314
bool isMultiPart = false;
314315

315316
double copyRatio = 0;
@@ -382,12 +383,18 @@ int main(int argc, const char** argv)
382383
}
383384
}
384385
if (stf->timeframeId != lastTFid) {
385-
if ((lastTFid) && (stf->timeframeId != lastTFid + 1)) {
386-
theLog.log(LogWarningSupport_(3237), "Non-continuous TF id ordering: was %d now %d", (int)lastTFid, (int)stf->timeframeId );
387-
}
386+
if (lastTFid != undefinedTimeframeId) {
387+
if ((lastTFid) && (stf->timeframeId != lastTFid + 1)) {
388+
theLog.log(LogWarningSupport_(3237), "Non-continuous TF id ordering: was %d now %d", (int)lastTFid, (int)stf->timeframeId );
389+
}
390+
if (flagLastTFMessage != 1) {
391+
theLog.log(LogWarningSupport_(3237), "TF id changed without lastTFMessage set in TF %d", (int)lastTFid);
392+
}
393+
}
388394
lastTFid = stf->timeframeId;
389395
nTF++;
390396
}
397+
flagLastTFMessage = stf->lastTFMessage;
391398
} else {
392399
if ((numberOfHBF != 0) && (stf->isRdhFormat)) {
393400
// then we have 1 part per HBF
@@ -402,7 +409,7 @@ int main(int argc, const char** argv)
402409
RdhHandle h(((uint8_t*)data) + pageOffset);
403410

404411
if (dumpNext) {
405-
printf("Receiving TF %d CRU %d.%d link %d : %d HBf\n", (int)stf->timeframeId, (int)h.getCruId(), (int)h.getEndPointId(), (int)stf->linkId, numberOfHBF);
412+
printf("Receiving TF %d CRU %d.%d link %d : %d HBf %c\n", (int)stf->timeframeId, (int)h.getCruId(), (int)h.getEndPointId(), (int)stf->linkId, numberOfHBF, (int)stf->lastTFMessage ? '*' : '.' );
406413
dumpNext = false;
407414
}
408415

0 commit comments

Comments
 (0)