@@ -79,8 +79,8 @@ class MessengerTests : public TerrierTest {
79
79
return db_main;
80
80
}
81
81
82
- /* * A dirty hack that sleeps for a little while so that sockets can clean up. */
83
- static void DirtySleep () { std::this_thread::sleep_for (std::chrono::seconds (5 )); }
82
+ /* * A dirty hack that sleeps for a little while so that sockets can wake up and clean up. */
83
+ static void DirtySleep () { std::this_thread::sleep_for (std::chrono::seconds (2 )); }
84
84
};
85
85
86
86
// NOLINTNEXTLINE
@@ -98,14 +98,25 @@ TEST_F(MessengerTests, BasicReplicationTest) {
98
98
// done[3] is shared memory (mmap) so that the forked processes can coordinate on when they are done.
99
99
// This is done instead of waitpid() because I can't find a way to stop googletest from freaking out on waitpid().
100
100
// done[0] : primary, done[1] : replica1, done[2] : replica2.
101
+ volatile bool *init = static_cast <volatile bool *>(
102
+ mmap (nullptr , 3 * sizeof (bool ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
103
+ NOISEPAGE_ASSERT (MAP_FAILED != init, " mmap() failed." );
101
104
volatile bool *done = static_cast <volatile bool *>(
102
105
mmap (nullptr , 3 * sizeof (bool ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
103
106
NOISEPAGE_ASSERT (MAP_FAILED != done, " mmap() failed." );
104
107
108
+ init[0 ] = false ;
109
+ init[1 ] = false ;
110
+ init[2 ] = false ;
105
111
done[0 ] = false ;
106
112
done[1 ] = false ;
107
113
done[2 ] = false ;
108
114
115
+ auto spin_until_init = [init]() {
116
+ while (!(init[0 ] && init[1 ] && init[2 ])) {
117
+ }
118
+ };
119
+
109
120
auto spin_until_done = [done]() {
110
121
while (!(done[0 ] && done[1 ] && done[2 ])) {
111
122
}
@@ -115,19 +126,28 @@ TEST_F(MessengerTests, BasicReplicationTest) {
115
126
auto primary = BuildDBMain (port_primary, port_messenger_primary, " primary" );
116
127
primary->GetNetworkLayer ()->GetServer ()->RunServer ();
117
128
129
+ init[0 ] = true ;
130
+ spin_until_init ();
131
+ DirtySleep ();
132
+
118
133
while (!(done[1 ] && done[2 ])) {
119
134
}
120
135
121
136
MESSENGER_LOG_TRACE (" Primary done." );
122
137
done[0 ] = true ;
123
138
spin_until_done ();
124
139
MESSENGER_LOG_TRACE (" Primary exit." );
140
+ primary->ForceShutdown ();
125
141
};
126
142
127
143
VoidFn replica1_fn = [=]() {
128
144
auto replica1 = BuildDBMain (port_replica1, port_messenger_replica1, " replica1" );
129
145
replica1->GetNetworkLayer ()->GetServer ()->RunServer ();
130
146
147
+ init[1 ] = true ;
148
+ spin_until_init ();
149
+ DirtySleep ();
150
+
131
151
// Set up a connection to the primary.
132
152
auto messenger = replica1->GetMessengerLayer ()->GetMessenger ();
133
153
ConnectionDestination dest_primary = Messenger::GetEndpointIPC (" primary" , port_messenger_primary);
@@ -168,12 +188,17 @@ TEST_F(MessengerTests, BasicReplicationTest) {
168
188
done[1 ] = true ;
169
189
spin_until_done ();
170
190
MESSENGER_LOG_TRACE (" Replica 1 exit." );
191
+ replica1->ForceShutdown ();
171
192
};
172
193
173
194
VoidFn replica2_fn = [=]() {
174
195
auto replica2 = BuildDBMain (port_replica2, port_messenger_replica2, " replica2" );
175
196
replica2->GetNetworkLayer ()->GetServer ()->RunServer ();
176
197
198
+ init[2 ] = true ;
199
+ spin_until_init ();
200
+ DirtySleep ();
201
+
177
202
// Set up a connection to the primary.
178
203
auto messenger = replica2->GetMessengerLayer ()->GetMessenger ();
179
204
ConnectionDestination dest_primary = Messenger::GetEndpointIPC (" primary" , port_messenger_primary);
@@ -214,6 +239,7 @@ TEST_F(MessengerTests, BasicReplicationTest) {
214
239
done[2 ] = true ;
215
240
spin_until_done ();
216
241
MESSENGER_LOG_TRACE (" Replica 2 exit." );
242
+ replica2->ForceShutdown ();
217
243
};
218
244
219
245
std::vector<pid_t > pids = ForkTests ({primary_fn, replica1_fn, replica2_fn});
@@ -223,9 +249,14 @@ TEST_F(MessengerTests, BasicReplicationTest) {
223
249
}
224
250
225
251
DirtySleep ();
226
-
227
- UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 3 * sizeof (bool ));
228
- NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
252
+ {
253
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(init)), 3 * sizeof (bool ));
254
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
255
+ }
256
+ {
257
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 3 * sizeof (bool ));
258
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
259
+ }
229
260
}
230
261
231
262
// NOLINTNEXTLINE
@@ -240,13 +271,23 @@ TEST_F(MessengerTests, BasicListenTest) {
240
271
241
272
uint16_t port_listen = 8030 ;
242
273
274
+ volatile bool *init = static_cast <volatile bool *>(
275
+ mmap (nullptr , 2 * sizeof (bool ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
276
+ NOISEPAGE_ASSERT (MAP_FAILED != init, " mmap() failed." );
243
277
volatile bool *done = static_cast <volatile bool *>(
244
278
mmap (nullptr , 2 * sizeof (bool ), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ));
245
279
NOISEPAGE_ASSERT (MAP_FAILED != done, " mmap() failed." );
246
280
281
+ init[0 ] = false ;
282
+ init[1 ] = false ;
247
283
done[0 ] = false ;
248
284
done[1 ] = false ;
249
285
286
+ auto spin_until_init = [init]() {
287
+ while (!(init[0 ] && init[1 ])) {
288
+ }
289
+ };
290
+
250
291
auto spin_until_done = [done]() {
251
292
while (!(done[0 ] && done[1 ])) {
252
293
}
@@ -269,18 +310,26 @@ TEST_F(MessengerTests, BasicListenTest) {
269
310
}
270
311
});
271
312
313
+ init[0 ] = true ;
314
+ spin_until_init ();
315
+ DirtySleep ();
316
+
272
317
while (!done[1 ]) {
273
318
}
274
319
275
320
MESSENGER_LOG_TRACE (" Primary done." );
276
321
done[0 ] = true ;
277
322
spin_until_done ();
278
323
MESSENGER_LOG_TRACE (" Primary exit." );
324
+ primary->ForceShutdown ();
279
325
};
280
326
281
327
VoidFn replica1_fn = [=]() {
282
328
auto replica1 = BuildDBMain (port_replica1, port_messenger_replica1, " replica1" );
283
329
replica1->GetNetworkLayer ()->GetServer ()->RunServer ();
330
+ init[1 ] = true ;
331
+ spin_until_init ();
332
+ DirtySleep ();
284
333
285
334
// Set up a connection to the primary via the listen endpoint.
286
335
auto messenger = replica1->GetMessengerLayer ()->GetMessenger ();
@@ -316,6 +365,7 @@ TEST_F(MessengerTests, BasicListenTest) {
316
365
done[1 ] = true ;
317
366
spin_until_done ();
318
367
MESSENGER_LOG_TRACE (" Replica 1 exit." );
368
+ replica1->ForceShutdown ();
319
369
};
320
370
321
371
std::vector<pid_t > pids = ForkTests ({primary_fn, replica1_fn});
@@ -325,9 +375,14 @@ TEST_F(MessengerTests, BasicListenTest) {
325
375
}
326
376
327
377
DirtySleep ();
328
-
329
- UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 2 * sizeof (bool ));
330
- NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
378
+ {
379
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(init)), 2 * sizeof (bool ));
380
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
381
+ }
382
+ {
383
+ UNUSED_ATTRIBUTE int munmap_retval = munmap (static_cast <void *>(const_cast <bool *>(done)), 2 * sizeof (bool ));
384
+ NOISEPAGE_ASSERT (-1 != munmap_retval, " munmap() failed." );
385
+ }
331
386
}
332
387
333
388
} // namespace noisepage::messenger
0 commit comments