forked from TimelyDataflow/differential-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathdeals.rs
More file actions
137 lines (102 loc) · 5.2 KB
/
deals.rs
File metadata and controls
137 lines (102 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use std::time::Instant;
use timely::dataflow::*;
use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::difference::Present;
type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Node = u32;
type Edge = (Node, Node);
type Iter = u32;
fn main() {
// snag a filename to use for the input graph.
let filename = std::env::args().nth(1).unwrap();
let program = std::env::args().nth(2).unwrap();
timely::execute_from_args(std::env::args().skip(1), move |worker| {
let peers = worker.peers();
let index = worker.index();
let timer = Instant::now();
let core_ids = core_affinity::get_core_ids().unwrap();
core_affinity::set_for_current(core_ids[index % core_ids.len()]);
let inspect = true;
let mut input = worker.dataflow::<(),_,_>(|scope| {
let (input, graph) = scope.new_collection();
// each edge should exist in both directions.
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
match program.as_str() {
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
"sg" => sg(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("sg count: {:?}", x)).probe(),
_ => panic!("must specify one of 'tc', 'sg'.")
};
input
});
let mut nodes = 0;
use std::io::{BufReader, BufRead};
use std::fs::File;
let file = BufReader::new(File::open(filename.clone()).unwrap());
for (count, readline) in file.lines().enumerate() {
let line = readline.ok().expect("read error");
if count % peers == index && !line.starts_with('#') {
let mut elts = line[..].split_whitespace();
let src: u32 = elts.next().unwrap().parse().ok().expect("malformed src");
let dst: u32 = elts.next().unwrap().parse().ok().expect("malformed dst");
if nodes < src { nodes = src; }
if nodes < dst { nodes = dst; }
input.update((src, dst), Present);
}
}
if index == 0 { println!("{:?}\tData ingested", timer.elapsed()); }
input.close();
while worker.step() { }
if index == 0 { println!("{:?}\tComputation complete", timer.elapsed()); }
}).unwrap();
}
use timely::order::Product;
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> Collection<G, Edge, Present> {
// repeatedly update minimal distances each node can be reached from each root
edges.stream.scope().iterative::<Iter,_,_>(|scope| {
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let edges = edges.enter(&inner.scope());
let result =
inner
.map(|(x,y)| (y,x))
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_y,&x,&z| Some((x, z)))
.concat(&edges.as_collection(|&k,&v| (k,v)))
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
inner.set(&result);
result.leave()
}
)
}
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> Collection<G, Edge, Present> {
let peers = edges.join_core(&edges, |_,&x,&y| Some((x,y))).filter(|&(x,y)| x != y);
// repeatedly update minimal distances each node can be reached from each root
peers.scope().iterative::<Iter,_,_>(|scope| {
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let edges = edges.enter(&inner.scope());
let peers = peers.enter(&inner.scope());
let result =
inner
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.concat(&peers)
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
inner.set(&result);
result.leave()
}
)
}