Skip to content

Commit eb65ad8

Browse files
committed
Add Rust implementation of FlatFAT + Reactive Aggregator
1 parent a585065 commit eb65ad8

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed

rust/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,7 @@ pub mod soe;
4949

5050
/// Two-Stacks
5151
pub mod two_stacks;
52+
53+
/// Reactive-Aggregator
54+
pub mod reactive;
55+

rust/src/reactive/flat_fat.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use alga::general::AbstractMonoid;
2+
use alga::general::Operator;
3+
use std::collections::HashSet;
4+
5+
pub trait FAT<Value, BinOp>
6+
where
7+
Value: AbstractMonoid<BinOp> + Clone,
8+
BinOp: Operator,
9+
{
10+
/// Creates a new FAT from a batch of values
11+
fn new(batch: &[Value]) -> Self;
12+
13+
/// Creates a new FAT with uninitialized values
14+
fn with_capacity(capacity: usize) -> Self;
15+
16+
/// Updates a non-contiguous batch of leaves
17+
fn update<'a, I>(&mut self, batch: I)
18+
where
19+
I: IntoIterator<Item = &'a (usize, Value)>,
20+
Value: 'a;
21+
22+
/// Updates a contiguous batch of leaves
23+
fn update_ordered<'a, I>(&mut self, values: I)
24+
where
25+
I: IntoIterator<Item = &'a Value>,
26+
Value: 'a;
27+
28+
/// Aggregates all nodes in the FAT and returns the result
29+
fn aggregate(&self) -> Value;
30+
31+
/// Aggregates a prefix of nodes in the FAT and returns the result
32+
fn prefix(&self, i: usize) -> Value;
33+
34+
/// Aggregates a suffix of nodes in the FAT and returns the result
35+
fn suffix(&self, i: usize) -> Value;
36+
}
37+
38+
pub struct FlatFAT<Value, BinOp>
39+
where
40+
Value: AbstractMonoid<BinOp> + Clone,
41+
BinOp: Operator,
42+
{
43+
/// A flat binary tree, indexed as:
44+
/// 0
45+
/// / \
46+
/// / \
47+
/// 1 2
48+
/// / \ / \
49+
/// 3 4 5 6
50+
pub(crate) tree: Vec<Value>,
51+
/// Number of leaves which can be stored in the tree
52+
pub(crate) capacity: usize,
53+
binop: std::marker::PhantomData<BinOp>,
54+
}
55+
56+
impl<Value, BinOp> FlatFAT<Value, BinOp>
57+
where
58+
Value: AbstractMonoid<BinOp> + Clone,
59+
BinOp: Operator,
60+
{
61+
/// Returns all leaf nodes of the tree
62+
pub(crate) fn leaves(&self) -> &[Value] {
63+
&self.tree[self.leaf(0)..]
64+
}
65+
/// Returns the index of the root node
66+
fn root(&self) -> usize {
67+
0
68+
}
69+
/// Returns the index of a leaf node
70+
fn leaf(&self, i: usize) -> usize {
71+
i + self.capacity - 1
72+
}
73+
/// Returns the index of an node's left child
74+
fn left(&self, i: usize) -> usize {
75+
2 * (i + 1) - 1
76+
}
77+
/// Returns the index of an node's right child
78+
fn right(&self, i: usize) -> usize {
79+
2 * (i + 1)
80+
}
81+
/// Returns the index of an node's parent
82+
fn parent(&self, i: usize) -> usize {
83+
(i - 1) / 2
84+
}
85+
}
86+
87+
impl<Value, BinOp> FAT<Value, BinOp> for FlatFAT<Value, BinOp>
88+
where
89+
Value: AbstractMonoid<BinOp> + Clone,
90+
BinOp: Operator,
91+
{
92+
fn new(values: &[Value]) -> Self {
93+
let capacity = values.len();
94+
let mut new = Self::with_capacity(capacity);
95+
new.update_ordered(values);
96+
new
97+
}
98+
99+
fn with_capacity(capacity: usize) -> Self {
100+
assert_ne!(capacity, 0, "Capacity of window must be greater than 0");
101+
Self {
102+
tree: vec![Value::identity(); 2 * capacity - 1],
103+
binop: std::marker::PhantomData,
104+
capacity,
105+
}
106+
}
107+
108+
fn update<'a, I>(&mut self, batch: I)
109+
where
110+
I: IntoIterator<Item = &'a (usize, Value)>,
111+
Value: 'a,
112+
{
113+
let mut parents: HashSet<usize> = batch
114+
.into_iter()
115+
.map(|(idx, val)| {
116+
let leaf = self.leaf(*idx);
117+
self.tree[leaf] = val.clone();
118+
self.parent(leaf)
119+
})
120+
.collect();
121+
let mut new_parents: HashSet<usize> = HashSet::new();
122+
loop {
123+
parents.drain().for_each(|parent| {
124+
let left = self.left(parent);
125+
let right = self.right(parent);
126+
self.tree[parent] = self.tree[left].operate(&self.tree[right]);
127+
if parent != self.root() {
128+
new_parents.insert(self.parent(parent));
129+
}
130+
});
131+
if new_parents.is_empty() {
132+
break;
133+
} else {
134+
std::mem::swap(&mut parents, &mut new_parents);
135+
}
136+
}
137+
}
138+
139+
fn update_ordered<'a, I>(&mut self, values: I)
140+
where
141+
I: IntoIterator<Item = &'a Value>,
142+
Value: 'a,
143+
{
144+
values.into_iter().enumerate().for_each(|(idx, val)| {
145+
let leaf = self.leaf(idx);
146+
self.tree[leaf] = val.clone();
147+
});
148+
(0..self.leaf(0)).into_iter().rev().for_each(|parent| {
149+
let left = self.left(parent);
150+
let right = self.right(parent);
151+
self.tree[parent] = self.tree[left].operate(&self.tree[right]);
152+
});
153+
}
154+
155+
fn aggregate(&self) -> Value {
156+
self.tree[self.root()].clone()
157+
}
158+
159+
fn prefix(&self, idx: usize) -> Value {
160+
let mut node = self.leaf(idx);
161+
let mut agg = self.tree[node].clone();
162+
while node != self.root() {
163+
let parent = self.parent(node);
164+
if node == self.right(parent) {
165+
let left = self.left(parent);
166+
agg = self.tree[left].operate(&agg);
167+
}
168+
node = parent;
169+
}
170+
return agg;
171+
}
172+
173+
fn suffix(&self, i: usize) -> Value {
174+
let mut node = self.leaf(i);
175+
let mut agg = self.tree[node].clone();
176+
while node != self.root() {
177+
let parent = self.parent(node);
178+
if node == self.left(parent) {
179+
agg = agg.operate(&self.tree[self.right(parent)]);
180+
}
181+
node = parent;
182+
}
183+
return agg;
184+
}
185+
}

rust/src/reactive/mod.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
pub(crate) mod flat_fat;
2+
3+
use crate::reactive::flat_fat::{FlatFAT, FAT};
4+
use crate::FifoWindow;
5+
use alga::general::AbstractMonoid;
6+
use alga::general::Operator;
7+
8+
pub struct Reactive<Value, BinOp>
9+
where
10+
Value: AbstractMonoid<BinOp> + Clone,
11+
BinOp: Operator,
12+
{
13+
fat: FlatFAT<Value, BinOp>,
14+
size: usize,
15+
front: usize,
16+
back: usize,
17+
}
18+
19+
impl<Value, BinOp> Reactive<Value, BinOp>
20+
where
21+
Value: AbstractMonoid<BinOp> + Clone,
22+
BinOp: Operator,
23+
{
24+
/// Returns a Reactive Aggregator with a pre-allocated `capacity`
25+
pub fn with_capacity(capacity: usize) -> Self {
26+
Self {
27+
fat: FlatFAT::with_capacity(capacity),
28+
size: 0,
29+
front: 0,
30+
back: 0,
31+
}
32+
}
33+
fn inverted(&self) -> bool {
34+
return self.front > self.back;
35+
}
36+
fn resize(&mut self, capacity: usize) {
37+
let leaves = self.fat.leaves();
38+
let mut fat = FlatFAT::with_capacity(capacity);
39+
if self.inverted() {
40+
fat.update_ordered(
41+
leaves[self.front..]
42+
.iter()
43+
.chain(leaves[..self.back].iter()),
44+
);
45+
} else {
46+
fat.update_ordered(leaves[self.front..self.back].iter());
47+
}
48+
self.fat = fat;
49+
self.front = 0;
50+
self.back = self.size;
51+
}
52+
}
53+
54+
impl<Value, BinOp> FifoWindow<Value, BinOp> for Reactive<Value, BinOp>
55+
where
56+
Value: AbstractMonoid<BinOp> + Clone,
57+
BinOp: Operator,
58+
{
59+
fn new() -> Self {
60+
Self {
61+
fat: FlatFAT::with_capacity(8),
62+
size: 0,
63+
front: 0,
64+
back: 0,
65+
}
66+
}
67+
fn push(&mut self, v: Value) {
68+
self.fat.update(&[(self.back, v)]);
69+
self.size += 1;
70+
self.back += 1;
71+
if self.size > (3 * self.fat.capacity) / 4 {
72+
self.resize(self.fat.capacity * 2);
73+
}
74+
}
75+
fn pop(&mut self) {
76+
self.fat.update(&[(self.front, Value::identity())]);
77+
self.size -= 1;
78+
self.front += 1;
79+
if self.size <= self.fat.capacity / 4 {
80+
self.resize(self.fat.capacity / 2);
81+
}
82+
}
83+
fn query(&self) -> Value {
84+
if self.front > self.back {
85+
self.fat
86+
.suffix(self.front)
87+
.operate(&self.fat.prefix(self.back))
88+
} else {
89+
self.fat.aggregate()
90+
}
91+
}
92+
}

rust/tests/fifo-window.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use alga::general::Identity;
88
use alga::general::Operator;
99
use alga::general::TwoSidedInverse;
1010
use rand::Rng;
11+
use swag::reactive::*;
1112
use swag::recalc::*;
1213
use swag::soe::*;
1314
use swag::two_stacks::*;
@@ -122,3 +123,13 @@ fn test1_two_stacks() {
122123
fn test2_two_stacks() {
123124
test2::<TwoStacks<Value, Sum>>();
124125
}
126+
127+
#[test]
128+
fn test1_reactive() {
129+
test1::<Reactive<Value, Sum>>();
130+
}
131+
132+
#[test]
133+
fn test2_reactive() {
134+
test2::<Reactive<Value, Sum>>();
135+
}

0 commit comments

Comments
 (0)