Skip to content

Commit 39c6369

Browse files
committed
test fmq memory: more options../doc/releaseNotes.md
1 parent 7e78440 commit 39c6369

File tree

1 file changed

+121
-41
lines changed

1 file changed

+121
-41
lines changed

src/testFmqMemory.cxx

Lines changed: 121 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,32 @@
1616
#include <sys/types.h>
1717
#include <sys/mman.h>
1818
#include <unistd.h>
19+
#include <stdarg.h>
1920

2021
#include <fairmq/FairMQDevice.h>
2122
#include <fairmq/FairMQTransportFactory.h>
2223
//#include <fairmq/tools/Unique.h>
2324

25+
// simple log function with good timestamp
26+
void log(const char *message, ...) {
27+
char buffer[1024] = "";
28+
size_t len = 0;
29+
va_list ap;
30+
va_start(ap, message);
31+
vsnprintf(&buffer[len], sizeof(buffer), message, ap);
32+
va_end(ap);
33+
using namespace std::chrono;
34+
uint64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
35+
std::time_t seconds = now / 1000;
36+
int milliseconds = now % 1000;
37+
std::ostringstream oss;
38+
oss << std::put_time(std::gmtime(&seconds), "%Y-%m-%d %H:%M:%S.") << std::setfill('0') << std::setw(3) << milliseconds;
39+
std::string timestamp(oss.str());
40+
printf("%s\t%s\n", timestamp.c_str(), buffer);
41+
fflush(stdout);
42+
}
43+
44+
// print process memory stats from /proc
2445
void logMemoryUsage() {
2546
double memPageSize = getpagesize() / (1024.0*1024.0);
2647
const int maxpath = 256;
@@ -32,107 +53,166 @@ void logMemoryUsage() {
3253
if (fgets(buf, maxline, fp) != NULL) {
3354
int vsize, vresident, vshared, vtext, vlib, vdata, vdt;
3455
if (sscanf(buf, "%d %d %d %d %d %d %d", &vsize, &vresident, &vshared, &vtext, &vlib, &vdata, &vdt) == 7) {
35-
printf("Memory stats: size = %6.2f MB\tresident = %6.2f MB\tshared = %6.2f MB\n", vsize * memPageSize, vresident * memPageSize, vshared * memPageSize);
56+
log("Memory stats: size = %6.2f MB\tresident = %6.2f MB\tshared = %6.2f MB", vsize * memPageSize, vresident * memPageSize, vshared * memPageSize);
3657
}
3758
}
3859
fclose(fp);
3960
}
4061

41-
4262
#define GB *1073741824LLU
4363
#define SLEEPTIME 3
4464
//#define WAITHERE printf("Waiting %ds ",SLEEPTIME); for(int k=0; k<SLEEPTIME; k++) {printf(".");fflush(stdout);usleep(1000000);} printf("\n");
4565
#define WAITHERE logMemoryUsage();
4666

4767

4868
// memory settings
49-
const bool memlock = false; // lock the whole process memory
50-
const bool fmqMemLock = true; // lock FMQ region
51-
const bool fmqMemZero = false; // zero FMQ region
69+
int memLock = 0; // lock the whole process memory
70+
int memZero = 2; // write mode: 0=no write 1=memset 2=bzero 3=1 byte per page
71+
int fmqMemLock = 1; // lock FMQ region
72+
int fmqMemZero = 0; // zero FMQ region
73+
int nLoops = 1; // number of test loops
74+
int memWaitRelease = 0; // amount of time to keep the memory before releasing it
5275

53-
54-
int main(int argc, const char* argv[]) {
76+
int main(int argc, char* argv[]) {
5577

5678
unsigned int ngb = 1;
79+
int syncTime = 0;
5780
if (argc>=2) {
5881
ngb=(unsigned int)atoi(argv[1]);
5982
} else {
6083
printf("Usage: %s numberOfGigabytes\n",argv[0]);
6184
return -1;
6285
}
63-
64-
printf("Locking process memory: %s\n", memlock ? "yes" : "no");
65-
if (memlock) {
86+
// other options
87+
for (int i = 2; i < argc; i++) {
88+
char *k = argv[i];
89+
char *v = strchr(k, '=');
90+
if (v != nullptr) {
91+
*v = 0;
92+
v++;
93+
}
94+
if (strcmp(k, "syncTime") == 0) {
95+
syncTime = atoi(v);
96+
} else if (strcmp(k, "memLock") == 0) {
97+
memLock = atoi(v);
98+
} else if (strcmp(k, "memZero") == 0) {
99+
memZero = atoi(v);
100+
} else if (strcmp(k, "fmqMemLock") == 0) {
101+
fmqMemLock = atoi(v);
102+
} else if (strcmp(k, "fmqMemZero") == 0) {
103+
fmqMemZero = atoi(v);
104+
} else if (strcmp(k, "nLoops") == 0) {
105+
nLoops = atoi(v);
106+
} else if (strcmp(k, "memWaitRelease") == 0) {
107+
memWaitRelease = atoi(v);
108+
} else {
109+
printf("unknown option %s\n", k);
110+
return -1;
111+
}
112+
}
113+
// wait until scheduled startup time (given modulo round number of seconds)
114+
if (syncTime>0) {
115+
time_t t = time(NULL);
116+
time_t waitT = syncTime - (t % syncTime);
117+
log("Waiting sync time (%ds)", (int)waitT);
118+
t = t + waitT;
119+
while (time(NULL) != t) {
120+
usleep(10000);
121+
}
122+
}
123+
124+
log("Locking process memory: %s", memLock ? "yes" : "no");
125+
if (memLock) {
66126
if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) {
67-
printf("failed to lock memory\n");
127+
log("failed to lock memory");
68128
}
69129
}
70130

71-
printf("Startup pid %d\n",(int)getpid());
131+
log("Startup pid %d",(int)getpid());
132+
WAITHERE;
133+
134+
for(int nn=0; nn < nLoops; nn++) {
135+
log("Starting test loop %d / %d", nn + 1, nLoops);
72136
WAITHERE;
73137

74138
std::unique_ptr<FairMQChannel> sendingChannel;
75139
std::shared_ptr<FairMQTransportFactory> transportFactory;
76140
FairMQUnmanagedRegionPtr memoryBuffer = nullptr;
77141
FairMQProgOptions fmqOptions;
78142

79-
printf("Create FMQ channel\n");
143+
log("Create FMQ channel");
80144
// random name: use fair::mq::tools::Uuid()
81145
transportFactory = FairMQTransportFactory::CreateTransportFactory("shmem", "readout-test", &fmqOptions);
82146
sendingChannel = std::make_unique<FairMQChannel>("readout-test", "pair", transportFactory);
83147
WAITHERE;
84148

85-
printf("Get unmanaged memory (lock=%s, zero=%s)\n", fmqMemLock ? "yes" : "no", fmqMemZero ? "yes" : "no");
149+
log("Get unmanaged memory (lock=%s, zero=%s)", fmqMemLock ? "yes" : "no", fmqMemZero ? "yes" : "no");
86150
long long mMemorySize = ngb GB;
87151
auto t00 = std::chrono::steady_clock::now();
88152
try {
89153
// memoryBuffer = sendingChannel->Transport()->CreateUnmanagedRegion(mMemorySize, [](void* /*data*/, size_t /*size*/, void* /*hint*/) {});
90154
memoryBuffer = sendingChannel->Transport()->CreateUnmanagedRegion(mMemorySize, [](void* /*data*/, size_t /*size*/, void* /*hint*/) {},
91-
"",0,fair::mq::RegionConfig{fmqMemLock, fmqMemZero}); // lock / zero
155+
"",0,fair::mq::RegionConfig{(bool)fmqMemLock, (bool)fmqMemZero}); // lock / zero
92156
}
93157
catch(...) {
94-
printf("Failed to get buffer (exception)\n"); return 1;
158+
log("Failed to get buffer (exception)"); return 1;
95159
}
96-
if (memoryBuffer==nullptr) { printf("Failed to get buffer\n"); return 1; }
160+
if (memoryBuffer==nullptr) { log("Failed to get buffer"); return 1; }
97161
memoryBuffer->SetLinger(1);
98162
std::chrono::duration<double> tdiff0 = std::chrono::steady_clock::now() - t00;
99-
printf("Got %p : %llu - %.2lf GB/s\n", memoryBuffer->GetData(), (unsigned long long)memoryBuffer->GetSize(), ngb * 1.0/tdiff0.count());
163+
log("Got %p : %llu - %.2lf GB/s", memoryBuffer->GetData(), (unsigned long long)memoryBuffer->GetSize(), ngb * 1.0/tdiff0.count());
100164
WAITHERE;
101165

102-
printf("Write to memory, by chunks of 1GB\n");
103-
t00 = std::chrono::steady_clock::now();
104-
for (unsigned int i=0; i<ngb; i++) {
105-
auto t0 = std::chrono::steady_clock::now();
106-
char *ptr=(char *)memoryBuffer->GetData();
107-
ptr=&ptr[i GB];
108-
printf("#%u : writing @%p ... ",i+1, ptr);
109-
//memset(ptr,0,1 GB);
110-
//bzero(ptr,1 GB);
111-
// marginally faster to write one byte per memory page
112-
if (1) {
113-
for(size_t j=0; j<1 GB; j += getpagesize()) {
114-
ptr[j]=0;
166+
if (memZero) {
167+
log("Write to memory (mode %d), by chunks of 1GB", memZero);
168+
t00 = std::chrono::steady_clock::now();
169+
for (unsigned int i=0; i<ngb; i++) {
170+
auto t0 = std::chrono::steady_clock::now();
171+
char *ptr=(char *)memoryBuffer->GetData();
172+
ptr=&ptr[i GB];
173+
printf("#%u : writing @%p ... ",i+1, ptr);
174+
if (memZero == 1) {
175+
memset(ptr,0,1 GB);
176+
} else if (memZero == 2) {
177+
bzero(ptr,1 GB);
178+
} else if (memZero == 3) {
179+
// marginally faster to write one byte per memory page
180+
for(size_t j=0; j<1 GB; j += getpagesize()) {
181+
ptr[j]=0;
182+
}
115183
}
184+
std::chrono::duration<double> tdiff = std::chrono::steady_clock::now() - t0;
185+
printf(" %.2lf GB/s\n", 1.0/tdiff.count());
116186
}
117-
std::chrono::duration<double> tdiff = std::chrono::steady_clock::now() - t0;
118-
printf(" %.2lf GB/s\n", 1.0/tdiff.count());
187+
std::chrono::duration<double> tdiff1 = std::chrono::steady_clock::now() - t00;
188+
log("Done writing");
189+
log("Average: %.2lf GB/s (writing)", ngb * 1.0/tdiff1.count());
190+
log("Average: %.2lf GB/s (writing + malloc)", ngb * 1.0/(tdiff0.count() + tdiff1.count()));
191+
WAITHERE;
192+
}
193+
194+
if (memWaitRelease) {
195+
log("Waiting %ds before releasing", memWaitRelease);
196+
sleep(memWaitRelease);
119197
}
120-
std::chrono::duration<double> tdiff1 = std::chrono::steady_clock::now() - t00;
121-
printf("Done writing\n");
122-
printf("Average: %.2lf GB/s (writing)\n", ngb * 1.0/tdiff1.count());
123-
printf("Average: %.2lf GB/s (writing + malloc)\n", ngb * 1.0/(tdiff0.count() + tdiff1.count()));
124-
WAITHERE;
125198

126-
printf("Cleanup FMQ unmanaged region\n");
199+
log("Cleanup FMQ unmanaged region");
127200
memoryBuffer = nullptr;
128201
WAITHERE;
129202

130-
printf("Cleanup FMQ channel\n");
203+
log("Cleanup FMQ channel");
131204
sendingChannel.reset();
132205
transportFactory.reset();
133206
WAITHERE;
134207

135-
printf("Exit\n");
208+
log("Releasing FMQ variables");
209+
sendingChannel = nullptr;
210+
transportFactory = nullptr;
211+
WAITHERE;
212+
213+
}
214+
215+
log("Exit");
136216
return 0;
137217
}
138218

0 commit comments

Comments
 (0)