Skip to content

Commit 20a5aa2

Browse files
authored
Merge pull request #238 from sy-c/master
update
2 parents 9313ca8 + c555672 commit 20a5aa2

22 files changed

+555
-45
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ target_include_directories(objReadoutAggregator PRIVATE ${READOUT_INCLUDE_DIRS})
239239
# utilities, including memory handling
240240
add_library(
241241
objReadoutUtils OBJECT
242+
${SOURCE_DIR}/readoutInfoLogger.cxx
242243
${SOURCE_DIR}/ReadoutUtils.cxx
243244
${SOURCE_DIR}/RdhUtils.cxx
244245
${SOURCE_DIR}/CounterStats.cxx

doc/configurationParameters.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ The parameters related to 3rd-party libraries are described here for convenience
6969
| consumer-FairMQChannel-* | unmanagedMemorySize | bytes | | Size of the memory region to be created. c.f. FairMQ::FairMQUnmanagedRegion.h. If not set, no special FMQ memory region is created. |
7070
| consumer-fileRecorder-* | bytesMax | bytes | 0 | Maximum number of bytes to write to each file. Data pages are never truncated, so if writing the full page would exceed this limit, no data from that page is written at all and file is closed. If zero (default), no maximum size set.|
7171
| consumer-fileRecorder-* | dataBlockHeaderEnabled | int | 0 | Enable (1) or disable (0) the writing to file of the internal readout header (Readout DataBlock.h) between the data pages, to easily navigate through the file without RDH decoding. If disabled, the raw data pages received from CRU are written without further formatting. |
72-
| consumer-fileRecorder-* | dropEmptyHBFrames | int | 0 | If 1, memory pages are scanned and empty HBframes are discarded, i.e. couples of packets which contain only RDH, the first one with pagesCounter=0 and the second with stop bit set. This setting does not change the content of in-memory data pages, other consumers would still get full data pages with empty packets. This setting is meant to reduce the amount of data recorded for continuous detectors in triggered mode.|
72+
| consumer-fileRecorder-* | dropEmptyHBFrames | int | 0 | If 1, memory pages are scanned and empty HBframes are discarded, i.e. couples of packets which contain only RDH, the first one with pagesCounter=0 and the second with stop bit set. This setting does not change the content of in-memory data pages, other consumers would still get full data pages with empty packets. This setting is meant to reduce the amount of data recorded for continuous detectors in triggered mode. Use with dropEmptyHBFramesTriggerMask, if some empty frames with specific trigger types need to be kept (eg TF or SOC). |
73+
| consumer-fileRecorder-* | dropEmptyHBFramesTriggerMask | int | 0 | (when using dropEmptyHBFrames = 1) empty HB frames are kept if any bit in RDH TriggerType field matches this pattern (RDHTriggerType & TriggerMask != 0). To be provided as a decimal value: eg 2048 (TF triggers, bit 11), 3584 (TF + SOC + EOC bits 9,10,11). |
7374
| consumer-fileRecorder-* | fileName | string | | Path to the file where to record data. The following variables are replaced at runtime: ${XXX} -> get variable XXX from environment, %t -> unix timestamp (seconds since epoch), %T -> formatted date/time, %i -> equipment ID of each data chunk (used to write data from different equipments to different output files), %l -> link ID (used to write data from different links to different output files). |
7475
| consumer-fileRecorder-* | filesMax | int | 1 | If 1 (default), file splitting is disabled: file is closed whenever a limit is reached on a given recording stream. Otherwise, file splitting is enabled: whenever the current file reaches a limit, it is closed an new one is created (with an incremental name). If <=0, an unlimited number of incremental chunks can be created. If non-zero, it defines the maximum number of chunks. The file name is suffixed with chunk number (by default, ".001, .002, ..." at the end of the file name. One may use "%f" in the file name to define where this incremental file counter is printed. |
7576
| consumer-fileRecorder-* | pagesMax | int | 0 | Maximum number of data pages accepted by recorder. If zero (default), no maximum set.|

doc/releaseNotes.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,3 +504,10 @@ This file describes the main feature changes for each readout.exe released versi
504504

505505
## v2.15.2 - 08/12/2022
506506
- Added ROC link Id for some log messages, as provided from new superpage metadata link field from ROC library.
507+
508+
## next version
509+
- Superpage internal metadata not stored at beginning of page but separately in heap, by default. Behavior can be adjusted with MemoryPool::headerReservedSpace.
510+
- equipment-CruEmulator: set TF trigger type (bit 11) on new TF. Used for dropEmptyHBFramesTriggerMask tests.
511+
- Updated configuration parameters:
512+
- added consumer-fileRecorder-*.dropEmptyHBFramesTriggerMask: when using dropEmptyHBFrames = 1, keep some empty HB frames with trigger type matching given (decimal) mask.
513+
- Added support for RDHv7. Readout already accepts data from equipments with v7, but still generates data (internal sw generator) with v6, for compatibility with current software downstream.

src/ConsumerFMQchannel.cxx

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12+
#define ENABLE_LOG_CODEWRONG
13+
#include "readoutInfoLogger.h"
14+
1215
#include "Consumer.h"
1316
#include "MemoryBank.h"
1417
#include "MemoryBankManager.h"
@@ -62,8 +65,13 @@ static_assert(sizeof(DataBlockFMQStats) <= DataBlockHeaderUserSpace, "DataBlockF
6265
//uint64_t ddsizemem=0;
6366

6467

65-
void initDataBlockStats(DataBlock* b, uint64_t v_memorySizeAccounted = 0)
68+
void initDataBlockStats(DataBlockContainerReference* blockRef, uint64_t v_memorySizeAccounted = 0)
6669
{
70+
if (blockRef == nullptr) {return;}
71+
if (*blockRef == nullptr) {return;}
72+
DataBlock* b = (*blockRef)->getData();
73+
if (b == nullptr) {return;}
74+
if ((*blockRef)->isChildBlock()) {LOG_CODEWRONG; return;}
6775
DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header.userSpace);
6876
s->magic = 0xAA;
6977
s->countRef = 0;
@@ -72,30 +80,55 @@ void initDataBlockStats(DataBlock* b, uint64_t v_memorySizeAccounted = 0)
7280
//printf ("TF %d adding mem sz %d\n", (int)b->header.timeframeId, (int) v_memorySizeAccounted);
7381
}
7482

75-
void incDataBlockStats(DataBlock* b, uint64_t dataSizeAccounted = 0)
83+
void incDataBlockStats(DataBlockContainerReference* blockRef, uint64_t dataSizeAccounted = 0)
7684
{
77-
//printf("inc %p\n",b);
85+
if (blockRef == nullptr) {return;}
86+
if (*blockRef == nullptr) {return;}
87+
DataBlockContainerReference parentBlock = nullptr;
88+
if ((*blockRef)->isChildBlock()) {
89+
// for a child block, update stats of the parent block
90+
parentBlock = (*blockRef)->getParent();
91+
blockRef = &parentBlock;
92+
}
93+
DataBlock* b = (*blockRef)->getData();
94+
if (b == nullptr) {return;}
95+
//printf("inc %p\n",b->data);
7896
DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header.userSpace);
97+
7998
if (s->magic != 0xAA)
8099
return;
81100
if ((s->countRef++) == 0) {
82101
s->t0 = timeNowMicrosec();
83102
gReadoutStats.counters.pagesPendingFairMQ++;
84103
gReadoutStats.counters.notify++;
85-
// printf("init %p -> pages locked = %lu\n",b,(unsigned long)gReadoutStats.counters.pagesPendingFairMQ);
104+
// printf("init %p -> pages locked = %lu\n",b->data,(unsigned long)gReadoutStats.counters.pagesPendingFairMQ);
86105
gReadoutStats.counters.ddMemoryPendingBytes += s->memorySizeAccounted;
87106
//printf("adding %d / %d\n", (int)dataSizeAccounted, (int)s->memorySizeAccounted);
88107
//ddsizemem+=s->memorySizeAccounted;
108+
//printf("page %p pool %p\n",b->data,(*blockRef)->memoryPagesPoolPtr);
109+
updatePageStateFromDataBlockContainerReference(*blockRef, MemoryPage::PageState::InFMQ);
89110
}
90111
s->dataSizeAccounted += dataSizeAccounted;
91112
gReadoutStats.counters.ddPayloadPendingBytes += dataSizeAccounted;
92113
//ddsizepayload += dataSizeAccounted;
93114
}
94115

95-
void decDataBlockStats(DataBlock* b)
116+
void decDataBlockStats(DataBlockContainerReference* blockRef)
96117
{
118+
if (blockRef == nullptr) {return;}
119+
if (*blockRef == nullptr) {return;}
120+
DataBlockContainerReference parentBlock = nullptr;
121+
if ((*blockRef)->isChildBlock()) {
122+
// for a child block, update stats of the parent block
123+
parentBlock = (*blockRef)->getParent();
124+
blockRef = &parentBlock;
125+
}
126+
DataBlock* b = (*blockRef)->getData();
127+
if ((*blockRef)->isChildBlock()) {
128+
b = (*blockRef)->getParent()->getData();
129+
}
130+
if (b == nullptr) {return;}
97131
DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header.userSpace);
98-
//printf("dec %p\n",b);
99132
if (s->magic != 0xAA)
100133
return;
101134
if ((--s->countRef) == 0) {
@@ -317,7 +350,7 @@ class ConsumerFMQchannel : public Consumer
317350
DataBlockContainerReference* blockRef = (DataBlockContainerReference*)hint;
318351
//printf("ack hint=%p page %p\n",hint,(*blockRef)->getData());
319352
//printf("ptr %p: use_count=%d\n",blockRef, (int)blockRef->use_count());
320-
decDataBlockStats((*blockRef)->getData());
353+
decDataBlockStats(blockRef);
321354
delete blockRef;
322355
}
323356
},fair::mq::RegionConfig{false,false}); // lock / zero - done later
@@ -445,6 +478,7 @@ class ConsumerFMQchannel : public Consumer
445478
totalPushError++;
446479
return -1;
447480
}
481+
(*blockRef)->memoryPagesPoolPtr = br->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
448482
void* hint = (void*)blockRef;
449483
void* blobPtr = b->data;
450484
size_t blobSize = (size_t)b->header.dataSize;
@@ -478,6 +512,7 @@ class ConsumerFMQchannel : public Consumer
478512
totalPushError++;
479513
return -1;
480514
}
515+
(*ptr)->memoryPagesPoolPtr = br->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
481516
std::unique_ptr<FairMQMessage> msgHeader(transportFactory->CreateMessage((void*)&(br->getData()->header), (size_t)(br->getData()->header.headerSize), msgcleanupCallback, (void*)nullptr));
482517
std::unique_ptr<FairMQMessage> msgBody(transportFactory->CreateMessage((void*)(br->getData()->data), (size_t)(br->getData()->header.dataSize), msgcleanupCallback, (void*)(ptr)));
483518

@@ -505,6 +540,7 @@ class ConsumerFMQchannel : public Consumer
505540
totalPushError++;
506541
return -1;
507542
}
543+
(*blockRef)->memoryPagesPoolPtr = headerBlock->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
508544
SubTimeframe* stfHeader = (SubTimeframe*)headerBlock->getData()->data;
509545
if (stfHeader == nullptr) {
510546
totalPushError++;
@@ -550,6 +586,7 @@ class ConsumerFMQchannel : public Consumer
550586
totalPushError++;
551587
return -1;
552588
}
589+
(*blockRef)->memoryPagesPoolPtr = br->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
553590
void* hint = (void*)blockRef;
554591
void* blobPtr = b->data;
555592
size_t blobSize = (size_t)b->header.dataSize;
@@ -728,6 +765,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
728765
totalPushError++;
729766
return -1;
730767
}
768+
(*blockRef)->memoryPagesPoolPtr = headerBlock->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
731769
SubTimeframe* stfHeader = (SubTimeframe*)headerBlock->getData()->data;
732770
if (stfHeader == nullptr) {
733771
totalPushError++;
@@ -820,8 +858,8 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
820858
assert(ddm.messagesToSend.empty());
821859
if (memoryBuffer) {
822860
// printf("send H %p\n", blockRef);
823-
initDataBlockStats((*blockRef)->getData(), headerBlock->getDataBufferSize());
824-
incDataBlockStats((*blockRef)->getData(), sizeof(SubTimeframe));
861+
initDataBlockStats(blockRef, headerBlock->getDataBufferSize());
862+
incDataBlockStats(blockRef, sizeof(SubTimeframe));
825863
ddm.messagesToSend.emplace_back(sendingChannel->NewMessage(memoryBuffer, (void*)stfHeader, sizeof(SubTimeframe), (void*)(blockRef)));
826864
} else {
827865
ddm.messagesToSend.emplace_back(sendingChannel->NewMessage((void*)stfHeader, sizeof(SubTimeframe), msgcleanupCallback, (void*)(blockRef)));
@@ -849,6 +887,10 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
849887
pf.HBid = id;
850888
// create a copy of the reference, in a newly allocated object, so that reference is kept alive until this new object is destroyed in the cleanupCallback
851889
pf.blockRef = new DataBlockContainerReference(br);
890+
if (pf.blockRef == nullptr) {
891+
throw __LINE__;
892+
}
893+
(*pf.blockRef)->memoryPagesPoolPtr = br->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
852894
pendingFrames.push_back(pf);
853895
};
854896

@@ -874,7 +916,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
874916
// create and queue a fmq message
875917
if (memoryBuffer) {
876918
// printf("send D %p\n", hint);
877-
incDataBlockStats((*(pendingFrames[0].blockRef))->getData(), l);
919+
incDataBlockStats(pendingFrames[0].blockRef, l);
878920
// printf("mem1 sz = %d\n",(int)(*(pendingFrames[0].blockRef))->getData()->header.memorySize);
879921
ddm.messagesToSend.emplace_back(sendingChannel->NewMessage(memoryBuffer, (void*)(&(b->data[ix])), (size_t)(l), hint));
880922
} else {
@@ -917,6 +959,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
917959
isNewBlock = 1;
918960
if (copyBlockBuffer != nullptr) {
919961
copyBlockMemSize = copyBlockBuffer->getDataBufferSize();
962+
initDataBlockStats(&copyBlockBuffer, copyBlockMemSize);
920963
}
921964
nPagesUsedForRepack++;
922965
continue;
@@ -934,6 +977,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
934977
isNewBlock = 1;
935978
if (copyBlock != nullptr) {
936979
copyBlockMemSize = copyBlock->getDataBufferSize();
980+
initDataBlockStats(&copyBlock, copyBlockMemSize);
937981
}
938982
nPagesUsedForRepack++;
939983
}
@@ -951,6 +995,10 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
951995
}
952996
auto blockRef = new DataBlockContainerReference(copyBlock);
953997
char* newBlock = (char*)copyBlock->getData()->data;
998+
if (blockRef ==nullptr) {
999+
throw __LINE__;
1000+
}
1001+
(*blockRef)->memoryPagesPoolPtr = copyBlock->memoryPagesPoolPtr; // keep ref to memoryPagesPool for state updates
9541002

9551003
int newIx = 0;
9561004
for (auto& f : pendingFrames) {
@@ -972,8 +1020,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
9721020
// create and queue a fmq message
9731021
if (memoryBuffer) {
9741022
// printf("send D2 %p\n", blockRef);
975-
initDataBlockStats((*blockRef)->getData(), copyBlockMemSize);
976-
incDataBlockStats((*blockRef)->getData(), totalSize);
1023+
incDataBlockStats(blockRef, totalSize);
9771024
ddm.messagesToSend.emplace_back(sendingChannel->NewMessage(memoryBuffer, (void*)newBlock, totalSize, (void*)(blockRef)));
9781025
} else {
9791026
ddm.messagesToSend.emplace_back(sendingChannel->NewMessage((void*)newBlock, totalSize, msgcleanupCallback, (void*)(blockRef)));
@@ -988,7 +1035,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
9881035
try {
9891036
for (auto& br : *bc) {
9901037
DataBlock* b = br->getData();
991-
initDataBlockStats(b, br->getDataBufferSize());
1038+
initDataBlockStats(&br, br->getDataBufferSize());
9921039

9931040
unsigned int HBstart = 0;
9941041
for (int offset = 0; offset + sizeof(o2::Header::RAWDataHeader) <= b->header.dataSize;) {

src/ConsumerFileRecorder.cxx

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class ConsumerFileRecorder : public Consumer
211211
}
212212
}
213213

214-
// configuration parameter: | consumer-fileRecorder-* | dropEmptyHBFrames | int | 0 | If 1, memory pages are scanned and empty HBframes are discarded, i.e. couples of packets which contain only RDH, the first one with pagesCounter=0 and the second with stop bit set. This setting does not change the content of in-memory data pages, other consumers would still get full data pages with empty packets. This setting is meant to reduce the amount of data recorded for continuous detectors in triggered mode.|
214+
// configuration parameter: | consumer-fileRecorder-* | dropEmptyHBFrames | int | 0 | If 1, memory pages are scanned and empty HBframes are discarded, i.e. couples of packets which contain only RDH, the first one with pagesCounter=0 and the second with stop bit set. This setting does not change the content of in-memory data pages, other consumers would still get full data pages with empty packets. This setting is meant to reduce the amount of data recorded for continuous detectors in triggered mode. Use with dropEmptyHBFramesTriggerMask, if some empty frames with specific trigger types need to be kept (eg TF or SOC). |
215215
cfg.getOptionalValue(cfgEntryPoint + ".dropEmptyHBFrames", dropEmptyHBFrames, 0);
216216
if (dropEmptyHBFrames) {
217217
if (recordWithDataBlockHeader) {
@@ -220,6 +220,13 @@ class ConsumerFileRecorder : public Consumer
220220
}
221221
theLog.log(LogInfoSupport_(3002), "Some packets with RDH-only payload will not be recorded to file, option dropEmptyHBFrames is enabled");
222222
}
223+
224+
// configuration parameter: | consumer-fileRecorder-* | dropEmptyHBFramesTriggerMask | int | 0 | (when using dropEmptyHBFrames = 1) empty HB frames are kept if any bit in RDH TriggerType field matches this pattern (RDHTriggerType & TriggerMask != 0). To be provided as a decimal value: eg 2048 (TF triggers, bit 11), 3584 (TF + SOC + EOC bits 9,10,11). |
225+
cfg.getOptionalValue(cfgEntryPoint + ".dropEmptyHBFramesTriggerMask", dropEmptyHBFramesTriggerMask, 0);
226+
if ((dropEmptyHBFrames)&&(dropEmptyHBFramesTriggerMask)) {
227+
theLog.log(LogInfoSupport_(3002), "Some packets with RDH-only payload will be recorded when their trigger type matches mask 0x%X", dropEmptyHBFramesTriggerMask);
228+
}
229+
223230
}
224231

225232
~ConsumerFileRecorder() {}
@@ -504,14 +511,14 @@ class ConsumerFileRecorder : public Consumer
504511
};
505512

506513
auto isEmptyHBstop = [&](RdhHandle& h) {
507-
if ((h.getStopBit()) && (h.getHeaderSize() == h.getMemorySize())) {
514+
if ((h.getStopBit()) && (h.getHeaderSize() == h.getMemorySize()) && ((h.getTriggerType() & (uint32_t)dropEmptyHBFramesTriggerMask) == 0) ) {
508515
return true;
509516
}
510517
return false;
511518
};
512519

513520
auto isEmptyHBstart = [&](RdhHandle& h) {
514-
if ((h.getPagesCounter() == 0) && (h.getHeaderSize() == h.getMemorySize())) {
521+
if ((h.getPagesCounter() == 0) && (h.getHeaderSize() == h.getMemorySize()) && ((h.getTriggerType() & (uint32_t)dropEmptyHBFramesTriggerMask) == 0)) {
515522
return true;
516523
}
517524
return false;
@@ -642,6 +649,7 @@ class ConsumerFileRecorder : public Consumer
642649
int maxFileTF = 0; // maximum number of TF to write (in each file)
643650
int filesMax = 0; // maximum number of files to write (for each stream)
644651
int dropEmptyHBFrames = 0; // if set, some empty packets are discarded (see logic in code)
652+
int dropEmptyHBFramesTriggerMask = 0; // (when using dropEmptyHBFrames = 1) empty HB frames are kept if any bit in RDH TriggerType field matches this pattern. (TriggerType & TriggerMask != 0)
645653

646654
class Packet
647655
{

src/ConsumerStats.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ class ConsumerStats : public Consumer
194194
theLog.log(LogInfoOps_(3003), "Last interval (%.2fs): blocksRx=%llu, block rate=%.2lf, block size = %.1lfkB, bytesRx=%llu, rate=%s", deltaT, (unsigned long long)counterBlocksDiff, counterBlocksDiff / deltaT, counterBytesDiff / (1024.0*counterBlocksDiff), (unsigned long long)counterBytesDiff, NumberOfBytesToString(counterBytesDiff * 8 / deltaT, "b/s", 1000).c_str());
195195
if (gReadoutStats.isFairMQ) {
196196
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, released = %llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", (unsigned long long) snapshot.pagesPendingFairMQ.load(), nRfmq, rRfmq, avgTfmq, tfidfmq );
197-
theLog.log(LogInfoOps_(3003), "STFB HBF repacking = %.1lf Hz, copy overhead = %.1lf MB/s", ddHBFRepackedRate, ddBytesCopiedRate);
197+
theLog.log(LogInfoOps_(3003), "STFB HBF repacking = %.1lf Hz, copy overhead = %.1lf MB/s = %.2f%%", ddHBFRepackedRate, ddBytesCopiedRate, ddBytesCopiedRate * 1024.0 * 1024.0 * 100.0 * deltaT / counterBytesDiff);
198198
theLog.log(LogInfoOps_(3003), "STFB memory efficiency = %.1lf %%, data buffered = %.1lf MB, real memory used %.1lf MB",
199199
ddMemoryEfficiency, snapshot.ddPayloadPendingBytes / (1024.0*1024.0), snapshot.ddMemoryPendingBytes / (1024.0*1024.0) );
200200
}

0 commit comments

Comments
 (0)