1010#include < math.h>
1111
1212namespace device {
13+
14+ constexpr int WorkGroupSize = 256 ;
15+ constexpr int ItemsPerWorkItem = 16 ;
1316template <typename T>
1417struct Sum {
1518 T defaultValue{0 };
@@ -44,70 +47,104 @@ __forceinline__ __device__ T shuffledown(T value, int offset) {
4447}
4548#endif
4649
47- // a rather "dumb", but general reduction kernel
48- // (not intended for intensive use; there's the thrust libraries instead)
50+ // Warp reduce operation similar to SYCL
51+ template <typename T, typename OperationT>
52+ __device__ __forceinline__ T warpReduce (T value, OperationT operation) {
4953
50- template <typename AccT, typename VecT, typename OperationT>
51- __launch_bounds__ (1024 ) void __global__ kernel_reduce (
52- AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {
53- __shared__ AccT shmem[256 ];
54- const auto warpCount = blockDim.x / warpSize;
55- const auto currentWarp = threadIdx.x / warpSize;
56- const auto threadInWarp = threadIdx.x % warpSize;
57- const auto warpsNeeded = (size + warpSize - 1 ) / warpSize;
58-
59- auto value = operation.defaultValue ;
60- auto acc = operation.defaultValue ;
61-
62- #pragma unroll 4
63- for (std::size_t i = currentWarp; i < warpsNeeded; i += warpCount) {
64- const auto id = threadInWarp + i * warpSize;
65- const auto valueNew =
66- (id < size) ? static_cast <AccT>(ntload (&vector[id])) : operation.defaultValue ;
67-
68- value = operation (value, valueNew);
69- }
70-
71- for (int offset = 1 ; offset < warpSize; offset *= 2 ) {
54+ for (int offset = warpSize / 2 ; offset > 0 ; offset /= 2 ) {
7255 value = operation (value, shuffledown (value, offset));
7356 }
57+ return value;
58+ }
59+
60+ // Helper function for Generic Atomic Update
61+ // Fallback to atomicCAS-based implementation if atomic instruction is not available
62+ // Picked from: https://docs.nvidia.com/cuda/cuda-c-programming-guide/#atomic-functions
63+ template <typename T, typename OperationT>
64+ __device__ __forceinline__ void atomicUpdate (T* address, T val, OperationT operation) {
65+ unsigned long long * address_as_ull = (unsigned long long *)address;
66+ unsigned long long old = *address_as_ull, assumed;
67+ do {
68+ assumed = old;
69+ T calculatedRes = operation (*(T*)&assumed, val);
70+ old = atomicCAS (address_as_ull, assumed, *(unsigned long long *)&calculatedRes);
71+ } while (assumed != old);
72+ }
7473
75- acc = operation (acc, value);
74+ // Native atomics
75+ template <>
76+ __device__ __forceinline__ void
77+ atomicUpdate<int , device::Sum<int >>(int * address, int val, device::Sum<int > operation) {
78+ atomicAdd (address, val);
79+ }
80+ template <>
81+ __device__ __forceinline__ void atomicUpdate<float , device::Sum<float >>(
82+ float * address, float val, device::Sum<float > operation) {
83+ atomicAdd (address, val);
84+ }
85+ #if __CUDA_ARCH__ >= 600
86+ template <>
87+ __device__ __forceinline__ void atomicUpdate<double , device::Sum<double >>(
88+ double * address, double val, device::Sum<double > operation) {
89+ atomicAdd (address, val);
90+ }
91+ #endif
7692
77- if (threadInWarp == 0 ) {
78- shmem[currentWarp] = acc;
79- }
93+ // Block Reduce
94+ template <typename T, typename OperationT>
95+ __device__ __forceinline__ T blockReduce (T val, T* shmem, OperationT operation) {
96+
97+ const int laneId = threadIdx.x % warpSize;
98+ const int warpId = threadIdx.x / warpSize;
8099
100+ val = warpReduce (val, operation);
101+ if (laneId == 0 )
102+ shmem[warpId] = val;
81103 __syncthreads ();
82104
83- if (currentWarp == 0 ) {
84- const auto lastWarpsNeeded = (warpCount + warpSize - 1 ) / warpSize ;
105+ const int numWarps = WorkGroupSize / warpSize;
106+ val = (threadIdx. x < numWarps) ? shmem[laneId] : operation. defaultValue ;
85107
86- auto value = operation. defaultValue ;
87- auto lastAcc = operation. defaultValue ;
108+ if (warpId == 0 )
109+ val = warpReduce (val, operation) ;
88110
89- #pragma unroll 2
90- for (int i = 0 ; i < lastWarpsNeeded; ++i) {
91- const auto id = threadInWarp + i * warpSize;
92- const auto valueNew = (id < warpCount) ? shmem[id] : operation.defaultValue ;
111+ return val;
112+ }
93113
94- value = operation (value, valueNew);
95- }
114+ // Init Kernel to handle overrideResult safely across multiple blocks
115+ template <typename T, typename OperationT>
116+ __global__ void initKernel (T* result, OperationT operation) {
117+ if (threadIdx.x == 0 ) {
118+ *result = operation.defaultValue ;
119+ }
120+ }
96121
97- for ( int offset = 1 ; offset < warpSize; offset *= 2 ) {
98- value = operation (value, shuffledown (value, offset));
99- }
122+ template < typename AccT, typename VecT, typename OperationT>
123+ __launch_bounds__ (WorkGroupSize) void __global__ kernel_reduce (
124+ AccT* result, const VecT* vector, size_t size, bool overrideResult, OperationT operation) {
100125
101- lastAcc = operation (lastAcc, value);
126+ // Maximum block size 1024, warp size 32 so 1024/32 = 32 chosen
127+ // For AMD, warp size 64, 1024/64 = 16, but 32 should work with a few idle memory addresses
128+ __shared__ AccT shmem[32 ];
102129
103- if (threadIdx.x == 0 ) {
104- if (overrideResult) {
105- ntstore (result, lastAcc);
106- } else {
107- ntstore (result, operation (ntload (result), lastAcc));
108- }
130+ AccT threadAcc = operation.defaultValue ;
131+ size_t blockBaseIdx = blockIdx.x * (WorkGroupSize * ItemsPerWorkItem);
132+ size_t threadBaseIdx = blockBaseIdx + threadIdx.x ;
133+
134+ #pragma unroll
135+ for (int i = 0 ; i < ItemsPerWorkItem; i++) {
136+ size_t idx = threadBaseIdx + i * WorkGroupSize;
137+ if (idx < size) {
138+ threadAcc = operation (threadAcc, static_cast <AccT>(ntload (&vector[idx])));
109139 }
110140 }
141+
142+ AccT blockAcc = blockReduce<AccT, OperationT>(threadAcc, shmem, operation);
143+
144+ if (threadIdx.x == 0 ) {
145+ (void )overrideResult; // to silence unused parameter warning for non-Add reductions
146+ atomicUpdate (result, blockAcc, operation);
147+ }
111148}
112149
113150template <typename AccT, typename VecT>
@@ -119,22 +156,36 @@ void Algorithms::reduceVector(AccT* result,
119156 void * streamPtr) {
120157 auto * stream = reinterpret_cast <internals::DeviceStreamT>(streamPtr);
121158
122- dim3 grid (1 , 1 , 1 );
123- dim3 block (1024 , 1 , 1 );
159+ size_t totalItems = WorkGroupSize * ItemsPerWorkItem;
160+ size_t numBlocks = (size + totalItems - 1 ) / totalItems;
161+
162+ if (overrideResult) {
163+ switch (type) {
164+ case ReductionType::Add:
165+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Sum<AccT>());
166+ break ;
167+ case ReductionType::Max:
168+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Max<AccT>());
169+ break ;
170+ case ReductionType::Min:
171+ initKernel<<<1 , 1 , 0 , stream>>>(result, device::Min<AccT>());
172+ break ;
173+ }
174+ }
124175
125176 switch (type) {
126177 case ReductionType::Add: {
127- kernel_reduce<<<grid, block , 0 , stream>>>(
178+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
128179 result, buffer, size, overrideResult, device::Sum<AccT>());
129180 break ;
130181 }
131182 case ReductionType::Max: {
132- kernel_reduce<<<grid, block , 0 , stream>>>(
183+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
133184 result, buffer, size, overrideResult, device::Max<AccT>());
134185 break ;
135186 }
136187 case ReductionType::Min: {
137- kernel_reduce<<<grid, block , 0 , stream>>>(
188+ kernel_reduce<<<numBlocks, WorkGroupSize , 0 , stream>>>(
138189 result, buffer, size, overrideResult, device::Min<AccT>());
139190 break ;
140191 }
0 commit comments