Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions benchmarks/parallel_vs_serial.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#=
This file benchmarks the parallel and serial computation of a recurrence matrix
=#
using DynamicalSystemsBase, RecurrenceAnalysis
using RecurrenceAnalysis

using DynamicalSystemsBase, PredefinedDynamicalSystems
using Statistics, BenchmarkTools, Base.Threads

function pretty_time(b::BenchmarkTools.Trial)
Expand All @@ -17,12 +19,12 @@ function pretty_time(b::BenchmarkTools.Trial)
end

Ns = round.(Int, 10.0 .^ (2:0.5:4.5))
ro = Systems.roessler()
ro = PredefinedDynamicalSystems.roessler()

println("I am using $(nthreads()) threads.")
for N in Ns
# set up datasets
tr = trajectory(ro, N*0.1; dt = 0.1, Tr = 10.0)
tr, ts = trajectory(ro, N*0.1; Δt = 0.1, Ttr = 10.0)
printstyled("For N = $(length(tr))..."; color = :blue, bold = true)
println()
x = tr[:, 1]
Expand Down
6 changes: 3 additions & 3 deletions src/matrices/distance_matrix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ end
# by utilizing the fact that the area is proportional to the square of the height.
# It partitions the "triangle" which needs to be computed into "trapezoids",
# which all have an equal area.
function partition_indices(len)
indices = Vector{UnitRange{Int}}(undef, Threads.nthreads())
function partition_indices(len, ntasks = Threads.nthreads())
indices = Vector{UnitRange{Int}}(undef, ntasks)
length = len
offset = 1

# Here, we have to iterate in reverse, to get an equal area every time
for n in Threads.nthreads():-1:1
for n in ntasks:-1:1
partition = round(Int, length / sqrt(n)) # length varies as square root of area
indices[n] = offset:(partition .+ offset .- 1)
length -= partition
Expand Down
95 changes: 65 additions & 30 deletions src/matrices/recurrence_matrix_low.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,56 +60,91 @@ end
# Now, we define the parallel versions of these functions.

# Core function

function _full_task_worker!(rowvals, colvals, x::Vector_or_SSSet, y::Vector_or_SSSet, k, metric::Metric, ε::E) where E
for j in k
nzcol = 0
for i in eachindex(x)
@inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] )
push!(rowvals, i) # push to the thread-specific row array
nzcol += 1
end
end
append!(colvals, fill(j, (nzcol,)))
end
return nothing
end

function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true})
@assert ε isa Real || length(ε) == length(y)
# We create an `Array` of `Array`s, for each thread to have its
# We create an `Array` of `Array`s, for each task to have its
# own array to push to. This avoids race conditions with
# multiple threads pushing to the same `Array` (`Array`s are not atomic).
rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()]
colvals = [Vector{Int}() for _ in 1:Threads.nthreads()]

# For load balancing reasons, we will use 2 tasks per CPU thread that we have
# available.
ntasks = Threads.nthreads()
chunks = ntuple(ntasks) do ichunk
# Adapted from JuliaFolds2/ChunkSplitters.jl, MIT license, credit to the authors.
n_per_chunk, n_remaining = divrem(length(y), ntasks)
first = 1 + (ichunk - 1) * n_per_chunk + ifelse(ichunk <= n_remaining, ichunk - 1, n_remaining)
last = (first - 1) + n_per_chunk + ifelse(ichunk <= n_remaining, 1, 0)
first:last
end

rowvals = [Vector{Int}() for _ in 1:ntasks]
colvals = [Vector{Int}() for _ in 1:ntasks]

# This is the same logic as the serial function, but parallelized.
Threads.@threads for j in eachindex(y)
threadn = Threads.threadid()
# We create all tasks in this array comprehension so we have them stored,
# but they are launched at the same time approximately.
tasks = [
Threads.@spawn _full_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(y), $(k), $(metric), $(ε))
for (i, k) in enumerate(chunks)
]
# The array comprehension above scheduled the tasks...now we have to wait on them to make
# sure they are all complete before we access the results.
foreach(fetch, tasks)
# Now that we know all tasks are complete, we can merge the results.
finalrows = reduce(vcat, rowvals) # merge into one array
finalcols = reduce(vcat, colvals) # merge into one array
nzvals = fill(true, (length(finalrows),))
return sparse(finalrows, finalcols, nzvals, length(x), length(y))
end

function _lower_triangular_task_worker!(rowvals, colvals, x::Vector_or_SSSet, k, metric::Metric, ε::E) where E
for j in k
nzcol = 0
for i in eachindex(x)
@inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] )
push!(rowvals[threadn], i) # push to the thread-specific row array
for i in 1:j
@inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] )
push!(rowvals, i) # push to the thread-specific row array
nzcol += 1
end
end
append!(colvals[threadn], fill(j, (nzcol,)))
append!(colvals, fill(j, (nzcol,)))
end
finalrows = vcat(rowvals...) # merge into one array
finalcols = vcat(colvals...) # merge into one array
nzvals = fill(true, (length(finalrows),))
return sparse(finalrows, finalcols, nzvals, length(x), length(y))
return nothing
end

function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true})
@assert ε isa Real || length(ε) == length(x)
# We create an `Array` of `Array`s, for each thread to have its
# own array to push to. This avoids race conditions with
# multiple threads pushing to the same `Array` (`Array`s are not atomic).
rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()]
colvals = [Vector{Int}() for _ in 1:Threads.nthreads()]

ntasks = Threads.nthreads()

rowvals = [Vector{Int}() for _ in 1:ntasks]
colvals = [Vector{Int}() for _ in 1:ntasks]

# This is the same logic as the serial function, but parallelized.
Threads.@threads for k in partition_indices(length(x))
threadn = Threads.threadid()
for j in k
nzcol = 0
for i in 1:j
@inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] )
push!(rowvals[threadn], i) # push to the thread-specific row array
nzcol += 1
end
end
append!(colvals[threadn], fill(j, (nzcol,)))
end
end
finalrows = vcat(rowvals...) # merge into one array
finalcols = vcat(colvals...) # merge into one array
tasks = [
Threads.@spawn _lower_triangular_task_worker!($(rowvals[i]), $(colvals[i]), $(x), $(k), $(metric), $(ε))
for (i, k) in enumerate(partition_indices(length(x), ntasks))
]
foreach(fetch, tasks)
finalrows = reduce(vcat, rowvals) # merge into one array
finalcols = reduce(vcat, colvals) # merge into one array
nzvals = fill(true, (length(finalrows),))
return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U)
end
Loading