1
1
#include " messenger/messenger.h"
2
2
3
+ #include < pthread.h>
3
4
#include < sys/mman.h>
4
5
#include < unistd.h>
5
6
@@ -112,30 +113,56 @@ TEST_F(MessengerTests, BasicReplicationTest) {
112
113
done[1 ] = false ;
113
114
done[2 ] = false ;
114
115
115
- auto spin_until_init = [init]() {
116
+ auto *pmutex = static_cast <pthread_mutex_t *>(
117
+ mmap (nullptr , sizeof (pthread_mutex_t ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
118
+ NOISEPAGE_ASSERT (MAP_FAILED != pmutex, " mmap() failed." );
119
+ pthread_mutexattr_t mutexattr;
120
+ pthread_mutexattr_init (&mutexattr);
121
+ pthread_mutexattr_setpshared (&mutexattr, PTHREAD_PROCESS_SHARED);
122
+ pthread_mutex_init (pmutex, &mutexattr);
123
+
124
+ auto *pcond = static_cast <pthread_cond_t *>(
125
+ mmap (nullptr , sizeof (pthread_cond_t ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
126
+ NOISEPAGE_ASSERT (MAP_FAILED != pcond, " mmap() failed." );
127
+ pthread_condattr_t condattr;
128
+ pthread_condattr_init (&condattr);
129
+ pthread_condattr_setpshared (&condattr, PTHREAD_PROCESS_SHARED);
130
+ pthread_cond_init (pcond, &condattr);
131
+
132
+ auto sleep_until_init = [init, pmutex, pcond]() {
116
133
while (!(init[0 ] && init[1 ] && init[2 ])) {
134
+ pthread_mutex_lock (pmutex);
135
+ pthread_cond_wait (pcond, pmutex);
136
+ pthread_mutex_unlock (pmutex);
117
137
}
118
138
};
119
139
120
- auto spin_until_done = [done]() {
140
+ auto sleep_until_done = [done, pmutex, pcond ]() {
121
141
while (!(done[0 ] && done[1 ] && done[2 ])) {
142
+ pthread_mutex_lock (pmutex);
143
+ pthread_cond_wait (pcond, pmutex);
144
+ pthread_mutex_unlock (pmutex);
122
145
}
123
146
};
124
147
148
+ auto wake_all = [pcond]() { pthread_cond_broadcast (pcond); };
149
+
125
150
VoidFn primary_fn = [=]() {
126
151
auto primary = BuildDBMain (port_primary, port_messenger_primary, " primary" );
127
152
primary->GetNetworkLayer ()->GetServer ()->RunServer ();
128
153
129
154
init[0 ] = true ;
130
- spin_until_init ();
155
+ wake_all ();
156
+ sleep_until_init ();
131
157
DirtySleep ();
132
158
133
159
while (!(done[1 ] && done[2 ])) {
134
160
}
135
161
136
162
MESSENGER_LOG_TRACE (" Primary done." );
137
163
done[0 ] = true ;
138
- spin_until_done ();
164
+ wake_all ();
165
+ sleep_until_done ();
139
166
MESSENGER_LOG_TRACE (" Primary exit." );
140
167
primary->ForceShutdown ();
141
168
};
@@ -145,7 +172,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {
145
172
replica1->GetNetworkLayer ()->GetServer ()->RunServer ();
146
173
147
174
init[1 ] = true ;
148
- spin_until_init ();
175
+ wake_all ();
176
+ sleep_until_init ();
149
177
DirtySleep ();
150
178
151
179
// Set up a connection to the primary.
@@ -186,7 +214,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {
186
214
187
215
MESSENGER_LOG_TRACE (" Replica 1 done." );
188
216
done[1 ] = true ;
189
- spin_until_done ();
217
+ wake_all ();
218
+ sleep_until_done ();
190
219
MESSENGER_LOG_TRACE (" Replica 1 exit." );
191
220
replica1->ForceShutdown ();
192
221
};
@@ -196,7 +225,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {
196
225
replica2->GetNetworkLayer ()->GetServer ()->RunServer ();
197
226
198
227
init[2 ] = true ;
199
- spin_until_init ();
228
+ wake_all ();
229
+ sleep_until_init ();
200
230
DirtySleep ();
201
231
202
232
// Set up a connection to the primary.
@@ -237,16 +267,21 @@ TEST_F(MessengerTests, BasicReplicationTest) {
237
267
238
268
MESSENGER_LOG_TRACE (" Replica 2 done." );
239
269
done[2 ] = true ;
240
- spin_until_done ();
270
+ wake_all ();
271
+ sleep_until_done ();
241
272
MESSENGER_LOG_TRACE (" Replica 2 exit." );
242
273
replica2->ForceShutdown ();
243
274
};
244
275
245
276
std::vector<pid_t > pids = ForkTests ({primary_fn, replica1_fn, replica2_fn});
246
277
247
278
// Spin until all done.
248
- while (!(done[0 ] && done[1 ] && done[2 ])) {
249
- }
279
+ sleep_until_done ();
280
+
281
+ pthread_mutex_destroy (pmutex);
282
+ pthread_mutexattr_destroy (&mutexattr);
283
+ pthread_cond_destroy (pcond);
284
+ pthread_condattr_destroy (&condattr);
250
285
251
286
DirtySleep ();
252
287
{
@@ -257,6 +292,14 @@ TEST_F(MessengerTests, BasicReplicationTest) {
257
292
UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 3 * sizeof (bool ));
258
293
NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
259
294
}
295
+ {
296
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(pmutex), sizeof (pthread_mutex_t ));
297
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
298
+ }
299
+ {
300
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(pcond), sizeof (pthread_cond_t ));
301
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
302
+ }
260
303
}
261
304
262
305
// NOLINTNEXTLINE
@@ -283,16 +326,34 @@ TEST_F(MessengerTests, BasicListenTest) {
283
326
done[0 ] = false ;
284
327
done[1 ] = false ;
285
328
286
- auto spin_until_init = [init]() {
329
+ auto *pmutex = static_cast <pthread_mutex_t *>(
330
+ mmap (nullptr , sizeof (pthread_mutex_t ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
331
+ NOISEPAGE_ASSERT (MAP_FAILED != pmutex, " mmap() failed." );
332
+ pthread_mutexattr_t mutexattr;
333
+ pthread_mutexattr_init (&mutexattr);
334
+ pthread_mutexattr_setpshared (&mutexattr, PTHREAD_PROCESS_SHARED);
335
+ pthread_mutex_init (pmutex, &mutexattr);
336
+
337
+ auto *pcond = static_cast <pthread_cond_t *>(
338
+ mmap (nullptr , sizeof (pthread_cond_t ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
339
+ NOISEPAGE_ASSERT (MAP_FAILED != pcond, " mmap() failed." );
340
+ pthread_condattr_t condattr;
341
+ pthread_condattr_init (&condattr);
342
+ pthread_condattr_setpshared (&condattr, PTHREAD_PROCESS_SHARED);
343
+ pthread_cond_init (pcond, &condattr);
344
+
345
+ auto sleep_until_init = [init]() {
287
346
while (!(init[0 ] && init[1 ])) {
288
347
}
289
348
};
290
349
291
- auto spin_until_done = [done]() {
350
+ auto sleep_until_done = [done]() {
292
351
while (!(done[0 ] && done[1 ])) {
293
352
}
294
353
};
295
354
355
+ auto wake_all = [pcond]() { pthread_cond_broadcast (pcond); };
356
+
296
357
VoidFn primary_fn = [=]() {
297
358
auto primary = BuildDBMain (port_primary, port_messenger_primary, " primary" );
298
359
primary->GetNetworkLayer ()->GetServer ()->RunServer ();
@@ -311,15 +372,17 @@ TEST_F(MessengerTests, BasicListenTest) {
311
372
});
312
373
313
374
init[0 ] = true ;
314
- spin_until_init ();
375
+ wake_all ();
376
+ sleep_until_init ();
315
377
DirtySleep ();
316
378
317
379
while (!done[1 ]) {
318
380
}
319
381
320
382
MESSENGER_LOG_TRACE (" Primary done." );
321
383
done[0 ] = true ;
322
- spin_until_done ();
384
+ wake_all ();
385
+ sleep_until_done ();
323
386
MESSENGER_LOG_TRACE (" Primary exit." );
324
387
primary->ForceShutdown ();
325
388
};
@@ -328,7 +391,8 @@ TEST_F(MessengerTests, BasicListenTest) {
328
391
auto replica1 = BuildDBMain (port_replica1, port_messenger_replica1, " replica1" );
329
392
replica1->GetNetworkLayer ()->GetServer ()->RunServer ();
330
393
init[1 ] = true ;
331
- spin_until_init ();
394
+ wake_all ();
395
+ sleep_until_init ();
332
396
DirtySleep ();
333
397
334
398
// Set up a connection to the primary via the listen endpoint.
@@ -363,16 +427,21 @@ TEST_F(MessengerTests, BasicListenTest) {
363
427
364
428
MESSENGER_LOG_TRACE (" Replica 1 done." );
365
429
done[1 ] = true ;
366
- spin_until_done ();
430
+ wake_all ();
431
+ sleep_until_done ();
367
432
MESSENGER_LOG_TRACE (" Replica 1 exit." );
368
433
replica1->ForceShutdown ();
369
434
};
370
435
371
436
std::vector<pid_t > pids = ForkTests ({primary_fn, replica1_fn});
372
437
373
438
// Spin until all done.
374
- while (!(done[0 ] && done[1 ])) {
375
- }
439
+ sleep_until_done ();
440
+
441
+ pthread_mutex_destroy (pmutex);
442
+ pthread_mutexattr_destroy (&mutexattr);
443
+ pthread_cond_destroy (pcond);
444
+ pthread_condattr_destroy (&condattr);
376
445
377
446
DirtySleep ();
378
447
{
@@ -383,6 +452,14 @@ TEST_F(MessengerTests, BasicListenTest) {
383
452
UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 2 * sizeof (bool ));
384
453
NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
385
454
}
455
+ {
456
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(pmutex), sizeof (pthread_mutex_t ));
457
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
458
+ }
459
+ {
460
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(pcond), sizeof (pthread_cond_t ));
461
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
462
+ }
386
463
}
387
464
388
465
} // namespace noisepage::messenger
0 commit comments