1
- /* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve .
1
+ /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved .
2
2
3
3
Licensed under the Apache License, Version 2.0 (the "License");
4
4
you may not use this file except in compliance with the License.
@@ -25,6 +25,26 @@ using paddle::framework::CloseChannel;
25
25
using paddle::framework::details::Buffered;
26
26
using paddle::framework::details::UnBuffered;
27
27
28
+ void RecevingOrderEqualToSendingOrder (Channel<int > *ch) {
29
+ unsigned sum_send = 0 ;
30
+ std::thread t ([&]() {
31
+ for (int i = 0 ; i < 5 ; i++) {
32
+ EXPECT_EQ (ch->Send (&i), true );
33
+ sum_send += i;
34
+ }
35
+ });
36
+ for (int i = 0 ; i < 5 ; i++) {
37
+ int recv;
38
+ EXPECT_EQ (ch->Receive (&recv), true );
39
+ EXPECT_EQ (recv, i);
40
+ }
41
+
42
+ CloseChannel (ch);
43
+ t.join ();
44
+ EXPECT_EQ (sum_send, 10U );
45
+ delete ch;
46
+ }
47
+
28
48
TEST (Channel, MakeAndClose) {
29
49
using paddle::framework::details::Buffered;
30
50
using paddle::framework::details::UnBuffered;
@@ -137,9 +157,7 @@ TEST(Channel, ReceiveFromBufferedChannelReturnResidualValuesTest) {
137
157
138
158
for (size_t i = 0 ; i < buffer_size; ++i) {
139
159
EXPECT_EQ (ch->Receive (&out),
140
- false ); // after receiving residual values, return zeros.
141
- // Note: we cannot check EXPECT_EQ(out, 0), because C++ doesn't
142
- // define zero values like Go does.
160
+ false ); // receiving on closed channel should return false
143
161
}
144
162
delete ch;
145
163
}
@@ -158,39 +176,25 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
158
176
sum += i;
159
177
}
160
178
});
161
- std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait 0.5 sec
179
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait 0.1 sec
162
180
EXPECT_EQ (sum, 45U );
163
181
164
182
CloseChannel (ch);
165
183
t.join ();
166
184
delete ch;
167
185
}
168
186
169
- TEST (Channel, SimpleUnbufferedChannelTest ) {
187
+ TEST (Channel, RecevingOrderEqualToSendingOrderWithUnBufferedChannel ) {
170
188
auto ch = MakeChannel<int >(0 );
171
- unsigned sum_send = 0 ;
172
- std::thread t ([&]() {
173
- for (int i = 0 ; i < 5 ; i++) {
174
- EXPECT_EQ (ch->Send (&i), true );
175
- sum_send += i;
176
- }
177
- });
178
- for (int i = 0 ; i < 5 ; i++) {
179
- int recv;
180
- EXPECT_EQ (ch->Receive (&recv), true );
181
- EXPECT_EQ (recv, i);
182
- }
189
+ RecevingOrderEqualToSendingOrder (ch);
190
+ }
183
191
184
- CloseChannel (ch);
185
- t.join ();
186
- EXPECT_EQ (sum_send, 10U );
187
- delete ch;
192
+ TEST (Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel) {
193
+ auto ch = MakeChannel<int >(10 );
194
+ RecevingOrderEqualToSendingOrder (ch);
188
195
}
189
196
190
- // This tests that closing a buffered channel also unblocks
191
- // any receivers waiting on the channel
192
- TEST (Channel, BufferedChannelCloseUnblocksReceiversTest) {
193
- auto ch = MakeChannel<int >(1 );
197
+ void ChannelCloseUnblocksReceiversTest (Channel<int > *ch) {
194
198
size_t num_threads = 5 ;
195
199
std::thread t[num_threads];
196
200
bool thread_ended[num_threads];
@@ -201,15 +205,14 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
201
205
t[i] = std::thread (
202
206
[&](bool *p) {
203
207
int data;
204
- // All reads should return false
205
208
EXPECT_EQ (ch->Receive (&data), false );
206
209
*p = true ;
207
210
},
208
211
&thread_ended[i]);
209
212
}
210
- std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait
213
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait 0.1 sec
211
214
212
- // Verify that all threads are blocked
215
+ // Verify that all the threads are blocked
213
216
for (size_t i = 0 ; i < num_threads; i++) {
214
217
EXPECT_EQ (thread_ended[i], false );
215
218
}
@@ -218,21 +221,20 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
218
221
// This should unblock all receivers
219
222
CloseChannel (ch);
220
223
221
- std::this_thread::sleep_for (std::chrono::milliseconds (200 )); // wait
224
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait 0.1 sec
222
225
223
226
// Verify that all threads got unblocked
224
227
for (size_t i = 0 ; i < num_threads; i++) {
225
228
EXPECT_EQ (thread_ended[i], true );
226
229
}
227
230
228
231
for (size_t i = 0 ; i < num_threads; i++) t[i].join ();
229
- delete ch;
230
232
}
231
233
232
- // This tests that closing a buffered channel also unblocks
233
- // any senders waiting for channel to have write space
234
- TEST (Channel, BufferedChannelCloseUnblocksSendersTest) {
235
- auto ch = MakeChannel< int >( 1 );
234
+ void ChannelCloseUnblocksSendersTest (Channel< int > *ch) {
235
+ using paddle::framework::details::Buffered;
236
+ using paddle::framework::details::UnBuffered;
237
+
236
238
size_t num_threads = 5 ;
237
239
std::thread t[num_threads];
238
240
bool thread_ended[num_threads];
@@ -252,116 +254,72 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
252
254
}
253
255
std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait
254
256
255
- // Verify that atleast 4 threads are blocked
256
- int ct = 0 ;
257
- for (size_t i = 0 ; i < num_threads; i++) {
258
- if (thread_ended[i] == false ) ct++;
257
+ if (dynamic_cast <Buffered<int > *>(ch)) {
258
+ // If ch is Buffered, atleast 4 threads must be blocked.
259
+ int ct = 0 ;
260
+ for (size_t i = 0 ; i < num_threads; i++) {
261
+ if (!thread_ended[i]) ct++;
262
+ }
263
+ EXPECT_GE (ct, 4 );
264
+ } else {
265
+ // If ch is UnBuffered, all the threads should be blocked.
266
+ for (size_t i = 0 ; i < num_threads; i++) {
267
+ EXPECT_EQ (thread_ended[i], false );
268
+ }
259
269
}
260
- // Atleast 4 threads must be blocked
261
- EXPECT_GE (ct, 4 );
262
-
263
270
// Explicitly close the thread
264
271
// This should unblock all senders
265
272
CloseChannel (ch);
266
273
267
- std::this_thread::sleep_for (std::chrono::milliseconds (200 )); // wait
274
+ std::this_thread::sleep_for (std::chrono::milliseconds (100 )); // wait
268
275
269
276
// Verify that all threads got unblocked
270
277
for (size_t i = 0 ; i < num_threads; i++) {
271
278
EXPECT_EQ (thread_ended[i], true );
272
279
}
273
280
274
- // Verify that only 1 send was successful
275
- ct = 0 ;
276
- for (size_t i = 0 ; i < num_threads; i++) {
277
- if (send_success[i]) ct++;
281
+ if (dynamic_cast <Buffered<int > *>(ch)) {
282
+ // Verify that only 1 send was successful
283
+ int ct = 0 ;
284
+ for (size_t i = 0 ; i < num_threads; i++) {
285
+ if (send_success[i]) ct++;
286
+ }
287
+ // Only 1 send must be successful
288
+ EXPECT_EQ (ct, 1 );
278
289
}
279
- // Only 1 send must be successful
280
- EXPECT_EQ (ct, 1 );
281
290
282
291
for (size_t i = 0 ; i < num_threads; i++) t[i].join ();
292
+ }
293
+
294
+ // This tests that closing a buffered channel also unblocks
295
+ // any receivers waiting on the channel
296
+ TEST (Channel, BufferedChannelCloseUnblocksReceiversTest) {
297
+ auto ch = MakeChannel<int >(1 );
298
+ ChannelCloseUnblocksReceiversTest (ch);
299
+ delete ch;
300
+ }
301
+
302
+ // This tests that closing a buffered channel also unblocks
303
+ // any senders waiting for channel to have write space
304
+ TEST (Channel, BufferedChannelCloseUnblocksSendersTest) {
305
+ auto ch = MakeChannel<int >(1 );
306
+ ChannelCloseUnblocksSendersTest (ch);
283
307
delete ch;
284
308
}
285
309
286
310
// This tests that closing an unbuffered channel also unblocks
287
311
// unblocks any receivers waiting for senders
288
312
TEST (Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
289
313
auto ch = MakeChannel<int >(0 );
290
- size_t num_threads = 5 ;
291
- std::thread t[num_threads];
292
- bool thread_ended[num_threads];
293
-
294
- // Launches threads that try to read and are blocked becausew of no writers
295
- for (size_t i = 0 ; i < num_threads; i++) {
296
- thread_ended[i] = false ;
297
- t[i] = std::thread (
298
- [&](bool *p) {
299
- int data;
300
- EXPECT_EQ (ch->Receive (&data), false );
301
- *p = true ;
302
- },
303
- &thread_ended[i]);
304
- }
305
- std::this_thread::sleep_for (std::chrono::milliseconds (500 )); // wait 0.5 sec
306
-
307
- // Verify that all the threads are blocked
308
- for (size_t i = 0 ; i < num_threads; i++) {
309
- EXPECT_EQ (thread_ended[i], false );
310
- }
311
-
312
- // Explicitly close the thread
313
- // This should unblock all receivers
314
- CloseChannel (ch);
315
-
316
- std::this_thread::sleep_for (std::chrono::milliseconds (500 )); // wait 0.5 sec
317
-
318
- // Verify that all threads got unblocked
319
- for (size_t i = 0 ; i < num_threads; i++) {
320
- EXPECT_EQ (thread_ended[i], true );
321
- }
322
-
323
- for (size_t i = 0 ; i < num_threads; i++) t[i].join ();
314
+ ChannelCloseUnblocksReceiversTest (ch);
324
315
delete ch;
325
316
}
326
317
327
318
// This tests that closing an unbuffered channel also unblocks
328
319
// unblocks any senders waiting for senders
329
320
TEST (Channel, UnbufferedChannelCloseUnblocksSendersTest) {
330
321
auto ch = MakeChannel<int >(0 );
331
- size_t num_threads = 5 ;
332
- std::thread t[num_threads];
333
- bool thread_ended[num_threads];
334
-
335
- // Launches threads that try to read and are blocked becausew of no writers
336
- for (size_t i = 0 ; i < num_threads; i++) {
337
- thread_ended[i] = false ;
338
- t[i] = std::thread (
339
- [&](bool *p) {
340
- int data = 10 ;
341
- EXPECT_EQ (ch->Send (&data), false );
342
- *p = true ;
343
- },
344
- &thread_ended[i]);
345
- }
346
- std::this_thread::sleep_for (std::chrono::milliseconds (500 )); // wait 0.5 sec
347
-
348
- // Verify that all the threads are blocked
349
- for (size_t i = 0 ; i < num_threads; i++) {
350
- EXPECT_EQ (thread_ended[i], false );
351
- }
352
-
353
- // Explicitly close the thread
354
- // This should unblock all receivers
355
- CloseChannel (ch);
356
-
357
- std::this_thread::sleep_for (std::chrono::milliseconds (500 )); // wait 0.5 sec
358
-
359
- // Verify that all threads got unblocked
360
- for (size_t i = 0 ; i < num_threads; i++) {
361
- EXPECT_EQ (thread_ended[i], true );
362
- }
363
-
364
- for (size_t i = 0 ; i < num_threads; i++) t[i].join ();
322
+ ChannelCloseUnblocksReceiversTest (ch);
365
323
delete ch;
366
324
}
367
325
0 commit comments