Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ Polyester = "f517fe37-dbe3-4b94-8317-1923a5111588"
Unrolled = "9602ed7d-8fef-5bc8-8597-8f21381861e8"

[weakdeps]
Metal = "dde4c033-4e86-420c-a63e-0dd931031962"
oneAPI = "8f75cd03-7ff8-4ecb-9b8f-daf728133b1b"

[extensions]
PlatformDependentMetalExt = "Metal"
PlatformDependentoneAPIExt = "oneAPI"

[compat]
Expand All @@ -25,6 +27,7 @@ DocStringExtensions = "0.9"
GPUArraysCore = "0.1, 0.2"
KernelAbstractions = "0.9"
Markdown = "1"
Metal = "1.4.2"
OhMyThreads = "0.7"
Polyester = "0.7"
Unrolled = "0.1"
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ Julia v1.11

[Metal](https://github.com/JuliaGPU/Metal.jl)

[Known Issue with `accumulate` Only](https://github.com/JuliaGPU/AcceleratedKernels.jl/issues/10)

</td>
<td>

Expand Down
32 changes: 32 additions & 0 deletions ext/PlatformDependentMetalExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module PlatformDependentMetalExt


using Metal
import AcceleratedKernels as AK


# On Metal use the ScanPrefixes accumulation algorithm by default as the DecoupledLookback algorithm
# cannot be supported due to Metal's weaker memory consistency guarantees.
function AK.accumulate!(
op, v::AbstractArray, backend::MetalBackend;
init,
inclusive::Bool=true,

# Algorithm choice
alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(),

# GPU settings
block_size::Int=1024,
temp::Union{Nothing, AbstractArray}=nothing,
temp_flags::Union{Nothing, AbstractArray}=nothing,
)
AK._accumulate_impl!(
op, v, backend,
init=init, inclusive=inclusive,
alg=alg,
block_size=block_size, temp=temp, temp_flags=temp_flags,
)
end


end # module PlatformDependentMetalExt
2 changes: 1 addition & 1 deletion prototype/accumulate_benchmark.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Random.seed!(0)


function akacc(v)
va = AK.accumulate(+, v, init=zero(eltype(v)), block_size=512)
va = AK.accumulate(+, v, init=zero(eltype(v)), block_size=1024)
Metal.synchronize()
va
end
Expand Down
22 changes: 22 additions & 0 deletions prototype/accumulate_test_metal.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

using Random
using BenchmarkTools
using Profile
using PProf

using KernelAbstractions
using Metal

import AcceleratedKernels as AK


Random.seed!(0)


v = Metal.ones(Int32, 100)

v2 = AK.accumulate!(+, copy(v), init=zero(eltype(v)), block_size=1024)

@assert Array(v2) == cumsum(Array(v))

v2
41 changes: 35 additions & 6 deletions src/accumulate/accumulate.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Available accumulation algorithms
abstract type AccumulateAlgorithm end
struct DecoupledLookback <: AccumulateAlgorithm end
struct ScanPrefixes <: AccumulateAlgorithm end


# Implementations, then interfaces
include("accumulate_1d.jl")


Expand All @@ -7,6 +14,10 @@ include("accumulate_1d.jl")
init,
inclusive::Bool=true,
# Algorithm choice
alg::AccumulateAlgorithm=DecoupledLookback(),
# GPU settings
block_size::Int=256,
temp::Union{Nothing, AbstractArray}=nothing,
temp_flags::Union{Nothing, AbstractArray}=nothing,
Expand All @@ -22,13 +33,20 @@ element is included in the accumulation (or not).
The `block_size` should be a power of 2 and greater than 0. The temporaries `temp` and `temp_flags`
should both have at least
`(length(v) + 2 * block_size - 1) ÷ (2 * block_size)` elements; `eltype(v) === eltype(temp)`; the
elements in `temp_flags` can be any integers, but `Int8` is used by default to reduce memory usage.
elements in `temp_flags` can be any integers, but `Int8` is used by default to reduce memory usage.
The `alg` can be one of the following:
- `DecoupledLookback()`: the default algorithm, using opportunistic lookback to reuse earlier
blocks' results; requires device-level memory consistency guarantees, which Apple Metal does not
provide.
- `ScanPrefixes()`: a simpler algorithm that scans the prefixes of each block, with no lookback;
`temp_flags` is not used in this case.
# Platform-Specific Notes
Currently, Apple Metal GPUs do not have strong enough memory consistency guarantees to support the
industry-standard "decoupled lookback" algorithm for prefix sums - which means it currently may,
for very large arrays, produce incorrect results ~0.38% of the time. We are currently working on an
alternative algorithm without lookback ([issue](https://github.com/JuliaGPU/AcceleratedKernels.jl/issues/10)).
On Metal, the `alg=ScanPrefixes()` algorithm is used by default, as Apple Metal GPUs do not have
strong enough memory consistency guarantees for the `DecoupledLookback()` algorithm - which
produces incorrect results about 0.38% of the time. Also, `block_size=1024` is used here by
default to reduce the number of coupled lookbacks.
The CPU implementation currently defers to the single-threaded Base.accumulate!; we are waiting on a
multithreaded implementation in OhMyThreads.jl ([issue](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/129)).
Expand All @@ -41,20 +59,28 @@ using oneAPI
v = oneAPI.ones(Int32, 100_000)
AK.accumulate!(+, v, init=0)
# Use a different algorithm
AK.accumulate!(+, v, alg=AK.ScanPrefixes())
```
"""
function accumulate!(
op, v::AbstractArray, backend::Backend=get_backend(v);
init,
inclusive::Bool=true,

# Algorithm choice
alg::AccumulateAlgorithm=DecoupledLookback(),

# GPU settings
block_size::Int=256,
temp::Union{Nothing, AbstractArray}=nothing,
temp_flags::Union{Nothing, AbstractArray}=nothing,
)
_accumulate_impl!(
op, v, backend,
init=init, inclusive=inclusive,
alg=alg,
block_size=block_size, temp=temp, temp_flags=temp_flags,
)
end
Expand All @@ -65,13 +91,16 @@ function _accumulate_impl!(
init,
inclusive::Bool=true,

alg::AccumulateAlgorithm=DecoupledLookback(),

# GPU settings
block_size::Int=256,
temp::Union{Nothing, AbstractArray}=nothing,
temp_flags::Union{Nothing, AbstractArray}=nothing,
)
if backend isa GPU
accumulate_1d!(
op, v, backend,
op, v, backend, alg,
init=init, inclusive=inclusive,
block_size=block_size, temp=temp, temp_flags=temp_flags,
)
Expand Down
111 changes: 108 additions & 3 deletions src/accumulate/accumulate_1d.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,16 @@ end

# Write this block's final prefix to global array and set flag to "block prefix computed"
if bi == 0x2 * block_size - 0x1
prefixes[iblock + 0x1] = temp[bi + bank_offset_b + 0x1]
flags[iblock + 0x1] = ACC_FLAG_P

# Known at compile-time; used in the first pass of the ScanPrefixes algorithm
if !isnothing(prefixes)
prefixes[iblock + 0x1] = temp[bi + bank_offset_b + 0x1]
end

# Known at compile-time; used only in the DecoupledLookback algorithm
if !isnothing(flags)
flags[iblock + 0x1] = ACC_FLAG_P
end
end

if block_offset + ai < len
Expand Down Expand Up @@ -192,8 +200,52 @@ end
end


@kernel cpu=false inbounds=true function _accumulate_previous_coupled_preblocks!(op, v, prefixes)

# No decoupled lookback
len = length(v)
block_size = @groupsize()[1]

# NOTE: for many index calculations in this library, computation using zero-indexing leads to
# fewer operations (also code is transpiled to CUDA / ROCm / oneAPI / Metal code which do zero
# indexing). Internal calculations will be done using zero indexing except when actually
# accessing memory. As with C, the lower bound is inclusive, the upper bound exclusive.

# Group (block) and local (thread) indices
iblock = @index(Group, Linear) - 0x1 + 0x1 # Skipping first block
ithread = @index(Local, Linear) - 0x1
block_offset = iblock * block_size * 0x2 # Processing two elements per thread

# Each block looks back to find running prefix sum
running_prefix = prefixes[iblock - 0x1 + 0x1]

# The prefixes were pre-accumulated, which means (for block_size=N):
# - If there were N or fewer prefixes (so fewer than N*N elements in v to begin with), the
# prefixes were fully accumulated and we can use them directly.
# - If there were more than N prefixes, each chunk of N prefixes was accumulated, but not
# along the chunks. We need to accumulate the prefixes of the previous chunks into
# running_prefix.
num_preblocks = (iblock - 0x1) ÷ (block_size * 0x2)
for i in 0x1:num_preblocks
running_prefix = op(running_prefix, prefixes[i * block_size * 0x2])
end

# Now we have aggregate prefix of all previous blocks, add it to all our elements
ai = ithread
if block_offset + ai < len
v[block_offset + ai + 0x1] = op(running_prefix, v[block_offset + ai + 0x1])
end

bi = ithread + block_size
if block_offset + bi < len
v[block_offset + bi + 0x1] = op(running_prefix, v[block_offset + bi + 0x1])
end
end


# DecoupledLookback algorithm
function accumulate_1d!(
op, v::AbstractArray, backend::GPU;
op, v::AbstractArray, backend::GPU, ::DecoupledLookback;
init,
inclusive::Bool=true,

Expand Down Expand Up @@ -242,3 +294,56 @@ function accumulate_1d!(

return v
end


# ScanPrefixes algorithm
function accumulate_1d!(
op, v::AbstractArray, backend::GPU, ::ScanPrefixes;
init,
inclusive::Bool=true,

block_size::Int=256,
temp::Union{Nothing, AbstractArray}=nothing,
temp_flags::Union{Nothing, AbstractArray}=nothing,
)
# Correctness checks
@argcheck block_size > 0
@argcheck ispow2(block_size)

# Nothing to accumulate
if length(v) == 0
return v
end

# Each thread will process two elements
elems_per_block = block_size * 2
num_blocks = (length(v) + elems_per_block - 1) ÷ elems_per_block

if isnothing(temp)
prefixes = similar(v, eltype(v), num_blocks)
else
@argcheck eltype(temp) === eltype(v)
@argcheck length(temp) >= num_blocks
prefixes = temp
end

kernel1! = _accumulate_block!(backend, block_size)
kernel1!(op, v, init, inclusive, nothing, prefixes,
ndrange=num_blocks * block_size)

if num_blocks > 1

# Accumulate prefixes of all blocks
num_blocks_prefixes = (length(prefixes) + elems_per_block - 1) ÷ elems_per_block
kernel1!(op, prefixes, init, true, nothing, nothing,
ndrange=num_blocks_prefixes * block_size)

# Prefixes are pre-accumulated (completely accumulated if num_blocks_prefixes == 1, or
# partially, which we will account for in the coupled lookback)
kernel2! = _accumulate_previous_coupled_preblocks!(backend, block_size)
kernel2!(op, v, prefixes,
ndrange=(num_blocks - 1) * block_size)
end

return v
end
2 changes: 0 additions & 2 deletions src/reduce/mapreduce_nd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
iblock = @index(Group, Linear) - 0x1
ithread = @index(Local, Linear) - 0x1

tid = ithread + iblock * N

# Each thread handles one output element
tid = ithread + iblock * N
if tid < output_size
Expand Down
2 changes: 0 additions & 2 deletions src/reduce/reduce_nd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
iblock = @index(Group, Linear) - 0x1
ithread = @index(Local, Linear) - 0x1

tid = ithread + iblock * N

# Each thread handles one output element
tid = ithread + iblock * N
if tid < output_size
Expand Down
21 changes: 15 additions & 6 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,34 @@ import Pkg
if "--CUDA" in ARGS
Pkg.add("CUDA")
using CUDA
display(CUDA.versioninfo())
CUDA.versioninfo()
const backend = CUDABackend()
elseif "--oneAPI" in ARGS
Pkg.add("oneAPI")
using oneAPI
display(oneAPI.versioninfo())
oneAPI.versioninfo()
const backend = oneAPIBackend()
elseif "--AMDGPU" in ARGS
Pkg.add("AMDGPU")
using AMDGPU
display(AMDGPU.versioninfo())
AMDGPU.versioninfo()
const backend = ROCBackend()
elseif "--Metal" in ARGS
Pkg.add("Metal")
using Metal
display(Metal.versioninfo())
Metal.versioninfo()
const backend = MetalBackend()
elseif "--OpenCL" in ARGS
Pkg.add(name="OpenCL", rev="master")
Pkg.add("pocl_jll")
using pocl_jll
using OpenCL
display(OpenCL.versioninfo())
OpenCL.versioninfo()
const backend = OpenCLBackend()
elseif !@isdefined(backend)
# Otherwise do CPU tests
using InteractiveUtils
display(InteractiveUtils.versioninfo())
InteractiveUtils.versioninfo()
const backend = CPU()
end

Expand Down Expand Up @@ -1059,6 +1059,15 @@ end
@test all(Array(y) .== accumulate(+, Array(x)))
end

# Stress-testing small block sizes -> many blocks
for _ in 1:100
num_elems = rand(1:100_000)
x = array_from_host(rand(1:1000, num_elems), Int32)
y = copy(x)
AK.accumulate!(+, y; init=0, block_size=16)
@test all(Array(y) .== accumulate(+, Array(x)))
end

# Testing different settings
AK.accumulate!(+, array_from_host(ones(Int32, 1000)), init=0, inclusive=false,
block_size=128,
Expand Down
Loading