From 6d0dc54b27ff840118858772a2cd4ba62cbdfa85 Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Sat, 12 Apr 2025 09:45:45 +0200 Subject: [PATCH 1/7] try Channel storage with threaded `recurrence_matrix` --- src/matrices/recurrence_matrix_low.jl | 51 ++++++++++++++++++--------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 8cfd142..c194de6 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -62,54 +62,73 @@ end # Core function function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(y) - # We create an `Array` of `Array`s, for each thread to have its + # We create a Channel for `Array`s of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + nbuffers = Threads.nthreads() + threadchannel = Channel{NTuple{2, Vector{Int}}}(nbuffers) # for rows and columns + foreach(1:nbuffers) do _ + put!(threadchannel, (Int[], Int[])) + end # This is the same logic as the serial function, but parallelized. Threads.@threads for j in eachindex(y) - threadn = Threads.threadid() + rowvals, colvals = take!(threadchannel) nzcol = 0 for i in eachindex(x) @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array + push!(rowvals, i) # push to the thread-specific row array nzcol += 1 end end - append!(colvals[threadn], fill(j, (nzcol,))) + append!(colvals, fill(j, (nzcol,))) + put!(threadchannel, (rowvals, colvals)) + end + # merge into one array + finalrows = Int[] + finalcols = Int[] + foreach(1:nbuffers) do _ + rowvals, colvals = take!(threadchannel) + append!(finalrows, rowvals) + append!(finalcols, colvals) end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array nzvals = fill(true, (length(finalrows),)) return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(x) - # We create an `Array` of `Array`s, for each thread to have its + # We create a Channel for `Array`s of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). - rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] - colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + nbuffers = Threads.nthreads() + threadchannel = Channel{NTuple{2, Vector{Int}}}(nbuffers) # for rows and columns + foreach(1:nbuffers) do _ + put!(threadchannel, (Int[], Int[])) + end # This is the same logic as the serial function, but parallelized. Threads.@threads for k in partition_indices(length(x)) - threadn = Threads.threadid() + rowvals, colvals = take!(threadchannel) for j in k nzcol = 0 for i in 1:j @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals[threadn], i) # push to the thread-specific row array + push!(rowvals, i) # push to the thread-specific row array nzcol += 1 end end - append!(colvals[threadn], fill(j, (nzcol,))) + append!(colvals, fill(j, (nzcol,))) end end - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + # merge into one array + finalrows = Int[] + finalcols = Int[] + foreach(1:nbuffers) do _ + rowvals, colvals = take!(threadchannel) + append!(finalrows, rowvals) + append!(finalcols, colvals) + end nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end From 3b26b1a2958e3009bcb7d778b8abbc30ab028354 Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Sat, 12 Apr 2025 10:31:30 +0200 Subject: [PATCH 2/7] add missing line to put! data back into Channel. --- src/matrices/recurrence_matrix_low.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index c194de6..b3184dd 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -120,6 +120,7 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) end append!(colvals, fill(j, (nzcol,))) end + put!(threadchannel, (rowvals, colvals)) end # merge into one array finalrows = Int[] From 916e3bdbdbc6f81d677c1745310d228a25bb0bbd Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Sun, 13 Apr 2025 17:49:05 +0200 Subject: [PATCH 3/7] restrict versions of StateSpaceSets --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 0355663..a5cbbc3 100644 --- a/Project.toml +++ b/Project.toml @@ -20,7 +20,7 @@ DelimitedFiles = "1" Distances = "0.8, 0.9, 0.10" Graphs = "1.6" Reexport = "1" -StateSpaceSets = "2" +StateSpaceSets = "2.0, 2.1, 2.2, 2.3" Statistics = "1" UnicodePlots = "0.3, 1, 2, 3" julia = "1.5" From 12c315efdfd9593967df5f47ee9d22c80525fd65 Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Sun, 13 Apr 2025 18:07:30 +0200 Subject: [PATCH 4/7] fix `_distancematrix` according to StateSpaceSets v2.4 --- Project.toml | 2 +- src/matrices/distance_matrix.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Project.toml b/Project.toml index a5cbbc3..0355663 100644 --- a/Project.toml +++ b/Project.toml @@ -20,7 +20,7 @@ DelimitedFiles = "1" Distances = "0.8, 0.9, 0.10" Graphs = "1.6" Reexport = "1" -StateSpaceSets = "2.0, 2.1, 2.2, 2.3" +StateSpaceSets = "2" Statistics = "1" UnicodePlots = "0.3, 1, 2, 3" julia = "1.5" diff --git a/src/matrices/distance_matrix.jl b/src/matrices/distance_matrix.jl index b588812..7df5508 100644 --- a/src/matrices/distance_matrix.jl +++ b/src/matrices/distance_matrix.jl @@ -97,7 +97,7 @@ end # Again, we'll define the serial version first: function _distancematrix(x::Array_or_SSSet, metric::Metric, ::Val{false}) - d = zeros(eltype(x), length(x), length(x)) + d = zeros(eltype(eltype(x)), length(x), length(x)) for j in 2:length(x) for i in 1:j-1 # all else is zero @inbounds d[i, j] = evaluate(metric, x[i], x[j]) From 3b567e9db9235e4709d3fc0b4e6f057cd9acbe3f Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Fri, 18 Apr 2025 10:43:31 +0200 Subject: [PATCH 5/7] change channels to keep vector indices instead of vectors --- src/matrices/recurrence_matrix_low.jl | 60 +++++++++++++-------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index b3184dd..4e21de4 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -62,74 +62,74 @@ end # Core function function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(y) - # We create a Channel for `Array`s of `Array`s, for each thread to have its + # We create an `Array` of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). + rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + # Channel to manage `Array`s to be used in each iteration nbuffers = Threads.nthreads() - threadchannel = Channel{NTuple{2, Vector{Int}}}(nbuffers) # for rows and columns - foreach(1:nbuffers) do _ - put!(threadchannel, (Int[], Int[])) + threadchannel = Channel{Int}(nbuffers) # for rows and columns + for i in 1:nbuffers + put!(threadchannel, i) end # This is the same logic as the serial function, but parallelized. Threads.@threads for j in eachindex(y) - rowvals, colvals = take!(threadchannel) + threadn = take!(threadchannel) nzcol = 0 for i in eachindex(x) @inbounds if evaluate(metric, x[i], y[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals, i) # push to the thread-specific row array + push!(rowvals[threadn], i) # push to the thread-specific row array nzcol += 1 end end - append!(colvals, fill(j, (nzcol,))) - put!(threadchannel, (rowvals, colvals)) + append!(colvals[threadn], fill(j, (nzcol,))) + put!(threadchannel, threadn) end + close(threadchannel) + # merge into one array - finalrows = Int[] - finalcols = Int[] - foreach(1:nbuffers) do _ - rowvals, colvals = take!(threadchannel) - append!(finalrows, rowvals) - append!(finalcols, colvals) - end + finalrows = vcat(rowvals...) # merge into one array + finalcols = vcat(colvals...) # merge into one array nzvals = fill(true, (length(finalrows),)) return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) @assert ε isa Real || length(ε) == length(x) - # We create a Channel for `Array`s of `Array`s, for each thread to have its + # We create an `Array` of `Array`s, for each thread to have its # own array to push to. This avoids race conditions with # multiple threads pushing to the same `Array` (`Array`s are not atomic). + rowvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + colvals = [Vector{Int}() for _ in 1:Threads.nthreads()] + # Channel to manage `Array`s to be used in each iteration nbuffers = Threads.nthreads() - threadchannel = Channel{NTuple{2, Vector{Int}}}(nbuffers) # for rows and columns - foreach(1:nbuffers) do _ - put!(threadchannel, (Int[], Int[])) + threadchannel = Channel{Int}(nbuffers) # for rows and columns + for i in 1:nbuffers + put!(threadchannel, i) end # This is the same logic as the serial function, but parallelized. Threads.@threads for k in partition_indices(length(x)) - rowvals, colvals = take!(threadchannel) + threadn = take!(threadchannel) for j in k nzcol = 0 for i in 1:j @inbounds if evaluate(metric, x[i], x[j]) ≤ ( (ε isa Real) ? ε : ε[j] ) - push!(rowvals, i) # push to the thread-specific row array + push!(rowvals[threadn], i) # push to the thread-specific row array nzcol += 1 end end - append!(colvals, fill(j, (nzcol,))) + append!(colvals[threadn], fill(j, (nzcol,))) end - put!(threadchannel, (rowvals, colvals)) + put!(threadchannel, threadn) end + close(threadchannel) + # merge into one array - finalrows = Int[] - finalcols = Int[] - foreach(1:nbuffers) do _ - rowvals, colvals = take!(threadchannel) - append!(finalrows, rowvals) - append!(finalcols, colvals) - end + finalrows = vcat(rowvals...) # merge into one array + finalcols = vcat(colvals...) # merge into one array nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end From ea7d265fe8809f5f9bb757f471ecf7a9ebfe6ac0 Mon Sep 17 00:00:00 2001 From: Helios De Rosario Date: Fri, 18 Apr 2025 23:55:23 +0200 Subject: [PATCH 6/7] change vcat+splat by reduce+vcat --- src/matrices/recurrence_matrix_low.jl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/matrices/recurrence_matrix_low.jl b/src/matrices/recurrence_matrix_low.jl index 4e21de4..80320e0 100644 --- a/src/matrices/recurrence_matrix_low.jl +++ b/src/matrices/recurrence_matrix_low.jl @@ -89,9 +89,8 @@ function recurrence_matrix(x::Vector_or_SSSet, y::Vector_or_SSSet, metric::Metri end close(threadchannel) - # merge into one array - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return sparse(finalrows, finalcols, nzvals, length(x), length(y)) end @@ -127,9 +126,8 @@ function recurrence_matrix(x::Vector_or_SSSet, metric::Metric, ε, ::Val{true}) end close(threadchannel) - # merge into one array - finalrows = vcat(rowvals...) # merge into one array - finalcols = vcat(colvals...) # merge into one array + finalrows = reduce(vcat, rowvals) # merge into one array + finalcols = reduce(vcat, colvals) # merge into one array nzvals = fill(true, (length(finalrows),)) return Symmetric(sparse(finalrows, finalcols, nzvals, length(x), length(x)), :U) end From 62590d38c9bf13e637d7483eeeb8866e4e9c14c3 Mon Sep 17 00:00:00 2001 From: George Datseris Date: Mon, 19 May 2025 08:06:44 +0100 Subject: [PATCH 7/7] Update Project.toml --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 0355663..ef5b88e 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "RecurrenceAnalysis" uuid = "639c3291-70d9-5ea2-8c5b-839eba1ee399" repo = "https://github.com/JuliaDynamics/RecurrenceAnalysis.jl.git" -version = "2.1.1" +version = "2.1.2" [deps] DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab"