1010#include < math.h>
1111
1212namespace device {
13+
14+ constexpr int WorkGroupSize = 256 ;
15+ constexpr int ItemsPerWorkItem = 16 ;
1316template <typename T>
1417struct Sum {
1518 T defaultValue{0 };
@@ -44,70 +47,106 @@ __forceinline__ __device__ T shuffledown(T value, int offset) {
4447}
4548#endif
4649
47- // a rather "dumb", but general reduction kernel
48- // (not intended for intensive use; there's the thrust libraries instead)
50+ // Warp reduce operation similar to SYCL
51+ template <typename T, typename OperationT>
52+ __device__ __forceinline__ T warpReduce (T value, OperationT operation) {
4953
50- template <typename AccT, typename VecT, typename OperationT>
51- __launch_bounds__ (1024 ) void __global__ kernel_reduce (
52- AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {
53- __shared__ AccT shmem[256 ];
54- const auto warpCount = blockDim.x / warpSize;
55- const auto currentWarp = threadIdx.x / warpSize;
56- const auto threadInWarp = threadIdx.x % warpSize;
57- const auto warpsNeeded = (size + warpSize - 1 ) / warpSize;
58-
59- auto value = operation.defaultValue ;
60- auto acc = operation.defaultValue ;
61-
62- #pragma unroll 4
63- for (std::size_t i = currentWarp; i < warpsNeeded; i += warpCount) {
64- const auto id = threadInWarp + i * warpSize;
65- const auto valueNew =
66- (id < size) ? static_cast <AccT>(ntload (&vector[id])) : operation.defaultValue ;
67-
68- value = operation (value, valueNew);
69- }
70-
71- for (int offset = 1 ; offset < warpSize; offset *= 2 ) {
54+ for (int offset = warpSize / 2 ; offset > 0 ; offset /= 2 ) {
7255 value = operation (value, shuffledown (value, offset));
7356 }
57+ return value;
58+ }
59+
60+ // Helper function for Generic Atomic Update
61+ // Fallback to atomicCAS-based implementation if atomic instruction is not available
62+ // Picked from: https://docs.nvidia.com/cuda/cuda-c-programming-guide/#atomic-functions
63+ template <typename T, typename OperationT>
64+ __device__ __forceinline__ void genericAtomicUpdate (T* address, T val, OperationT operation) {
65+ unsigned long long * address_as_ull = (unsigned long long *)address;
66+ unsigned long long old = *address_as_ull, assumed;
67+ do {
68+ assumed = old;
69+ T calculatedRes = operation (*(T*)&assumed, val);
70+ old = atomicCAS (address_as_ull, assumed, *(unsigned long long *)&calculatedRes);
71+ } while (assumed != old);
72+ }
7473
75- acc = operation (acc, value);
74+ // Native atomics
75+ template <>
76+ __device__ void
77+ atomicUpdate<int , device::Sum<int >>(int * address, int val, device::Sum<int > operation) {
78+ atomicAdd (address, val);
79+ }
80+ template <>
81+ __device__ void atomicUpdate<float , device::Sum<float >>(float * address,
82+ float val,
83+ device::Sum<float > operation) {
84+ atomicAdd (address, val);
85+ }
86+ #if __CUDA_ARCH__ >= 600
87+ template <>
88+ __device__ void atomicUpdate<double , device::Sum<double >>(double * address,
89+ double val,
90+ device::Sum<double > operation) {
91+ atomicAdd (address, val);
92+ }
93+ #endif
7694
77- if (threadInWarp == 0 ) {
78- shmem[currentWarp] = acc;
79- }
95+ // Block Reduce
96+ template <typename T, typename OperationT>
97+ __device__ __forceinline__ T blockReduce (T val, T* shmem, OperationT operation) {
98+
99+ const int laneId = threadIdx.x % warpSize;
100+ const int warpId = threadIdx.x / warpSize;
80101
102+ val = warpReduce (val, operation);
103+ if (laneId == 0 )
104+ shmem[warpId] = val;
81105 __syncthreads ();
82106
83- if (currentWarp == 0 ) {
84- const auto lastWarpsNeeded = (warpCount + warpSize - 1 ) / warpSize ;
107+ const int numWarps = WorkGroupSize / warpSize;
108+ val = (threadIdx. x < numWarps) ? shmem[laneId] : operation. defaultValue ;
85109
86- auto value = operation. defaultValue ;
87- auto lastAcc = operation. defaultValue ;
110+ if (warpId == 0 )
111+ val = warpReduce (val, operation) ;
88112
89- #pragma unroll 2
90- for (int i = 0 ; i < lastWarpsNeeded; ++i) {
91- const auto id = threadInWarp + i * warpSize;
92- const auto valueNew = (id < warpCount) ? shmem[id] : operation.defaultValue ;
113+ return val;
114+ }
93115
94- value = operation (value, valueNew);
95- }
116+ // Init Kernel to handle overrideResult safely across multiple blocks
117+ template <typename T, typename OperationT>
118+ __global__ void initKernel (T* result, OperationT operation) {
119+ if (threadIdx.x == 0 ) {
120+ *result = operation.defaultValue ;
121+ }
122+ }
96123
97- for ( int offset = 1 ; offset < warpSize; offset *= 2 ) {
98- value = operation (value, shuffledown (value, offset));
99- }
124+ template < typename AccT, typename VecT, typename OperationT>
125+ __launch_bounds__ (WorkGroupSize) void __global__ kernel_reduce (
126+ AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {
100127
101- lastAcc = operation (lastAcc, value);
128+ // Maximum block size 1024, warp size 32 so 1024/32 = 32 chosen
129+ // For AMD, warp size 64, 1024/64 = 16, but 32 should work with a few idle memory addresses
130+ __shared__ AccT shmem[32 ];
102131
103- if (threadIdx.x == 0 ) {
104- if (overrideResult) {
105- ntstore (result, lastAcc);
106- } else {
107- ntstore (result, operation (ntload (result), lastAcc));
108- }
132+ AccT threadAcc = operation.defaultValue ;
133+ size_t blockBaseIdx = blockIdx.x * (WorkGroupSize * ItemsPerWorkItem);
134+ size_t threadBaseIdx = blockBaseIdx + threadIdx.x ;
135+
136+ #pragma unroll
137+ for (int i = 0 ; i < ItemsPerWorkItem; i++) {
138+ size_t idx = threadBaseIdx + i * WorkGroupSize;
139+ if (idx < size) {
140+ threadAcc = operation (threadAcc, static_cast <AccT>(ntload (&vector[idx])));
109141 }
110142 }
143+
144+ AccT blockAcc = blockReduce<AccT, OperationT>(threadAcc, shmem, operation);
145+
146+ if (threadIdx.x == 0 ) {
147+ (void )overrideResult; // to silence unused parameter warning for non-Add reductions
148+ atomicUpdate (result, blockAcc, operation);
149+ }
111150}
112151
113152template <typename AccT, typename VecT>
@@ -119,22 +158,36 @@ void Algorithms::reduceVector(AccT* result,
119158 void * streamPtr) {
120159 auto * stream = reinterpret_cast <internals::DeviceStreamT>(streamPtr);
121160
122- dim3 grid (1 , 1 , 1 );
123- dim3 block (1024 , 1 , 1 );
161+ size_t totalItems = WorkGroupSize * ItemsPerWorkItem;
162+ size_t numBlocks = (size + totalItems - 1 ) / totalItems;
163+
164+ if (overrideResult) {
165+ switch (type) {
166+ case ReductionType::Add:
167+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Sum<AccT>());
168+ break ;
169+ case ReductionType::Max:
170+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Max<AccT>());
171+ break ;
172+ case ReductionType::Min:
173+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Min<AccT>());
174+ break ;
175+ }
176+ }
124177
125178 switch (type) {
126179 case ReductionType::Add: {
127- kernel_reduce<<<grid, block , 0 , stream>>>(
180+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
128181 result, buffer, size, overrideResult, device::Sum<AccT>());
129182 break ;
130183 }
131184 case ReductionType::Max: {
132- kernel_reduce<<<grid, block , 0 , stream>>>(
185+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
133186 result, buffer, size, overrideResult, device::Max<AccT>());
134187 break ;
135188 }
136189 case ReductionType::Min: {
137- kernel_reduce<<<grid, block , 0 , stream>>>(
190+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
138191 result, buffer, size, overrideResult, device::Min<AccT>());
139192 break ;
140193 }
0 commit comments