11#include " Consumer.h"
22#include " ReadoutUtils.h"
3- #include " MemoryHandler.h"
4-
3+ // #include "MemoryHandler.h"
4+ #include " MemoryBank.h"
5+ #include " MemoryBankManager.h"
6+ #include " MemoryPagesPool.h"
57
68#ifdef WITH_FAIRMQ
79
@@ -47,10 +49,12 @@ class ConsumerFMQchannel: public Consumer {
4749 FairMQUnmanagedRegionPtr memoryBuffer=nullptr ;
4850 bool disableSending=0 ;
4951
50- int memPoolNumberOfElements; // number of pages in memory pool
51- int memPoolElementSize; // size of each page
52- std::shared_ptr<MemoryHandler> mh; // a memory pool from which to allocate data pages
52+ std::shared_ptr<MemoryBank> memBank; // a dedicated memory bank allocated by FMQ mechanism
53+ std::shared_ptr<MemoryPagesPool> mp; // a memory pool from which to allocate data pages
5354
55+ int memoryPoolPageSize;
56+ int memoryPoolNumberOfPages;
57+
5458 public:
5559
5660
@@ -86,6 +90,9 @@ class ConsumerFMQchannel: public Consumer {
8690 transportFactory=FairMQTransportFactory::CreateTransportFactory (cfgTransportType, fair::mq::tools::Uuid (), &fmqOptions);
8791 sendingChannel=std::make_unique<FairMQChannel>(cfgChannelName, cfgChannelType, transportFactory);
8892
93+ std::string memoryBankName=" " ; // name of memory bank to create (if any) and use.
94+ cfg.getOptionalValue <std::string>(cfgEntryPoint + " .memoryBankName" , memoryBankName);
95+
8996 std::string cfgUnmanagedMemorySize=" " ;
9097 cfg.getOptionalValue <std::string>(cfgEntryPoint + " .unmanagedMemorySize" ,cfgUnmanagedMemorySize);
9198 long long mMemorySize =ReadoutUtils::getNumberOfBytesFromString (cfgUnmanagedMemorySize.c_str ());
@@ -100,25 +107,26 @@ class ConsumerFMQchannel: public Consumer {
100107 });
101108
102109 theLog.log (" Got FMQ unmanaged memory buffer size %lu @ %p" ,memoryBuffer->GetSize (),memoryBuffer->GetData ());
103- std::unique_ptr<MemoryRegion> m;
104- m=std::make_unique<MemoryRegion>();
105- m->name =" FMQ unmanaged memory buffer" ;
106- m->size =memoryBuffer->GetSize ();
107- m->ptr =memoryBuffer->GetData ();
108- m->usedSize =0 ;
109- bigBlock=std::move (m);
110+ memBank=std::make_shared<MemoryBank>(memoryBuffer->GetData (),memoryBuffer->GetSize (),nullptr ," FMQ unmanaged memory buffer from " + cfgEntryPoint);
111+ if (memoryBankName.length ()==0 ) {
112+ memoryBankName=cfgEntryPoint; // if no bank name defined, create one with the name of the consumer
113+ }
114+ theMemoryBankManager.addBank (memBank,memoryBankName);
115+ theLog.log (" Bank %s added" ,memoryBankName.c_str ());
110116 }
111117
112- // allocate a pool of pages for headers/data copies
113- cfg.getOptionalValue <int >(cfgEntryPoint + " .memPoolNumberOfElements" , memPoolNumberOfElements,100 );
114- std::string cfgMemPoolElementSize;
115- cfg.getOptionalValue <std::string>(cfgEntryPoint + " .memPoolElementSize" , cfgMemPoolElementSize);
116- memPoolElementSize=ReadoutUtils::getNumberOfBytesFromString (cfgMemPoolElementSize.c_str ());
117- if (memPoolElementSize<=0 ) {
118- memPoolElementSize=128 *1024 ;
118+ // allocate a pool of pages for headers and data frame copies
119+ memoryPoolPageSize=0 ;
120+ memoryPoolNumberOfPages=100 ;
121+ std::string cfgMemoryPoolPageSize=" 128k" ;
122+ cfg.getOptionalValue <std::string>(cfgEntryPoint + " .memoryPoolPageSize" ,cfgMemoryPoolPageSize);
123+ memoryPoolPageSize=(int )ReadoutUtils::getNumberOfBytesFromString (cfgMemoryPoolPageSize.c_str ());
124+ cfg.getOptionalValue <int >(cfgEntryPoint + " .memoryPoolNumberOfPages" , memoryPoolNumberOfPages);
125+ mp=theMemoryBankManager.getPagedPool (memoryPoolPageSize, memoryPoolNumberOfPages, memoryBankName);
126+ if (mp==nullptr ) {
127+ throw " ConsumerFMQ: failed to get memory pool from " + memoryBankName + " for " + std::to_string (memoryPoolNumberOfPages) + " pages x " + std::to_string (memoryPoolPageSize) + " bytes" ;
119128 }
120- mh=std::make_shared<MemoryHandler>(memPoolElementSize,memPoolNumberOfElements);
121-
129+ theLog.log (" Using memory pool %d pages x %d bytes" , memoryPoolNumberOfPages, memoryPoolPageSize);
122130
123131 sendingChannel->Bind (cfgChannelAddress);
124132
@@ -129,7 +137,6 @@ class ConsumerFMQchannel: public Consumer {
129137 }
130138
131139 ~ConsumerFMQchannel () {
132- bigBlock=nullptr ;
133140 }
134141
135142 int pushData (DataBlockContainerReference &) {
@@ -168,10 +175,10 @@ class ConsumerFMQchannel: public Consumer {
168175 // todo: replace by RDHv3 length
169176
170177 // we iterate a first time to count number of HB
171- if (memPoolElementSize <(int )sizeof (SubTimeframe)) {return -1 ;}
178+ if (memoryPoolPageSize <(int )sizeof (SubTimeframe)) {return -1 ;}
172179 DataBlockContainerReference headerBlock=nullptr ;
173180 try {
174- headerBlock=std::make_shared<DataBlockContainerFromMemoryHandler>(mh );
181+ headerBlock=mp-> getNewDataBlockContainer ( );
175182 }
176183 catch (...) {
177184 }
@@ -281,13 +288,13 @@ class ConsumerFMQchannel: public Consumer {
281288 // allocate
282289 // todo: same code as for header -> create func/lambda
283290 // todo: send empty message if no page left in buffer
284- if (memPoolElementSize <totalSize) {
285- printf (" error page size too small %d < %d\n " ,memPoolElementSize ,totalSize);
291+ if (memoryPoolPageSize <totalSize) {
292+ printf (" error page size too small %d < %d\n " ,memoryPoolPageSize ,totalSize);
286293 return ;
287294 }
288295 DataBlockContainerReference copyBlock=nullptr ;
289296 try {
290- copyBlock=std::make_shared<DataBlockContainerFromMemoryHandler>(mh );
297+ copyBlock=mp-> getNewDataBlockContainer ( );
291298 }
292299 catch (...) {
293300 }
0 commit comments