diff --git a/benchmarks/parallel_vs_serial.jl b/benchmarks/parallel_vs_serial.jl index 7bd2dc2..261254d 100644 --- a/benchmarks/parallel_vs_serial.jl +++ b/benchmarks/parallel_vs_serial.jl @@ -1,7 +1,9 @@ #= This file benchmarks the parallel and serial computation of a recurrence matrix =# -using DynamicalSystemsBase, RecurrenceAnalysis +using RecurrenceAnalysis + +using DynamicalSystemsBase, PredefinedDynamicalSystems using Statistics, BenchmarkTools, Base.Threads function pretty_time(b::BenchmarkTools.Trial) @@ -17,12 +19,12 @@ function pretty_time(b::BenchmarkTools.Trial) end Ns = round.(Int, 10.0 .^ (2:0.5:4.5)) -ro = Systems.roessler() +ro = PredefinedDynamicalSystems.roessler() println("I am using $(nthreads()) threads.") for N in Ns # set up datasets - tr = trajectory(ro, N*0.1; dt = 0.1, Tr = 10.0) + tr, ts = trajectory(ro, N*0.1; Δt = 0.1, Ttr = 10.0) printstyled("For N = $(length(tr))..."; color = :blue, bold = true) println() x = tr[:, 1] diff --git a/src/matrices/distance_matrix.jl b/src/matrices/distance_matrix.jl index b588812..d387635 100644 --- a/src/matrices/distance_matrix.jl +++ b/src/matrices/distance_matrix.jl @@ -118,13 +118,13 @@ end # by utilizing the fact that the area is proportional to the square of the height. # It partitions the "triangle" which needs to be computed into "trapezoids", # which all have an equal area. -function partition_indices(len) - indices = Vector{UnitRange{Int}}(undef, Threads.nthreads()) +function partition_indices(len, ntasks = Threads.nthreads()) + indices = Vector{UnitRange{Int}}(undef, ntasks) length = len offset = 1 # Here, we have to iterate in reverse, to get an equal area every time - for n in Threads.nthreads():-1:1 + for n in ntasks:-1:1 partition = round(Int, length / sqrt(n)) # length varies as square root of area indices[n] = offset:(partition .+ offset .- 1) length -= partition diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 8cfd142..6d6d94c 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -60,30 +60,70 @@ end # Now, we define the parallel versions of these functions. # Core function + +function _full_task_worker!(rowvals, colvals, x::Vector_or_SSSet, y::Vector_or_SSSet, k, metric::Metric, ε::E) where E + for j in k + nzcol = 0 + for i in eachindex(x) + @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals, i) # push to the thread-specific row array + nzcol += 1 + end + end + append!(colvals, fill(j, (nzcol,))) + end + return nothing +end + function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(y) - # We create an `Array` of `Array`s, for each thread to have its + # We create an `Array` of `Array`s, for each task to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + + # For load balancing reasons, we will use 2 tasks per CPU thread that we have + # available. + ntasks = Threads.nthreads() + chunks = ntuple(ntasks) do ichunk + # Adapted from JuliaFolds2/ChunkSplitters.jl, MIT license, credit to the authors. + n_per_chunk, n_remaining = divrem(length(y), ntasks) + first = 1 + (ichunk - 1) * n_per_chunk + ifelse(ichunk <= n_remaining, ichunk - 1, n_remaining) + last = (first - 1) + n_per_chunk + ifelse(ichunk <= n_remaining, 1, 0) + first:last + end + + rowvals = [Vector{Int}() for _ in 1:ntasks] + colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. - Threads.@threads for j in eachindex(y) - threadn = Threads.threadid() + # We create all tasks in this array comprehension so we have them stored, + # but they are launched at the same time approximately. + tasks = [ + Threads.@spawn _full_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(y), $(k), $(metric), $(ε)) + for (i, k) in enumerate(chunks) + ] + # The array comprehension above scheduled the tasks...now we have to wait on them to make + # sure they are all complete before we access the results. + foreach(fetch, tasks) + # Now that we know all tasks are complete, we can merge the results. + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array + nzvals = fill(true, (length(finalrows),)) + return sparse(finalrows, finalcols, nzvals, length(x), length(y)) +end + +function _lower_triangular_task_worker!(rowvals, colvals, x::Vector_or_SSSet, k, metric::Metric, ε::E) where E + for j in k nzcol = 0 - for i in eachindex(x) - @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array + for i in 1:j + @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals, i) # push to the thread-specific row array nzcol += 1 end end - append!(colvals[threadn], fill(j, (nzcol,))) + append!(colvals, fill(j, (nzcol,))) end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array - nzvals = fill(true, (length(finalrows),)) - return sparse(finalrows, finalcols, nzvals, length(x), length(y)) + return nothing end function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @@ -91,25 +131,20 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) # We create an `Array` of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + + ntasks = Threads.nthreads() + + rowvals = [Vector{Int}() for _ in 1:ntasks] + colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. - Threads.@threads for k in partition_indices(length(x)) - threadn = Threads.threadid() - for j in k - nzcol = 0 - for i in 1:j - @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array - nzcol += 1 - end - end - append!(colvals[threadn], fill(j, (nzcol,))) - end - end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + tasks = [ + Threads.@spawn _lower_triangular_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(k), $(metric), $(ε)) + for (i, k) in enumerate(partition_indices(length(x), ntasks)) + ] + foreach(fetch, tasks) + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end