Skip to content
115 changes: 74 additions & 41 deletions algorithms/sycl/Reduction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,61 +29,94 @@ namespace {
return T(0);
}

template<ReductionType Type, typename AtomicRef, typename AccT>
void atomicUpdate(AtomicRef& atomic, AccT value){

constexpr auto MO = sycl::memory_order::relaxed;
AccT expected = neutral<Type, AccT>();

if constexpr(Type == ReductionType::Add) {
// Explicity pass MO to fetch_add
atomic.fetch_add(value, MO);
}
if constexpr(Type == ReductionType::Max) {
// sm 60 does not have a fetch max instruction.
// Using our own CAS loop
// Explicity pass MO to load
// AccT expected = atomic.load(MO);

while(true){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make while(true) { if (condition) break; -> while(!condition)

if(expected>=value) break;

if(atomic.compare_exchange_weak(expected, value, MO, MO)){
break;
}
}
// atomic.fetch_max(value);
}
if constexpr(Type == ReductionType::Min) {
//sm 60 does not have a fetch min instruction
// Using our own CAS loop
// AccT expected = atomic.load(MO);

while(true){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in L48

if(expected<=value) break;

if(atomic.compare_exchange_weak(expected, value, MO, MO)){
break;
}
}
// atomic.fetch_min(value);
}
}

template <ReductionType Type, typename AccT, typename VecT, typename OpT> void launchReduction(AccT* result, const VecT *buffer, size_t size, OpT operation, bool overrideResult, void* streamPtr) {

constexpr auto DefaultValue = neutral<Type, AccT>();
constexpr size_t workGroupSize = 256;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 256 and not 1024? (or are you at the bandwidth already like that?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and for higher numbers, there is no real improvement on PVC at least.

constexpr size_t itemsPerWorkItem = 8;

if(overrideResult){
((sycl::queue *) streamPtr)->submit([&](sycl::handler &cgh) {
sycl::local_accessor<AccT, 1> shmem(256, cgh);
cgh.parallel_for(sycl::nd_range<1> { 1024, 1024 },
[=](sycl::nd_item<1> idx) {
const auto subgroup = idx.get_sub_group();
const auto sgSize = subgroup.get_local_range().size();
cgh.single_task([=](){
// Initialize the global result to identity value.
*result = DefaultValue;
});
});
}

const auto warpCount = subgroup.get_group_range().size();
const int currentWarp = subgroup.get_group_id();
const int threadInWarp = subgroup.get_local_id();
const auto warpsNeeded = (size + sgSize - 1) / sgSize;
((sycl::queue *) streamPtr)->submit([&](sycl::handler &cgh) {

auto acc = DefaultValue;

#pragma unroll 4
for (std::size_t i = currentWarp; i < warpsNeeded; i += warpCount) {
const auto id = threadInWarp + i * sgSize;
auto value = (id < size) ? static_cast<AccT>(ntload(&buffer[id])) : DefaultValue;
size_t numWorkGroups = (size + (workGroupSize * itemsPerWorkItem) - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

/ (workGroupSize * itemsPerWorkItem);

value = sycl::reduce_over_group(subgroup, value, operation);
cgh.parallel_for(sycl::nd_range<1> { numWorkGroups*itemsPerWorkItem, workGroupSize },
[=](sycl::nd_item<1> idx) {

acc = operation(acc, value);
}
const auto localId = idx.get_local_id(0);
const auto groupId = idx.get_group(0);

if (threadInWarp == 0) {
shmem[currentWarp] = acc;
//Thread-local reduction
AccT threadAcc = DefaultValue;
size_t baseIdx = groupId*(workGroupSize*itemsPerWorkItem) + localId;

#pragma unroll
for (std::size_t i = 0; i < itemsPerWorkItem*workGroupSize; i += workGroupSize) {
const auto id = baseIdx + i;
if(id < size){
threadAcc = operation(threadAcc, static_cast<AccT>(ntload(&buffer[id])));
}
}

idx.barrier();

if (currentWarp == 0) {
const auto lastWarpsNeeded = (warpCount + sgSize - 1) / sgSize;
auto lastAcc = DefaultValue;
#pragma unroll 2
for (int i = 0; i < lastWarpsNeeded; ++i) {
const auto id = threadInWarp + i * sgSize;
auto value = (id < warpCount) ? shmem[id] : DefaultValue;
idx.barrier(sycl::access::fence_space::local_space);

value = sycl::reduce_over_group(subgroup, value, operation);
auto reducedValue = sycl::reduce_over_group(idx.get_group(), threadAcc, operation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const


lastAcc = operation(lastAcc, value);
}

if (threadInWarp == 0) {
if (overrideResult) {
ntstore(result, lastAcc);
}
else {
ntstore(result, operation(ntload(result), lastAcc));
}
}
if(localId == 0){
sycl::atomic_ref<AccT, sycl::memory_order::relaxed,
sycl::memory_scope::device,
sycl::access::address_space::global_space> atomicRes(*result);
atomicUpdate<Type>(atomicRes, reducedValue);
}
});
});
Expand Down
Loading