99// granted to it by virtue of its status as an Intergovernmental Organization
1010// or submit itself to any jurisdiction.
1111
12+ #define ENABLE_LOG_CODEWRONG
13+ #include " readoutInfoLogger.h"
14+
1215#include " Consumer.h"
1316#include " MemoryBank.h"
1417#include " MemoryBankManager.h"
@@ -62,8 +65,13 @@ static_assert(sizeof(DataBlockFMQStats) <= DataBlockHeaderUserSpace, "DataBlockF
6265// uint64_t ddsizemem=0;
6366
6467
65- void initDataBlockStats (DataBlock* b , uint64_t v_memorySizeAccounted = 0 )
68+ void initDataBlockStats (DataBlockContainerReference* blockRef , uint64_t v_memorySizeAccounted = 0 )
6669{
70+ if (blockRef == nullptr ) {return ;}
71+ if (*blockRef == nullptr ) {return ;}
72+ DataBlock* b = (*blockRef)->getData ();
73+ if (b == nullptr ) {return ;}
74+ if ((*blockRef)->isChildBlock ()) {LOG_CODEWRONG; return ;}
6775 DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header .userSpace );
6876 s->magic = 0xAA ;
6977 s->countRef = 0 ;
@@ -72,30 +80,55 @@ void initDataBlockStats(DataBlock* b, uint64_t v_memorySizeAccounted = 0)
7280 // printf ("TF %d adding mem sz %d\n", (int)b->header.timeframeId, (int) v_memorySizeAccounted);
7381}
7482
75- void incDataBlockStats (DataBlock* b , uint64_t dataSizeAccounted = 0 )
83+ void incDataBlockStats (DataBlockContainerReference* blockRef , uint64_t dataSizeAccounted = 0 )
7684{
77- // printf("inc %p\n",b);
85+ if (blockRef == nullptr ) {return ;}
86+ if (*blockRef == nullptr ) {return ;}
87+ DataBlockContainerReference parentBlock = nullptr ;
88+ if ((*blockRef)->isChildBlock ()) {
89+ // for a child block, update stats of the parent block
90+ parentBlock = (*blockRef)->getParent ();
91+ blockRef = &parentBlock;
92+ }
93+ DataBlock* b = (*blockRef)->getData ();
94+ if (b == nullptr ) {return ;}
95+ // printf("inc %p\n",b->data);
7896 DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header .userSpace );
97+
7998 if (s->magic != 0xAA )
8099 return ;
81100 if ((s->countRef ++) == 0 ) {
82101 s->t0 = timeNowMicrosec ();
83102 gReadoutStats .counters .pagesPendingFairMQ ++;
84103 gReadoutStats .counters .notify ++;
85- // printf("init %p -> pages locked = %lu\n",b,(unsigned long)gReadoutStats.counters.pagesPendingFairMQ);
104+ // printf("init %p -> pages locked = %lu\n",b->data ,(unsigned long)gReadoutStats.counters.pagesPendingFairMQ);
86105 gReadoutStats .counters .ddMemoryPendingBytes += s->memorySizeAccounted ;
87106 // printf("adding %d / %d\n", (int)dataSizeAccounted, (int)s->memorySizeAccounted);
88107 // ddsizemem+=s->memorySizeAccounted;
108+ // printf("page %p pool %p\n",b->data,(*blockRef)->memoryPagesPoolPtr);
109+ updatePageStateFromDataBlockContainerReference (*blockRef, MemoryPage::PageState::InFMQ);
89110 }
90111 s->dataSizeAccounted += dataSizeAccounted;
91112 gReadoutStats .counters .ddPayloadPendingBytes += dataSizeAccounted;
92113 // ddsizepayload += dataSizeAccounted;
93114}
94115
95- void decDataBlockStats (DataBlock* b )
116+ void decDataBlockStats (DataBlockContainerReference* blockRef )
96117{
118+ if (blockRef == nullptr ) {return ;}
119+ if (*blockRef == nullptr ) {return ;}
120+ DataBlockContainerReference parentBlock = nullptr ;
121+ if ((*blockRef)->isChildBlock ()) {
122+ // for a child block, update stats of the parent block
123+ parentBlock = (*blockRef)->getParent ();
124+ blockRef = &parentBlock;
125+ }
126+ DataBlock* b = (*blockRef)->getData ();
127+ if ((*blockRef)->isChildBlock ()) {
128+ b = (*blockRef)->getParent ()->getData ();
129+ }
130+ if (b == nullptr ) {return ;}
97131 DataBlockFMQStats* s = (DataBlockFMQStats*)&(b->header .userSpace );
98- // printf("dec %p\n",b);
99132 if (s->magic != 0xAA )
100133 return ;
101134 if ((--s->countRef ) == 0 ) {
@@ -317,7 +350,7 @@ class ConsumerFMQchannel : public Consumer
317350 DataBlockContainerReference* blockRef = (DataBlockContainerReference*)hint;
318351 // printf("ack hint=%p page %p\n",hint,(*blockRef)->getData());
319352 // printf("ptr %p: use_count=%d\n",blockRef, (int)blockRef->use_count());
320- decDataBlockStats ((* blockRef)-> getData () );
353+ decDataBlockStats (blockRef);
321354 delete blockRef;
322355 }
323356 },fair::mq::RegionConfig{false ,false }); // lock / zero - done later
@@ -445,6 +478,7 @@ class ConsumerFMQchannel : public Consumer
445478 totalPushError++;
446479 return -1 ;
447480 }
481+ (*blockRef)->memoryPagesPoolPtr = br->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
448482 void * hint = (void *)blockRef;
449483 void * blobPtr = b->data ;
450484 size_t blobSize = (size_t )b->header .dataSize ;
@@ -478,6 +512,7 @@ class ConsumerFMQchannel : public Consumer
478512 totalPushError++;
479513 return -1 ;
480514 }
515+ (*ptr)->memoryPagesPoolPtr = br->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
481516 std::unique_ptr<FairMQMessage> msgHeader (transportFactory->CreateMessage ((void *)&(br->getData ()->header ), (size_t )(br->getData ()->header .headerSize ), msgcleanupCallback, (void *)nullptr ));
482517 std::unique_ptr<FairMQMessage> msgBody (transportFactory->CreateMessage ((void *)(br->getData ()->data ), (size_t )(br->getData ()->header .dataSize ), msgcleanupCallback, (void *)(ptr)));
483518
@@ -505,6 +540,7 @@ class ConsumerFMQchannel : public Consumer
505540 totalPushError++;
506541 return -1 ;
507542 }
543+ (*blockRef)->memoryPagesPoolPtr = headerBlock->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
508544 SubTimeframe* stfHeader = (SubTimeframe*)headerBlock->getData ()->data ;
509545 if (stfHeader == nullptr ) {
510546 totalPushError++;
@@ -550,6 +586,7 @@ class ConsumerFMQchannel : public Consumer
550586 totalPushError++;
551587 return -1 ;
552588 }
589+ (*blockRef)->memoryPagesPoolPtr = br->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
553590 void * hint = (void *)blockRef;
554591 void * blobPtr = b->data ;
555592 size_t blobSize = (size_t )b->header .dataSize ;
@@ -728,6 +765,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
728765 totalPushError++;
729766 return -1 ;
730767 }
768+ (*blockRef)->memoryPagesPoolPtr = headerBlock->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
731769 SubTimeframe* stfHeader = (SubTimeframe*)headerBlock->getData ()->data ;
732770 if (stfHeader == nullptr ) {
733771 totalPushError++;
@@ -820,8 +858,8 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
820858 assert (ddm.messagesToSend .empty ());
821859 if (memoryBuffer) {
822860 // printf("send H %p\n", blockRef);
823- initDataBlockStats ((* blockRef)-> getData () , headerBlock->getDataBufferSize ());
824- incDataBlockStats ((* blockRef)-> getData () , sizeof (SubTimeframe));
861+ initDataBlockStats (blockRef, headerBlock->getDataBufferSize ());
862+ incDataBlockStats (blockRef, sizeof (SubTimeframe));
825863 ddm.messagesToSend .emplace_back (sendingChannel->NewMessage (memoryBuffer, (void *)stfHeader, sizeof (SubTimeframe), (void *)(blockRef)));
826864 } else {
827865 ddm.messagesToSend .emplace_back (sendingChannel->NewMessage ((void *)stfHeader, sizeof (SubTimeframe), msgcleanupCallback, (void *)(blockRef)));
@@ -849,6 +887,10 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
849887 pf.HBid = id;
850888 // create a copy of the reference, in a newly allocated object, so that reference is kept alive until this new object is destroyed in the cleanupCallback
851889 pf.blockRef = new DataBlockContainerReference (br);
890+ if (pf.blockRef == nullptr ) {
891+ throw __LINE__;
892+ }
893+ (*pf.blockRef )->memoryPagesPoolPtr = br->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
852894 pendingFrames.push_back (pf);
853895 };
854896
@@ -874,7 +916,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
874916 // create and queue a fmq message
875917 if (memoryBuffer) {
876918 // printf("send D %p\n", hint);
877- incDataBlockStats ((*( pendingFrames[0 ].blockRef ))-> getData () , l);
919+ incDataBlockStats (pendingFrames[0 ].blockRef , l);
878920// printf("mem1 sz = %d\n",(int)(*(pendingFrames[0].blockRef))->getData()->header.memorySize);
879921 ddm.messagesToSend .emplace_back (sendingChannel->NewMessage (memoryBuffer, (void *)(&(b->data [ix])), (size_t )(l), hint));
880922 } else {
@@ -917,6 +959,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
917959 isNewBlock = 1 ;
918960 if (copyBlockBuffer != nullptr ) {
919961 copyBlockMemSize = copyBlockBuffer->getDataBufferSize ();
962+ initDataBlockStats (©BlockBuffer, copyBlockMemSize);
920963 }
921964 nPagesUsedForRepack++;
922965 continue ;
@@ -934,6 +977,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
934977 isNewBlock = 1 ;
935978 if (copyBlock != nullptr ) {
936979 copyBlockMemSize = copyBlock->getDataBufferSize ();
980+ initDataBlockStats (©Block, copyBlockMemSize);
937981 }
938982 nPagesUsedForRepack++;
939983 }
@@ -951,6 +995,10 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
951995 }
952996 auto blockRef = new DataBlockContainerReference (copyBlock);
953997 char * newBlock = (char *)copyBlock->getData ()->data ;
998+ if (blockRef ==nullptr ) {
999+ throw __LINE__;
1000+ }
1001+ (*blockRef)->memoryPagesPoolPtr = copyBlock->memoryPagesPoolPtr ; // keep ref to memoryPagesPool for state updates
9541002
9551003 int newIx = 0 ;
9561004 for (auto & f : pendingFrames) {
@@ -972,8 +1020,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
9721020 // create and queue a fmq message
9731021 if (memoryBuffer) {
9741022 // printf("send D2 %p\n", blockRef);
975- initDataBlockStats ((*blockRef)->getData (), copyBlockMemSize);
976- incDataBlockStats ((*blockRef)->getData (), totalSize);
1023+ incDataBlockStats (blockRef, totalSize);
9771024 ddm.messagesToSend .emplace_back (sendingChannel->NewMessage (memoryBuffer, (void *)newBlock, totalSize, (void *)(blockRef)));
9781025 } else {
9791026 ddm.messagesToSend .emplace_back (sendingChannel->NewMessage ((void *)newBlock, totalSize, msgcleanupCallback, (void *)(blockRef)));
@@ -988,7 +1035,7 @@ int ConsumerFMQchannel::DDformatMessage(DataSetReference &bc, DDMessage &ddm) {
9881035 try {
9891036 for (auto & br : *bc) {
9901037 DataBlock* b = br->getData ();
991- initDataBlockStats (b , br->getDataBufferSize ());
1038+ initDataBlockStats (&br , br->getDataBufferSize ());
9921039
9931040 unsigned int HBstart = 0 ;
9941041 for (int offset = 0 ; offset + sizeof (o2::Header::RAWDataHeader) <= b->header .dataSize ;) {
0 commit comments