Skip to content

Commit 27402a0

Browse files
authored
Merge pull request #220 from sy-c/master
v2.10.2
2 parents 6f16762 + 298d714 commit 27402a0

15 files changed

+134
-17
lines changed

doc/releaseNotes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,3 +451,6 @@ This file describes the main feature changes for each readout.exe released versi
451451
## v2.10.1 - 20/04/2022
452452
- Updated configuration parameters:
453453
- equipment-file-*.updateOrbits: when set to zero, RDH orbits are not updated in file loop replay. This is needed for some reconstruction tests. This however creates a stream of data with inconsistent orbit ids and mismatching timeframe information.
454+
455+
## v2.10.2 - 04/05/2022
456+
- Added some buffer monitoring counters. They are logged in console stats and in monitoring (readout.bufferUsage, one per buffer, from 0% to 100%).

src/ConsumerFMQchannel.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,11 @@ class ConsumerFMQchannel : public Consumer
286286
throw "ConsumerFMQ: failed to get memory pool from " + memoryBankName + " for " + std::to_string(memoryPoolNumberOfPages) + " pages x " + std::to_string(memoryPoolPageSize) + " bytes";
287287
} else {
288288
mp -> setWarningCallback(std::bind(&ConsumerFMQchannel::mplog, this, std::placeholders::_1));
289+
if ((mp->getId() >= 0) && (mp->getId() < ReadoutStatsMaxItems)) {
290+
mp -> setBufferStateVariable(&gReadoutStats.counters.bufferUsage[mp->getId()]);
291+
}
289292
}
290-
theLog.log(LogInfoDevel_(3008), "Using memory pool %d pages x %d bytes", memoryPoolNumberOfPages, memoryPoolPageSize);
293+
theLog.log(LogInfoDevel_(3008), "Using memory pool [%d]: %d pages x %d bytes", mp->getId(), memoryPoolNumberOfPages, memoryPoolPageSize);
291294
}
292295

293296
~ConsumerFMQchannel()

src/ConsumerStats.cxx

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ class ConsumerStats : public Consumer
156156
sendMetricNoException({ rRfmq, "readout.stfbMemoryPagesReleaseRate"});
157157
sendMetricNoException({ avgTfmq, "readout.stfbMemoryPagesReleaseLatency"});
158158
sendMetricNoException({ tfidfmq, "readout.stfbTimeframeId"});
159+
160+
// buffer stats
161+
for (int i = 0; i < ReadoutStatsMaxItems; i++) {
162+
double r = snapshot.bufferUsage[i].load();
163+
if (r >= 0) {
164+
sendMetricNoException(Metric{(int)(r*100), "readout.bufferUsage"}.addTag(tags::Key::ID, i));
165+
}
166+
}
159167
}
160168

161169
#ifdef WITH_ZMQ
@@ -171,6 +179,16 @@ class ConsumerStats : public Consumer
171179
if (gReadoutStats.isFairMQ) {
172180
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, released = %llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", (unsigned long long) snapshot.pagesPendingFairMQ.load(), nRfmq, rRfmq, avgTfmq, tfidfmq );
173181
}
182+
std::string bufferReport;
183+
for (int i = 0; i < ReadoutStatsMaxItems; i++) {
184+
double r = snapshot.bufferUsage[i].load();
185+
if (r >= 0) {
186+
bufferReport += "["+ std::to_string(i) + "]=" + std::to_string((int)(r*100)) + "% ";
187+
}
188+
}
189+
if (bufferReport.length()) {
190+
theLog.log(LogInfoOps_(3003), "Memory buffers usage: %s", bufferReport.c_str());
191+
}
174192
}
175193
}
176194

src/MemoryBankManager.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ std::shared_ptr<MemoryPagesPool> MemoryBankManager::getPagedPool(size_t pageSize
4141
void* baseAddress = nullptr; // base address of bank from which the block is taken
4242
size_t offset = 0; // offset of new block (relative to baseAddress)
4343
size_t blockSize = 0; // size of new block (in bytes)
44+
int newId = 0;
4445

4546
// disable concurrent execution of this block
4647
// automatic release of lock when going out of scope
@@ -109,11 +110,12 @@ std::shared_ptr<MemoryPagesPool> MemoryBankManager::getPagedPool(size_t pageSize
109110

110111
// keep track of this new block
111112
banks[ix].rangesInUse.push_back({ offset, blockSize });
113+
newId = ++poolIndex;
112114
}
113115
// end of locked block
114116

115117
// create pool of pages from new block
116-
return std::make_shared<MemoryPagesPool>(pageSize, pageNumber, &(((char*)baseAddress)[offset]), blockSize, nullptr, firstPageOffset);
118+
return std::make_shared<MemoryPagesPool>(pageSize, pageNumber, &(((char*)baseAddress)[offset]), blockSize, nullptr, firstPageOffset, newId);
117119
}
118120

119121
// a global MemoryBankManager instance
@@ -140,5 +142,6 @@ void MemoryBankManager::reset()
140142
theLog.log(LogInfoDevel_(3008), "Releasing bank %s%s", it.name.c_str(), (useCount == 1) ? "" : "warning - still in use elsewhere !");
141143
}
142144
banks.clear();
145+
poolIndex = -1;
143146
}
144147

src/MemoryBankManager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class MemoryBankManager
6868
private:
6969
std::vector<bankDescriptor> banks; // list of registered memory banks
7070
std::mutex bankMutex; // instance mutex to handle concurrent access to public methods
71+
int poolIndex = -1; // an increasing index used to assign a unique id to memory pools
7172
};
7273

7374
// a global MemoryBankManager instance

src/MemoryPagesPool.cxx

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
int MemoryPagesPoolStatsEnabled = 0; // flag to control memory stats
1818

19-
MemoryPagesPool::MemoryPagesPool(size_t vPageSize, size_t vNumberOfPages, void* vBaseAddress, size_t vBaseSize, ReleaseCallback vCallback, size_t firstPageOffset)
19+
MemoryPagesPool::MemoryPagesPool(size_t vPageSize, size_t vNumberOfPages, void* vBaseAddress, size_t vBaseSize, ReleaseCallback vCallback, size_t firstPageOffset, int vId)
2020
{
2121
// initialize members from parameters
2222
pageSize = vPageSize;
@@ -25,6 +25,7 @@ MemoryPagesPool::MemoryPagesPool(size_t vPageSize, size_t vNumberOfPages, void*
2525
baseBlockSize = vBaseSize;
2626
releaseBaseBlockCallback = vCallback;
2727
state = BufferState::empty;
28+
id = vId;
2829

2930
// check page / header sizes
3031
assert(headerReservedSpace >= sizeof(DataBlock));
@@ -77,6 +78,9 @@ MemoryPagesPool::MemoryPagesPool(size_t vPageSize, size_t vNumberOfPages, void*
7778
t3.enableHistogram(64, 1, 100000000);
7879
t4.enableHistogram(64, 1, 100000000);
7980
}
81+
82+
// udpate buffer state
83+
updateBufferState();
8084
}
8185

8286
MemoryPagesPool::~MemoryPagesPool()
@@ -192,6 +196,9 @@ void MemoryPagesPool::releasePage(void* address)
192196
}
193197
}
194198

199+
// disable concurrent execution of this function
200+
std::unique_lock<std::mutex> lock(pagesAvailableMutexPush);
201+
195202
// put back page in list of available pages
196203
pagesAvailable->push(address);
197204

@@ -302,4 +309,17 @@ void MemoryPagesPool::updateBufferState() {
302309
state = BufferState::empty;
303310
log("buffer usage back to reasonable level");
304311
}
312+
if (pBufferState != nullptr) {
313+
pBufferState->store(r);
314+
}
315+
}
316+
317+
int MemoryPagesPool::getId() {
318+
return id;
319+
}
320+
321+
void MemoryPagesPool::setBufferStateVariable(std::atomic<double> *bufferStateVar) {
322+
pBufferState=bufferStateVar;
323+
updateBufferState();
324+
printf("buffer usage = %lf @ 0x%p\n", pBufferState->load(),pBufferState);
305325
}

src/MemoryPagesPool.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <map>
1919
#include <memory>
2020
#include <string>
21+
#include <mutex>
2122

2223
#include "CounterStats.h"
2324
#include "DataBlockContainer.h"
@@ -43,7 +44,8 @@ class MemoryPagesPool
4344
// - size of memory block in bytes (if zero, assuming it is big enough for page number * page size - not taking into account firstPageOffset is set)
4445
// - a release callback to be called at destruction time
4546
// - firstPageOffset is the offset of first page from base address. This is to control alignment. All pages are created contiguous from this point. If non-zero, this may reduce number of pages created compared to request (as to fit in base size)
46-
MemoryPagesPool(size_t pageSize, size_t numberOfPages, void* baseAddress, size_t baseSize = 0, ReleaseCallback callback = nullptr, size_t firstPageOffset = 0);
47+
// - id: an optional identifier
48+
MemoryPagesPool(size_t pageSize, size_t numberOfPages, void* baseAddress, size_t baseSize = 0, ReleaseCallback callback = nullptr, size_t firstPageOffset = 0, int id = -1);
4749

4850
// destructor
4951
~MemoryPagesPool();
@@ -59,6 +61,7 @@ class MemoryPagesPool
5961
size_t getNumberOfPagesAvailable(); // get number of pages currently available
6062
void* getBaseBlockAddress(); // get the base address of memory pool block
6163
size_t getBaseBlockSize(); // get the size of memory pool block. All pages guaranteed to be within &baseBlockAddress[0] and &baseBlockAddress[baseBlockSize]
64+
int getId(); // get pool identifier, as set on creation
6265

6366
// get an empty data block container from the pool
6467
// parameters:
@@ -77,6 +80,7 @@ class MemoryPagesPool
7780
typedef std::function<void(const std::string &)> LogCallback;
7881

7982
void setWarningCallback(const LogCallback& cb, double thHigh = 0.9, double thOk = 0.8);
83+
void setBufferStateVariable(std::atomic<double> *bufferStateVar); // the provided variable is updated continuously with the buffer usage ratio (0.0 empty -> 1.0 full)
8084

8185
private:
8286

@@ -87,8 +91,10 @@ class MemoryPagesPool
8791
enum BufferState {empty, high, full};
8892
BufferState state = BufferState::empty;
8993
void updateBufferState();
94+
std::atomic<double> *pBufferState = nullptr; // when set, the pointed variable is updated everytime updateBufferState() is called
9095

9196
std::unique_ptr<AliceO2::Common::Fifo<void*>> pagesAvailable; // a buffer to keep track of individual pages
97+
std::mutex pagesAvailableMutexPush; // a lock to avoid concurrent push-back of free pages to fifo
9298

9399
size_t numberOfPages; // number of pages
94100
size_t pageSize; // size of each page, in bytes
@@ -124,6 +130,7 @@ class MemoryPagesPool
124130
CounterStats t1, t2, t3, t4;
125131

126132
CounterStats poolStats; // keep track of number of free pages in the pool
133+
int id = -1; // unique identifier for this pool
127134
};
128135

129136
#endif // #ifndef _MEMORYPAGESPOOL_H

src/ReadoutEquipment.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ ReadoutEquipment::ReadoutEquipment(ConfigFile& cfg, std::string cfgEntryPoint, b
203203
throw __LINE__;
204204
} else {
205205
mp -> setWarningCallback(std::bind(&ReadoutEquipment::mplog, this, std::placeholders::_1));
206+
if ((mp->getId() >= 0) && (mp->getId() < ReadoutStatsMaxItems)) {
207+
mp -> setBufferStateVariable(&gReadoutStats.counters.bufferUsage[mp->getId()]);
208+
}
209+
theLog.log(LogInfoDevel_(3008), "Using memory pool [%d]: %d pages x %d bytes", mp->getId(), memoryPoolNumberOfPages, memoryPoolPageSize);
206210
}
207211
// todo: move page align to MemoryPool class
208212
assert(pageSpaceReserved == mp->getPageSize() - mp->getDataBlockMaxSize());

src/ReadoutEquipmentRORC.cxx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,18 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks()
210210
}
211211
}
212212
lastPacketDropped = currentPacketDropped;
213+
213214
if (isWaitingFirstLoop) {
214215
packetDroppedTimer.reset(1000000); // 1 sec interval
215216
} else {
216217
packetDroppedTimer.increment();
218+
// check CRU FIFO status - but only after first loop, otherwise would be empty yet
219+
if (0) { // feature disabled for the time being, spurious warnings on startup
220+
if (!channel->areSuperpageFifosHealthy()) {
221+
static InfoLogger::AutoMuteToken logToken(LogWarningSupport_(3235), 5, 60);
222+
theLog.log(logToken, "Equipment %s: ROC memory fifo not healthy", name.c_str());
223+
}
224+
}
217225
}
218226
}
219227

src/ReadoutStats.cxx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ ReadoutStats::~ReadoutStats() {
4646
zmqCleanup();
4747
}
4848

49-
void ReadoutStats::reset()
49+
void ReadoutStats::reset(bool lightReset)
5050
{
5151
counters.notify = 0;
5252

@@ -65,6 +65,12 @@ void ReadoutStats::reset()
6565
counters.timeframeIdFairMQ = 0;
6666

6767
counters.firstOrbit = undefinedOrbit;
68+
69+
if (!lightReset) {
70+
for (unsigned int i = 0; i < ReadoutStatsMaxItems; i++) {
71+
counters.bufferUsage[i] = -1.0;
72+
}
73+
}
6874
}
6975

7076
void ReadoutStats::print()

0 commit comments

Comments
 (0)