diff --git a/Project.toml b/Project.toml index 0355663..ef5b88e 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "RecurrenceAnalysis" uuid = "639c3291-70d9-5ea2-8c5b-839eba1ee399" repo = "https://github.com/JuliaDynamics/RecurrenceAnalysis.jl.git" -version = "2.1.1" +version = "2.1.2" [deps] DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab" diff --git a/src/matrices/distance_matrix.jl b/src/matrices/distance_matrix.jl index b588812..7df5508 100644 --- a/src/matrices/distance_matrix.jl +++ b/src/matrices/distance_matrix.jl @@ -97,7 +97,7 @@ end # Again, we'll define the serial version first: function _distancematrix(x::Array_or_SSSet, metric::Metric, ::Val{false}) - d = zeros(eltype(x), length(x), length(x)) + d = zeros(eltype(eltype(x)), length(x), length(x)) for j in 2:length(x) for i in 1:j-1 # all else is zero @inbounds d[i, j] = evaluate(metric, x[i], x[j]) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 8cfd142..80320e0 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -67,10 +67,16 @@ function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metri # multiple threads pushing to the same `Array` (`Array`s are not atomic). rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + # Channel to manage `Array`s to be used in each iteration + nbuffers = Threads.nthreads() + threadchannel = Channel{Int}(nbuffers) # for rows and columns + for i in 1:nbuffers + put!(threadchannel, i) + end # This is the same logic as the serial function, but parallelized. Threads.@threads for j in eachindex(y) - threadn = Threads.threadid() + threadn = take!(threadchannel) nzcol = 0 for i in eachindex(x) @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) @@ -79,9 +85,12 @@ function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metri end end append!(colvals[threadn], fill(j, (nzcol,))) + put!(threadchannel, threadn) end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + close(threadchannel) + + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end @@ -93,10 +102,16 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) # multiple threads pushing to the same `Array` (`Array`s are not atomic). rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + # Channel to manage `Array`s to be used in each iteration + nbuffers = Threads.nthreads() + threadchannel = Channel{Int}(nbuffers) # for rows and columns + for i in 1:nbuffers + put!(threadchannel, i) + end # This is the same logic as the serial function, but parallelized. Threads.@threads for k in partition_indices(length(x)) - threadn = Threads.threadid() + threadn = take!(threadchannel) for j in k nzcol = 0 for i in 1:j @@ -107,9 +122,12 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) end append!(colvals[threadn], fill(j, (nzcol,))) end + put!(threadchannel, threadn) end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + close(threadchannel) + + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end