Skip to content

Commit f263c76

Browse files
authored
Merge pull request #188 from sy-c/master
v2.3.0
2 parents cdda52b + baa8202 commit f263c76

File tree

8 files changed

+35
-14
lines changed

8 files changed

+35
-14
lines changed

doc/releaseNotes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,3 +340,8 @@ This file describes the main feature changes for each readout.exe released versi
340340

341341
## v2.2.0 - 07/05/2021
342342
- Added rate limit for eventDump: see consumer-zmq.maxRate and consumer-zmq.pagesPerBurst.
343+
344+
## v2.3.0 - 11/05/2021
345+
- consumer-stats: publish/print the current timeframe Id sent to STFB.
346+
- consumer-stats: ZMQ stats client cleanup timeout, to avoid blocking on exit.
347+
- auto-mute RDH warnings: verbosity reduced if many successive logs done in a short time.

src/ConsumerFMQchannel.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,8 @@ class ConsumerFMQchannel : public Consumer
460460
// printf("offset %d - HBid=%d\n",offset,lastHBid);
461461
}
462462
if (stfHeader->linkId != rdh->linkId) {
463-
theLog.log(LogWarningSupport_(3004), "TF%d link Id mismatch %d != %d @ page offset %d", (int)stfHeader->timeframeId, (int)stfHeader->linkId, (int)rdh->linkId, (int)offset);
463+
static InfoLogger::AutoMuteToken token(LogWarningSupport_(3004));
464+
theLog.log(token, "TF%d equipment %d link Id mismatch %d != %d @ page offset %d", (int)stfHeader->timeframeId, (int)stfHeader->equipmentId, (int)stfHeader->linkId, (int)rdh->linkId, (int)offset);
464465
// dumpRDH(rdh);
465466
// printf("block %p : offset %d = %p\n",b,offset,rdh);
466467
}
@@ -646,6 +647,7 @@ class ConsumerFMQchannel : public Consumer
646647
messagesToSend.clear();
647648
gReadoutStats.counters.bytesFairMQ += messagesToSendSize;
648649
messagesToSendSize = 0;
650+
gReadoutStats.counters.timeframeIdFairMQ = stfHeader->timeframeId;
649651
} else {
650652
theLog.log(LogErrorSupport_(3233), "Sending failed");
651653
}

src/ConsumerStats.cxx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class ConsumerStats : public Consumer
127127
gReadoutStats.counters.pagesPendingFairMQtime = 0;
128128
gReadoutStats.counters.pagesPendingFairMQreleased = 0;
129129
unsigned long long nRfmq = snapshot.pagesPendingFairMQreleased.load();
130+
int tfidfmq = (int)snapshot.timeframeIdFairMQ.load();
130131
double avgTfmq = 0.0;
131132
double rRfmq = 0.0;
132133
if (nRfmq) {
@@ -157,6 +158,7 @@ class ConsumerStats : public Consumer
157158
sendMetricNoException({ (int)nRfmq, "readout.stfbMemoryPagesLocked"});
158159
sendMetricNoException({ rRfmq, "readout.stfbMemoryPagesReleaseRate"});
159160
sendMetricNoException({ avgTfmq, "readout.stfbMemoryPagesReleaseLatency"});
161+
sendMetricNoException({ tfidfmq, "readout.stfbTimeframeId"});
160162
}
161163

162164
#ifdef WITH_ZMQ
@@ -169,7 +171,7 @@ class ConsumerStats : public Consumer
169171
if (consoleUpdate) {
170172
if (deltaT > 0) {
171173
theLog.log(LogInfoOps_(3003), "Last interval (%.2fs): blocksRx=%llu, block rate=%.2lf, bytesRx=%llu, rate=%s", deltaT, (unsigned long long)counterBlocksDiff, counterBlocksDiff / deltaT, (unsigned long long)counterBytesDiff, NumberOfBytesToString(counterBytesDiff * 8 / deltaT, "b/s", 1000).c_str());
172-
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s", nRfmq, rRfmq, avgTfmq);
174+
theLog.log(LogInfoOps_(3003), "STFB locked pages: current=%llu, release rate=%.2lf Hz, latency=%.3lf s, current TF = %d", nRfmq, rRfmq, avgTfmq, tfidfmq );
173175
}
174176
}
175177

@@ -256,6 +258,12 @@ class ConsumerStats : public Consumer
256258
throw __LINE__;
257259
}
258260

261+
const int cfgZmqLinger = 1000;
262+
zmqError = zmq_setsockopt(zmqHandle, ZMQ_LINGER, (void*)&cfgZmqLinger, sizeof(cfgZmqLinger)); // close timeout
263+
if (zmqError) {
264+
throw __LINE__;
265+
}
266+
259267
zmqError = zmq_connect(zmqHandle, cfgZmqPublishAddress.c_str());
260268
if (zmqError) {
261269
throw __LINE__;

src/ReadoutEquipment.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,8 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
571571
int rdhIndexInPage = 0;
572572
int linkId = undefinedLinkId;
573573

574+
static InfoLogger::AutoMuteToken logRdhErrorsToken(LogWarningSupport_(3004), 30, 5);
575+
574576
for (size_t pageOffset = 0; pageOffset < blockSize;) {
575577
RdhHandle h(baseAddress + pageOffset);
576578
rdhIndexInPage++;
@@ -608,7 +610,7 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
608610
}
609611
if (linkId != h.getLinkId()) {
610612
if (cfgRdhDumpWarningEnabled) {
611-
theLog.log(LogWarningSupport_(3004), "RDH #%d @ 0x%X : inconsistent link ids: %d != %d", rdhIndexInPage, (unsigned int)pageOffset, linkId, h.getLinkId());
613+
theLog.log(logRdhErrorsToken, "Equipment %d RDH #%d @ 0x%X : inconsistent link ids: %d != %d", id, rdhIndexInPage, (unsigned int)pageOffset, linkId, h.getLinkId());
612614
}
613615
statsRdhCheckStreamErr++;
614616
break; // stop checking this page
@@ -617,7 +619,7 @@ int ReadoutEquipment::processRdh(DataBlockContainerReference& block)
617619
// check no timeframe overlap in page
618620
if (((blockHeader.timeframeOrbitFirst < blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) || (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast))) || ((blockHeader.timeframeOrbitFirst > blockHeader.timeframeOrbitLast) && ((h.getTriggerOrbit() < blockHeader.timeframeOrbitFirst) && (h.getTriggerOrbit() > blockHeader.timeframeOrbitLast)))) {
619621
if (cfgRdhDumpErrorEnabled) {
620-
theLog.log(LogWarningSupport_(3004), "RDH #%d @ 0x%X : TimeFrame ID change in page not allowed : orbit 0x%08X not in range [0x%08X,0x%08X]", rdhIndexInPage, (unsigned int)pageOffset, (int)h.getTriggerOrbit(), (int)blockHeader.timeframeOrbitFirst, (int)blockHeader.timeframeOrbitLast);
622+
theLog.log(logRdhErrorsToken, "Equipment %d RDH #%d @ 0x%X : TimeFrame ID change in page not allowed : orbit 0x%08X not in range [0x%08X,0x%08X]", id, rdhIndexInPage, (unsigned int)pageOffset, (int)h.getTriggerOrbit(), (int)blockHeader.timeframeOrbitFirst, (int)blockHeader.timeframeOrbitLast);
621623
}
622624
statsRdhCheckStreamErr++;
623625
break; // stop checking this page

src/ReadoutStats.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ void ReadoutStats::reset()
4141
counters.pagesPendingFairMQ = 0;
4242
counters.pagesPendingFairMQreleased = 0;
4343
counters.pagesPendingFairMQtime = 0;
44+
counters.timeframeIdFairMQ = 0;
4445
}
4546

4647
void ReadoutStats::print()

src/ReadoutStats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct ReadoutStatsCounters {
2828
std::atomic<uint64_t> pagesPendingFairMQ; // number of pages pending in ConsumerFMQ
2929
std::atomic<uint64_t> pagesPendingFairMQreleased; // number of pages which have been released by ConsumerFMQ
3030
std::atomic<uint64_t> pagesPendingFairMQtime; // latency in FMQ, in microseconds, total for all released pages
31+
std::atomic<uint32_t> timeframeIdFairMQ; // last timeframe pushed to ConsumerFMQ
3132
};
3233

3334
// need to be able to easily transmit this struct as a whole

src/ReadoutVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
#define READOUT_VERSION "2.2.0"
1+
#define READOUT_VERSION "2.3.0"

src/readoutMonitor.cxx

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ int main(int argc, const char** argv)
182182
double previousSampleTime = 0;
183183

184184
// header
185-
printf(" Time State nStf Readout Recorder STFB STFB STFB STFB\n");
186-
printf(" total total total memory memory memory\n");
187-
printf(" locked release release\n");
188-
printf(" rate latency\n");
189-
printf(" (bytes) (bytes) (bytes) (pages) (pages/s) (s)\n");
185+
printf(" Time State nStf Readout Recorder STFB STFB STFB STFB STFB\n");
186+
printf(" total total total memory memory memory tf \n");
187+
printf(" locked release release id \n");
188+
printf(" rate latency \n");
189+
printf(" (bytes) (bytes) (bytes) (pages) (pages/s) (s) \n");
190190

191191
for (; !ShutdownRequest;) {
192192
int nb = 0;
@@ -214,7 +214,7 @@ int main(int argc, const char** argv)
214214
avgTfmq = (counters->pagesPendingFairMQtime.load() / nRfmq) / (deltaT * 1000000.0);
215215
}
216216
if (cfgRawBytes) {
217-
printf("%s\t%s\t%llu\t%llu\t%llu\t%llu\t%llu\t%.2lf\t%.6lf\n",
217+
printf("%s\t%s\t%llu\t%llu\t%llu\t%llu\t%llu\t%.2lf\t%.6lf\t%d\n",
218218
t ? getStringTime(t).c_str() : "-",
219219
(char*)&state,
220220
(unsigned long long)counters->numberOfSubtimeframes.load(),
@@ -223,9 +223,10 @@ int main(int argc, const char** argv)
223223
(unsigned long long)counters->bytesFairMQ.load(),
224224
(unsigned long long)counters->pagesPendingFairMQ.load(),
225225
nRfmq,
226-
avgTfmq);
226+
avgTfmq,
227+
(int)counters->timeframeIdFairMQ.load());
227228
} else {
228-
printf("%s %s %8llu %s %s %s %6llu %7.2lf %6.4lf\n",
229+
printf("%s %s %8llu %s %s %s %6llu %7.2lf %6.4lf %8d\n",
229230
t ? getStringTime(t).c_str() : "-",
230231
(char*)&state,
231232
(unsigned long long)counters->numberOfSubtimeframes.load(),
@@ -234,7 +235,8 @@ int main(int argc, const char** argv)
234235
NumberOfBytesToString(counters->bytesFairMQ.load(),"").c_str(),
235236
(unsigned long long)counters->pagesPendingFairMQ.load(),
236237
nRfmq,
237-
avgTfmq);
238+
avgTfmq,
239+
(int)counters->timeframeIdFairMQ.load());
238240
}
239241
}
240242
previousSampleTime = t;

0 commit comments

Comments
 (0)