Skip to content

Commit a7a5618

Browse files
committed
init_shm: add MPIDU_Init_shm_comm_alloc
Add routine to support allocating a shared memory by a comm, which allows - * create shared memory by a smaller comm than a comm_world * attach the shared memory by later processes * potentially allowing shm communication with dynamic processes - we need a way to discover and attach to Init_shm (via intercomm) and the initial shared memory need pre-allocate to account for new processes. For now, we need this to support MPIDI_POSIX_comm_set_vcis.
1 parent 1e90bb0 commit a7a5618

File tree

3 files changed

+170
-14
lines changed

3 files changed

+170
-14
lines changed

src/mpid/common/shm/mpidu_init_shm.c

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ int MPIDU_Init_shm_query(int local_rank, void **target_addr)
5555
typedef struct Init_shm_barrier {
5656
MPL_atomic_int_t val;
5757
MPL_atomic_int_t wait;
58+
/* fields that support async shm alloc */
59+
MPL_atomic_int_t lock;
60+
MPL_atomic_int_t alloc_count;
61+
char serialized_hnd[MPIDU_INIT_SHM_BLOCK_SIZE];
5862
} Init_shm_barrier_t;
5963

6064
static size_t init_shm_len;
@@ -123,17 +127,15 @@ int MPIDU_Init_shm_init(void)
123127
MPIDU_Init_shm_local_rank = MPIR_Process.local_rank;
124128

125129
if (MPIDU_Init_shm_local_size == 1) {
126-
/* We'll special case this trivial case */
127-
128130
/* All processes need call MPIR_pmi_bcast. This is because we may need call MPIR_pmi_barrier
129131
* inside depend on PMI versions, and all processes need participate.
130132
*/
131133
int dummy;
132134
mpi_errno = MPIR_pmi_bcast(&dummy, sizeof(int), MPIR_PMI_DOMAIN_LOCAL);
133135
MPIR_ERR_CHECK(mpi_errno);
134136
} else {
135-
size_t segment_len = MPIDU_SHM_CACHE_LINE_LEN +
136-
sizeof(MPIDU_Init_shm_block_t) * MPIDU_Init_shm_local_size;
137+
size_t segment_len = sizeof(Init_shm_barrier_t) +
138+
MPIDU_INIT_SHM_BLOCK_SIZE * MPIDU_Init_shm_local_size;
137139

138140
char *serialized_hnd = NULL;
139141
int serialized_hnd_size = 0;
@@ -190,7 +192,7 @@ int MPIDU_Init_shm_init(void)
190192
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**remove_shar_mem");
191193
}
192194

193-
baseaddr = init_shm_addr + MPIDU_SHM_CACHE_LINE_LEN;
195+
baseaddr = init_shm_addr + sizeof(Init_shm_barrier_t);
194196

195197
mpi_errno = Init_shm_barrier();
196198
}
@@ -251,8 +253,8 @@ int MPIDU_Init_shm_put(void *orig, size_t len)
251253
MPIR_FUNC_ENTER;
252254

253255
if (MPIDU_Init_shm_local_size > 1) {
254-
MPIR_Assert(len <= sizeof(MPIDU_Init_shm_block_t));
255-
MPIR_Memcpy((char *) baseaddr + MPIDU_Init_shm_local_rank * sizeof(MPIDU_Init_shm_block_t),
256+
MPIR_Assert(len <= MPIDU_INIT_SHM_BLOCK_SIZE);
257+
MPIR_Memcpy((char *) baseaddr + MPIDU_Init_shm_local_rank * MPIDU_INIT_SHM_BLOCK_SIZE,
256258
orig, len);
257259
}
258260

@@ -270,8 +272,8 @@ int MPIDU_Init_shm_get(int local_rank, size_t len, void *target)
270272
/* a single process should not get its own put */
271273
MPIR_Assert(MPIDU_Init_shm_local_size > 1);
272274

273-
MPIR_Assert(local_rank < MPIDU_Init_shm_local_size && len <= sizeof(MPIDU_Init_shm_block_t));
274-
MPIR_Memcpy(target, (char *) baseaddr + local_rank * sizeof(MPIDU_Init_shm_block_t), len);
275+
MPIR_Assert(local_rank < MPIDU_Init_shm_local_size && len <= MPIDU_INIT_SHM_BLOCK_SIZE);
276+
MPIR_Memcpy(target, (char *) baseaddr + local_rank * MPIDU_INIT_SHM_BLOCK_SIZE, len);
275277

276278
MPIR_FUNC_EXIT;
277279

@@ -288,11 +290,42 @@ int MPIDU_Init_shm_query(int local_rank, void **target_addr)
288290
MPIR_Assert(MPIDU_Init_shm_local_size > 1);
289291

290292
MPIR_Assert(local_rank < MPIDU_Init_shm_local_size);
291-
*target_addr = (char *) baseaddr + local_rank * sizeof(MPIDU_Init_shm_block_t);
293+
*target_addr = (char *) baseaddr + local_rank * MPIDU_INIT_SHM_BLOCK_SIZE;
292294

293295
MPIR_FUNC_EXIT;
294296

295297
return mpi_errno;
296298
}
297299

300+
bool MPIDU_Init_shm_atomic_key_exist(void)
301+
{
302+
return (MPL_atomic_load_int(&barrier->alloc_count) > 0);
303+
}
304+
305+
int MPIDU_Init_shm_atomic_put(void *orig, size_t len)
306+
{
307+
/* get spin lock */
308+
while (MPL_atomic_cas_int(&barrier->lock, 0, 1)) {
309+
}
310+
/* set the data */
311+
MPIR_Assert(len <= MPIDU_INIT_SHM_BLOCK_SIZE);
312+
MPIR_Memcpy(barrier->serialized_hnd, orig, len);
313+
MPL_atomic_store_int(&barrier->alloc_count, 1);
314+
/* unlock */
315+
MPL_atomic_store_int(&barrier->lock, 0);
316+
return MPI_SUCCESS;
317+
}
318+
319+
int MPIDU_Init_shm_atomic_get(void *target, size_t len)
320+
{
321+
/* get spin lock */
322+
while (MPL_atomic_cas_int(&barrier->lock, 0, 1)) {
323+
}
324+
/* copy the data */
325+
MPIR_Memcpy(target, barrier->serialized_hnd, len);
326+
/* unlock */
327+
MPL_atomic_store_int(&barrier->lock, 0);
328+
return MPI_SUCCESS;
329+
}
330+
298331
#endif /* ENABLE_NO_LOCAL */

src/mpid/common/shm/mpidu_init_shm.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
shared memory will not work properly. Consider disable it with --enable-nolocal.
1818
#endif
1919

20-
typedef struct MPIDU_Init_shm_block {
21-
char block[MPIDU_INIT_SHM_BLOCK_SIZE];
22-
} MPIDU_Init_shm_block_t;
23-
2420
int MPIDU_Init_shm_init(void);
2521
int MPIDU_Init_shm_finalize(void);
2622
int MPIDU_Init_shm_barrier(void);
@@ -32,4 +28,13 @@ int MPIDU_Init_shm_alloc(size_t len, void **ptr);
3228
int MPIDU_Init_shm_free(void *ptr);
3329
int MPIDU_Init_shm_is_symm(void *ptr);
3430

31+
/* support routines for MPIDU_Init_shm_root_alloc */
32+
bool MPIDU_Init_shm_atomic_key_exist(void);
33+
int MPIDU_Init_shm_atomic_put(void *orig, size_t len);
34+
int MPIDU_Init_shm_atomic_get(void *target, size_t len);
35+
36+
/* comm root allocate a shared memory and put the serialized handle if previously not allocated,
37+
* Otherwise, retrieve the handle and attach. Broadcast the handle to rest of the comm */
38+
int MPIDU_Init_shm_comm_alloc(MPIR_Comm * comm, size_t len, void **ptr);
39+
3540
#endif /* MPIDU_INIT_SHM_H_INCLUDED */

src/mpid/common/shm/mpidu_init_shm_alloc.c

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,124 @@ int MPIDU_Init_shm_alloc(size_t len, void **ptr)
138138
/* --END ERROR HANDLING-- */
139139
}
140140

141+
int MPIDU_Init_shm_comm_alloc(MPIR_Comm * comm, size_t len, void **ptr)
142+
{
143+
int mpi_errno = MPI_SUCCESS, mpl_err = 0;
144+
void *current_addr;
145+
size_t segment_len = len;
146+
struct memory_seg *memory = NULL;
147+
memory_list_t *memory_node = NULL;
148+
MPIR_CHKPMEM_DECL();
149+
150+
MPIR_FUNC_ENTER;
151+
152+
if (MPIDU_Init_shm_local_size == 1) {
153+
*ptr = MPL_aligned_alloc(MPL_CACHELINE_SIZE, len, MPL_MEM_SHM);
154+
MPIR_ERR_CHKANDJUMP(!*ptr, mpi_errno, MPI_ERR_OTHER, "**nomem");
155+
goto fn_exit;
156+
}
157+
158+
MPIR_Comm *node_comm = comm->node_comm;
159+
bool is_root;
160+
if (node_comm) {
161+
is_root = (node_comm->rank == 0);
162+
} else {
163+
is_root = true;
164+
}
165+
166+
MPIR_Assert(segment_len > 0);
167+
MPIR_CHKPMEM_MALLOC(memory, sizeof(*memory), MPL_MEM_OTHER);
168+
mpl_err = MPL_shm_hnd_init(&(memory->hnd));
169+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**alloc_shar_mem");
170+
171+
memory->segment_len = segment_len;
172+
173+
char *serialized_hnd = NULL;
174+
int serialized_hnd_size = 0;
175+
char serialized_hnd_buffer[MPIDU_INIT_SHM_BLOCK_SIZE];
176+
bool need_attach;
177+
bool need_remove;
178+
if (is_root) {
179+
if (!MPIDU_Init_shm_atomic_key_exist()) {
180+
/* We need to create the shm segment */
181+
mpl_err = MPL_shm_seg_create_and_attach(memory->hnd, memory->segment_len,
182+
(void **) &(memory->base_addr), 0);
183+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**alloc_shar_mem");
184+
185+
mpl_err = MPL_shm_hnd_get_serialized_by_ref(memory->hnd, &serialized_hnd);
186+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**alloc_shar_mem");
187+
serialized_hnd_size = strlen(serialized_hnd) + 1; /* add 1 for null char */
188+
189+
MPIDU_Init_shm_atomic_put(serialized_hnd, serialized_hnd_size);
190+
need_attach = false;
191+
need_remove = true;
192+
} else {
193+
/* Just retrieve the existing serialized handle */
194+
MPIDU_Init_shm_atomic_get(serialized_hnd_buffer, MPIDU_INIT_SHM_BLOCK_SIZE);
195+
serialized_hnd = serialized_hnd_buffer;
196+
serialized_hnd_size = strlen(serialized_hnd) + 1; /* add 1 for null char */
197+
need_attach = true;
198+
need_remove = false;
199+
}
200+
MPIR_Assert(serialized_hnd_size <= MPIDU_INIT_SHM_BLOCK_SIZE);
201+
if (node_comm) {
202+
mpi_errno = MPIR_Bcast_impl(serialized_hnd, MPIDU_INIT_SHM_BLOCK_SIZE,
203+
MPIR_BYTE_INTERNAL, 0, node_comm, MPIR_ERR_NONE);
204+
MPIR_ERR_CHECK(mpi_errno);
205+
}
206+
} else {
207+
mpi_errno = MPIR_Bcast_impl(serialized_hnd_buffer, MPIDU_INIT_SHM_BLOCK_SIZE,
208+
MPIR_BYTE_INTERNAL, 0, node_comm, MPIR_ERR_NONE);
209+
MPIR_ERR_CHECK(mpi_errno);
210+
serialized_hnd = serialized_hnd_buffer;
211+
serialized_hnd_size = strlen(serialized_hnd) + 1; /* add 1 for null char */
212+
need_attach = true;
213+
need_remove = false;
214+
}
215+
if (need_attach) {
216+
mpl_err = MPL_shm_hnd_deserialize(memory->hnd, serialized_hnd, strlen(serialized_hnd));
217+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**alloc_shar_mem");
218+
219+
mpl_err = MPL_shm_seg_attach(memory->hnd, memory->segment_len,
220+
(void **) &memory->base_addr, 0);
221+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**attach_shar_mem");
222+
}
223+
224+
if (node_comm) {
225+
mpi_errno = MPIR_Barrier_impl(node_comm, MPIR_ERR_NONE);
226+
MPIR_ERR_CHECK(mpi_errno);
227+
}
228+
if (need_remove) {
229+
/* memory->hnd no longer needed */
230+
mpl_err = MPL_shm_seg_remove(memory->hnd);
231+
MPIR_ERR_CHKANDJUMP(mpl_err, mpi_errno, MPI_ERR_OTHER, "**remove_shar_mem");
232+
}
233+
234+
current_addr = memory->base_addr;
235+
memory->symmetrical = false;
236+
memory->is_shm = true;
237+
238+
mpi_errno = check_alloc(memory);
239+
MPIR_ERR_CHECK(mpi_errno);
240+
241+
/* assign sections of the shared memory segment to their pointers */
242+
*ptr = current_addr;
243+
244+
MPIR_CHKPMEM_MALLOC(memory_node, sizeof(*memory_node), MPL_MEM_OTHER);
245+
memory_node->ptr = *ptr;
246+
memory_node->memory = memory;
247+
LL_APPEND(memory_head, memory_tail, memory_node);
248+
249+
fn_exit:
250+
MPIR_FUNC_EXIT;
251+
return mpi_errno;
252+
fn_fail:
253+
MPL_shm_seg_remove(memory->hnd);
254+
MPL_shm_hnd_finalize(&(memory->hnd));
255+
MPIR_CHKPMEM_REAP();
256+
goto fn_exit;
257+
}
258+
141259
/* MPIDU_SHM_Seg_free() free the shared memory segment */
142260
int MPIDU_Init_shm_free(void *ptr)
143261
{

0 commit comments

Comments
 (0)