Skip to content

Commit 920f3cf

Browse files
committed
Add metadata support
Add dgraph.txt index file for Graph IO format
1 parent 7d0090c commit 920f3cf

File tree

5 files changed

+355
-38
lines changed

5 files changed

+355
-38
lines changed

lib/DaggerGraphs/Project.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
88
DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab"
99
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
1010
Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
11+
OffsetArrays = "6fe1bfb0-de20-5000-8ca7-80f57d26f881"
12+
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
1113
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
1214

1315
[compat]
14-
Dagger = "0.17, 0.18"
16+
Dagger = "0.18"
1517
Graphs = "1"
18+
OffsetArrays = "1"
1619
Tables = "1"
1720
julia = "1"

lib/DaggerGraphs/src/DaggerGraphs.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ using Dagger
44
import Dagger: Chunk
55
using Distributed
66
using Graphs
7+
import OffsetArrays: OffsetArray
78
import Tables
89

910
export DGraph

lib/DaggerGraphs/src/dgraph.jl

Lines changed: 210 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const ELTYPE = Union{Dagger.EagerThunk, Chunk}
2+
const META_ELTYPE = Union{ELTYPE,Nothing}
23

34
struct DGraphState{T,D}
45
# Whether the graph is "frozen" (immutable) or mutable
@@ -12,6 +13,10 @@ struct DGraphState{T,D}
1213
parts_ne::Vector{T}
1314
# The maximum number of nodes for each of `parts`
1415
parts_v_max::Int
16+
# The vertex metadata for each of `parts`
17+
parts_v_meta::Vector{META_ELTYPE}
18+
# The edge metadata for each of `parts`
19+
parts_e_meta::Vector{META_ELTYPE}
1520

1621
# A set of `AdjList` for each of `parts`
1722
# An edge is present here if either src or dst (but not both) is in
@@ -21,6 +26,22 @@ struct DGraphState{T,D}
2126
bg_adjs_ne::Vector{T}
2227
# The number of edges in each of `bg_adjs` where the source is this partition
2328
bg_adjs_ne_src::Vector{T}
29+
# The edge metadata for each of `bg_adjs`
30+
bg_adjs_e_meta::Vector{META_ELTYPE}
31+
end
32+
function DGraphState{T,D}(chunksize::Integer) where {T,D}
33+
return DGraphState{T,D}(
34+
Ref(false), # frozen
35+
ELTYPE[], # parts
36+
UnitRange{T}[], # parts_nv
37+
T[], # parts_ne
38+
chunksize, # parts_v_max
39+
META_ELTYPE[], # parts_v_meta
40+
META_ELTYPE[], # parts_e_meta
41+
ELTYPE[], # bg_adjs
42+
T[], # bg_adjs_ne
43+
T[], # bg_adjs_ne_src
44+
META_ELTYPE[]) # bg_adjs_e_meta
2445
end
2546
mutable struct DGraph{T,D} <: Graphs.AbstractGraph{T}
2647
# The internal graph state
@@ -32,14 +53,7 @@ mutable struct DGraph{T,D} <: Graphs.AbstractGraph{T}
3253
function DGraph{T}(;chunksize::Integer=8,
3354
directed::Bool=true) where {T}
3455
D = directed
35-
state = DGraphState{T,D}(Ref(false),
36-
ELTYPE[],
37-
UnitRange{T}[],
38-
T[],
39-
chunksize,
40-
ELTYPE[],
41-
T[],
42-
T[])
56+
state = DGraphState{T,D}(chunksize)
4357
return new{T,D}(Dagger.tochunk(state), Ref(false))
4458
end
4559
end
@@ -79,10 +93,13 @@ function DGraph(dg::DGraph{T,D}; chunksize::Integer=0, directed::Bool=D, freeze:
7993
push!(new_state.parts, Dagger.@spawn copy(state.parts[part]))
8094
push!(new_state.parts_nv, state.parts_nv[part])
8195
push!(new_state.parts_ne, state.parts_ne[part])
96+
push!(new_state.parts_v_meta, Dagger.@spawn copymeta(state.parts_v_meta[part]))
97+
push!(new_state.parts_e_meta, Dagger.@spawn copymeta(state.parts_e_meta[part]))
8298

8399
push!(new_state.bg_adjs, Dagger.@spawn copy(state.bg_adjs[part]))
84100
push!(new_state.bg_adjs_ne, state.bg_adjs_ne[part])
85101
push!(new_state.bg_adjs_ne_src, state.bg_adjs_ne_src[part])
102+
push!(new_state.bg_adjs_e_meta, Dagger.@spawn copymeta(state.bg_adj_e_meta[part]))
86103
end
87104
freeze && freeze!(g)
88105
return g
@@ -132,6 +149,23 @@ function freeze!(g::DGraphState)
132149
return false
133150
end
134151
g.frozen[] = true
152+
for part in nparts(g)
153+
if Dagger.istask(g.parts[part])
154+
g.parts[part] = fetch(g.parts[part]; raw=true)
155+
end
156+
if Dagger.istask(g.bg_adjs[part])
157+
g.bg_adjs[part] = fetch(g.bg_adjs[part]; raw=true)
158+
end
159+
if Dagger.istask(g.parts_v_meta[part])
160+
g.parts_v_meta[part] = fetch(g.parts_v_meta[part]; raw=true)
161+
end
162+
if Dagger.istask(g.parts_e_meta[part])
163+
g.parts_e_meta[part] = fetch(g.parts_e_meta[part]; raw=true)
164+
end
165+
if Dagger.istask(g.bg_adjs_e_meta[part])
166+
g.bg_adjs_e_meta[part] = fetch(g.bg_adjs_e_meta[part]; raw=true)
167+
end
168+
end
135169
return true
136170
end
137171
struct FrozenGraphException <: Exception end
@@ -143,8 +177,103 @@ function check_not_frozen(g)
143177
end
144178
end
145179

180+
has_metadata(g::DGraph) = with_state(g, has_metadata)
181+
has_vertex_metadata(g::DGraph) = with_state(g, has_vertex_metadata)
182+
has_edge_metadata(g::DGraph) = with_state(g, has_edge_metadata)
183+
has_metadata(g::DGraphState) =
184+
has_vertex_metadata(g) ||
185+
has_edge_metadata(g)
186+
has_vertex_metadata(g::DGraphState) =
187+
any(!isnothing, g.parts_v_meta)
188+
has_edge_metadata(g::DGraphState) =
189+
any(!isnothing, g.parts_e_meta) ||
190+
any(!isnothing, g.bg_adjs_e_meta)
191+
function set_vertex_metadata!(g::DGraph, meta)
192+
check_not_frozen(g)
193+
# Create vertex metadata for each partition, being careful not to transfer
194+
# `meta` itself, which may be large
195+
for part in 1:nparts(g)
196+
part_vs = partition_vertices(g, part)
197+
submeta = partition_vertex_metadata(meta, part_vs)
198+
with_state(g, set_vertex_metadata!, part, submeta)
199+
end
200+
end
201+
function set_vertex_metadata!(g::DGraphState, part::Integer, submeta)
202+
check_not_frozen(g)
203+
g.parts_v_meta[part] = Dagger.tochunk(submeta)
204+
return
205+
end
206+
function set_edge_metadata!(g::DGraph, meta)
207+
check_not_frozen(g)
208+
# Create edge metadata for each partition and background,
209+
# being careful not to transfer `meta` itself, which may be large
210+
for part in 1:nparts(g)
211+
part_edges, back_edges = partition_edges(g, part)
212+
part_submeta = partition_edge_metadata(meta, part_edges)
213+
back_submeta = partition_edge_metadata(meta, back_edges)
214+
with_state(g, set_edge_metadata!, part, part_submeta, back_submeta)
215+
end
216+
end
217+
function set_edge_metadata!(g::DGraphState, part::Integer, part_submeta, back_submeta)
218+
check_not_frozen(g)
219+
g.parts_e_meta[part] = Dagger.tochunk(part_submeta)
220+
g.bg_adjs_e_meta[part] = Dagger.tochunk(back_submeta)
221+
return
222+
end
223+
partition_vertex_metadata(meta, part_nv) = error("Must define `partition_vertex_metadata` for `$(typeof(meta))`")
224+
partition_edge_metadata(meta, edges) = error("Must define `partition_edge_metadata` for `$(typeof(meta))`")
225+
partition_vertex_metadata(meta::Vector, part_vs) =
226+
OffsetArray(meta[part_vs], part_vs)
227+
function partition_edge_metadata(meta::Matrix{T}, edges) where T
228+
if isempty(edges)
229+
return fill(one(T), 0, 0)
230+
end
231+
vs_min = minimum(x->min(src(x), dst(x)), edges; init=1)
232+
vs_max = maximum(x->max(src(x), dst(x)), edges; init=1)
233+
vs_span = vs_min:vs_max
234+
return OffsetArray(meta[vs_span,vs_span], vs_span, vs_span)
235+
end
236+
get_partition_vertex_metadata(g::DGraph, part::Integer) =
237+
fetch(with_state(g, get_partition_vertex_metadata, part))
238+
function get_partition_vertex_metadata(g::DGraphState, part::Integer)
239+
check_not_frozen(g)
240+
return g.parts_v_meta[part]
241+
end
242+
get_partition_edge_metadata(g::DGraph, part::Integer) =
243+
fetch(with_state(g, get_partition_edge_metadata, part))
244+
function get_partition_edge_metadata(g::DGraphState, part::Integer)
245+
check_not_frozen(g)
246+
return g.parts_e_meta[part]
247+
end
248+
get_background_edge_metadata(g::DGraph, part::Integer) =
249+
fetch(with_state(g, get_background_edge_metadata, part))
250+
function get_background_edge_metadata(g::DGraphState, part::Integer)
251+
check_not_frozen(g)
252+
return g.bg_adjs_e_meta[part]
253+
end
254+
copymeta(x) = x
255+
copymeta(x::AbstractArray) = copy(x)
256+
257+
function Graphs.weights(g::DGraph)
258+
if has_edge_metadata(g)
259+
return LazyWeights(g)
260+
end
261+
return Graphs.DefaultDistance(nv(g))
262+
end
263+
struct LazyWeights{D<:DGraph}
264+
g::D
265+
end
266+
function Base.collect(w::LazyWeights)
267+
W = ones(Float64, nv(w.g), nv(w.g))
268+
for (edge, w) in edges_with_weights(w.g)
269+
src, dst = Tuple(edge)
270+
W[src,dst] = w
271+
end
272+
return W
273+
end
274+
146275
function Base.show(io::IO, g::DGraph{T,D}) where {T,D}
147-
print(io, "{$(nv(g)), $(ne(g))} $(D ? "" : "un")directed Dagger $T graph$(isfrozen(g) ? " (frozen)" : "")")
276+
print(io, "{$(nv(g)), $(ne(g))} $(D ? "" : "un")directed Dagger $T $(has_metadata(g) ? "meta-" : "")graph$(isfrozen(g) ? " (frozen)" : "")")
148277
end
149278

150279
nparts(g::DGraph) = with_state(g, nparts)
@@ -184,6 +313,8 @@ end
184313
Graphs.is_directed(::DGraph{T,D}) where {T,D} = D
185314
Graphs.vertices(g::DGraph) = Base.OneTo(nv(g))
186315
Graphs.edges(g::DGraph) = DGraphEdgeIter(g)
316+
edges_with_metadata(f, g::DGraph) = DGraphEdgeIter(g; metadata=true, meta_f=f)
317+
edges_with_weights(g::DGraph) = edges_with_metadata(weights, g)
187318
Graphs.zero(::Type{<:DGraph}) = DGraph()
188319
function Graphs.add_vertex!(g::DGraph)
189320
check_not_frozen(g)
@@ -232,9 +363,12 @@ function add_partition!(g::DGraphState{T,D}, n::Integer) where {T,D}
232363
num_v = nv(g)
233364
push!(g.parts_nv, (num_v+1):(num_v+n))
234365
push!(g.parts_ne, 0)
366+
push!(g.parts_v_meta, nothing)
367+
push!(g.parts_e_meta, nothing)
235368
push!(g.bg_adjs, Dagger.@spawn AdjList{T,D}())
236369
push!(g.bg_adjs_ne, 0)
237370
push!(g.bg_adjs_ne_src, 0)
371+
push!(g.bg_adjs_e_meta, nothing)
238372
return length(g.parts)
239373
end
240374
function add_partition!(g::DGraph, sg::AbstractGraph)
@@ -250,6 +384,42 @@ function add_partition!(g::DGraphState{T,D}, sg::AbstractGraph; all::Bool=true)
250384
@assert !all || count == length(part_edges)
251385
return part
252386
end
387+
function add_partition!(g::DGraph, part_data::ELTYPE, back_data::ELTYPE,
388+
part_vert_meta_data::META_ELTYPE,
389+
part_edge_meta_data::META_ELTYPE,
390+
back_edge_meta_data::META_ELTYPE,
391+
n_verts::Integer, n_part_edges::Integer,
392+
n_back_edges::Integer, n_back_own_edges::Integer)
393+
check_not_frozen(g)
394+
return with_state(g, add_partition!, Ref(part_data), Ref(back_data),
395+
Ref(part_vert_meta_data),
396+
Ref(part_edge_meta_data),
397+
Ref(back_edge_meta_data),
398+
n_verts, n_part_edges,
399+
n_back_edges, n_back_own_edges)
400+
end
401+
function add_partition!(g::DGraphState{T,D}, part_data::Ref, back_data::Ref,
402+
part_vert_meta_data::Ref,
403+
part_edge_meta_data::Ref,
404+
back_edge_meta_data::Ref,
405+
n_verts::Integer, n_part_edges::Integer,
406+
n_back_edges::Integer, n_back_own_edges::Integer) where {T,D}
407+
check_not_frozen(g)
408+
if n_verts < 1
409+
throw(ArgumentError("n_verts must be >= 1"))
410+
end
411+
num_v = nv(g)
412+
push!(g.parts, part_data[])
413+
push!(g.parts_nv, (num_v+1):(num_v+n_verts))
414+
push!(g.parts_ne, n_part_edges)
415+
push!(g.parts_v_meta, part_vert_meta_data[])
416+
push!(g.parts_e_meta, part_edge_meta_data[])
417+
push!(g.bg_adjs, back_data[])
418+
push!(g.bg_adjs_ne, n_back_edges)
419+
push!(g.bg_adjs_ne_src, n_back_own_edges)
420+
push!(g.bg_adjs_e_meta, back_edge_meta_data[])
421+
return length(g.parts)
422+
end
253423
function Graphs.add_edge!(g::DGraph, src::Integer, dst::Integer)
254424
check_not_frozen(g)
255425
return with_state(g, add_edge!, src, dst)
@@ -398,11 +568,41 @@ function Graphs.outneighbors(g::DGraphState, v::Integer)
398568

399569
return neighbors
400570
end
401-
Graphs.weights(g::DGraph) = Graphs.DefaultDistance(nv(g))
402571

403572
get_partition(g::DGraph, part::Integer) =
404573
with_state(g, get_partition, part)
405574
get_partition(g::DGraphState, part::Integer) = fetch(g.parts[part])
406575
get_background(g::DGraph, part::Integer) =
407576
with_state(g, get_background, part)
408577
get_background(g::DGraphState, part::Integer) = fetch(g.bg_adjs[part])
578+
579+
partition_vertices(g::DGraph, part::Integer) =
580+
with_state(g, partition_vertices, part)
581+
partition_vertices(g::DGraphState, part::Integer) = g.parts_nv[part]
582+
583+
partition_edges(g::DGraph, part::Integer) =
584+
with_state(g, partition_edges, part)
585+
function partition_edges(g::DGraphState, part::Integer)
586+
shift = g.parts_nv[part].start - 1
587+
part_edges = map(edge->Edge(src(edge)+shift, dst(edge)+shift), exec_fast(edges, g.parts[part]))
588+
back_edges = exec_fast(edges, g.bg_adjs[part])
589+
return part_edges, back_edges
590+
end
591+
592+
partition_nv(g::DGraph, part::Integer) = length(partition_vertices(g, part))
593+
partition_ne(g::DGraph, part::Integer) = with_state(g, partition_ne, part)
594+
function partition_ne(g::DGraphState, part::Integer)
595+
return (g.parts_ne[part],
596+
g.bg_adjs_ne[part],
597+
g.bg_adjs_ne_src[part])
598+
end
599+
600+
partitioning(g::DGraph) = with_state(g, partitioning)
601+
function partitioning(g::DGraphState)
602+
c = fill(0, nv(g))
603+
for part in 1:nparts(g)
604+
span = g.parts_nv[part]
605+
c[span] .= part
606+
end
607+
return c
608+
end

0 commit comments

Comments
 (0)