diff --git a/Project.toml b/Project.toml index 0bc18e4..cbe940c 100644 --- a/Project.toml +++ b/Project.toml @@ -14,9 +14,11 @@ Polyester = "f517fe37-dbe3-4b94-8317-1923a5111588" Unrolled = "9602ed7d-8fef-5bc8-8597-8f21381861e8" [weakdeps] +Metal = "dde4c033-4e86-420c-a63e-0dd931031962" oneAPI = "8f75cd03-7ff8-4ecb-9b8f-daf728133b1b" [extensions] +PlatformDependentMetalExt = "Metal" PlatformDependentoneAPIExt = "oneAPI" [compat] @@ -25,6 +27,7 @@ DocStringExtensions = "0.9" GPUArraysCore = "0.1, 0.2" KernelAbstractions = "0.9" Markdown = "1" +Metal = "1.4.2" OhMyThreads = "0.7" Polyester = "0.7" Unrolled = "0.1" diff --git a/README.md b/README.md index 8b506ee..b6e9263 100644 --- a/README.md +++ b/README.md @@ -139,8 +139,6 @@ Julia v1.11 [Metal](https://github.com/JuliaGPU/Metal.jl) -[Known Issue with `accumulate` Only](https://github.com/JuliaGPU/AcceleratedKernels.jl/issues/10) - diff --git a/ext/PlatformDependentMetalExt.jl b/ext/PlatformDependentMetalExt.jl new file mode 100644 index 0000000..263f3b3 --- /dev/null +++ b/ext/PlatformDependentMetalExt.jl @@ -0,0 +1,32 @@ +module PlatformDependentMetalExt + + +using Metal +import AcceleratedKernels as AK + + +# On Metal use the ScanPrefixes accumulation algorithm by default as the DecoupledLookback algorithm +# cannot be supported due to Metal's weaker memory consistency guarantees. +function AK.accumulate!( + op, v::AbstractArray, backend::MetalBackend; + init, + inclusive::Bool=true, + + # Algorithm choice + alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(), + + # GPU settings + block_size::Int=1024, + temp::Union{Nothing, AbstractArray}=nothing, + temp_flags::Union{Nothing, AbstractArray}=nothing, +) + AK._accumulate_impl!( + op, v, backend, + init=init, inclusive=inclusive, + alg=alg, + block_size=block_size, temp=temp, temp_flags=temp_flags, + ) +end + + +end # module PlatformDependentMetalExt \ No newline at end of file diff --git a/prototype/accumulate_benchmark.jl b/prototype/accumulate_benchmark.jl index 6a2caaf..b4115aa 100644 --- a/prototype/accumulate_benchmark.jl +++ b/prototype/accumulate_benchmark.jl @@ -7,7 +7,7 @@ Random.seed!(0) function akacc(v) - va = AK.accumulate(+, v, init=zero(eltype(v)), block_size=512) + va = AK.accumulate(+, v, init=zero(eltype(v)), block_size=1024) Metal.synchronize() va end diff --git a/prototype/accumulate_test_metal.jl b/prototype/accumulate_test_metal.jl new file mode 100644 index 0000000..d7fcd5b --- /dev/null +++ b/prototype/accumulate_test_metal.jl @@ -0,0 +1,22 @@ + +using Random +using BenchmarkTools +using Profile +using PProf + +using KernelAbstractions +using Metal + +import AcceleratedKernels as AK + + +Random.seed!(0) + + +v = Metal.ones(Int32, 100) + +v2 = AK.accumulate!(+, copy(v), init=zero(eltype(v)), block_size=1024) + +@assert Array(v2) == cumsum(Array(v)) + +v2 diff --git a/src/accumulate/accumulate.jl b/src/accumulate/accumulate.jl index c9b14f2..a668945 100644 --- a/src/accumulate/accumulate.jl +++ b/src/accumulate/accumulate.jl @@ -1,3 +1,10 @@ +# Available accumulation algorithms +abstract type AccumulateAlgorithm end +struct DecoupledLookback <: AccumulateAlgorithm end +struct ScanPrefixes <: AccumulateAlgorithm end + + +# Implementations, then interfaces include("accumulate_1d.jl") @@ -7,6 +14,10 @@ include("accumulate_1d.jl") init, inclusive::Bool=true, + # Algorithm choice + alg::AccumulateAlgorithm=DecoupledLookback(), + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, @@ -22,13 +33,20 @@ element is included in the accumulation (or not). The `block_size` should be a power of 2 and greater than 0. The temporaries `temp` and `temp_flags` should both have at least `(length(v) + 2 * block_size - 1) ÷ (2 * block_size)` elements; `eltype(v) === eltype(temp)`; the -elements in `temp_flags` can be any integers, but `Int8` is used by default to reduce memory usage. +elements in `temp_flags` can be any integers, but `Int8` is used by default to reduce memory usage. + +The `alg` can be one of the following: +- `DecoupledLookback()`: the default algorithm, using opportunistic lookback to reuse earlier + blocks' results; requires device-level memory consistency guarantees, which Apple Metal does not + provide. +- `ScanPrefixes()`: a simpler algorithm that scans the prefixes of each block, with no lookback; + `temp_flags` is not used in this case. # Platform-Specific Notes -Currently, Apple Metal GPUs do not have strong enough memory consistency guarantees to support the -industry-standard "decoupled lookback" algorithm for prefix sums - which means it currently may, -for very large arrays, produce incorrect results ~0.38% of the time. We are currently working on an -alternative algorithm without lookback ([issue](https://github.com/JuliaGPU/AcceleratedKernels.jl/issues/10)). +On Metal, the `alg=ScanPrefixes()` algorithm is used by default, as Apple Metal GPUs do not have +strong enough memory consistency guarantees for the `DecoupledLookback()` algorithm - which +produces incorrect results about 0.38% of the time. Also, `block_size=1024` is used here by +default to reduce the number of coupled lookbacks. The CPU implementation currently defers to the single-threaded Base.accumulate!; we are waiting on a multithreaded implementation in OhMyThreads.jl ([issue](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/129)). @@ -41,6 +59,9 @@ using oneAPI v = oneAPI.ones(Int32, 100_000) AK.accumulate!(+, v, init=0) + +# Use a different algorithm +AK.accumulate!(+, v, alg=AK.ScanPrefixes()) ``` """ function accumulate!( @@ -48,6 +69,10 @@ function accumulate!( init, inclusive::Bool=true, + # Algorithm choice + alg::AccumulateAlgorithm=DecoupledLookback(), + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, @@ -55,6 +80,7 @@ function accumulate!( _accumulate_impl!( op, v, backend, init=init, inclusive=inclusive, + alg=alg, block_size=block_size, temp=temp, temp_flags=temp_flags, ) end @@ -65,13 +91,16 @@ function _accumulate_impl!( init, inclusive::Bool=true, + alg::AccumulateAlgorithm=DecoupledLookback(), + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, ) if backend isa GPU accumulate_1d!( - op, v, backend, + op, v, backend, alg, init=init, inclusive=inclusive, block_size=block_size, temp=temp, temp_flags=temp_flags, ) diff --git a/src/accumulate/accumulate_1d.jl b/src/accumulate/accumulate_1d.jl index 4e7913c..7ff8b96 100644 --- a/src/accumulate/accumulate_1d.jl +++ b/src/accumulate/accumulate_1d.jl @@ -126,8 +126,16 @@ end # Write this block's final prefix to global array and set flag to "block prefix computed" if bi == 0x2 * block_size - 0x1 - prefixes[iblock + 0x1] = temp[bi + bank_offset_b + 0x1] - flags[iblock + 0x1] = ACC_FLAG_P + + # Known at compile-time; used in the first pass of the ScanPrefixes algorithm + if !isnothing(prefixes) + prefixes[iblock + 0x1] = temp[bi + bank_offset_b + 0x1] + end + + # Known at compile-time; used only in the DecoupledLookback algorithm + if !isnothing(flags) + flags[iblock + 0x1] = ACC_FLAG_P + end end if block_offset + ai < len @@ -192,8 +200,52 @@ end end +@kernel cpu=false inbounds=true function _accumulate_previous_coupled_preblocks!(op, v, prefixes) + + # No decoupled lookback + len = length(v) + block_size = @groupsize()[1] + + # NOTE: for many index calculations in this library, computation using zero-indexing leads to + # fewer operations (also code is transpiled to CUDA / ROCm / oneAPI / Metal code which do zero + # indexing). Internal calculations will be done using zero indexing except when actually + # accessing memory. As with C, the lower bound is inclusive, the upper bound exclusive. + + # Group (block) and local (thread) indices + iblock = @index(Group, Linear) - 0x1 + 0x1 # Skipping first block + ithread = @index(Local, Linear) - 0x1 + block_offset = iblock * block_size * 0x2 # Processing two elements per thread + + # Each block looks back to find running prefix sum + running_prefix = prefixes[iblock - 0x1 + 0x1] + + # The prefixes were pre-accumulated, which means (for block_size=N): + # - If there were N or fewer prefixes (so fewer than N*N elements in v to begin with), the + # prefixes were fully accumulated and we can use them directly. + # - If there were more than N prefixes, each chunk of N prefixes was accumulated, but not + # along the chunks. We need to accumulate the prefixes of the previous chunks into + # running_prefix. + num_preblocks = (iblock - 0x1) ÷ (block_size * 0x2) + for i in 0x1:num_preblocks + running_prefix = op(running_prefix, prefixes[i * block_size * 0x2]) + end + + # Now we have aggregate prefix of all previous blocks, add it to all our elements + ai = ithread + if block_offset + ai < len + v[block_offset + ai + 0x1] = op(running_prefix, v[block_offset + ai + 0x1]) + end + + bi = ithread + block_size + if block_offset + bi < len + v[block_offset + bi + 0x1] = op(running_prefix, v[block_offset + bi + 0x1]) + end +end + + +# DecoupledLookback algorithm function accumulate_1d!( - op, v::AbstractArray, backend::GPU; + op, v::AbstractArray, backend::GPU, ::DecoupledLookback; init, inclusive::Bool=true, @@ -242,3 +294,56 @@ function accumulate_1d!( return v end + + +# ScanPrefixes algorithm +function accumulate_1d!( + op, v::AbstractArray, backend::GPU, ::ScanPrefixes; + init, + inclusive::Bool=true, + + block_size::Int=256, + temp::Union{Nothing, AbstractArray}=nothing, + temp_flags::Union{Nothing, AbstractArray}=nothing, +) + # Correctness checks + @argcheck block_size > 0 + @argcheck ispow2(block_size) + + # Nothing to accumulate + if length(v) == 0 + return v + end + + # Each thread will process two elements + elems_per_block = block_size * 2 + num_blocks = (length(v) + elems_per_block - 1) ÷ elems_per_block + + if isnothing(temp) + prefixes = similar(v, eltype(v), num_blocks) + else + @argcheck eltype(temp) === eltype(v) + @argcheck length(temp) >= num_blocks + prefixes = temp + end + + kernel1! = _accumulate_block!(backend, block_size) + kernel1!(op, v, init, inclusive, nothing, prefixes, + ndrange=num_blocks * block_size) + + if num_blocks > 1 + + # Accumulate prefixes of all blocks + num_blocks_prefixes = (length(prefixes) + elems_per_block - 1) ÷ elems_per_block + kernel1!(op, prefixes, init, true, nothing, nothing, + ndrange=num_blocks_prefixes * block_size) + + # Prefixes are pre-accumulated (completely accumulated if num_blocks_prefixes == 1, or + # partially, which we will account for in the coupled lookback) + kernel2! = _accumulate_previous_coupled_preblocks!(backend, block_size) + kernel2!(op, v, prefixes, + ndrange=(num_blocks - 1) * block_size) + end + + return v +end diff --git a/src/reduce/mapreduce_nd.jl b/src/reduce/mapreduce_nd.jl index 59bbf8f..614c954 100644 --- a/src/reduce/mapreduce_nd.jl +++ b/src/reduce/mapreduce_nd.jl @@ -23,8 +23,6 @@ iblock = @index(Group, Linear) - 0x1 ithread = @index(Local, Linear) - 0x1 - tid = ithread + iblock * N - # Each thread handles one output element tid = ithread + iblock * N if tid < output_size diff --git a/src/reduce/reduce_nd.jl b/src/reduce/reduce_nd.jl index 4d566ab..0be9247 100644 --- a/src/reduce/reduce_nd.jl +++ b/src/reduce/reduce_nd.jl @@ -23,8 +23,6 @@ iblock = @index(Group, Linear) - 0x1 ithread = @index(Local, Linear) - 0x1 - tid = ithread + iblock * N - # Each thread handles one output element tid = ithread + iblock * N if tid < output_size diff --git a/test/runtests.jl b/test/runtests.jl index e32e900..18295bc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,34 +11,34 @@ import Pkg if "--CUDA" in ARGS Pkg.add("CUDA") using CUDA - display(CUDA.versioninfo()) + CUDA.versioninfo() const backend = CUDABackend() elseif "--oneAPI" in ARGS Pkg.add("oneAPI") using oneAPI - display(oneAPI.versioninfo()) + oneAPI.versioninfo() const backend = oneAPIBackend() elseif "--AMDGPU" in ARGS Pkg.add("AMDGPU") using AMDGPU - display(AMDGPU.versioninfo()) + AMDGPU.versioninfo() const backend = ROCBackend() elseif "--Metal" in ARGS Pkg.add("Metal") using Metal - display(Metal.versioninfo()) + Metal.versioninfo() const backend = MetalBackend() elseif "--OpenCL" in ARGS Pkg.add(name="OpenCL", rev="master") Pkg.add("pocl_jll") using pocl_jll using OpenCL - display(OpenCL.versioninfo()) + OpenCL.versioninfo() const backend = OpenCLBackend() elseif !@isdefined(backend) # Otherwise do CPU tests using InteractiveUtils - display(InteractiveUtils.versioninfo()) + InteractiveUtils.versioninfo() const backend = CPU() end @@ -1059,6 +1059,15 @@ end @test all(Array(y) .== accumulate(+, Array(x))) end + # Stress-testing small block sizes -> many blocks + for _ in 1:100 + num_elems = rand(1:100_000) + x = array_from_host(rand(1:1000, num_elems), Int32) + y = copy(x) + AK.accumulate!(+, y; init=0, block_size=16) + @test all(Array(y) .== accumulate(+, Array(x))) + end + # Testing different settings AK.accumulate!(+, array_from_host(ones(Int32, 1000)), init=0, inclusive=false, block_size=128,