From c061d9f19d38790884b316b7fba1e1ff5ca01b7a Mon Sep 17 00:00:00 2001 From: Anshul Singhvi Date: Thu, 10 Apr 2025 13:33:02 -0400 Subject: [PATCH 1/4] Update parallel vs serial benchmarks --- benchmarks/parallel_vs_serial.jl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/benchmarks/parallel_vs_serial.jl b/benchmarks/parallel_vs_serial.jl index 7bd2dc2..261254d 100644 --- a/benchmarks/parallel_vs_serial.jl +++ b/benchmarks/parallel_vs_serial.jl @@ -1,7 +1,9 @@ #= This file benchmarks the parallel and serial computation of a recurrence matrix =# -using DynamicalSystemsBase, RecurrenceAnalysis +using RecurrenceAnalysis + +using DynamicalSystemsBase, PredefinedDynamicalSystems using Statistics, BenchmarkTools, Base.Threads function pretty_time(b::BenchmarkTools.Trial) @@ -17,12 +19,12 @@ function pretty_time(b::BenchmarkTools.Trial) end Ns = round.(Int, 10.0 .^ (2:0.5:4.5)) -ro = Systems.roessler() +ro = PredefinedDynamicalSystems.roessler() println("I am using $(nthreads()) threads.") for N in Ns # set up datasets - tr = trajectory(ro, N*0.1; dt = 0.1, Tr = 10.0) + tr, ts = trajectory(ro, N*0.1; Δt = 0.1, Ttr = 10.0) printstyled("For N = $(length(tr))..."; color = :blue, bold = true) println() x = tr[:, 1] From ec10f7f91b5ff1b613e311b9b9f08a66c4394ddd Mon Sep 17 00:00:00 2001 From: Anshul Singhvi Date: Thu, 10 Apr 2025 13:33:21 -0400 Subject: [PATCH 2/4] Make `partition_indices` take an `ntasks` argument --- src/matrices/distance_matrix.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/matrices/distance_matrix.jl b/src/matrices/distance_matrix.jl index b588812..d387635 100644 --- a/src/matrices/distance_matrix.jl +++ b/src/matrices/distance_matrix.jl @@ -118,13 +118,13 @@ end # by utilizing the fact that the area is proportional to the square of the height. # It partitions the "triangle" which needs to be computed into "trapezoids", # which all have an equal area. -function partition_indices(len) - indices = Vector{UnitRange{Int}}(undef, Threads.nthreads()) +function partition_indices(len, ntasks = Threads.nthreads()) + indices = Vector{UnitRange{Int}}(undef, ntasks) length = len offset = 1 # Here, we have to iterate in reverse, to get an equal area every time - for n in Threads.nthreads():-1:1 + for n in ntasks:-1:1 partition = round(Int, length / sqrt(n)) # length varies as square root of area indices[n] = offset:(partition .+ offset .- 1) length -= partition From 8ab9541c115c64113e30b53c5dd5c2fd8fa1687f Mon Sep 17 00:00:00 2001 From: Anshul Singhvi Date: Thu, 10 Apr 2025 13:43:20 -0400 Subject: [PATCH 3/4] use a task spawning based approach and per task arrays --- src/matrices/recurrence_matrix_low.jl | 79 +++++++++++++++++---------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 8cfd142..63a513e 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -62,26 +62,42 @@ end # Core function function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(y) - # We create an `Array` of `Array`s, for each thread to have its + # We create an `Array` of `Array`s, for each task to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + + # For load balancing reasons, we will use 2 tasks per CPU thread that we have + # available. + ntasks = Threads.nthreads() * 2 + rowvals = [Vector{Int}() for _ in 1:ntasks] + colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. - Threads.@threads for j in eachindex(y) - threadn = Threads.threadid() - nzcol = 0 - for i in eachindex(x) - @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array - nzcol += 1 + # We create all tasks in this array comprehension so we have them stored, + # but they are launched at the same time approximately. + # + tasks = [ + Threads.@spawn begin + for j in eachindex(y) + taskn = $i + nzcol = 0 + for i in eachindex(x) + @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals[taskn], i) # push to the thread-specific row array + nzcol += 1 + end + end + append!(colvals[taskn], fill(j, (nzcol,))) end end - append!(colvals[threadn], fill(j, (nzcol,))) - end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + for i in 1:ntasks + ] + # The array comprehension above scheduled the tasks...now we have to wait on them to make + # sure they are all complete before we access the results. + foreach(wait, tasks) + # Now that we know all tasks are complete, we can merge the results. + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end @@ -91,25 +107,32 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) # We create an `Array` of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + + ntasks = Threads.nthreads() * 2 + + rowvals = [Vector{Int}() for _ in 1:ntasks] + colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. - Threads.@threads for k in partition_indices(length(x)) - threadn = Threads.threadid() - for j in k - nzcol = 0 - for i in 1:j - @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array - nzcol += 1 + tasks = [ + Threads.@spawn begin + threadn = $i # note the `$` that does "interpolation" into the task + for j in $k + nzcol = 0 + for i in 1:j + @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals[threadn], i) # push to the thread-specific row array + nzcol += 1 + end end + append!(colvals[threadn], fill(j, (nzcol,))) end - append!(colvals[threadn], fill(j, (nzcol,))) end - end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + for (i, k) in enumerate(partition_indices(length(x), ntasks)) + ] + foreach(wait, tasks) + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end From a9aa0a5abf2e121eb3e847077a4ffe6931da9651 Mon Sep 17 00:00:00 2001 From: Anshul Singhvi Date: Thu, 10 Apr 2025 14:42:50 -0400 Subject: [PATCH 4/4] move to workers? the runtime with threads is just outrageous...not sure why --- src/matrices/recurrence_matrix_low.jl | 76 ++++++++++++++++----------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 63a513e..6d6d94c 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -60,6 +60,21 @@ end # Now, we define the parallel versions of these functions. # Core function + +function _full_task_worker!(rowvals, colvals, x::Vector_or_SSSet, y::Vector_or_SSSet, k, metric::Metric, ε::E) where E + for j in k + nzcol = 0 + for i in eachindex(x) + @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals, i) # push to the thread-specific row array + nzcol += 1 + end + end + append!(colvals, fill(j, (nzcol,))) + end + return nothing +end + function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(y) # We create an `Array` of `Array`s, for each task to have its @@ -68,33 +83,28 @@ function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metri # For load balancing reasons, we will use 2 tasks per CPU thread that we have # available. - ntasks = Threads.nthreads() * 2 + ntasks = Threads.nthreads() + chunks = ntuple(ntasks) do ichunk + # Adapted from JuliaFolds2/ChunkSplitters.jl, MIT license, credit to the authors. + n_per_chunk, n_remaining = divrem(length(y), ntasks) + first = 1 + (ichunk - 1) * n_per_chunk + ifelse(ichunk <= n_remaining, ichunk - 1, n_remaining) + last = (first - 1) + n_per_chunk + ifelse(ichunk <= n_remaining, 1, 0) + first:last + end + rowvals = [Vector{Int}() for _ in 1:ntasks] colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. # We create all tasks in this array comprehension so we have them stored, # but they are launched at the same time approximately. - # tasks = [ - Threads.@spawn begin - for j in eachindex(y) - taskn = $i - nzcol = 0 - for i in eachindex(x) - @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[taskn], i) # push to the thread-specific row array - nzcol += 1 - end - end - append!(colvals[taskn], fill(j, (nzcol,))) - end - end - for i in 1:ntasks + Threads.@spawn _full_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(y), $(k), $(metric), $(ε)) + for (i, k) in enumerate(chunks) ] # The array comprehension above scheduled the tasks...now we have to wait on them to make # sure they are all complete before we access the results. - foreach(wait, tasks) + foreach(fetch, tasks) # Now that we know all tasks are complete, we can merge the results. finalrows = reduce(vcat, rowvals) # merge into one array finalcols = reduce(vcat, colvals) # merge into one array @@ -102,35 +112,37 @@ function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metri return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end +function _lower_triangular_task_worker!(rowvals, colvals, x::Vector_or_SSSet, k, metric::Metric, ε::E) where E + for j in k + nzcol = 0 + for i in 1:j + @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) + push!(rowvals, i) # push to the thread-specific row array + nzcol += 1 + end + end + append!(colvals, fill(j, (nzcol,))) + end + return nothing +end + function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(x) # We create an `Array` of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - ntasks = Threads.nthreads() * 2 + ntasks = Threads.nthreads() rowvals = [Vector{Int}() for _ in 1:ntasks] colvals = [Vector{Int}() for _ in 1:ntasks] # This is the same logic as the serial function, but parallelized. tasks = [ - Threads.@spawn begin - threadn = $i # note the `$` that does "interpolation" into the task - for j in $k - nzcol = 0 - for i in 1:j - @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array - nzcol += 1 - end - end - append!(colvals[threadn], fill(j, (nzcol,))) - end - end + Threads.@spawn _lower_triangular_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(k), $(metric), $(ε)) for (i, k) in enumerate(partition_indices(length(x), ntasks)) ] - foreach(wait, tasks) + foreach(fetch, tasks) finalrows = reduce(vcat, rowvals) # merge into one array finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),))