Skip to content

Commit ba00547

Browse files
Add test case for partitioned communication
Signed-off-by: Axel Schneewind <[email protected]>
1 parent a7fd61b commit ba00547

File tree

4 files changed

+248
-0
lines changed

4 files changed

+248
-0
lines changed

Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ mpi_test_suite_SOURCES = \
124124
threaded/tst_threaded_ring.c \
125125
threaded/tst_threaded_ring_isend.c \
126126
threaded/tst_threaded_ring_persistent.c \
127+
threaded/tst_threaded_ring_partitioned.c \
127128
tst_comm.c \
128129
tst_comm.h \
129130
tst_file.c \

mpi_test_suite.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,10 @@ extern int tst_threaded_ring_persistent_cleanup (struct tst_env * env);
917917
extern int tst_threaded_comm_dup_init (struct tst_env * env);
918918
extern int tst_threaded_comm_dup_run (struct tst_env * env);
919919
extern int tst_threaded_comm_dup_cleanup (struct tst_env * env);
920+
921+
extern int tst_threaded_ring_partitioned_init (struct tst_env * env);
922+
extern int tst_threaded_ring_partitioned_run (struct tst_env * env);
923+
extern int tst_threaded_ring_partitioned_cleanup (struct tst_env * env);
920924
#endif
921925

922926
#endif /* __MPI_TESTSUITE_H__ */
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* File: tst_threaded_ring_partitioned.c
3+
*
4+
* Functionality:
5+
* Sends data through a ring using partitioned communication.
6+
* Each thread corresponds to a partition of the send/receive buffers.
7+
*
8+
* Author: Axel Schneewind
9+
*
10+
* Date: July 19th 2023
11+
*/
12+
#include <mpi.h>
13+
#include "mpi_test_suite.h"
14+
#include "tst_threads.h"
15+
#include "tst_output.h"
16+
#include "tst_comm.h"
17+
18+
#include <pthread.h>
19+
20+
#define TST_RANK_MASTER 0
21+
22+
pthread_barrier_t thread_barrier;
23+
24+
int tst_threaded_ring_partitioned_init(struct tst_env *env)
25+
{
26+
int comm_rank;
27+
MPI_Comm comm = tst_comm_getmastercomm(env->comm);
28+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
29+
30+
int thread_num = tst_thread_get_num();
31+
int num_worker_threads = tst_thread_num_threads();
32+
33+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d, Thread:%d) env->comm:%d env->type:%d env->values_num:%d\n",
34+
tst_global_rank, thread_num, env->comm, env->type, env->values_num);
35+
36+
// each partition contains env->values_num values
37+
MPI_Aint type_extent = tst_type_gettypesize(env->type);
38+
size_t buffer_size = num_worker_threads * env->values_num * type_extent;
39+
40+
if (thread_num == TST_THREAD_MASTER)
41+
{
42+
// one request for sending, one for receiving
43+
tst_thread_alloc_global_requests(2);
44+
45+
// barrier syncs master and worker threads
46+
pthread_barrier_init(&thread_barrier, NULL, num_worker_threads + 1);
47+
48+
// global buffer holds send and recv buffer
49+
tst_thread_global_buffer_init(2 * buffer_size);
50+
}
51+
52+
53+
// wait until buffer is initialized by master thread (thread barrier not ready here)
54+
while (tst_thread_get_global_buffer_size() != 2 * buffer_size)
55+
usleep(2000);
56+
57+
// first half of shared buffer is send and second half is receive buffer
58+
env->send_buffer = tst_thread_get_global_buffer();
59+
env->recv_buffer = (char *) tst_thread_get_global_buffer() + buffer_size;
60+
61+
env->req_buffer = tst_thread_get_global_request(0);
62+
63+
// master thread of rank 0 initializes array values
64+
if (comm_rank == TST_RANK_MASTER && thread_num == TST_THREAD_MASTER)
65+
tst_type_setstandardarray(env->type, num_worker_threads * env->values_num, env->send_buffer, comm_rank);
66+
67+
return 0;
68+
}
69+
70+
// busy wait until partition arrived
71+
// returns 1 if the partition has arrived and 0 if waiting was interupted
72+
int wait_for_partition(MPI_Request *recv_request, int partition_num)
73+
{
74+
int flag = 0;
75+
do
76+
{
77+
MPI_CHECK(MPI_Parrived(*recv_request, partition_num, &flag));
78+
} while (flag == 0 && usleep(2000) == 0);
79+
80+
return flag;
81+
}
82+
83+
84+
int tst_threaded_ring_partitioned_run(struct tst_env *env)
85+
{
86+
int comm_size;
87+
int comm_rank;
88+
int send_to;
89+
int recv_from;
90+
91+
// only allow intra comm
92+
MPI_Comm comm = tst_comm_getmastercomm(env->comm);
93+
if (tst_comm_getcommclass(env->comm) & TST_MPI_INTRA_COMM)
94+
{
95+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
96+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
97+
98+
send_to = (comm_rank + 1) % comm_size;
99+
recv_from = (comm_rank + comm_size - 1) % comm_size;
100+
} else if (tst_comm_getcommclass(env->comm) & TST_MPI_COMM_SELF)
101+
{
102+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
103+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
104+
105+
send_to = comm_rank;
106+
recv_from = comm_rank;
107+
}
108+
else
109+
ERROR(EINVAL, "tst_threaded_ring_partitioned cannot run with this kind of communicator");
110+
111+
MPI_Datatype type = tst_type_getdatatype(env->type);
112+
MPI_Aint type_extent = tst_type_gettypesize(env->type);
113+
114+
MPI_Status *statuses = env->status_buffer;
115+
116+
MPI_Request *requests = env->req_buffer;
117+
MPI_Request *send_request = &env->req_buffer[0];
118+
MPI_Request *recv_request = &env->req_buffer[1];
119+
120+
int num_threads = 1 + tst_thread_num_threads(); /* we have to add 1 for the master thread */
121+
int num_worker_threads = tst_thread_num_threads();
122+
int thread_num = tst_thread_get_num();
123+
124+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
125+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
126+
127+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d, Thread:%d) comm_rank:%d comm_size:%d "
128+
"send_to:%d recv_from:%d env->tag:%d\n",
129+
tst_global_rank, thread_num, comm_rank, comm_size,
130+
send_to, recv_from, env->tag);
131+
132+
// number of partitions and values per partition
133+
int num_partitions = num_worker_threads;
134+
int partition_size = env->values_num; // number of elements
135+
136+
// init send and recv and start both
137+
if (thread_num == TST_THREAD_MASTER)
138+
{
139+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%i, Thread:%i) initializing send to %i and recv from %i with %i partitions of size %i*%i bytes, at buffers { 0x%x, 0x%x }, requests { 0x%x, 0x%x }\n",
140+
comm_rank, thread_num,
141+
send_to, recv_from, num_partitions, partition_size, type_extent,
142+
env->send_buffer, env->recv_buffer,
143+
send_request, recv_request);
144+
145+
MPI_CHECK(MPI_Psend_init(env->send_buffer, num_partitions, partition_size, type, send_to,
146+
0, comm, MPI_INFO_NULL, send_request));
147+
MPI_CHECK(MPI_Precv_init(env->recv_buffer, num_partitions, partition_size, type, recv_from,
148+
0, comm, MPI_INFO_NULL, recv_request));
149+
150+
MPI_CHECK(MPI_Start(send_request));
151+
MPI_CHECK(MPI_Start(recv_request));
152+
153+
// wait for all ranks to become ready
154+
MPI_CHECK(MPI_Barrier(MPI_COMM_WORLD));
155+
};
156+
157+
pthread_barrier_wait(&thread_barrier);
158+
159+
// for measuring time
160+
double time_init;
161+
162+
if (comm_rank == TST_RANK_MASTER)
163+
{
164+
if (thread_num == TST_THREAD_MASTER)
165+
time_init = MPI_Wtime();
166+
167+
if (thread_num >= 0 && thread_num < num_partitions){
168+
// allow sending of this partition
169+
MPI_CHECK(MPI_Pready(thread_num, *send_request));
170+
}
171+
172+
if (thread_num >= 0 && thread_num < num_partitions){
173+
wait_for_partition(recv_request, thread_num);
174+
}
175+
}
176+
else
177+
{
178+
if (thread_num >= 0 && thread_num < num_partitions)
179+
{
180+
wait_for_partition(recv_request, thread_num);
181+
182+
// simply copy data from input to output buffer
183+
int begin_index = partition_size * thread_num * type_extent;
184+
int size = partition_size * type_extent;
185+
memcpy(&env->send_buffer[begin_index], &env->recv_buffer[begin_index], partition_size * type_extent);
186+
usleep(rand() % 8192);
187+
188+
// allow sending of this partition
189+
MPI_CHECK(MPI_Pready(thread_num, *send_request));
190+
}
191+
}
192+
193+
// wait until sends and recvs are done
194+
if (thread_num == TST_THREAD_MASTER)
195+
{
196+
MPI_CHECK(MPI_Waitall(2, requests, MPI_STATUSES_IGNORE));
197+
198+
if (comm_rank == TST_RANK_MASTER)
199+
{
200+
double time_final = MPI_Wtime();
201+
202+
// print timing
203+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d) Sending through ring took %fs\n", comm_rank, time_final - time_init);
204+
}
205+
else
206+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d) done\n", comm_rank);
207+
}
208+
209+
210+
pthread_barrier_wait(&thread_barrier);
211+
212+
// check that data was transmitted correctly
213+
if (thread_num == TST_THREAD_MASTER)
214+
return tst_test_checkstandardarray(env, env->recv_buffer, TST_RANK_MASTER);
215+
else
216+
return 0;
217+
}
218+
219+
int tst_threaded_ring_partitioned_cleanup(struct tst_env *env)
220+
{
221+
int thread_num = tst_thread_get_num();
222+
int num_worker_threads = tst_thread_num_threads();
223+
224+
if (thread_num == TST_THREAD_MASTER)
225+
{
226+
tst_thread_free_global_requests();
227+
228+
tst_thread_global_buffer_cleanup();
229+
230+
pthread_barrier_destroy(&thread_barrier);
231+
}
232+
233+
return 0;
234+
}

tst_tests.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,15 @@ static struct tst_test tst_tests[] = {
13751375
TST_MODE_RELAXED,
13761376
TST_NONE,
13771377
&tst_threaded_comm_dup_init, &tst_threaded_comm_dup_run, &tst_threaded_comm_dup_cleanup},
1378+
1379+
1380+
{TST_CLASS_THREADED, "Threaded ring partitioned",
1381+
TST_MPI_COMM_SELF | TST_MPI_INTRA_COMM,
1382+
1,
1383+
TST_MPI_ALL_C_TYPES,
1384+
TST_MODE_RELAXED,
1385+
TST_NONE,
1386+
&tst_threaded_ring_partitioned_init, &tst_threaded_ring_partitioned_run, &tst_threaded_ring_partitioned_cleanup},
13781387
#endif
13791388

13801389
{TST_CLASS_UNSPEC, "None",

0 commit comments

Comments
 (0)