Skip to content

Commit 6c21086

Browse files
committed
added first page align
1 parent a9120c3 commit 6c21086

File tree

3 files changed

+65
-222
lines changed

3 files changed

+65
-222
lines changed

src/ReadoutEquipment.cxx

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,17 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile &cfg, std::string cfgEntryPoint) {
3838
// disable output?
3939
cfg.getOptionalValue<int>(cfgEntryPoint + ".disableOutput", disableOutput);
4040

41-
41+
// memory alignment
42+
std::string cfgStringFirstPageOffset="0";
43+
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".firstPageOffset", cfgStringFirstPageOffset);
44+
size_t cfgFirstPageOffset=(size_t)ReadoutUtils::getNumberOfBytesFromString(cfgStringFirstPageOffset.c_str());
45+
std::string cfgStringBlockAlign="2M";
46+
cfg.getOptionalValue<std::string>(cfgEntryPoint + ".blockAlign", cfgStringBlockAlign);
47+
size_t cfgBlockAlign=(size_t)ReadoutUtils::getNumberOfBytesFromString(cfgStringBlockAlign.c_str());
48+
4249
// log config summary
4350
theLog.log("Equipment %s: from config [%s], max rate=%lf Hz, idleSleepTime=%d us, outputFifoSize=%d", name.c_str(), cfgEntryPoint.c_str(), readoutRate, cfgIdleSleepTime, cfgOutputFifoSize);
44-
theLog.log("Equipment %s: memory pool %d pages x %d bytes from bank %s", name.c_str(),(int) memoryPoolNumberOfPages, (int) memoryPoolPageSize, memoryBankName.c_str());
51+
theLog.log("Equipment %s: requesting memory pool %d pages x %d bytes from bank %s, block aligned @ 0x%X, 1st page offset @ 0x%X", name.c_str(),(int) memoryPoolNumberOfPages, (int) memoryPoolPageSize, memoryBankName.c_str(),(int)cfgBlockAlign,(int)cfgFirstPageOffset);
4552
if (disableOutput) {
4653
theLog.log("Equipment %s: output DISABLED ! Data will be readout and dropped immediately",name.c_str());
4754
}
@@ -60,8 +67,14 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile &cfg, std::string cfgEntryPoint) {
6067
if ((memoryPoolPageSize<=0)||(memoryPoolNumberOfPages<=0)) {
6168
theLog.log("Equipment %s: wrong memory pool settings", name.c_str());
6269
throw __LINE__;
70+
}
71+
pageSpaceReserved=sizeof(DataBlock); // reserve some data at beginning of each page for header, keep beginning of payload aligned as requested in config
72+
size_t firstPageOffset=0; // alignment of 1st page of memory pool
73+
if (cfgFirstPageOffset) {
74+
firstPageOffset=cfgFirstPageOffset-pageSpaceReserved;
6375
}
64-
mp=theMemoryBankManager.getPagedPool(memoryPoolPageSize, memoryPoolNumberOfPages, memoryBankName);
76+
theLog.log("pageSpaceReserved = %d, aligning 1st page @ 0x%X",(int)pageSpaceReserved,(int)firstPageOffset);
77+
mp=theMemoryBankManager.getPagedPool(memoryPoolPageSize, memoryPoolNumberOfPages, memoryBankName, firstPageOffset, cfgBlockAlign);
6578
if (mp==nullptr) {
6679
throw __LINE__;
6780
}
@@ -145,19 +158,6 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void *arg) {
145158
}
146159
}
147160

148-
// prepare next blocks
149-
Thread::CallbackResult statusPrepare=ptr->prepareBlocks();
150-
switch (statusPrepare) {
151-
case (Thread::CallbackResult::Ok):
152-
isActive=true;
153-
break;
154-
case (Thread::CallbackResult::Idle):
155-
break;
156-
default:
157-
// this is an abnormal situation, return corresponding status
158-
return statusPrepare;
159-
}
160-
161161
// check status of output FIFO
162162
ptr->equipmentStats[EquipmentStatsIndexes::fifoOccupancyOutBlocks].set(ptr->dataOut->getNumberOfUsedSlots());
163163

@@ -194,6 +194,21 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void *arg) {
194194
}
195195
ptr->equipmentStats[EquipmentStatsIndexes::nBlocksOut].increment(nPushedOut);
196196

197+
198+
// prepare next blocks
199+
Thread::CallbackResult statusPrepare=ptr->prepareBlocks();
200+
switch (statusPrepare) {
201+
case (Thread::CallbackResult::Ok):
202+
isActive=true;
203+
break;
204+
case (Thread::CallbackResult::Idle):
205+
break;
206+
default:
207+
// this is an abnormal situation, return corresponding status
208+
return statusPrepare;
209+
}
210+
211+
197212
// consider inactive if we have not pushed much compared to free space in output fifo
198213
// todo: instead, have dynamic 'inactive sleep time' as function of actual outgoing page rate to optimize polling interval
199214
if (nPushedOut<ptr->dataOut->getNumberOfFreeSlots()/4) {

src/ReadoutEquipment.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class ReadoutEquipment {
9494
std::string memoryBankName=""; // memory bank to be used. by default, this uses the first memory bank available
9595

9696
int disableOutput=0; // when set true, data are dropped before pushing to output queue
97+
98+
size_t pageSpaceReserved=0; // amount of space reserved (in bytes) at beginning of each data page, possibly to store header
9799
};
98100

99101

src/ReadoutEquipmentRORC.cxx

Lines changed: 32 additions & 206 deletions
Original file line numberDiff line numberDiff line change
@@ -21,181 +21,6 @@ extern InfoLogger theLog;
2121

2222

2323

24-
25-
// a big block of memory for I/O
26-
class ReadoutMemoryHandler {
27-
public:
28-
size_t memorySize; // total size of buffer
29-
size_t pageSize; // size of each superpage in buffer (not the one of getpagesize())
30-
uint8_t * baseAddress; // base address of buffer
31-
32-
std::unique_ptr<AliceO2::Common::Fifo<long>> pagesAvailable; // a buffer to keep track of individual pages. storing offset (with respect to base address) of pages available
33-
34-
void *getBaseAddress() {
35-
return baseAddress;
36-
}
37-
size_t getSize() {
38-
return memorySize;
39-
}
40-
size_t getPageSize() {
41-
return pageSize;
42-
}
43-
void freePage(void *p) {
44-
int64_t offset=((uint8_t *)p)-baseAddress;
45-
if ((offset<0) || (offset>(int64_t)memorySize)) {
46-
throw __LINE__;
47-
}
48-
pagesAvailable->push(offset);
49-
}
50-
void *getPage() {
51-
long offset=0;
52-
int res=pagesAvailable->pop(offset);
53-
if (res==0) {
54-
uint8_t *pagePtr=&baseAddress[offset];
55-
return pagePtr;
56-
}
57-
return nullptr;
58-
}
59-
60-
private:
61-
std::unique_ptr<AliceO2::roc::MemoryMappedFile> mMemoryMappedFile;
62-
63-
public:
64-
65-
ReadoutMemoryHandler(size_t vMemorySize, int vPageSize, std::string const &uid, std::string const &hugePageSize="1GB"){
66-
67-
pagesAvailable=nullptr;
68-
mMemoryMappedFile=nullptr;
69-
70-
// select Huge Page type and define corresponding settings
71-
int hugePageSizeBytes; // size of page in bytes
72-
if (hugePageSize=="1GB") {
73-
hugePageSizeBytes=1024*1024*1024;
74-
} else if (hugePageSize=="2MB") {
75-
hugePageSizeBytes=2*1024*1024;
76-
} else {
77-
theLog.log("Wrong hugePageSize %s",hugePageSize.c_str());
78-
exit(1);
79-
}
80-
81-
// path to our memory segment
82-
std::string memoryMapBasePath="/var/lib/hugetlbfs/global/pagesize-"+hugePageSize+"/";
83-
std::string memoryMapFilePath=memoryMapBasePath + uid;
84-
85-
// must be multiple of hugepage size
86-
int r=vMemorySize % hugePageSizeBytes;
87-
if (r) {
88-
vMemorySize+=hugePageSizeBytes-r;
89-
}
90-
91-
theLog.log("Creating shared memory block - %s @ %s",ReadoutUtils::NumberOfBytesToString(vMemorySize,"Bytes").c_str(),memoryMapFilePath.c_str());
92-
93-
try {
94-
mMemoryMappedFile=std::make_unique<AliceO2::roc::MemoryMappedFile>(memoryMapFilePath,vMemorySize,false);
95-
}
96-
catch (const AliceO2::roc::MemoryMapException& e) {
97-
theLog.log("Failed to allocate memory buffer : %s\n",e.what());
98-
exit(1);
99-
}
100-
// todo: check consistent with what requested, alignment, etc
101-
memorySize=mMemoryMappedFile->getSize();
102-
baseAddress=(uint8_t *)mMemoryMappedFile->getAddress();
103-
104-
int baseAlignment=1024*1024;
105-
r=(long)baseAddress % baseAlignment;
106-
if (r!=0) {
107-
theLog.log("Unaligned base address %p, target alignment=%d, skipping first %d bytes",baseAddress,baseAlignment,baseAlignment-r);
108-
int skipBytes=baseAlignment-r;
109-
baseAddress+=skipBytes;
110-
memorySize-=skipBytes;
111-
// now baseAddress is aligned to baseAlignment bytes, but what guarantee do we have that PHYSICAL page addresses are aligned ???
112-
}
113-
114-
pageSize=vPageSize;
115-
long long nPages=memorySize/pageSize;
116-
theLog.log("Got %lld pages, each %s",nPages,ReadoutUtils::NumberOfBytesToString(pageSize,"Bytes").c_str());
117-
pagesAvailable=std::make_unique<AliceO2::Common::Fifo<long>>(nPages);
118-
119-
for (int i=0;i<nPages;i++) {
120-
long offset=i*pageSize;
121-
//void *page=&((uint8_t*)baseAddress)[offset];
122-
//printf("%d : 0x%p\n",i,page);
123-
pagesAvailable->push(offset);
124-
}
125-
126-
}
127-
~ReadoutMemoryHandler() {
128-
}
129-
130-
};
131-
// todo: locks for thread-safe access at init time
132-
//ReadoutMemoryHandler mReadoutMemoryHandler;
133-
134-
135-
136-
137-
class DataBlockContainerFromRORC : public DataBlockContainer {
138-
private:
139-
AliceO2::roc::ChannelFactory::DmaChannelSharedPtr mChannel;
140-
AliceO2::roc::Superpage mSuperpage;
141-
// std::shared_ptr<ReadoutMemoryHandler> mReadoutMemoryHandler; // todo: store this in superpage user data
142-
std::shared_ptr<MemoryHandler> mReadoutMemoryHandler; // a memory pool from which to allocate data pages
143-
144-
public:
145-
DataBlockContainerFromRORC(AliceO2::roc::ChannelFactory::DmaChannelSharedPtr channel, AliceO2::roc::Superpage const & superpage, //std::shared_ptr<ReadoutMemoryHandler> const & h) {
146-
std::shared_ptr<MemoryHandler> const & h) {
147-
148-
mSuperpage=superpage;
149-
mChannel=channel;
150-
mReadoutMemoryHandler=h;
151-
152-
data=nullptr;
153-
try {
154-
data=new DataBlock;
155-
} catch (...) {
156-
throw __LINE__;
157-
}
158-
159-
160-
data->header.blockType=DataBlockType::H_BASE;
161-
data->header.headerSize=sizeof(DataBlockHeaderBase);
162-
data->header.dataSize=superpage.getReceived();
163-
164-
uint32_t *ptr=(uint32_t *) & (((uint8_t *)h->getBaseAddress())[mSuperpage.getOffset()]);
165-
166-
data->header.id=*ptr;
167-
data->data=(char *)ptr;
168-
169-
/* for (int i=0;i<64;i++) {
170-
printf("%08X ",(((unsigned int *)ptr)[i]));
171-
}
172-
printf("\n");
173-
sleep(1);
174-
*/
175-
176-
//printf("container %p created for superpage @ offset %ld = %p\n",this,(long)superpage.getOffset(),ptr);
177-
}
178-
179-
~DataBlockContainerFromRORC() {
180-
// todo: add lock
181-
// if constructor fails, do we make page available again or leave it to caller?
182-
//mReadoutMemoryHandler->pagesAvailable->push(mSuperpage.getOffset());
183-
184-
void *ptr= & (((uint8_t *)mReadoutMemoryHandler->getBaseAddress())[mSuperpage.getOffset()]);
185-
mReadoutMemoryHandler->freePage(ptr);
186-
187-
//printf("released superpage %ld\n",mSuperpage.getOffset());
188-
if (data!=nullptr) {
189-
delete data;
190-
}
191-
}
192-
};
193-
194-
195-
196-
197-
198-
19924
class ReadoutEquipmentRORC : public ReadoutEquipment {
20025

20126
public:
@@ -209,8 +34,6 @@ class ReadoutEquipmentRORC : public ReadoutEquipment {
20934
Thread::CallbackResult populateFifoOut(); // the data readout loop function
21035

21136
AliceO2::roc::ChannelFactory::DmaChannelSharedPtr channel; // channel to ROC device
212-
//std::shared_ptr<ReadoutMemoryHandler> mReadoutMemoryHandler; // object to get memory from
213-
//std::shared_ptr<MemoryHandler> mReadoutMemoryHandler; // object to get memory from
21437

21538
DataBlockId currentId=0; // current data id, kept for auto-increment
21639

@@ -292,7 +115,7 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) :
292115
//sleep((cfgChannelNumber+1)*2); // trick to avoid all channels open at once - fail to acquire lock
293116

294117
// define usable superpagesize
295-
superPageSize=mp->getPageSize()-sizeof(DataBlock); // Keep space at beginning for DataBlock object
118+
superPageSize=mp->getPageSize()-pageSpaceReserved; // Keep space at beginning for DataBlock object
296119
superPageSize-=superPageSize % (32*1024); // Must be a multiple of 32Kb for ROC
297120
theLog.log("Using superpage size %ld",superPageSize);
298121
if (superPageSize==0) {
@@ -307,10 +130,6 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) :
307130
}
308131
readoutEquipmentRORCLock.unlock();
309132

310-
// create memory pool
311-
//mReadoutMemoryHandler=std::make_shared<ReadoutMemoryHandler>((long)mMemorySize,(int)mPageSize,uid,cfgHugePageSize);
312-
//mReadoutMemoryHandler=std::make_shared<MemoryHandler>(mPageSize,mMemorySize/mPageSize);
313-
314133
// open and configure ROC
315134
theLog.log("Opening ROC %s:%d",cardId.c_str(),cfgChannelNumber);
316135
AliceO2::roc::Parameters params;
@@ -331,20 +150,22 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) :
331150
// card readout mode : experimental, not needed
332151
// params.setReadoutMode(AliceO2::roc::ReadoutMode::fromString(cfgReadoutMode));
333152

334-
// theLog.log("Loop DMA block %p:%lu", mp->getBaseBlockAddress(), mp->getBaseBlockSize()); //(void *)mReadoutMemoryHandler->getBaseAddress(),mReadoutMemoryHandler->getSize());
335-
// char *ptr=(char *)mp->getBaseBlockAddress();
336-
// for (size_t i=0;i<mp->getBaseBlockSize();i++) {
337-
// ptr[i]=0;
338-
//}
339-
340-
// register the memory block for DMA
341-
theLog.log("Register DMA block %p:%lu",(void *)mp->getBaseBlockAddress(),mp->getBaseBlockSize());
153+
/*
154+
theLog.log("Loop DMA block %p:%lu", mp->getBaseBlockAddress(), mp->getBaseBlockSize());
155+
char *ptr=(char *)mp->getBaseBlockAddress();
156+
for (size_t i=0;i<mp->getBaseBlockSize();i++) {
157+
ptr[i]=0;
158+
}
159+
*/
160+
161+
// register the memory block for DMA
162+
void *baseAddress=(void *)mp->getBaseBlockAddress();
163+
size_t blockSize=mp->getBaseBlockSize();
164+
theLog.log("Register DMA block %p:%lu",baseAddress,blockSize);
342165
params.setBufferParameters(AliceO2::roc::buffer_parameters::Memory {
343-
// (void *)mReadoutMemoryHandler->baseAddress, mReadoutMemoryHandler->memorySize
344-
// (void *)mReadoutMemoryHandler->getBaseAddress(), mReadoutMemoryHandler->getSize()
345-
mp->getBaseBlockAddress(), mp->getBaseBlockSize()
166+
baseAddress, blockSize
346167
});
347-
168+
348169
// clear locks if necessary
349170
params.setForcedUnlockEnabled(true);
350171

@@ -423,20 +244,16 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks(){
423244
}
424245

425246
// give free pages to the driver
426-
int nPushed=0; // number of free pages pushed this iteration
247+
int nPushed=0; // number of free pages pushed this iteration
427248
while (channel->getTransferQueueAvailable() != 0) {
428-
//long offset=0;
429-
//void *newPage=mReadoutMemoryHandler->getPage();
430249
void *newPage=mp->getPage();
431-
//if (mReadoutMemoryHandler->pagesAvailable->pop(offset)==0) {
432-
if (newPage!=nullptr) {
250+
if (newPage!=nullptr) {
251+
// todo: check page is aligned as expected
433252
AliceO2::roc::Superpage superpage;
434-
//superpage.offset=offset;
435-
//superpage.offset=(char *)newPage-(char *)mReadoutMemoryHandler->getBaseAddress();
436-
superpage.offset=(char *)newPage-(char *)mp->getBaseBlockAddress()+sizeof(DataBlock);
253+
superpage.offset=(char *)newPage-(char *)mp->getBaseBlockAddress()+pageSpaceReserved;
437254
superpage.size=superPageSize;
438-
superpage.userData=newPage; // &mReadoutMemoryHandler; // bad - looses shared_ptr // todo: keep track of datablock object instead?
439-
channel->pushSuperpage(superpage);
255+
superpage.userData=newPage;
256+
channel->pushSuperpage(superpage);
440257
isActive=1;
441258
nPushed++;
442259
} else {
@@ -492,8 +309,14 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() {
492309
if (superpage.isFilled()) {
493310
std::shared_ptr<DataBlockContainer>d=nullptr;
494311
try {
495-
//d=std::make_shared<DataBlockContainerFromRORC>(channel, superpage, mReadoutMemoryHandler);
496-
d=mp->getNewDataBlockContainer((void *)(superpage.userData));
312+
if (pageSpaceReserved>=sizeof(DataBlock)) {
313+
d=mp->getNewDataBlockContainer((void *)(superpage.userData));
314+
} else {
315+
// todo: allocate data block container elsewhere than beginning of page
316+
//d=mp->getNewDataBlockContainer(nullptr);
317+
//d=mp->getNewDataBlockContainer((void *)(superpage.userData));
318+
//d=std::make_shared<DataBlockContainer>(nullptr);
319+
}
497320
}
498321
catch (...) {
499322
// todo: increment a stats counter?
@@ -562,6 +385,9 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() {
562385
}
563386

564387
}
388+
else {
389+
// no data block container... what to do???
390+
}
565391
}
566392
}
567393
return nextBlock;

0 commit comments

Comments
 (0)