Skip to content

Commit 0a67730

Browse files
Nikita Gusevblazej-smorawskijbrosenzdependabot[bot]
authored
Intel(R) oneAPI Collective Communications Library (oneCCL) 2021.15.6 (#184)
* Intel(R) oneAPI Collective Communications Library (oneCCL) 2021.15.6 * infra: bump cmake version * Update README.md to provide notice of deprecation of legacy C++ API. * Bump setuptools from 75.1.0 to 78.1.1 in /doc Bumps [setuptools](https://github.com/pypa/setuptools) from 75.1.0 to 78.1.1. - [Release notes](https://github.com/pypa/setuptools/releases) - [Changelog](https://github.com/pypa/setuptools/blob/main/NEWS.rst) - [Commits](pypa/setuptools@v75.1.0...v78.1.1) --- updated-dependencies: - dependency-name: setuptools dependency-version: 78.1.1 dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: Błażej Smorawski <blazej.smorawski@intel.com> Co-authored-by: Joel Rosenzweig <joel.b.rosenzweig@intel.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent 52eee8d commit 0a67730

23 files changed

+349
-111
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ endif()
335335

336336
set(CCL_MAJOR_VERSION "2021")
337337
set(CCL_MINOR_VERSION "15")
338-
set(CCL_UPDATE_VERSION "5")
338+
set(CCL_UPDATE_VERSION "6")
339339
set(CCL_PRODUCT_STATUS "Gold")
340340
string(TIMESTAMP CCL_PRODUCT_BUILD_DATE "%Y-%m-%dT %H:%M:%SZ")
341341
get_vcs_properties("git")

man/doxconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROJECT_NAME = "Intel® oneAPI Collective Communications Library"
2-
PROJECT_NUMBER = "2021.15.5"
2+
PROJECT_NUMBER = "2021.15.6"
33

44
INPUT = ../src/common/env/vars.hpp ../src/common/env/vars_experimental.hpp
55

src/atl/ofi/atl_ofi_comm.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,63 @@ atl_ofi_comm::atl_ofi_comm(int comm_size,
4545
CCL_THROW_IF_NOT(init_transport(true) == ATL_STATUS_SUCCESS, "init transport failed");
4646
}
4747

48+
atl_status_t atl_ofi_comm::barrier(size_t ep_idx, atl_req_t& req) {
49+
ssize_t ret = ATL_STATUS_SUCCESS;
50+
51+
req.is_completed = false;
52+
atl_ofi_req_t* ofi_req = ((atl_ofi_req_t*)req.internal);
53+
54+
if (size == 1) {
55+
ofi_req->comp_state = ATL_OFI_COMP_COMPLETED;
56+
return ATL_STATUS_SUCCESS;
57+
}
58+
59+
int tag_comm_id = (comm_id != atl_comm_id_storage::invalid_comm_id)
60+
? comm_id
61+
: atl_comm_id_storage::max_comm_id;
62+
int tagc = tag_counter++;
63+
int src, dst;
64+
const int len = 1;
65+
char sendbuf[len], recvbuf[len];
66+
int mask = 0x1;
67+
while (mask < size) {
68+
dst = (rank + mask) % size;
69+
src = (rank - mask + size) % size;
70+
atl_req send_req, recv_req;
71+
uint64_t op_tag = tag_creator->create(rank, tag_comm_id, tagc, 1);
72+
do {
73+
ret = send(ep_idx, sendbuf, len, dst, op_tag, send_req);
74+
CCL_THROW_IF_NOT(ret != ATL_STATUS_FAILURE, "send failed");
75+
if (ret == ATL_STATUS_AGAIN) {
76+
ccl_yield(ccl::global_data::env().yield_type);
77+
}
78+
} while (ret == ATL_STATUS_AGAIN);
79+
op_tag = tag_creator->create(src, tag_comm_id, tagc, 1);
80+
do {
81+
ret = recv(ep_idx, recvbuf, len, src, op_tag, recv_req);
82+
CCL_THROW_IF_NOT(ret != ATL_STATUS_FAILURE, "recv failed");
83+
if (ret == ATL_STATUS_AGAIN) {
84+
ccl_yield(ccl::global_data::env().yield_type);
85+
}
86+
} while (ret == ATL_STATUS_AGAIN);
87+
while (!send_req.is_completed || !recv_req.is_completed) {
88+
poll(ep_idx);
89+
if (!send_req.is_completed) {
90+
CCL_THROW_IF_NOT(check(ep_idx, send_req) != ATL_STATUS_FAILURE,
91+
"check send failed");
92+
}
93+
if (!recv_req.is_completed) {
94+
CCL_THROW_IF_NOT(check(ep_idx, recv_req) != ATL_STATUS_FAILURE,
95+
"check recv failed");
96+
}
97+
}
98+
mask <<= 1;
99+
}
100+
101+
ofi_req->comp_state = ATL_OFI_COMP_COMPLETED;
102+
return ATL_STATUS_SUCCESS;
103+
}
104+
48105
atl_status_t atl_ofi_comm::allgatherv(size_t ep_idx,
49106
const void* send_buf,
50107
size_t send_len,

src/atl/ofi/atl_ofi_comm.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ class atl_ofi_comm : public atl_base_comm {
118118
return ATL_STATUS_UNSUPPORTED;
119119
}
120120

121-
atl_status_t barrier(size_t ep_idx, atl_req_t& req) override {
122-
return ATL_STATUS_UNSUPPORTED;
123-
}
121+
atl_status_t barrier(size_t ep_idx, atl_req_t& req) override;
124122

125123
atl_status_t bcast(size_t ep_idx, void* buf, size_t len, int root, atl_req_t& req) override {
126124
return ATL_STATUS_UNSUPPORTED;

src/coll/algorithms/allgatherv/sycl/allgatherv_large_sycl.cpp

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,27 +70,45 @@ ccl::event allgatherv_large(const void* send_buf,
7070
dep.wait();
7171
}
7272
}
73-
std::vector<void*> ptrs{ (void*)send_buf, recv_buf }; // index 0 and 1
74-
auto [sched, exchange_entry] = do_ipc_exchange(comm, global_stream, ptrs);
7573

76-
sycl_ptrs.xelink_ptrs_rd = get_ipc_ptrs<void, MAX_GPUS>(even_comm, 0, (void*)send_buf, sched);
77-
sycl_ptrs.xelink_ptrs_wr = get_ipc_ptrs<void, MAX_GPUS>(even_comm, 1, recv_buf, sched);
78-
// use full vector (>= 8 bytes) if remote buffers and data size are 4 byte aligned
79-
use_full_vector = use_full_vector &&
80-
all_aligned(sycl_ptrs.xelink_ptrs_rd.data(), even_comm->size(), send_count * dsize, 4) &&
81-
all_aligned(sycl_ptrs.xelink_ptrs_wr.data(), even_comm->size(), send_count * dsize, 4);
74+
if (is_arc_card(ccl::ze::get_device_family(global_stream->get_ze_device()))) {
75+
// only need output buffer
76+
std::vector<void*> ptrs{ recv_buf }; // index 0
77+
auto [sched, exchange_entry] = do_ipc_exchange(comm, global_stream, ptrs);
8278

83-
if (pair_comm->size() > 1) {
84-
assert(pair_comm->size() == MAX_TILES);
85-
int peer_pair_rank = pair_comm->rank() ? 0 : 1;
86-
sycl_ptrs.mdfi_ptr_rd =
87-
get_ipc_ptrs<void, MAX_TILES>(pair_comm, 0, (void*)send_buf, sched)[peer_pair_rank];
88-
sycl_ptrs.mdfi_ptr_wr = get_ipc_ptrs<void, MAX_TILES>(pair_comm, 1, recv_buf, sched)[peer_pair_rank];
89-
use_full_vector = use_full_vector && all_aligned(&sycl_ptrs.mdfi_ptr_rd, 1, send_count * dsize, 4) &&
90-
all_aligned(&sycl_ptrs.mdfi_ptr_wr, 1, send_count * dsize, 4);
79+
std::shared_ptr<ccl_comm> node_comm = comm->get_node_comm();
80+
sycl_ptrs.node_ptrs_wr = get_ipc_ptrs<void, MAX_NODE_RANKS>(node_comm, 0, recv_buf, sched);
81+
82+
delete exchange_entry;
83+
delete sched;
84+
}
85+
else {
86+
std::vector<void*> ptrs{ (void*)send_buf, recv_buf }; // index 0 and 1
87+
auto [sched, exchange_entry] = do_ipc_exchange(comm, global_stream, ptrs);
88+
89+
sycl_ptrs.xelink_ptrs_rd = get_ipc_ptrs<void, MAX_GPUS>(even_comm, 0, (void*)send_buf, sched);
90+
sycl_ptrs.xelink_ptrs_wr = get_ipc_ptrs<void, MAX_GPUS>(even_comm, 1, recv_buf, sched);
91+
// use full vector (>= 8 bytes) if remote buffers and data size are 4 byte aligned
92+
use_full_vector =
93+
use_full_vector &&
94+
all_aligned(sycl_ptrs.xelink_ptrs_rd.data(), even_comm->size(), send_count * dsize, 4) &&
95+
all_aligned(sycl_ptrs.xelink_ptrs_wr.data(), even_comm->size(), send_count * dsize, 4);
96+
97+
if (pair_comm->size() > 1) {
98+
assert(pair_comm->size() == MAX_TILES);
99+
int peer_pair_rank = pair_comm->rank() ? 0 : 1;
100+
sycl_ptrs.mdfi_ptr_rd =
101+
get_ipc_ptrs<void, MAX_TILES>(pair_comm, 0, (void*)send_buf, sched)[peer_pair_rank];
102+
sycl_ptrs.mdfi_ptr_wr =
103+
get_ipc_ptrs<void, MAX_TILES>(pair_comm, 1, recv_buf, sched)[peer_pair_rank];
104+
use_full_vector = use_full_vector &&
105+
all_aligned(&sycl_ptrs.mdfi_ptr_rd, 1, send_count * dsize, 4) &&
106+
all_aligned(&sycl_ptrs.mdfi_ptr_wr, 1, send_count * dsize, 4);
107+
}
108+
109+
delete exchange_entry;
110+
delete sched;
91111
}
92-
delete exchange_entry;
93-
delete sched;
94112

95113
// coll_init(comm, global_stream);
96114
}

src/coll/algorithms/allgatherv/sycl/allgatherv_large_sycl_impl.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,39 @@ ccl::event allgatherv_large_impl_ipc(const void* send_buf,
214214

215215
std::vector<sycl::event> dep_events = get_sycl_events(deps);
216216

217+
if (is_arc_card(ccl::ze::get_device_family(global_stream->get_ze_device()))) {
218+
sycl::event kernel_event;
219+
220+
sycl::event barrier_event1 = invoke_barrier(node_comm, q, dep_events, is_cpu_barrier);
221+
222+
int rank = comm->rank();
223+
for (int i = 0; i < N; i++) {
224+
// scatter the ranks and limit the amount to copy
225+
int peer = (rank + i) % N;
226+
// limit amount of write due to crash in KMD (read timeout error)
227+
const size_t max_chunk = 512 * 1024 * 1024;
228+
size_t left = send_count * dsize;
229+
size_t offset = 0;
230+
while (left > 0) {
231+
size_t chunk = left > max_chunk ? max_chunk : left;
232+
kernel_event = q.submit([=](sycl::handler& h) {
233+
h.depends_on(barrier_event1);
234+
h.memcpy(((char*)sycl_ptrs.node_ptrs_wr[peer] + rank * send_count * dsize) + offset,
235+
(char*)send_buf + offset,
236+
chunk);
237+
});
238+
left -= chunk;
239+
offset += chunk;
240+
// skip the barrier for the very last iterations
241+
if (i < N - 1 || left > 0)
242+
kernel_event = invoke_barrier(node_comm, q, { kernel_event }, is_cpu_barrier);
243+
}
244+
}
245+
246+
kernel_event = invoke_barrier(node_comm, q, { kernel_event }, is_cpu_barrier);
247+
return ccl::event::create_from_native(kernel_event);
248+
}
249+
217250
std::array<void*, MAX_GPUS> local_peer_even_ptrs, local_local_ptrs, local_peer_pair_ptrs;
218251
for (int i = 0; i < even_comm->size(); i++) {
219252
// offsets for read_write kernel

src/coll/algorithms/allgatherv/sycl/allgatherv_pcie.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ ccl::event allgatherv_ll_ring(const void *send_buf,
4646
uint32_t pattern = comm->get_rt_pattern(pattern_type::collective, -1);
4747

4848
auto lambda = [&]<typename T, template <typename, int> class Proto>(int NRanks) {
49+
const size_t *offs = offsets.empty() ? NULL : offsets.data();
50+
4951
T *peerbuf0[NRanks];
5052
T *peerbuf1[NRanks];
5153
for (int i = 0; i < NRanks; i++) {
@@ -57,6 +59,7 @@ ccl::event allgatherv_ll_ring(const void *send_buf,
5759
sycl::event e = AllGather<T, Proto, RingTransmit>::launch(NRanks,
5860
(T *)send_buf,
5961
(T *)recv_buf,
62+
offs,
6063
ipcbuf0,
6164
ipcbuf1,
6265
peerbuf0,

src/coll/algorithms/allgatherv/sycl/allgatherv_pcie.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
3535
AllGather(int nranks,
3636
T* input,
3737
T* output,
38+
const size_t* offsets,
3839
size_t nelems,
3940
int rank,
4041
uint32_t seqNo,
@@ -46,6 +47,7 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
4647
: Transmit<T, Proto, SubGroupSize>(nranks,
4748
input,
4849
output,
50+
offsets,
4951
scatterBuf,
5052
gatherBuf,
5153
peerBuf0,
@@ -83,6 +85,7 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
8385
static sycl::event launch(int nranks,
8486
T* input,
8587
T* output,
88+
const size_t* offsets,
8689
T* ipcbuf0,
8790
T* ipcbuf1,
8891
T* const peerbuf0[],
@@ -94,7 +97,8 @@ struct AllGather : public Transmit<T, Proto, SubGroupSize> {
9497
bool p2p,
9598
bool& done) {
9699
sycl::event e;
97-
AllGather offload(nranks, input, output, nelems, rank, step, ipcbuf0, ipcbuf1, peerbuf0, peerbuf1, p2p);
100+
AllGather offload(
101+
nranks, input, output, offsets, nelems, rank, step, ipcbuf0, ipcbuf1, peerbuf0, peerbuf1, p2p);
98102
if (offload.workSize == 0) {
99103
done = false;
100104
return e;

src/coll/algorithms/allgatherv/sycl/allgatherv_sycl.cpp

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,58 @@ ccl::event allgather_sycl_single_node(sycl::queue& q,
7676
done = false;
7777
return e;
7878
}
79-
LOG_DEBUG("invoking allgatherv LL256 kernel, send_count:", send_count, " datatype: ", dtype);
80-
e = allgatherv_ll_ring(
81-
send_buf, send_count, recv_buf, recv_counts, offsets, dtype, comm, global_stream, deps, done);
82-
LOG_DEBUG("invoking allgatherv LL256 kernel, count:", send_count, " datatype: ", dtype, " done");
83-
return e;
79+
if (send_count * ccl_dtype.size() < 256 * 1024 || !ccl::global_data::env().sycl_ccl_barrier ||
80+
ccl::global_data::env().sycl_allgatherv_tmp_buf) {
81+
int node_size = comm->size();
82+
const int chunk_size = ccl::global_data::env().sycl_allgatherv_chunking_threshold;
83+
size_t max_pack_count;
84+
if (chunk_size == 0 || send_count * ccl_dtype.size() <= chunk_size) {
85+
max_pack_count = send_count;
86+
}
87+
else {
88+
max_pack_count = chunk_size;
89+
int typesize = std::max(4, (int)ccl_dtype.size());
90+
max_pack_count = max_pack_count / typesize * typesize;
91+
max_pack_count = max_pack_count / ccl_dtype.size();
92+
CCL_ASSERT(max_pack_count > 0);
93+
}
94+
95+
int send_offset = 0;
96+
int nchunks = (send_count + max_pack_count - 1) / max_pack_count;
97+
for (int iter = 0; iter < nchunks; iter++) {
98+
int pack_count = (iter < nchunks - 1) ? max_pack_count : send_count - send_offset;
99+
std::vector<size_t> scaleup_counts(node_size, pack_count);
100+
std::vector<size_t> scaleup_offsets(node_size);
101+
for (int r = 0; r < node_size; r++) {
102+
scaleup_offsets[r] = (offsets.empty() ? r * send_count * ccl_dtype.size() : offsets[r]) +
103+
send_offset * ccl_dtype.size();
104+
}
105+
#ifdef CCL_ENABLE_ITT
106+
ccl::profile::itt::task_begin("allgatherv_small", "send_size", pack_count * ccl_dtype.size());
107+
#endif // CCL_ENABLE_ITT
108+
LOG_DEBUG("invoking allgatherv LL256 kernel, send_count:", pack_count, " datatype: ", dtype);
109+
e = allgatherv_ll_ring((char*)send_buf + send_offset * ccl_dtype.size(),
110+
pack_count,
111+
recv_buf,
112+
scaleup_counts,
113+
scaleup_offsets,
114+
dtype,
115+
comm,
116+
global_stream,
117+
deps,
118+
done);
119+
LOG_DEBUG("invoking allgatherv LL256 kernel, count:", pack_count, " datatype: ", dtype, " done");
120+
#ifdef CCL_ENABLE_ITT
121+
ccl::profile::itt::task_end();
122+
#endif // CCL_ENABLE_ITT
123+
send_offset += pack_count;
124+
} // for
125+
return e;
126+
}
127+
CCL_THROW_IF_NOT(ccl::global_data::env().sycl_ccl_barrier,
128+
"To run on BMG, CCL_SYCL_CCL_BARRIER must be set to 1");
129+
CCL_THROW_IF_NOT(ccl::global_data::env().sycl_allgatherv_tmp_buf == 0,
130+
"To run on BMG, CCL_SYCL_ALLGATHERV_TMP_BUF must be set to 0");
84131
}
85132

86133
if (!ccl::global_data::env().sycl_esimd) {
@@ -281,8 +328,12 @@ ccl::event allgatherv_sycl_multi_node(sycl::queue& q,
281328
{
282329
std::vector<size_t> scaleup_counts(node_size, pack_count);
283330
for (int i = 0; i < r2r_size; i++) {
284-
std::vector<size_t> scaleup_offsets(global_offsets.begin() + i * node_size,
285-
global_offsets.begin() + (i + 1) * node_size);
331+
std::vector<size_t> scaleup_offsets(node_size);
332+
for (int r = 0; r < node_size; r++) {
333+
const int global_rank = r + i * node_size;
334+
scaleup_offsets[r] = (send_count * global_rank + send_offset) * ccl_dtype.size();
335+
}
336+
286337
ev = allgather_sycl_single_node(q,
287338
(char*)(scaleout_buf) + scaleout_offsets[i],
288339
recv_scaleout_counts[i],

src/coll/algorithms/allreduce/sycl/allreduce_ring_ll256.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ sycl::event arc_ll256_allreduce(const void *src,
175175
/* To avoid pattern not changed when "iters" is 1 */
176176
pattern_t pattern_prefix = ++pattern_counter << 16;
177177

178+
size_t persist_buf_size = ccl::global_data::env().sycl_tmp_buf_size / 3;
179+
const int GATHER_BUF_OFFSET = persist_buf_size / 2;
180+
178181
sycl_e = q.submit([&](auto &h) {
179182
//using namespace sycl::ext::intel::experimental::esimd;
180183

0 commit comments

Comments
 (0)