Skip to content

Commit 726a357

Browse files
[Libraries/MPI] Sample demonstrating notifications usage
1 parent d406423 commit 726a357

File tree

5 files changed

+325
-0
lines changed

5 files changed

+325
-0
lines changed

Libraries/MPI/jacobian_solver/GNUmakefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ all:
22
make -C src/01_jacobian_host_mpi_one-sided
33
make -C src/02_jacobian_device_mpi_one-sided_gpu_aware
44
make -C src/03_jacobian_device_mpi_one-sided_device_initiated
5+
make -C src/04_jacobian_device_mpi_one-sided_device_initiated_notify
56

67
debug:
78
make debug -C src/01_jacobian_host_mpi_one-sided
89
make debug -C src/02_jacobian_device_mpi_one-sided_gpu_aware
910
make debug -C src/03_jacobian_device_mpi_one-sided_device_initiated
11+
make debug -C src/04_jacobian_device_mpi_one-sided_device_initiated_notify
1012

1113
clean:
1214
make clean -C src/01_jacobian_host_mpi_one-sided
1315
make clean -C src/02_jacobian_device_mpi_one-sided_gpu_aware
1416
make clean -C src/03_jacobian_device_mpi_one-sided_device_initiated
17+
make clean -C src/04_jacobian_device_mpi_one-sided_device_initiated_notify

Libraries/MPI/jacobian_solver/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ This program demonstrates how to initiate one-sided communications directly from
5454

5555
To enable device-initiated communications, you must set an extra environment variable: `I_MPI_OFFLOAD_ONESIDED_DEVICE_INITIATED=1`.
5656

57+
### `04_jacobian_device_mpi_one-sided_device_initiated_notify`
58+
59+
This program demonstrates how to initiate one-sided communications directly from the offloaded code. The Intel® MPI Library allows calls to some communication primitives directly from the offloaded code (SYCL or OpenMP). In contrast to prior example, this one demonstrates usage of one-sided communications with notification (extention of MPI-4.1 standard).
60+
61+
To enable device-initiated communications, you must set an extra environment variable: `I_MPI_OFFLOAD_ONESIDED_DEVICE_INITIATED=1`.
62+
5763
## Build the `Distributed Jacobian Solver SYCL/MPI` Sample
5864

5965
> **Note**: If you have not already done so, set up your CLI
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
INCLUDES =
2+
LDFLAGS = -lm
3+
CFLAGS = -qopenmp -fopenmp-targets=spir64 -Wall -Wformat-security -Werror=format-security
4+
CXXFLAGS = -fsycl -Wall -Wformat-security -Werror=format-security
5+
# Use icx from DPC++ oneAPI toolkit to compile. Please source DPCPP's vars.sh before compilation.
6+
CC = mpiicx
7+
CXX = mpiicpx
8+
example = mpi3_onesided_jacobian_gpu_sycl_device_initiated_notify
9+
10+
all: CFLAGS += -O2
11+
all: CXXFLAGS += -O2
12+
all: $(example)
13+
14+
debug: CFLAGS += -O0 -g
15+
debug: CXXFLAGS += -O0 -g
16+
debug: $(example)
17+
18+
% : %.c
19+
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $< $(LDFLAGS)
20+
21+
% : %.cpp
22+
$(CXX) $(CXXFLAGS) $(INCLUDES) -o $@ $< $(LDFLAGS)
23+
24+
clean:
25+
-rm -f $(example).o $(example)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*==============================================================
2+
* Copyright © 2023 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: MIT
5+
* ============================================================= */
6+
7+
/* Distributed Jacobian computation sample using OpenMP GPU offload and MPI-3 one-sided.
8+
*/
9+
#include "mpi.h"
10+
11+
#ifndef MPI_ERR_INVALID_NOTIFICATION
12+
/*For Intel MPI 2021.13/14 we have to use API compatibility layer*/
13+
#include "mpix_compat.h"
14+
#endif
15+
#include <sycl.hpp>
16+
#include <vector>
17+
#include <iostream>
18+
19+
const int Nx = 16384; /* Grid size */
20+
const int Ny = Nx;
21+
const int Niter = 100; /* Nuber of algorithm iterations */
22+
const int NormIteration = 0; /* Recaluculate norm after given number of iterations. 0 to disable norm calculation */
23+
const int PrintTime = 1; /* Output overall time of compute/communication part */
24+
25+
struct subarray {
26+
int rank, comm_size; /* MPI rank and communicator size */
27+
int x_size, y_size; /* Subarray size excluding border rows and columns */
28+
MPI_Aint l_nbh_offt; /* Offset predecessor data to update */
29+
};
30+
31+
#define ROW_SIZE(S) ((S).x_size + 2)
32+
#define XY_2_IDX(X,Y,S) (((Y)+1)*ROW_SIZE(S)+((X)+1))
33+
34+
/* Subroutine to create and initialize initial state of input subarrays */
35+
void InitDeviceArrays(double **A_dev_1, double **A_dev_2, sycl::queue q, struct subarray *sub)
36+
{
37+
size_t total_size = (sub->x_size + 2) * (sub->y_size + 2);
38+
39+
double *A = sycl::malloc_host < double >(total_size, q);
40+
*A_dev_1 = sycl::malloc_device < double >(total_size, q);
41+
*A_dev_2 = sycl::malloc_device < double >(total_size, q);
42+
43+
for (int i = 0; i < (sub->y_size + 2); i++)
44+
for (int j = 0; j < (sub->x_size + 2); j++)
45+
A[i * (sub->x_size + 2) + j] = 0.0;
46+
47+
if (sub->rank == 0) /* set top boundary */
48+
for (int i = 1; i <= sub->x_size; i++)
49+
A[i] = 1.0; /* set bottom boundary */
50+
if (sub->rank == (sub->comm_size - 1))
51+
for (int i = 1; i <= sub->x_size; i++)
52+
A[(sub->x_size + 2) * (sub->y_size + 1) + i] = 10.0;
53+
54+
for (int i = 1; i <= sub->y_size; i++) {
55+
int row_offt = i * (sub->x_size + 2);
56+
A[row_offt] = 1.0; /* set left boundary */
57+
A[row_offt + sub->x_size + 1] = 1.0; /* set right boundary */
58+
}
59+
60+
/* Move input arrays to device */
61+
q.memcpy(*A_dev_1, A, sizeof(double) * total_size);
62+
q.memcpy(*A_dev_2, A, sizeof(double) * total_size);
63+
q.wait();
64+
sycl::free(A, q);
65+
A = NULL;
66+
}
67+
68+
/* Setup subarray size and layout processed by current rank */
69+
void GetMySubarray(struct subarray *sub)
70+
{
71+
MPI_Comm_size(MPI_COMM_WORLD, &sub->comm_size);
72+
MPI_Comm_rank(MPI_COMM_WORLD, &sub->rank);
73+
sub->y_size = Ny / sub->comm_size;
74+
sub->x_size = Nx;
75+
sub->l_nbh_offt = (sub->x_size + 2) * (sub->y_size + 1) + 1;
76+
77+
78+
int tail = sub->y_size % sub->comm_size;
79+
if (tail != 0) {
80+
if (sub->rank < tail)
81+
sub->y_size++;
82+
if ((sub->rank > 0) && ((sub->rank - 1) < tail))
83+
sub->l_nbh_offt += (sub->x_size + 2);
84+
}
85+
}
86+
87+
int main(int argc, char *argv[])
88+
{
89+
double t_start;
90+
struct subarray my_subarray = { };
91+
double *A_device[2] = { };
92+
MPI_Win win[2] = { MPI_WIN_NULL, MPI_WIN_NULL };
93+
int batch_iters = 0;
94+
int passed_iters = 0;
95+
double norm = 0.0;
96+
int provided;
97+
98+
/* Initialization of runtime and initial state of data */
99+
sycl::queue q(sycl::gpu_selector_v);
100+
/* MPI_THREAD_MULTIPLE is required for device-initiated communications */
101+
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
102+
GetMySubarray(&my_subarray);
103+
InitDeviceArrays(&A_device[0], &A_device[1], q, &my_subarray);
104+
105+
#ifdef GROUP_SIZE_DEFAULT
106+
int work_group_size = GROUP_SIZE_DEFAULT;
107+
#else
108+
int work_group_size =
109+
q.get_device().get_info<sycl::info::device::max_work_group_size>();
110+
#endif
111+
112+
if ((Nx % work_group_size) != 0) {
113+
if (my_subarray.rank == 0) {
114+
printf("For simplification, sycl::info::device::max_work_group_size should be divider of X dimention of array\n");
115+
printf("Please adjust matrix size, or define GROUP_SIZE_DEFAULT\n");
116+
printf("sycl::info::device::max_work_group_size=%d Nx=%d (%d)\n", work_group_size, Nx, work_group_size % Nx);
117+
MPI_Abort(MPI_COMM_WORLD, -1);
118+
}
119+
}
120+
/* Create RMA window using device memory */
121+
MPI_Win_create(A_device[0],
122+
sizeof(double) * (my_subarray.x_size + 2) * (my_subarray.y_size + 2),
123+
sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &win[0]);
124+
MPI_Win_create(A_device[1],
125+
sizeof(double) * (my_subarray.x_size + 2) * (my_subarray.y_size + 2),
126+
sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &win[1]);
127+
MPI_Win_notify_attach(&win[0], 1, MPI_INFO_NULL);
128+
MPI_Win_notify_attach(&win[1], 1, MPI_INFO_NULL);
129+
/* Start RMA exposure epoch */
130+
MPI_Win_lock_all(0, win[0]);
131+
MPI_Win_lock_all(0, win[1]);
132+
133+
if (PrintTime) {
134+
t_start = MPI_Wtime();
135+
}
136+
137+
138+
int iterations_batch = (NormIteration <= 0) ? Niter : NormIteration;
139+
for (passed_iters = 0; passed_iters < Niter; passed_iters += iterations_batch) {
140+
141+
/* Submit compute kernel to calculate next "iterations_batch" steps */
142+
q.submit([&](auto & h) {
143+
h.parallel_for(sycl::nd_range<1>(work_group_size, work_group_size),
144+
[=](sycl::nd_item<1> item) {
145+
int local_id = item.get_local_id();
146+
int col_per_wg = my_subarray.x_size / work_group_size;
147+
148+
int my_x_lb = col_per_wg * local_id;
149+
int my_x_ub = my_x_lb + col_per_wg;
150+
151+
for (int k = 0; k < iterations_batch; ++k)
152+
{
153+
int i = passed_iters + k;
154+
MPI_Win cwin = win[(i + 1) % 2];
155+
MPI_Count c_expected = 0;
156+
double *a = A_device[i % 2];
157+
double *a_out = A_device[(i + 1) % 2];
158+
159+
/* Calculate values on borders to initiate communications early */
160+
for (int column = my_x_lb; column < my_x_ub; column ++) {
161+
int idx = XY_2_IDX(column, 0, my_subarray);
162+
a_out[idx] = 0.25 * (a[idx - 1] + a[idx + 1]
163+
+ a[idx - ROW_SIZE(my_subarray)]
164+
+ a[idx + ROW_SIZE(my_subarray)]);
165+
idx = XY_2_IDX(column, my_subarray.y_size - 1, my_subarray);
166+
a_out[idx] = 0.25 * (a[idx - 1] + a[idx + 1]
167+
+ a[idx - ROW_SIZE(my_subarray)]
168+
+ a[idx + ROW_SIZE(my_subarray)]);
169+
}
170+
171+
/* Perform 1D halo-exchange with neighbours */
172+
if (my_subarray.rank != 0) {
173+
int idx = XY_2_IDX(my_x_lb, 0, my_subarray);
174+
MPI_Put_notify(&a_out[idx], col_per_wg, MPI_DOUBLE,
175+
my_subarray.rank - 1, my_subarray.l_nbh_offt+my_x_lb,
176+
col_per_wg, MPI_DOUBLE, 0, cwin);
177+
c_expected+=work_group_size;
178+
}
179+
180+
if (my_subarray.rank != (my_subarray.comm_size - 1)) {
181+
int idx = XY_2_IDX(my_x_lb, my_subarray.y_size - 1, my_subarray);
182+
MPI_Put_notify(&a_out[idx], col_per_wg, MPI_DOUBLE,
183+
my_subarray.rank + 1, 1+my_x_lb,
184+
col_per_wg, MPI_DOUBLE, 0, cwin);
185+
c_expected+=work_group_size;
186+
}
187+
188+
/* Recalculate internal points in parallel with comunications */
189+
for (int row = 1; row < my_subarray.y_size - 1; ++row) {
190+
for (int column = my_x_lb; column < my_x_ub; column ++) {
191+
int idx = XY_2_IDX(column, row, my_subarray);
192+
a_out[idx] = 0.25 * (a[idx - 1] + a[idx + 1]
193+
+ a[idx - ROW_SIZE(my_subarray)]
194+
+ a[idx + ROW_SIZE(my_subarray)]);
195+
}
196+
}
197+
198+
item.barrier(sycl::access::fence_space::global_space);
199+
if (local_id == 0){
200+
MPI_Count c;
201+
while (c < c_expected) MPI_Win_notify_get_value(cwin, 0, &c);
202+
MPI_Win_notify_set_value(cwin, 0, 0);
203+
}
204+
item.barrier(sycl::access::fence_space::global_space);
205+
}
206+
});
207+
}).wait();
208+
209+
/* Calculate and report norm value after given number of iterations */
210+
if ((NormIteration > 0) && ((NormIteration - 1) == i % NormIteration)) {
211+
double rank_norm = 0.0;
212+
213+
{
214+
sycl::buffer<double> norm_buf(&rank_norm, 1);
215+
q.submit([&](auto & h) {
216+
auto sumr = sycl::reduction(norm_buf, h, sycl::plus<>());
217+
h.parallel_for(sycl::range(my_subarray.x_size, my_subarray.y_size), sumr, [=] (auto index, auto &v) {
218+
int idx = XY_2_IDX(index[0], index[1], my_subarray);
219+
double diff = a_out[idx] - a[idx];
220+
v += (diff * diff);
221+
});
222+
}).wait();
223+
}
224+
225+
/* Get global norm value */
226+
MPI_Reduce(&rank_norm, &norm, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
227+
if (my_subarray.rank == 0) {
228+
printf("NORM value on iteration %d: %f\n", i+1, sqrt(norm));
229+
}
230+
}
231+
}
232+
233+
if (PrintTime) {
234+
double avg_time;
235+
double rank_time;
236+
rank_time = MPI_Wtime() - t_start;
237+
238+
MPI_Reduce(&rank_time, &avg_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
239+
240+
if (my_subarray.rank == 0) {
241+
avg_time = avg_time/my_subarray.comm_size;
242+
printf("Average solver time: %f(sec)\n", avg_time);
243+
}
244+
}
245+
246+
if (my_subarray.rank == 0) {
247+
printf("[%d] SUCCESS\n", my_subarray.rank);
248+
}
249+
250+
MPI_Win_unlock_all(&win[1]);
251+
MPI_Win_unlock_all(&win[0]);
252+
253+
MPI_Win_free(&win[1]);
254+
MPI_Win_free(&win[0]);
255+
MPI_Finalize();
256+
257+
sycl::free(A_device[0], q);
258+
sycl::free(A_device[1], q);
259+
260+
return 0;
261+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*==============================================================
2+
* Copyright © 2023 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: MIT
5+
* ============================================================= */
6+
#ifndef MPIX_COMPAT_H
7+
#define MPIX_COMPAT_H
8+
9+
#define MPI_ERR_INVALID_NOTOFOCATION MPI_ERR_OTHER
10+
11+
/* int MPI_Win_notify_attach(MPI_Win win, int notification_num, MPI_Info info); */
12+
#define MPI_Win_notify_attach(win, notification_num, info) \
13+
MPIX_Win_create_notify(win, notification_num)
14+
15+
/* int MPI_Win_notify_detach(MPI_Win win); */
16+
#define MPI_Win_notify_detach(win) \
17+
MPIX_Win_free_notify(win)
18+
19+
/* int MPI_Win_notify_get_value(MPI_Win win, int notification_idx, MPI_Count *value) */
20+
#define MPI_Win_notify_get_value(win, notification_idx, value) \
21+
MPIX_Win_get_notify(win, notification_idx, value)
22+
23+
/* int MPI_Win_notify_set_value(MPI_Win win, int notification_idx, MPI_Count value) */
24+
#define MPI_Win_notify_set_value(win, notification_idx, value) \
25+
MPIX_Win_set_notify(win, notification_idx, value)
26+
27+
#define MPI_Put_notify MPIX_Put_notify
28+
#define MPI_Get_notify MPIX_Get_notify
29+
30+
#endif /* MPIX_COMPAT_H */

0 commit comments

Comments
 (0)