From d8a87306a8295a155b03b19f4ad23a4248d4cd68 Mon Sep 17 00:00:00 2001 From: Cody Tapscott Date: Wed, 14 May 2025 08:47:48 -0400 Subject: [PATCH] Add missing `parallel=:threads` implementations These implementations are extremely basic, but they try to follow the patterns in the other parts of the Parallel module. My real motivation is to be able to move the Distributed implementations into an extension, so that Graphs.jl does not depend on Distributed. --- src/Parallel/distance.jl | 41 ++++++++++++++++++++-- src/Parallel/shortestpaths/dijkstra.jl | 44 ++++++++++++++++++++++-- src/Parallel/traversals/greedy_color.jl | 36 +++++++++++++++++-- test/parallel/distance.jl | 12 +++++-- test/parallel/shortestpaths/dijkstra.jl | 22 ++++++++---- test/parallel/traversals/greedy_color.jl | 18 ++++++++-- 6 files changed, 152 insertions(+), 21 deletions(-) diff --git a/src/Parallel/distance.jl b/src/Parallel/distance.jl index 3145339f3..f16df61ea 100644 --- a/src/Parallel/distance.jl +++ b/src/Parallel/distance.jl @@ -1,20 +1,55 @@ # used in shortest path calculations function eccentricity( + g::AbstractGraph, + vs=vertices(g), + distmx::AbstractMatrix{T}=weights(g); + parallel::Symbol=:distributed, +) where {T<:Number} + return if parallel === :threads + threaded_eccentricity(g, vs, distmx) + elseif parallel === :distributed + distr_eccentricity(g, vs, distmx) + else + throw( + ArgumentError( + "Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')", + ), + ) + end +end + +function distr_eccentricity( g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g) ) where {T<:Number} vlen = length(vs) eccs = SharedVector{T}(vlen) @sync @distributed for i in 1:vlen - eccs[i] = maximum(Graphs.dijkstra_shortest_paths(g, vs[i], distmx).dists) + local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx) + eccs[i] = maximum(d.dists) end d = sdata(eccs) maximum(d) == typemax(T) && @warn("Infinite path length detected") return d end -function eccentricity(g::AbstractGraph, distmx::AbstractMatrix) - return eccentricity(g, vertices(g), distmx) +function threaded_eccentricity( + g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g) +) where {T<:Number} + vlen = length(vs) + eccs = Vector{T}(undef, vlen) + Base.Threads.@threads for i in 1:vlen + d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx) + eccs[i] = maximum(d.dists) + end + maximum(eccs) == typemax(T) && @warn("Infinite path length detected") + return eccs +end + +function eccentricity( + g::AbstractGraph, distmx::AbstractMatrix; parallel::Symbol=:distributed +) + return eccentricity(g, vertices(g), distmx; parallel) end function diameter(g::AbstractGraph, distmx::AbstractMatrix=weights(g)) diff --git a/src/Parallel/shortestpaths/dijkstra.jl b/src/Parallel/shortestpaths/dijkstra.jl index 70363cf93..7091e782b 100644 --- a/src/Parallel/shortestpaths/dijkstra.jl +++ b/src/Parallel/shortestpaths/dijkstra.jl @@ -9,14 +9,54 @@ struct MultipleDijkstraState{T<:Number,U<:Integer} <: AbstractPathState end """ - Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g)) + Parallel.dijkstra_shortest_paths(g, sources=vertices(g), distmx=weights(g), parallel=:distributed) Compute the shortest paths between all pairs of vertices in graph `g` by running [`dijkstra_shortest_paths`] for every vertex and using an optional list of source vertex `sources` and an optional distance matrix `distmx`. Return a [`Parallel.MultipleDijkstraState`](@ref) with relevant -traversal information. +traversal information. The `parallel` argument can be set to `:threads` or `:distributed` for multi- +threaded or multi-process parallelism, respectively. """ function dijkstra_shortest_paths( + g::AbstractGraph{U}, + sources=vertices(g), + distmx::AbstractMatrix{T}=weights(g); + parallel::Symbol=:distributed, +) where {T<:Number} where {U} + return if parallel === :threads + threaded_dijkstra_shortest_paths(g, sources, distmx) + elseif parallel === :distributed + distr_dijkstra_shortest_paths(g, sources, distmx) + else + throw( + ArgumentError( + "Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')", + ), + ) + end +end + +function threaded_dijkstra_shortest_paths( + g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g) +) where {T<:Number} where {U} + n_v = nv(g) + r_v = length(sources) + + # TODO: remove `Int` once julialang/#23029 / #23032 are resolved + dists = Matrix{T}(undef, Int(r_v), Int(n_v)) + parents = Matrix{U}(undef, Int(r_v), Int(n_v)) + + Base.Threads.@threads for i in 1:r_v + state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx) + dists[i, :] = state.dists + parents[i, :] = state.parents + end + + result = MultipleDijkstraState(dists, parents) + return result +end + +function distr_dijkstra_shortest_paths( g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g) ) where {T<:Number} where {U} n_v = nv(g) diff --git a/src/Parallel/traversals/greedy_color.jl b/src/Parallel/traversals/greedy_color.jl index 6eb0c1e8e..b6ebf1880 100644 --- a/src/Parallel/traversals/greedy_color.jl +++ b/src/Parallel/traversals/greedy_color.jl @@ -1,4 +1,31 @@ -function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer} +function random_greedy_color( + g::AbstractGraph{T}, reps::Integer; parallel::Symbol=:distributed +) where {T<:Integer} + return if parallel === :threads + threaded_random_greedy_color(g, reps) + elseif parallel === :distributed + distr_random_greedy_color(g, reps) + else + throw( + ArgumentError( + "Unsupported parallel argument '$(repr(parallel))' (supported: ':threads' or ':distributed')", + ), + ) + end +end + +function threaded_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer} + local_best = Vector{Graphs.Coloring{T}}(undef, reps) + Base.Threads.@threads for i in 1:reps + seq = shuffle(vertices(g)) + local_best[i] = Graphs.perm_greedy_color(g, seq) + end + best = reduce(Graphs.best_color, local_best) + + return convert(Graphs.Coloring{T}, best) +end + +function distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer} best = @distributed (Graphs.best_color) for i in 1:reps seq = shuffle(vertices(g)) Graphs.perm_greedy_color(g, seq) @@ -8,11 +35,14 @@ function random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integ end function greedy_color( - g::AbstractGraph{U}; sort_degree::Bool=false, reps::Integer=1 + g::AbstractGraph{U}; + sort_degree::Bool=false, + reps::Integer=1, + parallel::Symbol=:distributed, ) where {U<:Integer} return if sort_degree Graphs.degree_greedy_color(g) else - Parallel.random_greedy_color(g, reps) + Parallel.random_greedy_color(g, reps; parallel) end end diff --git a/test/parallel/distance.jl b/test/parallel/distance.jl index 16db54e06..115114ae5 100644 --- a/test/parallel/distance.jl +++ b/test/parallel/distance.jl @@ -1,4 +1,4 @@ -@testset "Parallel.Distance" begin +@testset "Parallel.Distance" for parallel in [:threads, :distributed] g4 = path_digraph(5) adjmx1 = [0 1 0; 1 0 1; 0 1 0] # graph adjmx2 = [0 1 0; 1 0 1; 1 1 0] # digraph @@ -9,7 +9,7 @@ for g in testgraphs(a1) z = @inferred(Graphs.eccentricity(g, distmx1)) - y = @inferred(Parallel.eccentricity(g, distmx1)) + y = @inferred(Parallel.eccentricity(g, distmx1; parallel)) @test isapprox(y, z) @test @inferred(Graphs.diameter(y)) == @inferred(Parallel.diameter(g, distmx1)) == @@ -21,9 +21,15 @@ @test @inferred(Graphs.center(y)) == @inferred(Parallel.center(g, distmx1)) == [2] end + let g = testgraphs(a1)[1] + # An error should be reported if the parallel mode could not be understood + @test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:thread) + @test_throws ArgumentError Parallel.eccentricity(g, distmx1; parallel=:distriibuted) + end + for g in testdigraphs(a2) z = @inferred(Graphs.eccentricity(g, distmx2)) - y = @inferred(Parallel.eccentricity(g, distmx2)) + y = @inferred(Parallel.eccentricity(g, distmx2; parallel)) @test isapprox(y, z) @test @inferred(Graphs.diameter(y)) == @inferred(Parallel.diameter(g, distmx2)) == diff --git a/test/parallel/shortestpaths/dijkstra.jl b/test/parallel/shortestpaths/dijkstra.jl index 34f948232..ee26ce847 100644 --- a/test/parallel/shortestpaths/dijkstra.jl +++ b/test/parallel/shortestpaths/dijkstra.jl @@ -1,4 +1,4 @@ -@testset "Parallel.Dijkstra" begin +@testset "Parallel.Dijkstra" for parallel in [:threads, :distributed] g4 = path_digraph(5) d1 = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0]) d2 = sparse(float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0])) @@ -8,7 +8,7 @@ for g in testgraphs(g3) z = floyd_warshall_shortest_paths(g, d) - zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d)) + zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel)) @test all(isapprox(z.dists, zp.dists)) for i in 1:5 @@ -21,7 +21,7 @@ end z = floyd_warshall_shortest_paths(g) - zp = @inferred(Parallel.dijkstra_shortest_paths(g)) + zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel)) @test all(isapprox(z.dists, zp.dists)) for i in 1:5 @@ -34,7 +34,7 @@ end z = floyd_warshall_shortest_paths(g) - zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2])) + zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel)) @test all(isapprox(z.dists[1:2, :], zp.dists)) for i in 1:2 @@ -51,9 +51,17 @@ g3 = path_digraph(5) d = float([0 1 2 3 4; 5 0 6 7 8; 9 10 0 11 12; 13 14 15 0 16; 17 18 19 20 0]) + # An error should be reported if the parallel mode could not be understood + @test_throws ArgumentError Parallel.dijkstra_shortest_paths( + testdigraphs(g3)[1], collect(1:5), d; parallel=:thread + ) + @test_throws ArgumentError Parallel.dijkstra_shortest_paths( + testdigraphs(g3)[1], collect(1:5), d; parallel=:distriibuted + ) + for g in testdigraphs(g3) z = floyd_warshall_shortest_paths(g, d) - zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d)) + zp = @inferred(Parallel.dijkstra_shortest_paths(g, collect(1:5), d; parallel)) @test all(isapprox(z.dists, zp.dists)) for i in 1:5 @@ -66,7 +74,7 @@ end z = floyd_warshall_shortest_paths(g) - zp = @inferred(Parallel.dijkstra_shortest_paths(g)) + zp = @inferred(Parallel.dijkstra_shortest_paths(g; parallel)) @test all(isapprox(z.dists, zp.dists)) for i in 1:5 @@ -79,7 +87,7 @@ end z = floyd_warshall_shortest_paths(g) - zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2])) + zp = @inferred(Parallel.dijkstra_shortest_paths(g, [1, 2]; parallel)) @test all(isapprox(z.dists[1:2, :], zp.dists)) for i in 1:2 diff --git a/test/parallel/traversals/greedy_color.jl b/test/parallel/traversals/greedy_color.jl index 65a43a514..65759ae23 100644 --- a/test/parallel/traversals/greedy_color.jl +++ b/test/parallel/traversals/greedy_color.jl @@ -1,8 +1,8 @@ -@testset "Parallel.Greedy Coloring" begin +@testset "Parallel.Greedy Coloring" for parallel in [:threads, :distributed] g3 = star_graph(10) for g in testgraphs(g3) for op_sort in (true, false) - C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort)) + C = @inferred(Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel)) @test C.num_colors == 2 end end @@ -10,10 +10,22 @@ g4 = path_graph(20) g5 = complete_graph(20) + let g = testgraphs(g4)[1] + # An error should be reported if the parallel mode could not be understood + @test_throws ArgumentError Parallel.greedy_color( + g; reps=5, sort_degree=false, parallel=:thread + ) + @test_throws ArgumentError Parallel.greedy_color( + g; reps=5, sort_degree=false, parallel=:distriibuted + ) + end + for graph in [g4, g5] for g in testgraphs(graph) for op_sort in (true, false) - C = @inferred(Parallel.greedy_color(g, reps=5, sort_degree=op_sort)) + C = @inferred( + Parallel.greedy_color(g; reps=5, sort_degree=op_sort, parallel) + ) @test C.num_colors <= maximum(degree(g)) + 1 correct = true