Skip to content

Commit 6d221c5

Browse files
committed
added o2-readout-monitor-memory
1 parent c4cd564 commit 6d221c5

File tree

1 file changed

+328
-0
lines changed

1 file changed

+328
-0
lines changed

src/readoutMemoryMonitor.cxx

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
13+
#include <InfoLogger/InfoLogger.hxx>
14+
#include <InfoLogger/InfoLoggerMacros.hxx>
15+
using namespace AliceO2::InfoLogger;
16+
17+
//#include "ZmqClient.hxx"
18+
//#include "TtyChecker.h"
19+
#include <zmq.h>
20+
#include "MemoryPagesPool.h"
21+
22+
#ifdef WITH_SDL
23+
#include <SDL2/SDL.h>
24+
//#include <SDL2/SDL_ttf.h>
25+
#endif
26+
27+
// set log environment before theLog is initialized
28+
// use console output, non-blocking input
29+
//TtyChecker theTtyChecker;
30+
31+
// log handle
32+
InfoLogger theLog;
33+
34+
#include <unistd.h>
35+
#include <inttypes.h>
36+
37+
int callback(void* msg, int msgSize)
38+
{
39+
(void)msg;
40+
printf("Block = %d\n",msgSize);
41+
return 0;
42+
}
43+
44+
int main(int argc, char** argv)
45+
{
46+
theLog.setContext(InfoLoggerContext({ { InfoLoggerContext::FieldName::Facility, (std::string) "readout/memview" } }));
47+
48+
std::string port = "tcp://127.0.0.1:50002"; // ZMQ server address
49+
int pageSize = 1024L * 1024L; // ZMQ RX buffer size, should be big enough to receive a full report
50+
int maxQueue = 1; // ZMQ input queue size
51+
52+
// parse options
53+
for (int i = 1; i < argc; i++) {
54+
const char* option = argv[i];
55+
std::string key(option);
56+
size_t separatorPosition = key.find('=');
57+
if (separatorPosition == std::string::npos) {
58+
theLog.log(LogErrorOps, "Failed to parse option '%s'\n", option);
59+
continue;
60+
}
61+
key.resize(separatorPosition);
62+
std::string value = &(option[separatorPosition + 1]);
63+
if (key == "port") {
64+
port = value;
65+
}
66+
if (key == "pageSize") {
67+
pageSize = atoi(value.c_str());
68+
}
69+
if (key == "maxQueue") {
70+
maxQueue = atoi(value.c_str());
71+
}
72+
}
73+
74+
void* context = nullptr;
75+
void* zh = nullptr;
76+
std::vector<char *> msgBuffer;
77+
std::vector<unsigned int> msgSize;
78+
79+
const int maxBlocks = 32; // maximum number of message parts
80+
81+
try {
82+
83+
msgBuffer.resize(maxBlocks+1);
84+
msgSize.resize(maxBlocks+1);
85+
for (int i=0; i <= maxBlocks; i++) {
86+
auto ptr = (char *)malloc(pageSize);
87+
if (ptr == nullptr) {
88+
theLog.log(LogErrorDevel, "Failed to allocate %d x %d buffer", maxBlocks, pageSize);
89+
throw __LINE__;
90+
}
91+
msgBuffer[i] = ptr;
92+
}
93+
94+
int linerr = 0;
95+
int zmqerr = 0;
96+
for (;;) {
97+
context = zmq_ctx_new();
98+
if (context == nullptr) {
99+
linerr = __LINE__;
100+
zmqerr = zmq_errno();
101+
break;
102+
}
103+
zh = zmq_socket(context, ZMQ_SUB);
104+
if (zh == nullptr) {
105+
linerr = __LINE__;
106+
zmqerr = zmq_errno();
107+
break;
108+
}
109+
int timeout = 1000;
110+
zmqerr = zmq_setsockopt(zh, ZMQ_RCVTIMEO, (void*)&timeout, sizeof(int));
111+
if (zmqerr) {
112+
linerr = __LINE__;
113+
break;
114+
}
115+
if (maxQueue >=0 ) {
116+
zmq_setsockopt(zh, ZMQ_RCVHWM, (void*)&maxQueue, sizeof(int));
117+
}
118+
zmqerr = zmq_connect(zh, port.c_str());
119+
if (zmqerr) {
120+
linerr = __LINE__;
121+
break;
122+
}
123+
// subscribe to all published messages
124+
zmqerr = zmq_setsockopt(zh, ZMQ_SUBSCRIBE, "", 0);
125+
if (zmqerr) {
126+
linerr = __LINE__;
127+
break;
128+
}
129+
break;
130+
}
131+
132+
if ((zmqerr) || (linerr)) {
133+
theLog.log(LogErrorDevel, "ZeroMQ error @%d : (%d) %s", linerr, zmqerr, zmq_strerror(zmqerr));
134+
throw __LINE__;
135+
}
136+
137+
}
138+
catch (...) {
139+
theLog.log(LogErrorDevel, "Failed to initialize client");
140+
return -1;
141+
}
142+
143+
#ifdef WITH_SDL
144+
SDL_Window* hWnd = NULL;
145+
SDL_Renderer* hRenderer = NULL;
146+
SDL_Event event;
147+
148+
int szx = 1920;
149+
int szy = 1080;
150+
151+
SDL_Init(SDL_INIT_VIDEO);
152+
hWnd = SDL_CreateWindow("FLP memory", SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, szx, szy, SDL_WINDOW_SHOWN); // | SDL_WINDOW_RESIZABLE);
153+
if (!hWnd) return -1;
154+
hRenderer = SDL_CreateRenderer(hWnd, -1, 0);
155+
if ((!hWnd) || (!hRenderer)) {
156+
return -1;
157+
}
158+
SDL_SetRenderDrawColor(hRenderer, 0, 0, 0, 0);
159+
SDL_RenderClear(hRenderer);
160+
SDL_RenderPresent(hRenderer);
161+
162+
//TTF_Font* Sans = TTF_OpenFont("Sans.ttf", 24);
163+
#endif
164+
165+
auto doRx = [&]() {
166+
unsigned int bufIx;
167+
unsigned int rxBytes = 0;
168+
for (unsigned int i = 0; ;i++) {
169+
bufIx = i < maxBlocks ? i : maxBlocks;
170+
int nb = 0;
171+
nb = zmq_recv(zh, msgBuffer[bufIx], pageSize, 0);
172+
if (nb >= pageSize) {
173+
// buffer was too small to gt full message
174+
theLog.log(LogWarningDevel, "ZMQ message bigger than buffer, skipping");
175+
break;
176+
}
177+
if (nb<0) break;
178+
rxBytes += nb;
179+
msgSize[bufIx]=nb;
180+
//printf("Got %d = %d\n",bufIx, nb);
181+
int more;
182+
size_t size = sizeof(int);
183+
if (zmq_getsockopt(zh, ZMQ_RCVMORE, &more, &size) != 0) break;
184+
if (!more) break;
185+
}
186+
if (rxBytes) {
187+
bool isOk = 0;
188+
if ((bufIx>=2) && (bufIx<maxBlocks) && (msgSize[0] == 4) && (msgSize[bufIx]==4)) {
189+
uint32_t nPools = *((uint32_t *)msgBuffer[0]);
190+
uint32_t trailer = *((uint32_t *)msgBuffer[bufIx]);
191+
if ((trailer == 0xF00F)&&((nPools * 2 + 1) == bufIx)) {
192+
//printf("%d pools\n", (int)nPools);
193+
isOk = 1;
194+
195+
for (unsigned int p=0; p<nPools; p++) {
196+
if (msgSize[1+p*2]!=sizeof(MemoryPagesPool::Stats)) {
197+
isOk = 0;
198+
break;
199+
}
200+
auto stats = ((MemoryPagesPool::Stats *)msgBuffer[1+p*2]);
201+
unsigned int npages = msgSize[2+p*2] / sizeof(MemoryPagesPool::PageStat);
202+
printf("%d %d: %f - %f - %u\n",p, stats->id, stats->t0,stats->t1,npages);
203+
auto ps = (MemoryPagesPool::PageStat*)msgBuffer[2+p*2];
204+
int c = 0;
205+
for (unsigned int k = 0; k<npages; k++) {
206+
if (ps[k].state != MemoryPage::PageState::Idle) c++;
207+
}
208+
printf("busy pages = %d / %u\n",c,npages);
209+
}
210+
211+
if (isOk) {
212+
#ifdef WITH_SDL
213+
int szx, szy;
214+
//SDL_RenderSetViewport(hRenderer, NULL);
215+
SDL_GetRendererOutputSize(hRenderer, &szx, &szy);
216+
SDL_SetRenderDrawColor(hRenderer, 0, 0, 0, 0);
217+
SDL_RenderClear(hRenderer);
218+
219+
printf("%d,%d\n", szx,szy);
220+
221+
int border=10;
222+
// one column per pool
223+
int cw=(szx-(nPools+1)*border)/nPools;
224+
int cy=szy-2*border;
225+
printf("cw,cy= %d, %d\n",cw, cy);
226+
for (unsigned int p=0; p<nPools; p++) {
227+
int ox=border+(border+cw)*p;
228+
int oy=border;
229+
printf("ox,oy= %d, %d\n",ox, oy);
230+
SDL_SetRenderDrawColor(hRenderer, 0, 0, 255, 255);
231+
SDL_Rect r = {ox,oy,cw,cy};
232+
SDL_RenderDrawRect(hRenderer, &r);
233+
234+
auto stats = ((MemoryPagesPool::Stats *)msgBuffer[1+p*2]);
235+
unsigned int npages = msgSize[2+p*2] / sizeof(MemoryPagesPool::PageStat);
236+
auto ps = (MemoryPagesPool::PageStat*)msgBuffer[2+p*2];
237+
238+
unsigned int bb = 2; // internal border
239+
unsigned int sq=(cw-2*bb)*(cy-2*bb);
240+
unsigned int pxk = (int)sqrt(sq/npages);
241+
pxk=6;
242+
printf("pxk=%d\n",pxk);
243+
unsigned int npl = (cw-bb) / pxk;
244+
for (unsigned int k = 0; k<npages; k++) {
245+
switch (ps[k].state) {
246+
case MemoryPage::PageState::Idle:
247+
SDL_SetRenderDrawColor(hRenderer, 32, 32, 32, 255);
248+
break;
249+
case MemoryPage::PageState::InROC:
250+
SDL_SetRenderDrawColor(hRenderer, 0, 255, 255, 255);
251+
break;
252+
case MemoryPage::PageState::InFMQ:
253+
SDL_SetRenderDrawColor(hRenderer, 255, 128, 128, 255);
254+
break;
255+
case MemoryPage::PageState::InAggregator:
256+
SDL_SetRenderDrawColor(hRenderer, 255, 255, 0, 255);
257+
break;
258+
default:
259+
SDL_SetRenderDrawColor(hRenderer, 200,200,200, 255);
260+
break;
261+
}
262+
int rx = k % npl;
263+
int ry = k / npl;
264+
SDL_Rect r = {ox+bb+rx*pxk+1,oy+bb+ry*pxk+1,pxk-2,pxk-2};
265+
SDL_RenderFillRect(hRenderer, &r);
266+
}
267+
}
268+
269+
SDL_RenderPresent(hRenderer);
270+
#endif
271+
}
272+
273+
}
274+
}
275+
if (!isOk) {
276+
printf("Wrong message received\n");
277+
}
278+
}
279+
};
280+
281+
/*
282+
for(;;) {
283+
doRx();
284+
}
285+
return 0;
286+
*/
287+
288+
#ifdef WITH_SDL
289+
290+
int shutdown = 0;
291+
while (!shutdown) {
292+
293+
294+
if (SDL_PollEvent(&event)) {
295+
printf("event type=%d\n",(int)event.type);
296+
shutdown=1;
297+
switch (event.type) {
298+
case SDL_QUIT:
299+
printf("exiting\n");
300+
shutdown = 1;
301+
break;
302+
303+
case SDL_KEYDOWN:
304+
int key = event.key.keysym.sym;
305+
if (key == SDLK_ESCAPE) {
306+
shutdown = 1;
307+
}
308+
break;
309+
}
310+
}
311+
else {
312+
doRx();
313+
SDL_Delay(10);
314+
}
315+
}
316+
SDL_DestroyRenderer(hRenderer);
317+
SDL_DestroyWindow(hWnd);
318+
SDL_Quit();
319+
exit(0);
320+
#endif // WITH_SDL
321+
322+
for(;;) {
323+
doRx();
324+
}
325+
326+
return 0;
327+
}
328+

0 commit comments

Comments
 (0)