1+ use timely:: dataflow:: operators:: probe:: Handle ;
2+
3+ use differential_dataflow:: {
4+ input:: InputSession ,
5+ operators:: { Join , Iterate , Reduce , Threshold } ,
6+ } ;
7+
8+ // Types for representing an AST as a collection of data.
9+ type AstName = usize ;
10+ type AstNode = ( String , Vec < AstName > ) ;
11+
12+ fn main ( ) {
13+ // Define a timely dataflow runtime
14+ timely:: execute_from_args ( std:: env:: args ( ) , move |worker| {
15+
16+ // Input AST as pairs of `name` and `node`.
17+ let mut nodes = InputSession :: < _ , ( AstName , AstNode ) , _ > :: new ( ) ;
18+ // Exogenous equivalences associating AST nodes by name.
19+ let mut equiv = InputSession :: < _ , ( AstName , AstName ) , _ > :: new ( ) ;
20+
21+ // Probe to determine progress / completion.
22+ let mut probe = Handle :: new ( ) ;
23+
24+ // Set up a new computation
25+ worker. dataflow ( |scope| {
26+
27+ let nodes = nodes. to_collection ( scope) ;
28+ let equiv = equiv. to_collection ( scope) ;
29+
30+ // Iteratively develop a map from `Name` to `Name` that closes `equiv` under congruence.
31+ // Specifically, pairs `(a, b)` where a >= b and b names the equivalence class of a.
32+ nodes
33+ . map ( |( name, _) | ( name, name) )
34+ . iterate ( |eq_class| {
35+
36+ // Collection is loop invariant, but must be brought in scope.
37+ let nodes = nodes. enter ( & eq_class. scope ( ) ) ;
38+ let equiv = equiv. enter ( & eq_class. scope ( ) ) ;
39+
40+ // Separate AST node operators and their arguments.
41+ let ops = nodes. map ( |( name, ( op, _) ) | ( name, op) ) ;
42+ let args = nodes. flat_map ( |( name, ( _, args) ) | args. into_iter ( ) . enumerate ( ) . map ( move |( index, arg) | ( arg, ( name, index) ) ) ) ;
43+
44+ // Update argument identifiers, and then equate `(Ops, Args)` tuples to inform equivalences.
45+ let equivalent_asts =
46+ args. join_map ( eq_class, |_child, & ( node, index) , & eq_class| ( node, ( index, eq_class) ) )
47+ . reduce ( |_node, input, output| {
48+ let mut args = Vec :: new ( ) ;
49+ for ( ( _index, eq_class) , _) in input. iter ( ) {
50+ args. push ( * eq_class) ;
51+ }
52+ output. push ( ( args, 1isize ) ) ;
53+ } )
54+ . join_map ( & ops, |node, children, op| ( ( children. clone ( ) , op. clone ( ) ) , * node) )
55+ . concat ( & nodes. filter ( |( _, ( _, args) ) | args. is_empty ( ) ) . map ( |( node, ( op, _) ) | ( ( vec ! [ ] , op) , node) ) )
56+ . reduce ( |_key, input, output| {
57+ for node in input. iter ( ) {
58+ output. push ( ( ( * ( node. 0 ) , * input[ 0 ] . 0 ) , 1 ) ) ;
59+ }
60+ } )
61+ . map ( |( _key, ( node, eq_class) ) | ( node, eq_class) ) ;
62+
63+ // Blend exogenous and endogenous equivalence; find connected components.
64+ // NB: don't *actually* write connected components this way
65+ let edges = equivalent_asts. concat ( & equiv) ;
66+ let symms = edges. map ( |( x, y) |( y, x) ) . concat ( & edges) ;
67+ symms. iterate ( |reach|
68+ reach. join_map ( & reach, |_b, a, c| ( * a, * c) )
69+ . distinct ( )
70+ )
71+ . reduce ( |_a, input, output| output. push ( ( * input[ 0 ] . 0 , 1 ) ) )
72+
73+ } )
74+ . consolidate ( )
75+ . inspect ( |x| println ! ( "{:?}" , x) )
76+ . probe_with ( & mut probe) ;
77+ } ) ;
78+
79+ nodes. advance_to ( 0 ) ;
80+ equiv. advance_to ( 0 ) ;
81+
82+ println ! ( "Insert `(a x 2) / 2`" ) ;
83+ nodes. insert ( ( 0 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
84+ nodes. insert ( ( 1 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
85+ nodes. insert ( ( 2 , ( "mul" . to_string ( ) , vec ! [ 0 , 1 ] ) ) ) ;
86+ nodes. insert ( ( 3 , ( "div" . to_string ( ) , vec ! [ 2 , 1 ] ) ) ) ;
87+
88+ nodes. advance_to ( 1 ) ; nodes. flush ( ) ;
89+ equiv. advance_to ( 1 ) ; equiv. flush ( ) ;
90+
91+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
92+ println ! ( "" ) ;
93+
94+
95+ println ! ( "Insert `a x (2 / 2)`" ) ;
96+ nodes. insert ( ( 4 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
97+ nodes. insert ( ( 5 , ( "div" . to_string ( ) , vec ! [ 4 , 4 ] ) ) ) ;
98+ nodes. insert ( ( 6 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
99+ nodes. insert ( ( 7 , ( "mul" . to_string ( ) , vec ! [ 6 , 5 ] ) ) ) ;
100+ println ! ( "Equate with the prior term" ) ;
101+ equiv. insert ( ( 3 , 7 ) ) ;
102+
103+ nodes. advance_to ( 2 ) ; nodes. flush ( ) ;
104+ equiv. advance_to ( 2 ) ; equiv. flush ( ) ;
105+
106+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
107+ println ! ( "" ) ;
108+
109+
110+ println ! ( "Insert `(2 / 2)` and `1` and equate them." ) ;
111+ nodes. insert ( ( 8 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
112+ nodes. insert ( ( 9 , ( "div" . to_string ( ) , vec ! [ 8 , 8 ] ) ) ) ;
113+ nodes. insert ( ( 10 , ( "1" . to_string ( ) , vec ! [ ] ) ) ) ;
114+ equiv. insert ( ( 9 , 10 ) ) ;
115+
116+ nodes. advance_to ( 3 ) ; nodes. flush ( ) ;
117+ equiv. advance_to ( 3 ) ; equiv. flush ( ) ;
118+
119+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
120+ println ! ( "" ) ;
121+
122+
123+ println ! ( "Insert `a * 1` and `a` and equate them." ) ;
124+ nodes. insert ( ( 11 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
125+ nodes. insert ( ( 12 , ( "1" . to_string ( ) , vec ! [ ] ) ) ) ;
126+ nodes. insert ( ( 13 , ( "mul" . to_string ( ) , vec ! [ 11 , 12 ] ) ) ) ;
127+ equiv. insert ( ( 11 , 13 ) ) ;
128+
129+ nodes. advance_to ( 4 ) ; nodes. flush ( ) ;
130+ equiv. advance_to ( 4 ) ; equiv. flush ( ) ;
131+
132+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
133+ println ! ( "" ) ;
134+
135+
136+ println ! ( "Oh shoot; '2' could equal zero; undo '2'/'2' == '1')" ) ;
137+ equiv. remove ( ( 9 , 10 ) ) ;
138+
139+ } ) . expect ( "Computation terminated abnormally" ) ;
140+ }
0 commit comments