diff --git a/.gitignore b/.gitignore index 2e31d35..49a32b0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ Manifest.toml # Local environment files .vscode/settings.json + +# Profile files +profile.pb.gz diff --git a/Project.toml b/Project.toml index c38978b..8f4ccb9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,15 +1,13 @@ name = "AcceleratedKernels" uuid = "6a4ca0a5-0e36-4168-a932-d9be78d558f1" authors = ["Andrei-Leonard Nicusan and contributors"] -version = "0.3.4" +version = "0.4.0" [deps] ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" GPUArraysCore = "46192b85-c4d5-4398-a991-12ede77f4527" KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c" Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a" -OhMyThreads = "67456a42-1dca-4109-a031-0a68de7e3ad5" -Polyester = "f517fe37-dbe3-4b94-8317-1923a5111588" [weakdeps] Metal = "dde4c033-4e86-420c-a63e-0dd931031962" @@ -25,7 +23,5 @@ GPUArraysCore = "0.2.0" KernelAbstractions = "0.9.34" Markdown = "1" Metal = "1" -OhMyThreads = "0.7, 0.8" -Polyester = "0.7" -julia = "1.10" oneAPI = "1, 2" +julia = "1.10" diff --git a/README.md b/README.md index 9431da6..9a07bc9 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,44 @@ Julia v1.11 ## 1. What's Different? As far as I am aware, this is the first cross-architecture parallel standard library *from a unified codebase* - that is, the code is written as [KernelAbstractions.jl](https://github.com/JuliaGPU/KernelAbstractions.jl) backend-agnostic kernels, which are then **transpiled** to a GPU backend; that means we benefit from all the optimisations available on the native platform and official compiler stacks. For example, unlike open standards like OpenCL that require GPU vendors to implement that API for their hardware, we target the existing official compilers. And while performance-portability libraries like [Kokkos](https://github.com/kokkos/kokkos) and [RAJA](https://github.com/LLNL/RAJA) are powerful for large C++ codebases, they require US National Lab-level development and maintenance efforts to effectively forward calls from a single API to other OpenMP, CUDA Thrust, ROCm rocThrust, oneAPI DPC++ libraries developed separately. +As a simple example, this is how a normal Julia `for`-loop can be converted to an accelerated kernel - for both multithreaded CPUs and Nvidia / AMD / Intel / Apple GPUs, **with native performance** - by changing a single line: + + + + + + + + + + +
CPU Code Multithreaded / GPU code
+ +```julia +# Copy kernel testing throughput + +function cpu_copy!(dst, src) + for i in eachindex(src) + dst[i] = src[i] + end +end +``` + + + +```julia +import AcceleratedKernels as AK + +function ak_copy!(dst, src) + AK.foreachindex(src) do i + dst[i] = src[i] + end +end +``` + +
+ + Again, this is only possible because of the unique Julia compilation model, the [JuliaGPU](https://juliagpu.org/) organisation work for reusable GPU backend infrastructure, and especially the [KernelAbstractions.jl](https://github.com/JuliaGPU/KernelAbstractions.jl) backend-agnostic kernel language. Thank you. @@ -299,6 +337,11 @@ Leave out to test the CPU backend: $> julia -e 'import Pkg; Pkg.test("AcceleratedKernels.jl")' ``` +Start Julia with multiple threads to run the tests on a multithreaded CPU backend: +```bash +$> julia --threads=4 -e 'import Pkg; Pkg.test("AcceleratedKernels.jl")' +``` + ## 8. Issues and Debugging As the compilation pipeline of GPU kernels is different to that of base Julia, error messages also look different - for example, where Julia would insert an exception when a variable name was not defined (e.g. we had a typo), a GPU kernel throwing exceptions cannot be compiled and instead you'll see some cascading errors like `"[...] compiling [...] resulted in invalid LLVM IR"` caused by `"Reason: unsupported use of an undefined name"` resulting in `"Reason: unsupported dynamic function invocation"`, etc. @@ -322,7 +365,6 @@ Help is very welcome for any of the below: switch_below=(1, 10, 100, 1000, 10000) end ``` -- We need multithreaded implementations of `sort`, N-dimensional `mapreduce` (in `OhMyThreads.tmapreduce`) and `accumulate` (again, probably in `OhMyThreads`). - Any way to expose the warp-size from the backends? Would be useful in reductions. - Add a performance regressions runner. - **Other ideas?** Post an issue, or open a discussion on the Julia Discourse. diff --git a/docs/src/api/using_backends.md b/docs/src/api/using_backends.md index cdbfeea..fad7a62 100644 --- a/docs/src/api/using_backends.md +++ b/docs/src/api/using_backends.md @@ -30,6 +30,4 @@ v = Vector(-1000:1000) # Normal CPU array AK.reduce(+, v, max_tasks=Threads.nthreads()) ``` -Note the `reduce` and `mapreduce` CPU implementations forward arguments to [OhMyThreads.jl](https://github.com/JuliaFolds2/OhMyThreads.jl), an excellent package for multithreading. The focus of AcceleratedKernels.jl is to provide a unified interface to high-performance implementations of common algorithmic kernels, for both CPUs and GPUs - if you need fine-grained control over threads, scheduling, communication for specialised algorithms (e.g. with highly unequal workloads), consider using [OhMyThreads.jl](https://github.com/JuliaFolds2/OhMyThreads.jl) or [KernelAbstractions.jl](https://github.com/JuliaGPU/KernelAbstractions.jl) directly. - -There is ongoing work on multithreaded CPU `sort` and `accumulate` implementations - at the moment, they fall back to single-threaded algorithms; the rest of the library is fully parallelised for both CPUs and GPUs. +By default all algorithms use the number of threads Julia was started with. diff --git a/ext/AcceleratedKernelsMetalExt.jl b/ext/AcceleratedKernelsMetalExt.jl index 245806b..2f10894 100644 --- a/ext/AcceleratedKernelsMetalExt.jl +++ b/ext/AcceleratedKernelsMetalExt.jl @@ -14,6 +14,10 @@ function AK.accumulate!( dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + # Algorithm choice alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(), @@ -39,6 +43,10 @@ function AK.accumulate!( dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + # Algorithm choice alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(), @@ -63,6 +71,10 @@ function AK.cumsum( neutral=zero(eltype(src)), dims::Union{Nothing, Int}=nothing, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + # Algorithm choice alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(), @@ -93,6 +105,10 @@ function AK.cumprod( neutral=one(eltype(src)), dims::Union{Nothing, Int}=nothing, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + # Algorithm choice alg::AK.AccumulateAlgorithm=AK.ScanPrefixes(), diff --git a/prototype/parallel_accumulate/Project.toml b/prototype/parallel_accumulate/Project.toml new file mode 100644 index 0000000..88b0c9f --- /dev/null +++ b/prototype/parallel_accumulate/Project.toml @@ -0,0 +1,3 @@ +[deps] +AcceleratedKernels = "6a4ca0a5-0e36-4168-a932-d9be78d558f1" +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" diff --git a/prototype/parallel_accumulate/benchmark.jl b/prototype/parallel_accumulate/benchmark.jl new file mode 100644 index 0000000..94440fd --- /dev/null +++ b/prototype/parallel_accumulate/benchmark.jl @@ -0,0 +1,30 @@ + +import AcceleratedKernels as AK +using BenchmarkTools + + + +v = rand(1_000_000) +init = eltype(v)(0) + +r1 = Base.accumulate(+, v; init=init) +r2 = AK.accumulate(+, v; init=init) + +@assert r1 == r2 + + +v = rand(1_000_000) +init = eltype(v)(0) + +println("1D Benchmark - Base vs. AK") +display(@benchmark Base.accumulate(+, v; init=init)) +display(@benchmark AK.accumulate(+, v; init=init)) + + +v = rand(100, 100, 100) +init = eltype(v)(0) + +println("3D Benchmark - Base vs. AK") +display(@benchmark Base.accumulate(+, v; init=init, dims=2)) +display(@benchmark AK.accumulate(+, v; init=init, dims=2)) + diff --git a/prototype/parallel_mapreduce/Project.toml b/prototype/parallel_mapreduce/Project.toml new file mode 100644 index 0000000..3957638 --- /dev/null +++ b/prototype/parallel_mapreduce/Project.toml @@ -0,0 +1,3 @@ +[deps] +AcceleratedKernels = "6a4ca0a5-0e36-4168-a932-d9be78d558f1" +OhMyThreads = "67456a42-1dca-4109-a031-0a68de7e3ad5" diff --git a/prototype/parallel_mapreduce/benchmark.jl b/prototype/parallel_mapreduce/benchmark.jl new file mode 100644 index 0000000..657801c --- /dev/null +++ b/prototype/parallel_mapreduce/benchmark.jl @@ -0,0 +1,24 @@ + +import AcceleratedKernels as AK +using BenchmarkTools + + +v = rand(1_000_000) +f(x) = x^2 +op(x, y) = x + y +init = eltype(v)(0) + +println("1D Benchmark - Base vs. AK") +display(@benchmark Base.mapreduce(f, op, v; init=init)) +display(@benchmark AK.mapreduce(f, op, v; init=init, neutral=init)) + + +v = rand(100, 100, 100) +f(x) = x^2 +op(x, y) = x + y +init = eltype(v)(0) + +println("3D Benchmark - Base vs. AK") +display(@benchmark Base.mapreduce(f, op, v; init=init, dims=2)) +display(@benchmark AK.mapreduce(f, op, v; init=init, neutral=init, dims=2)) + diff --git a/prototype/parallel_mapreduce/mapreduce_vs_omt.jl b/prototype/parallel_mapreduce/mapreduce_vs_omt.jl new file mode 100644 index 0000000..815ac5f --- /dev/null +++ b/prototype/parallel_mapreduce/mapreduce_vs_omt.jl @@ -0,0 +1,38 @@ + +import AcceleratedKernels as AK +import OhMyThreads as OMT +using BenchmarkTools + + +# Turns out we have the same performance as tmapreduce with just AK base threading +function mapreduce_omt(f, op, v; init) + # MapReduce using OhMyThreads + return OMT.tmapreduce(f, op, v; init=init) +end + + +function mapreduce_ak(f, op, v; init, max_tasks=Threads.nthreads()) + # MapReduce using AcceleratedKernels + if max_tasks == 1 + return Base.mapreduce(f, op, v; init=init) + end + + shared = Vector{typeof(init)}(undef, max_tasks) + AK.itask_partition(length(v), max_tasks) do itask, irange + @inbounds begin + shared[itask] = Base.mapreduce(f, op, @view(v[irange]); init=init) + end + end + return Base.reduce(op, shared; init=init) +end + + +v = rand(1_000_000) +f(x) = x^2 +op(x, y) = x + y +init = eltype(v)(0) + +@assert mapreduce_omt(f, op, v; init=init) == mapreduce_ak(f, op, v; init=init) + +display(@benchmark mapreduce_omt(f, op, v; init=init)) +display(@benchmark mapreduce_ak(f, op, v; init=init)) diff --git a/prototype/parallel_sample_sort/ak_test.jl b/prototype/parallel_sample_sort/ak_test.jl index 8c98223..ceb6b6b 100644 --- a/prototype/parallel_sample_sort/ak_test.jl +++ b/prototype/parallel_sample_sort/ak_test.jl @@ -10,3 +10,13 @@ AK.sort!(v) v = rand(1:100, 1_000_000) ix = AK.sortperm(v) @assert issorted(v[ix]) + + +for _ in 1:1000 + num_elems = rand(1:100_000) + v = array_from_host(rand(Int32, num_elems)) + AK.sample_sort!(v) + vh = Array(v) + @assert issorted(vh) +end + diff --git a/prototype/parallel_sample_sort/benchmark.jl b/prototype/parallel_sample_sort/benchmark.jl new file mode 100644 index 0000000..b09bbc7 --- /dev/null +++ b/prototype/parallel_sample_sort/benchmark.jl @@ -0,0 +1,45 @@ + +import AcceleratedKernels as AK +using BenchmarkTools + +using Profile +using PProf + +using Random +Random.seed!(0) + + +# Compile +v = rand(1_000_000) +AK.sort!(v) + + +# Collect a profile +Profile.clear() +# v = rand(1_000_000) +# @profile AK.sort!(v) + +v = rand(UInt32(0):UInt32(1_000_000), 1_000_000) +ix = Vector{Int}(undef, 1_000_000) +@profile AK.sortperm!(ix, v) +pprof() + + +println("\nBase vs AK sort (Int):") +display(@benchmark Base.sort!(v) setup=(v = rand(1:1_000_000, 1_000_000))) +display(@benchmark AK.sort!(v) setup=(v = rand(1:1_000_000, 1_000_000))) + + +println("\nBase vs AK sort (Float64):") +display(@benchmark Base.sort!(v) setup=(v = rand(Float64, 1_000_000))) +display(@benchmark AK.sort!(v) setup=(v = rand(Float64, 1_000_000))) + + +println("\nBase vs AK sortperm (UInt32):") +display(@benchmark Base.sortperm!(ix, v) setup=(v = rand(UInt32(0):UInt32(1_000_000), 1_000_000); ix = Vector{Int}(undef, 1_000_000))) +display(@benchmark AK.sortperm!(ix, v) setup=(v = rand(UInt32(0):UInt32(1_000_000), 1_000_000); ix = Vector{Int}(undef, 1_000_000))) + + +println("\nBase vs AK sortperm (Float64):") +display(@benchmark Base.sortperm!(ix, v) setup=(v = rand(Float64, 1_000_000); ix = Vector{Int}(undef, 1_000_000))) +display(@benchmark AK.sortperm!(ix, v) setup=(v = rand(Float64, 1_000_000); ix = Vector{Int}(undef, 1_000_000))) diff --git a/src/AcceleratedKernels.jl b/src/AcceleratedKernels.jl index 69928de..f97c9d8 100644 --- a/src/AcceleratedKernels.jl +++ b/src/AcceleratedKernels.jl @@ -12,10 +12,8 @@ module AcceleratedKernels # Internal dependencies using ArgCheck: @argcheck -using GPUArraysCore: AbstractGPUVector, AbstractGPUArray, @allowscalar +using GPUArraysCore: AbstractGPUArray, @allowscalar using KernelAbstractions -using Polyester: @batch -import OhMyThreads as OMT # Exposed functions from upstream packages diff --git a/src/accumulate/accumulate.jl b/src/accumulate/accumulate.jl index 6160785..1738a37 100644 --- a/src/accumulate/accumulate.jl +++ b/src/accumulate/accumulate.jl @@ -22,9 +22,9 @@ end # Implementations, then interfaces -include("accumulate_1d.jl") +include("accumulate_1d_cpu.jl") +include("accumulate_1d_gpu.jl") include("accumulate_nd.jl") -include("accumulate_cpu.jl") """ @@ -35,6 +35,10 @@ include("accumulate_cpu.jl") dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -51,6 +55,10 @@ include("accumulate_cpu.jl") dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -72,8 +80,7 @@ we do not need the constraint of `dst` and `src` being different; to minimise me recommend using the single-array interface (the first one above). ## CPU -The CPU implementation is currently single-threaded; we are waiting on a multithreaded -implementation in OhMyThreads.jl ([issue](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/129)). +Use at most `max_tasks` threads with at least `min_elems` elements per task. ## GPU For the 1D case (`dims=nothing`), the `alg` can be one of the following: @@ -121,6 +128,10 @@ function accumulate!( dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -132,6 +143,7 @@ function accumulate!( _accumulate_impl!( op, v, backend, init=init, neutral=neutral, dims=dims, inclusive=inclusive, + max_tasks=max_tasks, min_elems=min_elems, alg=alg, block_size=block_size, temp=temp, temp_flags=temp_flags, ) @@ -145,6 +157,10 @@ function accumulate!( dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -157,6 +173,7 @@ function accumulate!( _accumulate_impl!( op, dst, backend, init=init, neutral=neutral, dims=dims, inclusive=inclusive, + max_tasks=max_tasks, min_elems=min_elems, alg=alg, block_size=block_size, temp=temp, temp_flags=temp_flags, ) @@ -172,37 +189,29 @@ function _accumulate_impl!( alg::AccumulateAlgorithm=DecoupledLookback(), + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, ) - if backend isa GPU - if isnothing(dims) - return accumulate_1d!( - op, v, backend, alg, - init=init, neutral=neutral, inclusive=inclusive, - block_size=block_size, temp=temp, temp_flags=temp_flags, - ) - else - return accumulate_nd!( - op, v, backend, - init=init, neutral=neutral, dims=dims, inclusive=inclusive, - block_size=block_size, - ) - end + if isnothing(dims) + return accumulate_1d!( + op, v, backend, alg, + init=init, neutral=neutral, inclusive=inclusive, + max_tasks=max_tasks, min_elems=min_elems, + block_size=block_size, temp=temp, temp_flags=temp_flags, + ) else - if isnothing(dims) - return accumulate_1d!( - op, v, - init=init, inclusive=inclusive, - ) - else - return accumulate_nd!( - op, v, - init=init, dims=dims, inclusive=inclusive, - ) - end + return accumulate_nd!( + op, v, backend, + init=init, neutral=neutral, dims=dims, inclusive=inclusive, + max_tasks=max_tasks, min_elems=min_elems, + block_size=block_size, + ) end end @@ -215,6 +224,10 @@ end dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -233,6 +246,10 @@ function accumulate( dims::Union{Nothing, Int}=nothing, inclusive::Bool=true, + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + # Algorithm choice alg::AccumulateAlgorithm=DecoupledLookback(), @@ -246,16 +263,10 @@ function accumulate( copyto!(vcopy, v) accumulate!( op, vcopy, backend; - init=init, - neutral=neutral, - dims=dims, - inclusive=inclusive, - + init=init, neutral=neutral, dims=dims, inclusive=inclusive, + max_tasks=max_tasks, min_elems=min_elems, alg=alg, - - block_size=block_size, - temp=temp, - temp_flags=temp_flags, + block_size=block_size, temp=temp, temp_flags=temp_flags, ) vcopy end diff --git a/src/accumulate/accumulate_1d_cpu.jl b/src/accumulate/accumulate_1d_cpu.jl new file mode 100644 index 0000000..2066697 --- /dev/null +++ b/src/accumulate/accumulate_1d_cpu.jl @@ -0,0 +1,89 @@ +function accumulate_1d!( + op, v::AbstractArray, backend::CPU, alg; + init, + neutral, + inclusive::Bool=true, + + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=2, + + # GPU settings - not used + block_size::Int=256, + temp::Union{Nothing, AbstractArray}=nothing, + temp_flags::Union{Nothing, AbstractArray}=nothing, +) + # Trivial case + if length(v) == 0 + return v + end + + # Sanity checks - for exclusive accumulation, each task section / chunk must have at least 2 + # elements to be correct (otherwise we have to include more complicated logic in the threaded + # code); it makes no sense to have each task accumulate only 1 element anyways + @argcheck min_elems >= 2 + + # First accumulate chunks independently + tp = TaskPartitioner(length(v), max_tasks, min_elems) + if tp.num_tasks == 1 + return _accumulate_1d_cpu_section!(op, v; init, inclusive) + end + + # Save each task's final accumulated value + shared = Vector{eltype(v)}(undef, tp.num_tasks) + itask_partition(tp) do itask, irange + @inbounds begin + if itask == 1 + _accumulate_1d_cpu_section!( + op, @view(v[irange]); + init=init, + inclusive=inclusive, + ) + else + # Later sections should always be inclusively accumulated + _accumulate_1d_cpu_section!( + op, @view(v[irange]); + init=neutral, + inclusive=true, + ) + end + shared[itask] = v[irange.stop] + end + end + + # Now accumulate the final values of each task; the number of tasks is small enough (even for + # 144-thread HPC nodes) that there is no need to do decoupled lookbacks + _accumulate_1d_cpu_section!(op, shared; init=neutral, inclusive=true) + + # Now add the final values of each task, except the first one + itask_partition(tp) do itask, irange + @inbounds begin + if itask != 1 + for i in irange + v[i] = op(v[i], shared[itask - 1]) + end + end + end + end + + return v +end + + +function _accumulate_1d_cpu_section!(op, v; init, inclusive) + @inbounds begin + if inclusive + running = init + for i in eachindex(v) + running = op(running, v[i]) + v[i] = running + end + else + running = init + for i in eachindex(v) + v[i], running = running, op(running, v[i]) + end + end + end + return v +end diff --git a/src/accumulate/accumulate_1d.jl b/src/accumulate/accumulate_1d_gpu.jl similarity index 98% rename from src/accumulate/accumulate_1d.jl rename to src/accumulate/accumulate_1d_gpu.jl index c54c244..b66f443 100644 --- a/src/accumulate/accumulate_1d.jl +++ b/src/accumulate/accumulate_1d_gpu.jl @@ -254,6 +254,11 @@ function accumulate_1d!( neutral, inclusive::Bool=true, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, @@ -308,6 +313,11 @@ function accumulate_1d!( neutral, inclusive::Bool=true, + # CPU settings - not used + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, temp_flags::Union{Nothing, AbstractArray}=nothing, diff --git a/src/accumulate/accumulate_cpu.jl b/src/accumulate/accumulate_cpu.jl deleted file mode 100644 index 71e13f5..0000000 --- a/src/accumulate/accumulate_cpu.jl +++ /dev/null @@ -1,89 +0,0 @@ -function accumulate_1d!( - op, v::AbstractArray; - init, - inclusive::Bool=true, -) - # Simple single-threaded CPU implementation - FIXME: implement taccumulate in OhMyThreads.jl - if length(v) == 0 - return v - end - - if inclusive - running = init - for i in firstindex(v):lastindex(v) - running = op(running, v[i]) - v[i] = running - end - else - running = init - for i in eachindex(v) - v[i], running = running, op(running, v[i]) - end - end - return v -end - - -function accumulate_nd!( - op, v::AbstractArray; - init, - dims::Int, - inclusive::Bool=true, -) - @argcheck firstindex(v) == 1 - - # Invalid dims - if dims < 1 - throw(ArgumentError("region dimension(s) must be ≥ 1, got $dims")) - end - - # Nothing to accumulate - vsizes = size(v) - if length(v) == 0 || dims > length(vsizes) - return v - end - for s in vsizes - s == 0 && return v - end - - vstrides = strides(v) - ndims = length(vsizes) - - length_dims = vsizes[dims] - length_outer = length(v) ÷ length_dims - - # TODO: is there any more cache-friendly way to do this? We should prefer going along the - # fastest-varying axis first, regardless of the dimension we are accumulating over - for i in 1:length_outer - - # Compute the base index in v for this iteration - input_base_idx = 0 - tmp = i - KernelAbstractions.Extras.@unroll for i in 1:ndims - if i != dims - input_base_idx += (tmp % vsizes[i]) * vstrides[i] - tmp = tmp ÷ vsizes[i] - end - end - - # Go over each element in the accumulated dimension; this implementation assumes that there - # are so many outer elements (each processed by an independent thread) that we afford to - # loop sequentially over the accumulated dimension (e.g. reduce(+, rand(3, 1000), dims=1)) - if inclusive - running = init - for i in 0:length_dims - 1 - v_idx = input_base_idx + i * vstrides[dims] - running = op(running, v[v_idx + 1]) - v[v_idx + 1] = running - end - else - running = init - for i in 0:length_dims - 1 - v_idx = input_base_idx + i * vstrides[dims] - v[v_idx + 1], running = running, op(running, v[v_idx + 1]) - end - end - end - - return v -end diff --git a/src/accumulate/accumulate_nd.jl b/src/accumulate/accumulate_nd.jl index 605e043..219f7b0 100644 --- a/src/accumulate/accumulate_nd.jl +++ b/src/accumulate/accumulate_nd.jl @@ -1,3 +1,125 @@ +function accumulate_nd!( + op, v::AbstractArray, backend::Backend; + init, + neutral=neutral_element(op, eltype(v)), + dims::Int, + inclusive::Bool=true, + + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings + block_size::Int=256, +) + # Correctness checks + @argcheck block_size > 0 + @argcheck ispow2(block_size) + + # Degenerate cases begin; order of priority matters + + # Invalid dims + if dims < 1 + throw(ArgumentError("region dimension(s) must be ≥ 1, got $dims")) + end + + # Nothing to accumulate + vsizes = size(v) + if length(v) == 0 || dims > length(vsizes) + return v + end + for s in vsizes + s == 0 && return v + end + + # Degenerate cases end + + if backend isa CPU + _accumulate_nd_cpu_sections!(op, v; init, dims, inclusive, max_tasks, min_elems) + else + # On GPUs we have two parallelisation approaches, based on which dimension has more elements: + # - If the dimension we are accumulating along has more elements than the "outer" dimensions, + # (e.g. accumulate(+, rand(3, 1000), dims=2)), we use a block of threads per outer + # dimension - thus, a block of threads reduces the dims axis + # - If the other dimensions have more elements (e.g. reduce(+, rand(3, 1000), dims=1)), we + # use a single thread per outer dimension - thus, a thread reduces the dims axis + # sequentially, while the other dimensions are processed in parallel, independently + length_dims = vsizes[dims] + length_outer = length(v) ÷ length_dims + + if length_outer >= length_dims + # One thread per outer dimension + blocks = (length_outer + block_size - 1) ÷ block_size + kernel1! = _accumulate_nd_by_thread!(backend, block_size) + kernel1!( + v, op, init, dims, inclusive, + ndrange=(block_size * blocks,), + ) + else + # One block per outer dimension + blocks = length_outer + kernel2! = _accumulate_nd_by_block!(backend, block_size) + kernel2!( + v, op, init, neutral, dims, inclusive, + ndrange=(block_size, blocks), + ) + end + end + + return v +end + + +function _accumulate_nd_cpu_sections!( + op, v::AbstractArray; + init, dims, inclusive, + max_tasks, min_elems, +) + vsizes = size(v) + vstrides = strides(v) + + ndims = length(vsizes) + + length_dims = vsizes[dims] + length_outer = length(v) ÷ length_dims + + # Each thread handles a section of the output array - i.e. reducing along the dims, for + # multiple output strides + foreachindex(1:length_outer, CPU(), max_tasks=max_tasks, min_elems=min_elems) do idst + + @inbounds begin + # Compute the base index in v for this outer axis + input_base_idx = 0 + tmp = idst + KernelAbstractions.Extras.@unroll for i in 1:ndims + if i != dims + input_base_idx += (tmp % vsizes[i]) * vstrides[i] + tmp = tmp ÷ vsizes[i] + end + end + + # Go over each element in the accumulated dimension + if inclusive + running = init + for i in 0:length_dims - 1 + v_idx = input_base_idx + i * vstrides[dims] + running = op(running, v[v_idx + 1]) + v[v_idx + 1] = running + end + else + running = init + for i in 0:length_dims - 1 + v_idx = input_base_idx + i * vstrides[dims] + v[v_idx + 1], running = running, op(running, v[v_idx + 1]) + end + end + end + end + + v +end + + @kernel inbounds=true cpu=false unsafe_indices=true function _accumulate_nd_by_thread!( v, op, init, dims, inclusive, ) @@ -249,66 +371,3 @@ end ichunk += 0x1 end end - - -function accumulate_nd!( - op, v::AbstractArray, backend::GPU; - init, - neutral=neutral_element(op, eltype(v)), - dims::Int, - inclusive::Bool=true, - - block_size::Int=256, -) - # Correctness checks - @argcheck block_size > 0 - @argcheck ispow2(block_size) - - # Degenerate cases begin; order of priority matters - - # Invalid dims - if dims < 1 - throw(ArgumentError("region dimension(s) must be ≥ 1, got $dims")) - end - - # Nothing to accumulate - vsizes = size(v) - if length(v) == 0 || dims > length(vsizes) - return v - end - for s in vsizes - s == 0 && return v - end - - # Degenerate cases end - - # We have two parallelisation approaches, based on which dimension has more elements: - # - If the dimension we are accumulating along has more elements than the "outer" dimensions, - # (e.g. accumulate(+, rand(3, 1000), dims=2)), we use a block of threads per outer - # dimension - thus, a block of threads reduces the dims axis - # - If the other dimensions have more elements (e.g. reduce(+, rand(3, 1000), dims=1)), we - # use a single thread per outer dimension - thus, a thread reduces the dims axis - # sequentially, while the other dimensions are processed in parallel, independently - length_dims = vsizes[dims] - length_outer = length(v) ÷ length_dims - - if length_outer >= length_dims - # One thread per outer dimension - blocks = (length_outer + block_size - 1) ÷ block_size - kernel! = _accumulate_nd_by_thread!(backend, block_size) - kernel!( - v, op, init, dims, inclusive, - ndrange=(block_size * blocks,), - ) - else - # One block per outer dimension - blocks = length_outer - kernel! = _accumulate_nd_by_block!(backend, block_size) - kernel!( - v, op, init, neutral, dims, inclusive, - ndrange=(block_size, blocks), - ) - end - - return v -end diff --git a/src/arithmetics.jl b/src/arithmetics.jl index 8c72ac2..18f85e1 100644 --- a/src/arithmetics.jl +++ b/src/arithmetics.jl @@ -5,7 +5,6 @@ dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -48,7 +47,6 @@ function sum( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -62,7 +60,6 @@ function sum( init=init, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, @@ -80,7 +77,6 @@ end dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -123,7 +119,6 @@ function prod( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -137,7 +132,6 @@ function prod( init=init, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, @@ -155,7 +149,6 @@ end dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -198,7 +191,6 @@ function maximum( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -212,7 +204,6 @@ function maximum( init=init, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, @@ -230,7 +221,6 @@ end dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -273,7 +263,6 @@ function minimum( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -287,7 +276,6 @@ function minimum( init=init, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, @@ -305,7 +293,6 @@ end dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -354,7 +341,6 @@ function count( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -369,7 +355,6 @@ function count( neutral=zero(typeof(init)), dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, @@ -386,7 +371,6 @@ function count( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, max_tasks=Threads.nthreads(), min_elems=1, @@ -401,7 +385,6 @@ function count( neutral=zero(typeof(init)), dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, diff --git a/src/foreachindex.jl b/src/foreachindex.jl index afb2027..8923e04 100644 --- a/src/foreachindex.jl +++ b/src/foreachindex.jl @@ -27,14 +27,7 @@ function _forindices_gpu( end -function _forindices_polyester(f, indices, min_elems) - @batch minbatch=min_elems per=thread for i in indices - @inline f(i) - end -end - - -function _forindices_threads(f, indices, max_tasks, min_elems) +function _forindices_threads(f, indices; max_tasks, min_elems) task_partition(length(indices), max_tasks, min_elems) do irange # Task partition returns static ranges indexed from 1:length(indices); use those to index # into indices, which supports arbitrary indices (and gets compiled away when using 1-based @@ -47,34 +40,11 @@ function _forindices_threads(f, indices, max_tasks, min_elems) end -@inline function _forindices_cpu( - f, - indices, - backend::CPU; - - scheduler=:threads, - max_tasks=Threads.nthreads(), - min_elems=1, -) - # CPU implementation - if scheduler === :threads - _forindices_threads(f, indices, max_tasks, min_elems) - elseif scheduler === :polyester - _forindices_polyester(f, indices, min_elems) - else - throw(ArgumentError("`scheduler` must be `:threads` or `:polyester`. Received $scheduler")) - end - - nothing -end - - """ foreachindex( f, itr, backend::Backend=get_backend(itr); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -90,9 +60,7 @@ MtlArray, oneArray - with one GPU thread per index. On CPUs at most `max_tasks` threads are launched, or fewer such that each thread processes at least `min_elems` indices; if a single task ends up being needed, `f` is inlined and no thread is launched. Tune it to your function - the more expensive it is, the fewer elements are needed to -amortise the cost of launching a thread (which is a few μs). The scheduler can be `:polyester` -to use Polyester.jl cheap threads or `:threads` to use normal Julia threads; either can be faster -depending on the function, but in general the latter is more composable. +amortise the cost of launching a thread (which is a few μs). # Examples Normally you would write a for loop like this: @@ -155,7 +123,6 @@ function foreachindex( f, itr, backend::Backend=get_backend(itr); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -163,17 +130,9 @@ function foreachindex( block_size=256, ) if backend isa GPU - _forindices_gpu( - f, eachindex(itr), backend; - block_size=block_size, - ) + _forindices_gpu(f, eachindex(itr), backend; block_size) else - _forindices_cpu( - f, eachindex(itr), backend; - scheduler=scheduler, - max_tasks=max_tasks, - min_elems=min_elems, - ) + _forindices_threads(f, eachindex(itr); max_tasks, min_elems) end end @@ -183,7 +142,6 @@ end f, itr, dims::Union{Nothing, <:Integer}=nothing, backend::Backend=get_backend(itr); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -199,9 +157,7 @@ MtlArray, oneArray - with one GPU thread per index. On CPUs at most `max_tasks` threads are launched, or fewer such that each thread processes at least `min_elems` indices; if a single task ends up being needed, `f` is inlined and no thread is launched. Tune it to your function - the more expensive it is, the fewer elements are needed to -amortise the cost of launching a thread (which is a few μs). The scheduler can be `:polyester` -to use Polyester.jl cheap threads or `:threads` to use normal Julia threads; either can be faster -depending on the function, but in general the latter is more composable. +amortise the cost of launching a thread (which is a few μs). # Examples Normally you would write a for loop like this: @@ -260,7 +216,6 @@ function foraxes( f, itr, dims::Union{Nothing, <:Integer}=nothing, backend::Backend=get_backend(itr); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -270,7 +225,6 @@ function foraxes( if isnothing(dims) return foreachindex( f, itr, backend; - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, @@ -278,16 +232,8 @@ function foraxes( end if backend isa GPU - _forindices_gpu( - f, axes(itr, dims), backend; - block_size=block_size, - ) + _forindices_gpu(f, axes(itr, dims), backend; block_size) else - _forindices_cpu( - f, axes(itr, dims), backend; - scheduler=scheduler, - max_tasks=max_tasks, - min_elems=min_elems, - ) + _forindices_threads(f, axes(itr, dims); max_tasks, min_elems) end end diff --git a/src/map.jl b/src/map.jl index 762f746..ed7fe8a 100644 --- a/src/map.jl +++ b/src/map.jl @@ -3,7 +3,6 @@ f, dst::AbstractArray, src::AbstractArray, backend::Backend=get_backend(src); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -31,7 +30,6 @@ function map!( f, dst::AbstractArray, src::AbstractArray, backend::Backend=get_backend(src); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -41,7 +39,6 @@ function map!( @argcheck length(dst) == length(src) foreachindex( src, backend, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, @@ -57,7 +54,6 @@ end f, src::AbstractArray, backend::Backend=get_backend(src); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -73,7 +69,6 @@ function map( f, src::AbstractArray, backend::Backend=get_backend(src); # CPU settings - scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1, @@ -83,7 +78,6 @@ function map( dst = similar(src) map!( f, dst, src, backend, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, diff --git a/src/reduce/mapreduce_1d_cpu.jl b/src/reduce/mapreduce_1d_cpu.jl new file mode 100644 index 0000000..5384b1d --- /dev/null +++ b/src/reduce/mapreduce_1d_cpu.jl @@ -0,0 +1,33 @@ +function mapreduce_1d( + f, op, src::AbstractArray, backend::CPU; + init, + neutral=neutral_element(op, eltype(src)), + + # CPU settings + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings - ignored here + block_size::Int=256, + temp::Union{Nothing, AbstractArray}=nothing, + switch_below::Int=0, +) + if max_tasks == 1 + return op(init, Base.mapreduce(f, op, src; init=neutral)) + end + + tp = TaskPartitioner(length(src), max_tasks, min_elems) + if tp.num_tasks == 1 + return op(init, Base.mapreduce(f, op, src; init=neutral)) + end + + # Each task reduces an independent chunk of the array + shared = Vector{typeof(init)}(undef, tp.num_tasks) + itask_partition(tp) do itask, irange + @inbounds begin + # This shared buffer is only modified once per task, so false sharing is not a problem + shared[itask] = Base.mapreduce(f, op, @view(src[irange]); init=neutral) + end + end + return op(init, Base.reduce(op, shared; init=neutral)) +end diff --git a/src/reduce/mapreduce_1d.jl b/src/reduce/mapreduce_1d_gpu.jl similarity index 98% rename from src/reduce/mapreduce_1d.jl rename to src/reduce/mapreduce_1d_gpu.jl index 564b06a..cc3c4d6 100644 --- a/src/reduce/mapreduce_1d.jl +++ b/src/reduce/mapreduce_1d_gpu.jl @@ -104,6 +104,11 @@ function mapreduce_1d( init, neutral=neutral_element(op, eltype(src)), + # CPU settings - ignored here + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, switch_below::Int=0, diff --git a/src/reduce/mapreduce_nd.jl b/src/reduce/mapreduce_nd.jl index 9c828d1..bb5e644 100644 --- a/src/reduce/mapreduce_nd.jl +++ b/src/reduce/mapreduce_nd.jl @@ -1,10 +1,201 @@ -@kernel inbounds=true cpu=false unsafe_indices=true function _mapreduce_nd_by_thread!( - @Const(src), - dst, - f, - op, +function mapreduce_nd( + f, op, src::AbstractArray, backend::Backend; + init, + neutral=neutral_element(op, eltype(src)), + dims::Int, + + # CPU settings - ignored here + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, + + # GPU settings + block_size::Int=256, + temp::Union{Nothing, AbstractArray}=nothing, +) + @argcheck 1 <= block_size <= 1024 + + # Degenerate cases begin; order of priority matters + + # Invalid dims + if dims < 1 + throw(ArgumentError("region dimension(s) must be ≥ 1, got $dims")) + end + + # If dims > number of dimensions, just map each element through f and add init, e.g.: + # julia> x = rand(Float64, 3, 5); + # julia> mapreduce(x -> -x, +, x, dims=3, init=Float32(0)) + # 3×5 Matrix{Float32} # Negative numbers + src_sizes = size(src) + if dims > length(src_sizes) + if isnothing(temp) + dst = KernelAbstractions.allocate(backend, typeof(init), src_sizes) + else + @argcheck get_backend(temp) == backend + @argcheck size(temp) == src_sizes + @argcheck eltype(temp) == typeof(init) + dst = temp + end + _mapreduce_nd_apply_init!(f, op, dst, src, backend; init, max_tasks, min_elems, block_size) + return dst + end + + # The per-dimension sizes of the destination array; construct tuple without allocations + dst_sizes = unrolled_map_index(src_sizes) do i + i == dims ? 1 : src_sizes[i] + end + + # If any dimension except dims is zero, return empty similar array except with the dims + # dimension = 1. Weird, see example below: + # julia> x = rand(3, 0, 5); + # julia> reduce(+, x, dims=3) + # 3×0×1 Array{Float64, 3} + for isize in eachindex(src_sizes) + isize == dims && continue + if src_sizes[isize] == 0 + if isnothing(temp) + dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) + else + @argcheck size(temp) == dst_sizes + @argcheck eltype(temp) == typeof(init) + dst = temp + end + return dst + end + end + + # If sizes[dims] == 0, return array filled with init; same shape except sizes[dims] = 1: + # julia> x = rand(3, 0, 5); + # julia> mapreduce(+, x, dims=2) + # 3×1×5 Array{Float64, 3}: + # [:, :, 1] = + # 0.0 + # 0.0 + # 0.0 + # [...] + len = src_sizes[dims] + if len == 0 + if isnothing(temp) + dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) + else + @argcheck get_backend(temp) == backend + @argcheck size(temp) == dst_sizes + @argcheck eltype(temp) == typeof(init) + dst = temp + end + fill!(dst, init) + return dst + end + + # If sizes[dims] == 1, just map each element through f. Again, keep same type as init + if len == 1 + if isnothing(temp) + dst = KernelAbstractions.allocate(backend, typeof(init), src_sizes) + else + @argcheck get_backend(temp) == backend + @argcheck size(temp) == src_sizes + @argcheck eltype(temp) == typeof(init) + dst = temp + end + _mapreduce_nd_apply_init!(f, op, dst, src, backend; init, max_tasks, min_elems, block_size) + return dst + end + + # Degenerate cases end + + # Allocate destination array + if isnothing(temp) + dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) + else + @argcheck get_backend(temp) == backend + @argcheck size(temp) == dst_sizes + @argcheck eltype(temp) == typeof(init) + dst = temp + end + dst_size = length(dst) + + if backend isa CPU + _mapreduce_nd_cpu_sections!( + f, op, dst, src; + init, + dims, + max_tasks=max_tasks, + min_elems=min_elems, + ) + else + # On GPUs we have two parallelisation approaches, based on which dimension has more elements: + # - If the dimension we are reducing has more elements, (e.g. reduce(+, rand(3, 1000), dims=2)), + # we use a block of threads per dst element - thus, a block of threads reduces the dims axis + # - If the other dimensions have more elements (e.g. reduce(+, rand(3, 1000), dims=1)), we + # use a single thread per dst element - thus, a thread reduces the dims axis sequentially, + # while the other dimensions are processed in parallel, independently + if dst_size >= src_sizes[dims] + blocks = (dst_size + block_size - 1) ÷ block_size + kernel1! = _mapreduce_nd_by_thread!(backend, block_size) + kernel1!( + src, dst, f, op, init, dims, + ndrange=(block_size * blocks,), + ) + else + # One block per output element + blocks = dst_size + kernel2! = _mapreduce_nd_by_block!(backend, block_size) + kernel2!( + src, dst, f, op, init, neutral, dims, + ndrange=(block_size * blocks,), + ) + end + end + + return dst +end + + +function _mapreduce_nd_cpu_sections!( + f, op, dst, src; init, dims, + max_tasks, min_elems, +) + src_sizes = size(src) + src_strides = strides(src) + dst_strides = strides(dst) + + reduce_size = src_sizes[dims] + ndims = length(src_sizes) + + # Each thread handles a section of the output array - i.e. reducing along the dims, for + # multiple output strides + foreachindex(dst, max_tasks=max_tasks, min_elems=min_elems) do idst + + @inbounds begin + # Compute the base index in src (excluding the reduced axis) + input_base_idx = 0 + tmp = idst - 1 + KernelAbstractions.Extras.@unroll for i in ndims:-1:1 + if i != dims + input_base_idx += (tmp ÷ dst_strides[i]) * src_strides[i] + end + tmp = tmp % dst_strides[i] + end + + # Go over each element in the reduced dimension + res = init + for i in 0:reduce_size - 1 + src_idx = input_base_idx + i * src_strides[dims] + res = op(res, f(src[src_idx + 1])) + end + dst[idst] = res + end + end + + dst +end + + +@kernel inbounds=true cpu=false unsafe_indices=true function _mapreduce_nd_by_thread!( + @Const(src), dst, + f, op, + init, dims, ) # One thread per output element, when there are more outer elements than in the reduced dim # e.g. reduce(+, rand(3, 1000), dims=1) => only 3 elements in the reduced dim @@ -71,12 +262,9 @@ end @kernel inbounds=true cpu=false unsafe_indices=true function _mapreduce_nd_by_block!( - @Const(src), - dst, - f, - op, - init, - neutral, + @Const(src), dst, + f, op, + init, neutral, dims, ) # One block per output element, when there are more elements in the reduced dim than in outer @@ -183,166 +371,3 @@ end dst[iblock + 0x1] = op(init, sdata[0x1]) end end - - -function mapreduce_nd( - f, op, src::AbstractArray, backend::GPU; - init, - neutral=neutral_element(op, eltype(src)), - dims::Int, - block_size::Int=256, - temp::Union{Nothing, AbstractArray}=nothing, -) - @argcheck 1 <= block_size <= 1024 - - # Degenerate cases begin; order of priority matters - - # Invalid dims - if dims < 1 - throw(ArgumentError("region dimension(s) must be ≥ 1, got $dims")) - end - - # If dims > number of dimensions, just map each element through f and add init, e.g.: - # julia> x = rand(Float64, 3, 5); - # julia> mapreduce(x -> -x, +, x, dims=3, init=Float32(0)) - # 3×5 Matrix{Float32} # Negative numbers - src_sizes = size(src) - if dims > length(src_sizes) - if isnothing(temp) - dst = KernelAbstractions.allocate(backend, typeof(init), src_sizes) - else - @argcheck get_backend(temp) == backend - @argcheck size(temp) == src_sizes - @argcheck eltype(temp) == typeof(init) - dst = temp - end - _mapreduce_nd_apply_init!(f, op, dst, src, backend, init, block_size) - return dst - end - - # The per-dimension sizes of the destination array; construct tuple without allocations - dst_sizes = unrolled_map_index(src_sizes) do i - i == dims ? 1 : src_sizes[i] - end - - # If any dimension except dims is zero, return empty similar array except with the dims - # dimension = 1. Weird, see example below: - # julia> x = rand(3, 0, 5); - # julia> reduce(+, x, dims=3) - # 3×0×1 Array{Float64, 3} - for isize in eachindex(src_sizes) - isize == dims && continue - if src_sizes[isize] == 0 - if isnothing(temp) - dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) - else - @argcheck size(temp) == dst_sizes - @argcheck eltype(temp) == typeof(init) - dst = temp - end - return dst - end - end - - # If sizes[dims] == 0, return array filled with init; same shape except sizes[dims] = 1: - # julia> x = rand(3, 0, 5); - # julia> mapreduce(+, x, dims=2) - # 3×1×5 Array{Float64, 3}: - # [:, :, 1] = - # 0.0 - # 0.0 - # 0.0 - # [...] - len = src_sizes[dims] - if len == 0 - if isnothing(temp) - dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) - else - @argcheck get_backend(temp) == backend - @argcheck size(temp) == dst_sizes - @argcheck eltype(temp) == typeof(init) - dst = temp - end - fill!(dst, init) - return dst - end - - # If sizes[dims] == 1, just map each element through f. Again, keep same type as init - if len == 1 - if isnothing(temp) - dst = KernelAbstractions.allocate(backend, typeof(init), src_sizes) - else - @argcheck get_backend(temp) == backend - @argcheck size(temp) == src_sizes - @argcheck eltype(temp) == typeof(init) - dst = temp - end - _mapreduce_nd_apply_init!(f, op, dst, src, backend, init, block_size) - return dst - end - - # Degenerate cases end - - # Allocate destination array - if isnothing(temp) - dst = KernelAbstractions.allocate(backend, typeof(init), dst_sizes) - else - @argcheck get_backend(temp) == backend - @argcheck size(temp) == dst_sizes - @argcheck eltype(temp) == typeof(init) - dst = temp - end - dst_size = length(dst) - - # We have two parallelisation approaches, based on which dimension has more elements: - # - If the dimension we are reducing has more elements, (e.g. reduce(+, rand(3, 1000), dims=2)), - # we use a block of threads per dst element - thus, a block of threads reduces the dims axis - # - If the other dimensions have more elements (e.g. reduce(+, rand(3, 1000), dims=1)), we - # use a single thread per dst element - thus, a thread reduces the dims axis sequentially, - # while the other dimensions are processed in parallel, independently - if dst_size >= src_sizes[dims] - blocks = (dst_size + block_size - 1) ÷ block_size - kernel! = _mapreduce_nd_by_thread!(backend, block_size) - kernel!( - src, dst, f, op, init, dims, - ndrange=(block_size * blocks,), - ) - else - # One block per output element - blocks = dst_size - kernel! = _mapreduce_nd_by_block!(backend, block_size) - kernel!( - src, dst, f, op, init, neutral, dims, - ndrange=(block_size * blocks,), - ) - end - - return dst -end - - -function _mapreduce_nd_apply_init!(f, op, dst, src, backend, init, block_size) - foreachindex( - dst, backend, - block_size=block_size, - ) do i - dst[i] = op(init, f(src[i])) - end -end - - -# Unrolled map constructing a tuple -@inline function unrolled_map_index(f, tuple_vector::Tuple) - _unrolled_map_index(f, tuple_vector, (), 1) -end - - -@inline function _unrolled_map_index(f, rest::Tuple{}, acc, i) - acc -end - - -@inline function _unrolled_map_index(f, rest::Tuple, acc, i) - result = f(i) - _unrolled_map_index(f, Base.tail(rest), (acc..., result), i + 1) -end diff --git a/src/reduce/reduce.jl b/src/reduce/reduce.jl index b9af75e..d6a3c73 100644 --- a/src/reduce/reduce.jl +++ b/src/reduce/reduce.jl @@ -1,19 +1,7 @@ -# neutral_element moved over from GPUArrays.jl -neutral_element(op, T) = - error("""AcceleratedKernels.jl needs to know the neutral element for your operator `$op`. - Please pass it as an explicit keyword argument `neutral`.""") -neutral_element(::typeof(Base.:(|)), T) = zero(T) -neutral_element(::typeof(Base.:(+)), T) = zero(T) -neutral_element(::typeof(Base.add_sum), T) = zero(T) -neutral_element(::typeof(Base.:(&)), T) = one(T) -neutral_element(::typeof(Base.:(*)), T) = one(T) -neutral_element(::typeof(Base.mul_prod), T) = one(T) -neutral_element(::typeof(Base.min), T) = typemax(T) -neutral_element(::typeof(Base.max), T) = typemin(T) -neutral_element(::typeof(Base._extrema_rf), ::Type{<:NTuple{2,T}}) where {T} = typemax(T), typemin(T) - - -include("mapreduce_1d.jl") +# Backend implementations +include("utilities.jl") +include("mapreduce_1d_cpu.jl") +include("mapreduce_1d_gpu.jl") include("mapreduce_nd.jl") @@ -25,9 +13,8 @@ include("mapreduce_nd.jl") dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, @@ -39,13 +26,12 @@ Reduce `src` along dimensions `dims` using the binary operator `op`. If `dims` i `src` to a scalar. If `dims` is an integer, reduce `src` along that dimension. The `init` value is used as the initial value for the reduction; `neutral` is the neutral element for the operator `op`. -## CPU settings -The `scheduler` can be one of the [OhMyThreads.jl schedulers](https://juliafolds2.github.io/OhMyThreads.jl/dev/refs/api/#Schedulers), -i.e. `:static`, `:dynamic`, `:greedy` or `:serial`. Assuming the workload is uniform (as the GPU -algorithm prefers), `:static` is used by default; if you need fine-grained control over your -threads, consider using [`OhMyThreads.jl`](https://github.com/JuliaFolds2/OhMyThreads.jl) directly. +The returned type is the same as `init` - to control output precision, specify `init` explicitly. -Use at most `max_tasks` threads with at least `min_elems` elements per task. +## CPU settings +Use at most `max_tasks` threads with at least `min_elems` elements per task. For N-dimensional +arrays (`dims::Int`) multithreading currently only becomes faster for `max_tasks >= 4`; all other +cases are scaling linearly with the number of threads. ## GPU settings The `block_size` parameter controls the number of threads per block. @@ -60,10 +46,6 @@ zero, check against `Base.reduce` for CPU arrays for exact behavior. The `switch_below` parameter controls the threshold below which the reduction is performed on the CPU and is only used for 1D reductions (i.e. `dims=nothing`). -# Platform-Specific Notes -N-dimensional reductions on the CPU are not parallel yet ([issue](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/128)), -and defer to `Base.reduce`. - # Examples Computing a sum, reducing down to a scalar that is copied to host: ```julia @@ -91,9 +73,8 @@ function reduce( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, @@ -105,7 +86,6 @@ function reduce( init=init, neutral=neutral, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, @@ -122,9 +102,8 @@ function _reduce_impl( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, @@ -136,7 +115,6 @@ function _reduce_impl( init=init, neutral=neutral, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, @@ -156,9 +134,8 @@ end dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, @@ -173,13 +150,12 @@ dimension. The `init` value is used as the initial value for the reduction (i.e. The `neutral` value is the neutral element (zero) for the operator `op`, which is needed for an efficient GPU implementation that also allows a nonzero `init`. -## CPU settings -The `scheduler` can be one of the [OhMyThreads.jl schedulers](https://juliafolds2.github.io/OhMyThreads.jl/dev/refs/api/#Schedulers), -i.e. `:static`, `:dynamic`, `:greedy` or `:serial`. Assuming the workload is uniform (as the GPU -algorithm prefers), `:static` is used by default; if you need fine-grained control over your -threads, consider using [`OhMyThreads.jl`](https://github.com/JuliaFolds2/OhMyThreads.jl) directly. +The returned type is the same as `init` - to control output precision, specify `init` explicitly. -Use at most `max_tasks` threads with at least `min_elems` elements per task. +## CPU settings +Use at most `max_tasks` threads with at least `min_elems` elements per task. For N-dimensional +arrays (`dims::Int`) multithreading currently only becomes faster for `max_tasks >= 4`; all other +cases are scaling linearly with the number of threads. ## GPU settings The `block_size` parameter controls the number of threads per block. @@ -222,9 +198,8 @@ function mapreduce( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, @@ -236,7 +211,6 @@ function mapreduce( init=init, neutral=neutral, dims=dims, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, @@ -253,51 +227,35 @@ function _mapreduce_impl( dims::Union{Nothing, Int}=nothing, # CPU settings - scheduler=:static, - max_tasks=Threads.nthreads(), - min_elems=1, + max_tasks::Int=Threads.nthreads(), + min_elems::Int=1, # GPU settings block_size::Int=256, temp::Union{Nothing, AbstractArray}=nothing, switch_below::Int=0, ) - if backend isa GPU - if isnothing(dims) - return mapreduce_1d( - f, op, src, backend; - init=init, - neutral=neutral, - block_size=block_size, - temp=temp, - switch_below=switch_below, - ) - else - return mapreduce_nd( - f, op, src, backend; - init=init, - neutral=neutral, - dims=dims, - block_size=block_size, - temp=temp, - ) - end + if isnothing(dims) + return mapreduce_1d( + f, op, src, backend; + init=init, + neutral=neutral, + max_tasks=max_tasks, + min_elems=min_elems, + block_size=block_size, + temp=temp, + switch_below=switch_below, + ) else - if isnothing(dims) - num_elems = length(src) - num_tasks = min(max_tasks, num_elems ÷ min_elems) - if num_tasks <= 1 - return Base.mapreduce(f, op, src; init=init) - end - return op(init, OMT.tmapreduce( - f, op, src; init=neutral, - scheduler=scheduler, - outputtype=typeof(init), - nchunks=num_tasks, - )) - else - # FIXME: waiting on OhMyThreads.jl for n-dimensional reduction - return Base.mapreduce(f, op, src; init=init, dims=dims) - end + return mapreduce_nd( + f, op, src, backend; + init=init, + neutral=neutral, + dims=dims, + max_tasks=max_tasks, + min_elems=min_elems, + block_size=block_size, + temp=temp, + ) end end diff --git a/src/reduce/utilities.jl b/src/reduce/utilities.jl new file mode 100644 index 0000000..bd3eed4 --- /dev/null +++ b/src/reduce/utilities.jl @@ -0,0 +1,44 @@ +# neutral_element moved over from GPUArrays.jl +neutral_element(op, T) = + error("""AcceleratedKernels.jl needs to know the neutral element for your operator `$op`. + Please pass it as an explicit keyword argument `neutral`.""") +neutral_element(::typeof(Base.:(|)), T) = zero(T) +neutral_element(::typeof(Base.:(+)), T) = zero(T) +neutral_element(::typeof(Base.add_sum), T) = zero(T) +neutral_element(::typeof(Base.:(&)), T) = one(T) +neutral_element(::typeof(Base.:(*)), T) = one(T) +neutral_element(::typeof(Base.mul_prod), T) = one(T) +neutral_element(::typeof(Base.min), T) = typemax(T) +neutral_element(::typeof(Base.max), T) = typemin(T) +neutral_element(::typeof(Base._extrema_rf), ::Type{<:NTuple{2,T}}) where {T} = typemax(T), typemin(T) + + +# Unrolled map constructing a tuple +@inline function unrolled_map_index(f, tuple_vector::Tuple) + _unrolled_map_index(f, tuple_vector, (), 1) +end + + +@inline function _unrolled_map_index(f, rest::Tuple{}, acc, i) + acc +end + + +@inline function _unrolled_map_index(f, rest::Tuple, acc, i) + result = f(i) + _unrolled_map_index(f, Base.tail(rest), (acc..., result), i + 1) +end + + +# Apply op(init, f(x)) to each element of src, storing the result in dst. +function _mapreduce_nd_apply_init!( + f, op, dst, src, backend; + init, + max_tasks=Threads.nthreads(), + min_elems=1, + block_size=256, +) + foreachindex(dst, backend; max_tasks, min_elems, block_size) do i + dst[i] = op(init, f(src[i])) + end +end diff --git a/src/searchsorted.jl b/src/searchsorted.jl index 5395757..ac16ba7 100644 --- a/src/searchsorted.jl +++ b/src/searchsorted.jl @@ -78,7 +78,6 @@ end by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -98,7 +97,6 @@ function searchsortedfirst!( by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -114,7 +112,7 @@ function searchsortedfirst!( foreachindex( x, backend, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, + max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, ) do i @inbounds ix[i] = _searchsortedfirst(v, x[i], firstindex(v), lastindex(v), comp) @@ -131,7 +129,6 @@ end by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -150,7 +147,6 @@ function searchsortedfirst( by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -161,7 +157,7 @@ function searchsortedfirst( searchsortedfirst!( ix, v, x, backend; by=by, lt=lt, rev=rev, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, + max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, ) ix @@ -178,7 +174,6 @@ end by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -198,7 +193,6 @@ function searchsortedlast!( by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -215,7 +209,7 @@ function searchsortedlast!( foreachindex( x, backend, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, + max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, ) do i @inbounds ix[i] = _searchsortedlast(v, x[i], firstindex(v), lastindex(v), comp) @@ -232,7 +226,6 @@ end by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -251,7 +244,6 @@ function searchsortedlast( by=identity, lt=isless, rev::Bool=false, # CPU settings - scheduler=:threads, max_tasks::Int=Threads.nthreads(), min_elems::Int=1000, @@ -262,7 +254,7 @@ function searchsortedlast( searchsortedlast!( ix, v, x, backend; by=by, lt=lt, rev=rev, - scheduler=scheduler, max_tasks=max_tasks, min_elems=min_elems, + max_tasks=max_tasks, min_elems=min_elems, block_size=block_size, ) ix diff --git a/src/sort/cpu_sample_sort.jl b/src/sort/cpu_sample_sort.jl index 378ac7e..c583e5e 100644 --- a/src/sort/cpu_sample_sort.jl +++ b/src/sort/cpu_sample_sort.jl @@ -14,7 +14,10 @@ end function _sample_sort_compute_offsets!(histograms, max_tasks) + # Not worth parallelising this, as the number of tasks is much smaller than the number of + # elements - in profiling this does not show up @inbounds begin + # Sum up histograms and compute global offsets for each task offsets = @view histograms[1:max_tasks, max_tasks + 1] for itask in 1:max_tasks @@ -22,7 +25,7 @@ function _sample_sort_compute_offsets!(histograms, max_tasks) offsets[j] += histograms[j, itask] end end - accumulate!(+, offsets, init=0, inclusive=false) + accumulate!(+, offsets, init=0, inclusive=false, max_tasks=1) # Compute each task's local offset into each bucket for itask in 1:max_tasks @@ -30,6 +33,7 @@ function _sample_sort_compute_offsets!(histograms, max_tasks) +, @view(histograms[itask, 1:max_tasks]), init=0, inclusive=false, + max_tasks=1, ) end end @@ -39,7 +43,7 @@ end function _sample_sort_move_buckets!( - v, dest, ord, + v, temp, ord, splitters, global_offsets, task_offsets, itask, max_tasks, irange, ) @@ -57,7 +61,7 @@ function _sample_sort_move_buckets!( ibucket = 1 + _searchsortedlast(splitters, v[i], 1, length(splitters), ord) # Get the current destination index for this element, then increment - dest[offsets[ibucket]] = v[i] + temp[offsets[ibucket]] = v[i] offsets[ibucket] += 1 end end @@ -67,29 +71,29 @@ end function _sample_sort_sort_bucket!( - v, dest, offsets, itask, max_tasks; + v, temp, offsets, itask, max_tasks; lt, by, rev, order ) @inbounds begin istart = offsets[itask] + 1 - istop = itask == max_tasks ? length(dest) : offsets[itask + 1] + istop = itask == max_tasks ? length(temp) : offsets[itask + 1] if istart == istop - v[istart] = dest[istart] + v[istart] = temp[istart] return elseif istart > istop return end - # At the end we will have to move elements from dest back to v anyways; for every + # At the end we will have to move elements from temp back to v anyways; for every # odd-numbered itask, move elements first, to avoid false sharing from threads if isodd(itask) - copyto!(v, istart, dest, istart, istop - istart + 1) + copyto!(v, istart, temp, istart, istop - istart + 1) Base.sort!(view(v, istart:istop), lt=lt, by=by, rev=rev, order=order) else # For even-numbered itasks, sort first, then move elements back to v - Base.sort!(view(dest, istart:istop), lt=lt, by=by, rev=rev, order=order) - copyto!(v, istart, dest, istart, istop - istart + 1) + Base.sort!(view(temp, istart:istop), lt=lt, by=by, rev=rev, order=order) + copyto!(v, istart, temp, istart, istop - istart + 1) end end @@ -98,7 +102,7 @@ end function _sample_sort_parallel!( - v, dest, ord, + v, temp, ord, splitters, histograms, max_tasks; lt, by, rev, order, @@ -114,13 +118,13 @@ function _sample_sort_parallel!( end # Compute the global and local (per-bucket) offsets for each task - offsets = @view histograms[1:max_tasks, max_tasks + 1] _sample_sort_compute_offsets!(histograms, max_tasks) + offsets = @view histograms[1:max_tasks, max_tasks + 1] # Move the elements into the destination buffer itask_partition(tp) do itask, irange _sample_sort_move_buckets!( - v, dest, ord, + v, temp, ord, splitters, offsets, histograms, itask, max_tasks, irange, ) @@ -129,7 +133,7 @@ function _sample_sort_parallel!( # Sort each bucket in parallel itask_partition(tp) do itask, irange _sample_sort_sort_bucket!( - v, dest, offsets, itask, max_tasks; + v, temp, offsets, itask, max_tasks; lt=lt, by=by, rev=rev, order=order, ) end @@ -149,14 +153,14 @@ function _sample_sort_parallel!( # for itask in 1:max_tasks # irange = tp[itask] # _sample_sort_move_buckets!( - # v, dest, ord, + # v, temp, ord, # splitters, offsets, histograms, # itask, max_tasks, irange, # ) # end # for itask in 1:max_tasks # _sample_sort_sort_bucket!( - # v, dest, offsets, itask, max_tasks; + # v, temp, offsets, itask, max_tasks; # lt=lt, by=by, rev=rev, order=order, # ) # end @@ -284,17 +288,31 @@ function sample_sortperm!( @argcheck length(ix) == length(v) # Initialise indices that will be sorted by the keys in v - foreachindex(ix, max_tasks=max_tasks) do i + foreachindex(ix, max_tasks=max_tasks, min_elems=min_elems) do i @inbounds ix[i] = i end - # Construct custom comparator indexing into global array v for every index comparison - ilt = (ix, iy) -> lt(v[ix], v[iy]) + # The Order may have a type instability for `rev=true`, so we keep this function barrier + ord = Base.Order.ord(lt, by, rev, order) + _sample_sort_barrier!( + ix, v, ord; + max_tasks=max_tasks, + min_elems=min_elems, + temp=temp, + ) +end + - # Sort with custom comparator +function _sample_sort_barrier!(ix, v, ord; max_tasks, min_elems, temp) + # Construct custom comparator indexing into global array v for every index comparison + comp = (ix, iy) -> Base.Order.lt(ord, v[ix], v[iy]) sample_sort!( ix; - lt=ilt, by=by, rev=rev, order=order, + lt=comp, + + # Leave defaults - we already have a custom comparator + # by=identity, rev=nothing, order=Base.Order.Forward, + max_tasks=max_tasks, min_elems=min_elems, temp=temp, diff --git a/src/task_partitioner.jl b/src/task_partitioner.jl index d9436e4..b957a31 100644 --- a/src/task_partitioner.jl +++ b/src/task_partitioner.jl @@ -16,6 +16,7 @@ elements per task. - `min_elems::Int` : (user-defined) minimum number of elements per task. - `num_tasks::Int` : (computed) number of tasks actually needed. - `task_istarts::Vector{Int}` : (computed) element starting index for each task. +- `tasks::Vector{Task}` : (computed) array of tasks; can be reused. # Examples @@ -51,6 +52,10 @@ tp[i] = 6:10 tp[i] = 11:15 tp[i] = 16:20 ``` + +The `TaskPartitioner` is used internally by [`task_partition`](@ref) and [`itask_partition`](@ref); +you can construct one manually if you want to reuse the same partitioning for multiple tasks - this +also reuses the `tasks` array and minimises allocations. """ struct TaskPartitioner num_elems::Int @@ -60,6 +65,7 @@ struct TaskPartitioner # Computed num_tasks::Int task_istarts::Vector{Int} + tasks::Vector{Task} end @@ -73,7 +79,7 @@ function TaskPartitioner(num_elems, max_tasks=Threads.nthreads(), min_elems=1) num_tasks = min(max_tasks, num_elems ÷ min_elems) if num_tasks <= 1 num_tasks = 1 - return TaskPartitioner(num_elems, max_tasks, min_elems, num_tasks, Int[]) + return TaskPartitioner(num_elems, max_tasks, min_elems, num_tasks, Int[], Task[]) end # Each task gets at least (num_elems ÷ num_tasks) elements; the remaining are redistributed @@ -88,7 +94,10 @@ function TaskPartitioner(num_elems, max_tasks=Threads.nthreads(), min_elems=1) istart += i <= remaining ? per_task + 1 : per_task end - TaskPartitioner(num_elems, max_tasks, min_elems, num_tasks, task_istarts) + # Pre-allocate tasks array; this can be reused + tasks = Vector{Task}(undef, num_tasks) + + TaskPartitioner(num_elems, max_tasks, min_elems, num_tasks, task_istarts, tasks) end @@ -144,6 +153,19 @@ task_partition(4) do irange some_long_computation(param1, param2, irange) end ``` + +The [`TaskPartitioner`](@ref) form allows you to reuse the same partitioning for multiple tasks - +this also reuses the `tasks` array and minimises allocations: +```julia +tp = TaskPartitioner(4) +task_partition(tp) do irange + some_long_computation(param1, param2, irange) +end +# Reuse same partitioning and tasks array +task_partition(tp) do irange + some_other_long_computation(param1, param2, irange) +end +``` """ function task_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1) num_elems >= 0 || throw(ArgumentError("num_elems must be >= 0")) @@ -151,7 +173,7 @@ function task_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1) min_elems > 0 || throw(ArgumentError("min_elems must be > 0")) if min(max_tasks, num_elems ÷ min_elems) <= 1 - @inline f(1:num_elems) + f(1:num_elems) else # Compiler should decide if this should be inlined; threading adds quite a bit of code, it # is faster (as seen in Cthulhu) to keep it in a separate self-contained function @@ -163,19 +185,11 @@ end function task_partition(f, tp::TaskPartitioner) - tasks = Vector{Task}(undef, tp.num_tasks - 1) - - # Launch first N - 1 tasks - for i in 1:tp.num_tasks - 1 - tasks[i] = Threads.@spawn f(@inbounds(tp[i])) + for i in 1:tp.num_tasks + tp.tasks[i] = Threads.@spawn f(@inbounds(tp[i])) end - - # Execute task N on this main thread - f(@inbounds(tp[tp.num_tasks])) - - # Wait for the tasks to finish - @inbounds for i in 1:tp.num_tasks - 1 - wait(tasks[i]) + @inbounds for i in 1:tp.num_tasks + wait(tp.tasks[i]) end end @@ -210,6 +224,19 @@ task_partition(4) do itask, irange some_long_computation_needing_itask(param1, param2, irange) end ``` + +The [`TaskPartitioner`](@ref) form allows you to reuse the same partitioning for multiple tasks - +this also reuses the `tasks` array and minimises allocations: +```julia +tp = TaskPartitioner(4) +itask_partition(tp) do itask, irange + some_long_computation_needing_itask(param1, param2, irange) +end +# Reuse same partitioning and tasks array +itask_partition(tp) do itask, irange + some_other_long_computation_needing_itask(param1, param2, irange) +end +``` """ function itask_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1) num_elems >= 0 || throw(ArgumentError("num_elems must be >= 0")) @@ -217,7 +244,7 @@ function itask_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1 min_elems > 0 || throw(ArgumentError("min_elems must be > 0")) if min(max_tasks, num_elems ÷ min_elems) <= 1 - @inline f(1, 1:num_elems) + f(1, 1:num_elems) else # Compiler should decide if this should be inlined; threading adds quite a bit of code, it # is faster (as seen in Cthulhu) to keep it in a separate self-contained function @@ -229,18 +256,10 @@ end function itask_partition(f, tp::TaskPartitioner) - tasks = Vector{Task}(undef, tp.num_tasks - 1) - - # Launch first N - 1 tasks - for i in 1:tp.num_tasks - 1 - tasks[i] = Threads.@spawn f(i, @inbounds(tp[i])) + for i in 1:tp.num_tasks + tp.tasks[i] = Threads.@spawn f(i, @inbounds(tp[i])) end - - # Execute task N on this main thread - f(tp.num_tasks, @inbounds(tp[tp.num_tasks])) - - # Wait for the tasks to finish - @inbounds for i in 1:tp.num_tasks - 1 - wait(tasks[i]) + @inbounds for i in 1:tp.num_tasks + wait(tp.tasks[i]) end end diff --git a/test/looping.jl b/test/looping.jl index 41a9872..746fe8c 100644 --- a/test/looping.jl +++ b/test/looping.jl @@ -23,13 +23,7 @@ @test all(x .== 1:length(x)) x = zeros(Int, 1000) - AK.foreachindex(x, max_tasks=10, min_elems=10, scheduler=:threads) do i - x[i] = i - end - @test all(x .== 1:length(x)) - - x = zeros(Int, 1000) - AK.foreachindex(x, max_tasks=10, min_elems=10, scheduler=:polyester) do i + AK.foreachindex(x, max_tasks=10, min_elems=10) do i x[i] = i end @test all(x .== 1:length(x)) @@ -70,15 +64,10 @@ end @test all(xh .== (1:10) .+ (1:1000)') x = array_from_host(zeros(UInt32, 10, 1000)) - f1(x, scheduler=:threads, max_tasks=2, min_elems=100, block_size=64) + f1(x, max_tasks=2, min_elems=100, block_size=64) xh = Array(x) @test all(xh .== (1:10) .+ (1:1000)') - x = array_from_host(zeros(Float32, 10, 1000)) - f1(x, scheduler=:polyester, max_tasks=4, min_elems=500, block_size=128) - xh = Array(x) - @test all(xh .≈ (1:10) .+ (1:1000)') - f2(x; kwargs...) = AK.foraxes(x, 2; kwargs...) do j for i in axes(x, 1) x[i, j] = i + j @@ -91,15 +80,10 @@ end @test all(xh .== (1:10) .+ (1:1000)') x = array_from_host(zeros(UInt32, 10, 1000)) - f2(x, scheduler=:threads, max_tasks=2, min_elems=100, block_size=64) + f2(x, max_tasks=2, min_elems=100, block_size=64) xh = Array(x) @test all(xh .== (1:10) .+ (1:1000)') - x = array_from_host(zeros(Float32, 10, 1000)) - f2(x, scheduler=:polyester, max_tasks=4, min_elems=500, block_size=128) - xh = Array(x) - @test all(xh .≈ (1:10) .+ (1:1000)') - # dims are nothing, behaving like foreachindex f3(x; kwargs...) = AK.foraxes(x, nothing; kwargs...) do i x[i] = i diff --git a/test/map.jl b/test/map.jl index d5e4d0b..95637d1 100644 --- a/test/map.jl +++ b/test/map.jl @@ -17,13 +17,13 @@ @test y == map(i -> i^2, x) x = rand(Float32, 1000) - y = AK.map(x, scheduler=:threads, max_tasks=2, min_elems=100) do i + y = AK.map(x, max_tasks=2, min_elems=100) do i i > 0.5 ? i : 0 end @test y == map(i -> i > 0.5 ? i : 0, x) x = rand(Float32, 1000) - y = AK.map(x, scheduler=:polyester, max_tasks=4, min_elems=500) do i + y = AK.map(x, max_tasks=4, min_elems=500) do i i > 0.5 ? i : 0 end @test y == map(i -> i > 0.5 ? i : 0, x) diff --git a/test/reduce.jl b/test/reduce.jl index 37e9c58..ec24f6a 100644 --- a/test/reduce.jl +++ b/test/reduce.jl @@ -127,7 +127,6 @@ Base.zero(::Type{Point}) = Point(0.0f0, 0.0f0) block_size=64, temp=array_from_host(zeros(Int32, 10_000)), switch_below=50, - scheduler=:dynamic, max_tasks=10, min_elems=100, ) @@ -136,7 +135,6 @@ Base.zero(::Type{Point}) = Point(0.0f0, 0.0f0) rand(Int32, 10_000), init=Int32(0), neutral=Int64(0), - scheduler=:greedy, max_tasks=16, min_elems=1000, ) @@ -227,7 +225,6 @@ end block_size=64, temp=array_from_host(zeros(Int32, 3, 1, 5)), switch_below=50, - scheduler=:dynamic, max_tasks=10, min_elems=100, ) @@ -240,7 +237,6 @@ end block_size=64, temp=array_from_host(zeros(Int32, 3, 4, 1)), switch_below=50, - scheduler=:greedy, max_tasks=16, min_elems=1000, ) @@ -343,7 +339,6 @@ end block_size=64, temp=temp, switch_below=50, - scheduler=:dynamic, max_tasks=10, min_elems=100, ) @@ -456,7 +451,6 @@ end block_size=64, temp=array_from_host(zeros(Int32, 3, 1, 5)), switch_below=50, - scheduler=:dynamic, max_tasks=10, min_elems=100, ) @@ -470,7 +464,6 @@ end block_size=64, temp=array_from_host(zeros(Int32, 3, 4, 1)), switch_below=50, - scheduler=:greedy, max_tasks=16, min_elems=1000, )