Skip to content

Commit 440cf8e

Browse files
committed
Add more IPC tests
1 parent 1807b67 commit 440cf8e

File tree

1 file changed

+177
-66
lines changed

1 file changed

+177
-66
lines changed

test/ipcFixtures.hpp

Lines changed: 177 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ using ipcTestParams =
6868
struct umfIpcTest : umf_test::test,
6969
::testing::WithParamInterface<ipcTestParams> {
7070
umfIpcTest() {}
71+
size_t getOpenedIpcCacheSize() {
72+
const char *max_size_str = getenv("UMF_MAX_OPENED_IPC_HANDLES");
73+
if (max_size_str) {
74+
char *endptr;
75+
size_t max_size = strtoul(max_size_str, &endptr, 10);
76+
EXPECT_EQ(*endptr, '\0');
77+
if (*endptr == '\0') {
78+
return max_size;
79+
}
80+
}
81+
return 0;
82+
}
7183
void SetUp() override {
7284
test::SetUp();
7385
auto [pool_ops, pool_params_create, pool_params_destroy, provider_ops,
@@ -80,6 +92,7 @@ struct umfIpcTest : umf_test::test,
8092
providerParamsCreate = provider_params_create;
8193
providerParamsDestroy = provider_params_destroy;
8294
memAccessor = accessor;
95+
openedIpcCacheSize = getOpenedIpcCacheSize();
8396
}
8497

8598
void TearDown() override { test::TearDown(); }
@@ -160,6 +173,7 @@ struct umfIpcTest : umf_test::test,
160173
umf_memory_provider_ops_t *providerOps = nullptr;
161174
pfnProviderParamsCreate providerParamsCreate = nullptr;
162175
pfnProviderParamsDestroy providerParamsDestroy = nullptr;
176+
size_t openedIpcCacheSize = 0;
163177

164178
void concurrentGetConcurrentPutHandles(bool shuffle) {
165179
std::vector<void *> ptrs;
@@ -264,6 +278,158 @@ struct umfIpcTest : umf_test::test,
264278
pool.reset(nullptr);
265279
EXPECT_EQ(stat.putCount, stat.getCount);
266280
}
281+
282+
void concurrentOpenConcurrentCloseHandles(bool shuffle) {
283+
umf_result_t ret;
284+
std::vector<void *> ptrs;
285+
constexpr size_t ALLOC_SIZE = 100;
286+
constexpr size_t NUM_POINTERS = 100;
287+
umf::pool_unique_handle_t pool = makePool();
288+
ASSERT_NE(pool.get(), nullptr);
289+
290+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
291+
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
292+
EXPECT_NE(ptr, nullptr);
293+
ptrs.push_back(ptr);
294+
}
295+
296+
std::array<umf_ipc_handle_t, NUM_POINTERS> ipcHandles;
297+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
298+
umf_ipc_handle_t ipcHandle;
299+
size_t handleSize;
300+
ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
301+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
302+
ipcHandles[i] = ipcHandle;
303+
}
304+
305+
std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
306+
umf_ipc_handler_handle_t ipcHandler = nullptr;
307+
ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
308+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
309+
ASSERT_NE(ipcHandler, nullptr);
310+
311+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
312+
313+
auto openHandlesFn = [shuffle, &ipcHandles, &openedIpcHandles,
314+
&syncthreads, ipcHandler](size_t tid) {
315+
// Each thread gets a copy of the pointers to shuffle them
316+
std::array<umf_ipc_handle_t, NUM_POINTERS> localIpcHandles =
317+
ipcHandles;
318+
if (shuffle) {
319+
std::random_device rd;
320+
std::mt19937 g(rd());
321+
std::shuffle(localIpcHandles.begin(), localIpcHandles.end(), g);
322+
}
323+
syncthreads();
324+
for (auto ipcHandle : localIpcHandles) {
325+
void *ptr;
326+
umf_result_t ret =
327+
umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
328+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
329+
openedIpcHandles[tid].push_back(ptr);
330+
}
331+
};
332+
333+
umf_test::parallel_exec(NTHREADS, openHandlesFn);
334+
335+
auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
336+
syncthreads();
337+
for (void *ptr : openedIpcHandles[tid]) {
338+
umf_result_t ret = umfCloseIPCHandle(ptr);
339+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
340+
}
341+
};
342+
343+
umf_test::parallel_exec(NTHREADS, closeHandlesFn);
344+
345+
for (auto ipcHandle : ipcHandles) {
346+
ret = umfPutIPCHandle(ipcHandle);
347+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
348+
}
349+
350+
for (void *ptr : ptrs) {
351+
ret = umfPoolFree(pool.get(), ptr);
352+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
353+
}
354+
355+
pool.reset(nullptr);
356+
EXPECT_EQ(stat.getCount, stat.allocCount);
357+
EXPECT_EQ(stat.putCount, stat.getCount);
358+
EXPECT_EQ(stat.openCount, stat.allocCount);
359+
EXPECT_EQ(stat.openCount, stat.closeCount);
360+
}
361+
362+
void concurrentOpenCloseHandles(bool shuffle) {
363+
umf_result_t ret;
364+
std::vector<void *> ptrs;
365+
constexpr size_t ALLOC_SIZE = 100;
366+
constexpr size_t NUM_POINTERS = 100;
367+
umf::pool_unique_handle_t pool = makePool();
368+
ASSERT_NE(pool.get(), nullptr);
369+
370+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
371+
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
372+
EXPECT_NE(ptr, nullptr);
373+
ptrs.push_back(ptr);
374+
}
375+
376+
std::array<umf_ipc_handle_t, NUM_POINTERS> ipcHandles;
377+
for (size_t i = 0; i < NUM_POINTERS; ++i) {
378+
umf_ipc_handle_t ipcHandle;
379+
size_t handleSize;
380+
ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
381+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
382+
ipcHandles[i] = ipcHandle;
383+
}
384+
385+
umf_ipc_handler_handle_t ipcHandler = nullptr;
386+
ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
387+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
388+
ASSERT_NE(ipcHandler, nullptr);
389+
390+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
391+
392+
auto openCloseHandlesFn = [shuffle, &ipcHandles, &syncthreads,
393+
ipcHandler](size_t) {
394+
// Each thread gets a copy of the pointers to shuffle them
395+
std::array<umf_ipc_handle_t, NUM_POINTERS> localIpcHandles =
396+
ipcHandles;
397+
if (shuffle) {
398+
std::random_device rd;
399+
std::mt19937 g(rd());
400+
std::shuffle(localIpcHandles.begin(), localIpcHandles.end(), g);
401+
}
402+
syncthreads();
403+
for (auto ipcHandle : localIpcHandles) {
404+
void *ptr;
405+
umf_result_t ret =
406+
umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
407+
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
408+
ret = umfCloseIPCHandle(ptr);
409+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
410+
}
411+
};
412+
413+
umf_test::parallel_exec(NTHREADS, openCloseHandlesFn);
414+
415+
for (auto ipcHandle : ipcHandles) {
416+
ret = umfPutIPCHandle(ipcHandle);
417+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
418+
}
419+
420+
for (void *ptr : ptrs) {
421+
ret = umfPoolFree(pool.get(), ptr);
422+
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
423+
}
424+
425+
pool.reset(nullptr);
426+
EXPECT_EQ(stat.getCount, stat.allocCount);
427+
EXPECT_EQ(stat.putCount, stat.getCount);
428+
if (openedIpcCacheSize == 0) {
429+
EXPECT_EQ(stat.openCount, stat.allocCount);
430+
}
431+
EXPECT_EQ(stat.openCount, stat.closeCount);
432+
}
267433
};
268434

269435
TEST_P(umfIpcTest, GetIPCHandleSize) {
@@ -529,75 +695,20 @@ TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) {
529695
concurrentGetPutHandles(true);
530696
}
531697

532-
TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
533-
umf_result_t ret;
534-
std::vector<void *> ptrs;
535-
constexpr size_t ALLOC_SIZE = 100;
536-
constexpr size_t NUM_POINTERS = 100;
537-
umf::pool_unique_handle_t pool = makePool();
538-
ASSERT_NE(pool.get(), nullptr);
539-
540-
for (size_t i = 0; i < NUM_POINTERS; ++i) {
541-
void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE);
542-
EXPECT_NE(ptr, nullptr);
543-
ptrs.push_back(ptr);
544-
}
545-
546-
std::array<umf_ipc_handle_t, NUM_POINTERS> ipcHandles;
547-
for (size_t i = 0; i < NUM_POINTERS; ++i) {
548-
umf_ipc_handle_t ipcHandle;
549-
size_t handleSize;
550-
ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
551-
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
552-
ipcHandles[i] = ipcHandle;
553-
}
554-
555-
std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
556-
umf_ipc_handler_handle_t ipcHandler = nullptr;
557-
ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
558-
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
559-
ASSERT_NE(ipcHandler, nullptr);
560-
561-
umf_test::syncthreads_barrier syncthreads(NTHREADS);
562-
563-
auto openHandlesFn = [&ipcHandles, &openedIpcHandles, &syncthreads,
564-
ipcHandler](size_t tid) {
565-
syncthreads();
566-
for (auto ipcHandle : ipcHandles) {
567-
void *ptr;
568-
umf_result_t ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
569-
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
570-
openedIpcHandles[tid].push_back(ptr);
571-
}
572-
};
573-
574-
umf_test::parallel_exec(NTHREADS, openHandlesFn);
575-
576-
auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
577-
syncthreads();
578-
for (void *ptr : openedIpcHandles[tid]) {
579-
umf_result_t ret = umfCloseIPCHandle(ptr);
580-
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
581-
}
582-
};
583-
584-
umf_test::parallel_exec(NTHREADS, closeHandlesFn);
698+
TEST_P(umfIpcTest, ConcurrentOpenConcurrentCloseHandles) {
699+
concurrentOpenConcurrentCloseHandles(false);
700+
}
585701

586-
for (auto ipcHandle : ipcHandles) {
587-
ret = umfPutIPCHandle(ipcHandle);
588-
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
589-
}
702+
TEST_P(umfIpcTest, ConcurrentOpenConcurrentCloseHandlesShuffled) {
703+
concurrentOpenConcurrentCloseHandles(true);
704+
}
590705

591-
for (void *ptr : ptrs) {
592-
ret = umfPoolFree(pool.get(), ptr);
593-
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
594-
}
706+
TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
707+
concurrentOpenCloseHandles(false);
708+
}
595709

596-
pool.reset(nullptr);
597-
EXPECT_EQ(stat.getCount, stat.allocCount);
598-
EXPECT_EQ(stat.putCount, stat.getCount);
599-
EXPECT_EQ(stat.openCount, stat.allocCount);
600-
EXPECT_EQ(stat.openCount, stat.closeCount);
710+
TEST_P(umfIpcTest, ConcurrentOpenCloseHandlesShuffled) {
711+
concurrentOpenCloseHandles(true);
601712
}
602713

603714
TEST_P(umfIpcTest, ConcurrentDestroyIpcHandlers) {

0 commit comments

Comments
 (0)