Skip to content

Commit e98e19f

Browse files
authored
Merge pull request #226 from sy-c/master
v2.12.0
2 parents c2e2764 + 018e6e6 commit e98e19f

11 files changed

+175
-23
lines changed

doc/configurationParameters.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ The parameters related to 3rd-party libraries are described here for convenience
5252
| consumer-data-sampling-* | address | string | ipc:///tmp/readout-pipe-1 | Address of the data sampling. |
5353
| consumer-FairMQChannel-* | checkResources | string | | Check beforehand if unmanaged region would fit in given list of resources. Comma-separated list of items to be checked: eg /dev/shm, MemFree, MemAvailable. (any filesystem path, and any /proc/meminfo entry).|
5454
| consumer-FairMQChannel-* | disableSending | int | 0 | If set, no data is output to FMQ channel. Used for performance test to create FMQ shared memory segment without pushing the data. |
55+
| consumer-FairMQChannel-* | enablePackedCopy | int | 0 | If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages. |
5556
| consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
5657
| consumer-FairMQChannel-* | fmq-address | string | ipc:///tmp/pipe-readout | Address of the FMQ channel. Depends on transportType. c.f. FairMQ::FairMQChannel.h |
5758
| consumer-FairMQChannel-* | fmq-name | string | readout | Name of the FMQ channel. c.f. FairMQ::FairMQChannel.h |

doc/releaseNotes.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,3 +471,9 @@ This file describes the main feature changes for each readout.exe released versi
471471

472472
## v2.11.1 - 16/06/2022
473473
- Minor release for osx compatibility.
474+
475+
## v2.12.0 - 09/08/2022
476+
- Updated configuration parameters:
477+
- added consumer-FairMQChannel-*.enablePackedCopy. If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages.
478+
- ROC fifo health check enabled, a warning message is logged when in/out ROC FIFOs are empty/full.
479+

src/ConsumerFMQchannel.cxx

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class ConsumerFMQchannel : public Consumer
103103
bool enableRawFormat = false;
104104
bool enableStfSuperpage = false; // optimized stf transport: minimize STF packets
105105
bool enableRawFormatDatablock = false;
106+
int enablePackedCopy = 0; // default mode for repacking of page overlapping HBF. 0 = one page per copy, 1 = change page on TF only
106107

107108
std::shared_ptr<MemoryBank> memBank; // a dedicated memory bank allocated by FMQ mechanism
108109
std::shared_ptr<MemoryPagesPool> mp; // a memory pool from which to allocate data pages
@@ -111,6 +112,11 @@ class ConsumerFMQchannel : public Consumer
111112
int memoryPoolNumberOfPages;
112113

113114
CounterStats repackSizeStats; // keep track of page size used when repacking
115+
uint64_t nPagesUsedForRepack = 0; // count pages used for repack
116+
uint64_t nPagesUsedInput = 0; // count pages received
117+
118+
DataBlockContainerReference copyBlockBuffer = nullptr; // current buffer used for packed copy
119+
uint64_t lastTimeframeId = undefinedTimeframeId; // keep track of latest TF id received
114120

115121
// custom log function for memory pool
116122
void mplog(const std::string &msg) {
@@ -245,6 +251,7 @@ class ConsumerFMQchannel : public Consumer
245251
if (hint != nullptr) {
246252
DataBlockContainerReference* blockRef = (DataBlockContainerReference*)hint;
247253
//printf("ack hint=%p page %p\n",hint,(*blockRef)->getData());
254+
//printf("ptr %p: use_count=%d\n",blockRef, (int)blockRef->use_count());
248255
decDataBlockStats((*blockRef)->getData());
249256
delete blockRef;
250257
}
@@ -291,17 +298,22 @@ class ConsumerFMQchannel : public Consumer
291298
}
292299
}
293300
theLog.log(LogInfoDevel_(3008), "Using memory pool [%d]: %d pages x %d bytes", mp->getId(), memoryPoolNumberOfPages, memoryPoolPageSize);
301+
302+
// configuration parameter: | consumer-FairMQChannel-* | enablePackedCopy | int | 0 | If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages. |
303+
cfg.getOptionalValue<int>(cfgEntryPoint + ".enablePackedCopy", enablePackedCopy);
304+
theLog.log(LogInfoDevel_(3008), "Packed copy enabled = %d", enablePackedCopy);
294305
}
295306

296307
~ConsumerFMQchannel()
297308
{
298309
// log memory pool statistics
299310
if (mp!=nullptr) {
300311
theLog.log(LogInfoDevel_(3003), "Consumer %s - memory pool statistics ... %s", name.c_str(), mp->getStats().c_str());
301-
theLog.log(LogInfoDevel_(3003), "Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64, name.c_str(), repackSizeStats.getCount(), (uint64_t)repackSizeStats.getAverage(), repackSizeStats.getMaximum());
312+
theLog.log(LogInfoDevel_(3003), "Consumer %s - STFB repacking statistics ... number: %" PRIu64 " average page size: %" PRIu64 " max page size: %" PRIu64 " repacked/received = %" PRIu64 "/%" PRIu64 " = %.1f%%", name.c_str(), repackSizeStats.getCount(), (uint64_t)repackSizeStats.getAverage(), repackSizeStats.getMaximum(), nPagesUsedForRepack, nPagesUsedInput, nPagesUsedForRepack * 100.0 / nPagesUsedInput);
302313
}
303314

304315
// release in reverse order
316+
copyBlockBuffer = nullptr;
305317
mp = nullptr;
306318
memoryBuffer = nullptr; // warning: data range may still be referenced in memory bank manager
307319
sendingChannel = nullptr;
@@ -317,6 +329,8 @@ class ConsumerFMQchannel : public Consumer
317329
int pushData(DataSetReference& bc)
318330
{
319331

332+
nPagesUsedInput += bc->size();
333+
320334
if (disableSending) {
321335
totalPushSuccess++;
322336
return 0;
@@ -498,6 +512,15 @@ class ConsumerFMQchannel : public Consumer
498512
// set flag when this is last STF in timeframe
499513
if (b->header.flagEndOfTimeframe) {
500514
stfHeader->lastTFMessage = 1;
515+
//printf("end of TF %d eq %d link %d\n ", (int) b->header.timeframeId, (int) b->header.equipmentId, (int)b->header.linkId);
516+
copyBlockBuffer = nullptr;
517+
}
518+
519+
// detect changes of TF id
520+
if (b->header.timeframeId != lastTimeframeId) {
521+
lastTimeframeId = b->header.timeframeId;
522+
copyBlockBuffer = nullptr;
523+
//printf("TF %d start\n", (int) lastTimeframeId);
501524
}
502525

503526
if (isFirst) {
@@ -635,7 +658,26 @@ class ConsumerFMQchannel : public Consumer
635658
}
636659
DataBlockContainerReference copyBlock = nullptr;
637660
try {
638-
copyBlock = mp->getNewDataBlockContainer();
661+
if (enablePackedCopy) {
662+
for (int i = 0; i<=2; i++) {
663+
// allocate new buffer for copies if needed
664+
if (copyBlockBuffer == nullptr) {
665+
copyBlockBuffer = mp->getNewDataBlockContainer();
666+
nPagesUsedForRepack++;
667+
continue;
668+
}
669+
// try to allocate sub-block
670+
copyBlock = DataBlockContainer::getChildBlock(copyBlockBuffer, totalSize);
671+
if (copyBlock == nullptr) {
672+
copyBlockBuffer = nullptr;
673+
continue;
674+
}
675+
break;
676+
}
677+
} else {
678+
copyBlock = mp->getNewDataBlockContainer();
679+
nPagesUsedForRepack++;
680+
}
639681
} catch (...) {
640682
}
641683
if (copyBlock == nullptr) {

src/DataBlockContainer.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@
1818
#include <stdint.h>
1919
#include <stdlib.h>
2020

21+
//#define DATABLOCKCONTAINER_DEBUG
22+
#ifdef DATABLOCKCONTAINER_DEBUG
23+
#include <stdio.h>
24+
#endif
25+
2126
#include "DataBlock.h"
2227

2328
// A container class for data blocks.
2429
// In particular, allows to take care of the block release after use.
2530

31+
class DataBlockContainer;
32+
using DataBlockContainerReference = std::shared_ptr<DataBlockContainer>;
33+
2634
class DataBlockContainer
2735
{
2836

@@ -42,6 +50,9 @@ class DataBlockContainer
4250
if (releaseCallback != nullptr) {
4351
releaseCallback();
4452
}
53+
#ifdef DATABLOCKCONTAINER_DEBUG
54+
printf("Releasing DataBlockContainer @ %p\n", data);
55+
#endif
4556
};
4657

4758
DataBlock* getData()
@@ -54,10 +65,44 @@ class DataBlockContainer
5465
return dataBufferSize;
5566
};
5667

68+
69+
static DataBlockContainerReference getChildBlock(DataBlockContainerReference parentBlock, uint64_t v_dataBufferSizeNeeded, uint64_t roundUp = 0) {
70+
uint64_t bufferSize = v_dataBufferSizeNeeded + sizeof(DataBlock);
71+
if (roundUp) {
72+
uint64_t r = bufferSize % roundUp;
73+
if (r) {
74+
bufferSize += roundUp - r;
75+
}
76+
}
77+
if (bufferSize > parentBlock->getData()->header.dataSize - parentBlock->dataBufferUsed) {
78+
return nullptr;
79+
}
80+
DataBlock *b = (DataBlock *)&((char *)parentBlock->getData()->data)[parentBlock->dataBufferUsed];
81+
b->header = defaultDataBlockHeader;
82+
b->header.dataSize = v_dataBufferSizeNeeded;
83+
b->data = &(((char*)b)[sizeof(DataBlock)]);
84+
85+
#ifdef DATABLOCKCONTAINER_DEBUG
86+
printf("Block %p - creating child @ %p - payload size %d\n", parentBlock->getData(), b, (int)v_dataBufferSizeNeeded);
87+
#endif
88+
89+
DataBlockContainerReference childBlock = std::make_shared<DataBlockContainer>(b, bufferSize);
90+
if (childBlock == nullptr) {
91+
return nullptr;
92+
}
93+
parentBlock->dataBufferUsed += bufferSize;
94+
childBlock->parentBlock = parentBlock;
95+
return childBlock;
96+
};
97+
98+
5799
protected:
58100
DataBlock* data; // The DataBlock in use
59101
uint64_t dataBufferSize = 0; // Usable memory size pointed by data. Unspecified if zero.
60102
ReleaseCallback releaseCallback; // Function called on object destroy, to release dataBlock.
103+
104+
DataBlockContainerReference parentBlock = nullptr; // reference to parent block used, if any
105+
uint64_t dataBufferUsed = 0; // size of buffer used for child blocks.
61106
};
62107

63108
#endif

src/DataSet.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
// A container to store a vector of shared pointers to data block containers.
1818
// Can typically be used to pass a set of blocks to a function.
1919

20-
using DataBlockContainerReference = std::shared_ptr<DataBlockContainer>;
2120
using DataSet = std::vector<DataBlockContainerReference>;
2221
using DataSetReference = std::shared_ptr<DataSet>;
2322

src/MemoryPagesPool.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ void MemoryPagesPool::log(const std::string &msg) {
298298
}
299299

300300
void MemoryPagesPool::updateBufferState() {
301+
if (theLogCallback == nullptr) {
302+
return;
303+
}
301304
double r = 1.0 - (getNumberOfPagesAvailable() * 1.0 / getTotalNumberOfPages());
302305
if ((r == 1.0) && (state != BufferState::full)) {
303306
state = BufferState::full;

src/ReadoutEquipmentRORC.cxx

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <ReadoutCard/Exception.h>
1616
#include <ReadoutCard/MemoryMappedFile.h>
1717
#include <ReadoutCard/Parameters.h>
18+
#include <Common/SimpleLog.h>
1819
#include <cstring>
1920
#include <mutex>
2021
#include <string>
@@ -64,6 +65,11 @@ class ReadoutEquipmentRORC : public ReadoutEquipment
6465

6566
int32_t lastPacketDropped = 0; // latest value of CRU packet dropped counter
6667
AliceO2::Common::Timer packetDroppedTimer; // a timer to set period of packet dropped checks
68+
69+
SimpleLog logRocCalls; // ROC debug log file, for ROC library traces
70+
bool logRocCallsEnable = false; // flag to enable ROC debug log file
71+
std::string logRocCallsPath = "/tmp/roclog.txt"; // path to ROC debug log file
72+
unsigned long logRocCallsMaxFileSize = 10000000; // max size (bytes) of ROC debug log file
6773
};
6874

6975
// std::mutex readoutEquipmentRORCLock;
@@ -174,6 +180,11 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile& cfg, std::string name) :
174180

175181
// todo: log parameters ?
176182

183+
if (logRocCallsEnable) {
184+
logRocCalls.setLogFile(logRocCallsPath.c_str(), logRocCallsMaxFileSize);
185+
logRocCalls.info("Logs of ROC library calls by readout process");
186+
}
187+
177188
} catch (const std::exception& e) {
178189
theLog.log(LogErrorSupport_(3240), "Exception : %s", e.what());
179190
theLog.log(LogErrorSupport_(3240), "%s", boost::diagnostic_information(e).c_str());
@@ -216,12 +227,10 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks()
216227
} else {
217228
packetDroppedTimer.increment();
218229
// check CRU FIFO status - but only after first loop, otherwise would be empty yet
219-
if (0) { // feature disabled for the time being, spurious warnings on startup
220230
if (!channel->areSuperpageFifosHealthy()) {
221231
static InfoLogger::AutoMuteToken logToken(LogWarningSupport_(3235), 5, 60);
222232
theLog.log(logToken, "Equipment %s: ROC memory fifo not healthy", name.c_str());
223233
}
224-
}
225234
}
226235
}
227236

@@ -267,6 +276,9 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks()
267276
}
268277
}
269278
equipmentStats[EquipmentStatsIndexes::nPushedUp].increment(nPushed);
279+
if (logRocCallsEnable) {
280+
logRocCalls.info("pushed %d",nPushed);
281+
}
270282

271283
// check fifo occupancy ready queue size for stats
272284
equipmentStats[EquipmentStatsIndexes::fifoOccupancyReadyBlocks].set(channel->getReadyQueueSize());
@@ -284,6 +296,10 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks()
284296

285297
// this is to be called periodically for driver internal business
286298
channel->fillSuperpages();
299+
if (logRocCallsEnable) {
300+
logRocCalls.info("fillSuperpage()");
301+
}
302+
287303

288304
// readoutEquipmentRORCLock.unlock();
289305

@@ -317,6 +333,9 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock()
317333
for(int loop = 0; loop < maxLoop; loop++) {
318334
bool doLoop = 0;
319335
if ((channel->getReadyQueueSize() > 0)) {
336+
if (logRocCallsEnable) {
337+
logRocCalls.info("pop (%d available)", channel->getReadyQueueSize());
338+
}
320339
// get next page from FIFO
321340
auto superpage = channel->popSuperpage();
322341
void* mpPageAddress = (void*)(superpage.getUserData());
@@ -370,6 +389,9 @@ void ReadoutEquipmentRORC::setDataOn()
370389
// start DMA
371390
theLog.log(LogInfoDevel_(3010), "Starting DMA for ROC %s", getName().c_str());
372391
channel->startDma();
392+
if (logRocCallsEnable) {
393+
logRocCalls.info("startdma()");
394+
}
373395

374396
// get FIFO depth (it should be fully empty when starting)
375397
// can not be done before startDma() - would return 0
@@ -415,6 +437,9 @@ void ReadoutEquipmentRORC::setDataOff()
415437
theLog.log(LogInfoDevel_(3010), "Stopping DMA for ROC %s", getName().c_str());
416438
try {
417439
channel->stopDma();
440+
if (logRocCallsEnable) {
441+
logRocCalls.info("stopdma()");
442+
}
418443
} catch (const std::exception& e) {
419444
theLog.log(LogErrorSupport_(3240), "Exception : %s", e.what());
420445
theLog.log(LogErrorSupport_(3240), "%s", boost::diagnostic_information(e).c_str());

src/ReadoutVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12-
#define READOUT_VERSION "2.11.1"
12+
#define READOUT_VERSION "2.12.0"
1313

src/mainReadout.cxx

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,26 @@ int Readout::configure(const boost::property_tree::ptree& properties)
565565
};
566566
mergeConfig(cfg.get(), properties);
567567

568+
// extract optional configuration parameters
569+
// configuration parameter: | readout | customCommands | string | | List of key=value pairs defining some custom shell commands to be executed at before/after state change commands. |
570+
if (customCommandsShellPid) {
571+
std::string cfgCustomCommandsList;
572+
customCommands.clear();
573+
cfg.getOptionalValue<std::string>("readout.customCommands", cfgCustomCommandsList);
574+
if (getKeyValuePairsFromString(cfgCustomCommandsList, customCommands)) {
575+
theLog.log(LogWarningDevel_(3102), "Failed to parse custom commands");
576+
customCommands.clear();
577+
} else {
578+
theLog.log(LogInfoDevel_(3013), "Registered custom commands:");
579+
for (const auto &kv : customCommands) {
580+
theLog.log(LogInfoDevel_(3013), "%s : %s", kv.first.c_str(), kv.second.c_str());
581+
}
582+
}
583+
}
584+
585+
// execute custom command
586+
executeCustomCommand("preCONFIGURE");
587+
568588
// try to prevent deep sleeps
569589
bool deepsleepDisabled = false;
570590
int maxLatency = 2;
@@ -584,23 +604,6 @@ int Readout::configure(const boost::property_tree::ptree& properties)
584604
theLog.log(LogInfoDevel, "CPU deep sleep not disabled for process");
585605
}
586606

587-
// extract optional configuration parameters
588-
// configuration parameter: | readout | customCommands | string | | List of key=value pairs defining some custom shell commands to be executed at before/after state change commands. |
589-
if (customCommandsShellPid) {
590-
std::string cfgCustomCommandsList;
591-
customCommands.clear();
592-
cfg.getOptionalValue<std::string>("readout.customCommands", cfgCustomCommandsList);
593-
if (getKeyValuePairsFromString(cfgCustomCommandsList, customCommands)) {
594-
theLog.log(LogWarningDevel_(3102), "Failed to parse custom commands");
595-
customCommands.clear();
596-
} else {
597-
theLog.log(LogInfoDevel_(3013), "Registered custom commands:");
598-
for (const auto &kv : customCommands) {
599-
theLog.log(LogInfoDevel_(3013), "%s : %s", kv.first.c_str(), kv.second.c_str());
600-
}
601-
}
602-
}
603-
604607
// configuration parameter: | readout | exitTimeout | double | -1 | Time in seconds after which the program exits automatically. -1 for unlimited. |
605608
cfgExitTimeout = -1;
606609
cfg.getOptionalValue<double>("readout.exitTimeout", cfgExitTimeout);
@@ -1027,6 +1030,9 @@ int Readout::configure(const boost::property_tree::ptree& properties)
10271030
}
10281031
theLog.log(LogInfoDevel, "Aggregator: %d equipments", nEquipmentsAggregated);
10291032

1033+
// execute custom command
1034+
executeCustomCommand("postCONFIGURE");
1035+
10301036
theLog.log(LogInfoSupport_(3005), "Readout completed CONFIGURE");
10311037
gReadoutStats.counters.state = stringToUint64("ready");
10321038
gReadoutStats.counters.notify++;

src/readoutConfigEditor.tcl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ set configurationParametersDescriptor {
3030
| consumer-data-sampling-* | address | string | ipc:///tmp/readout-pipe-1 | Address of the data sampling. |
3131
| consumer-FairMQChannel-* | checkResources | string | | Check beforehand if unmanaged region would fit in given list of resources. Comma-separated list of items to be checked: eg /dev/shm, MemFree, MemAvailable. (any filesystem path, and any /proc/meminfo entry).|
3232
| consumer-FairMQChannel-* | disableSending | int | 0 | If set, no data is output to FMQ channel. Used for performance test to create FMQ shared memory segment without pushing the data. |
33+
| consumer-FairMQChannel-* | enablePackedCopy | int | 0 | If set, the same superpage may be reused (space allowing) for the copy of multiple HBF (instead of a separate one for each copy). This allows a reduced memoryPoolNumberOfPages. |
3334
| consumer-FairMQChannel-* | enableRawFormat | int | 0 | If 0, data is pushed 1 STF header + 1 part per HBF. If 1, data is pushed in raw format without STF headers, 1 FMQ message per data page. If 2, format is 1 STF header + 1 part per data page.|
3435
| consumer-FairMQChannel-* | fmq-address | string | ipc:///tmp/pipe-readout | Address of the FMQ channel. Depends on transportType. c.f. FairMQ::FairMQChannel.h |
3536
| consumer-FairMQChannel-* | fmq-name | string | readout | Name of the FMQ channel. c.f. FairMQ::FairMQChannel.h |

0 commit comments

Comments
 (0)