Skip to content

Commit a8ef861

Browse files
committed
A src/list/raxos/protocal/src/commonly_used/history/generic.rs
1 parent 9001881 commit a8ef861

File tree

6 files changed

+125
-30
lines changed

6 files changed

+125
-30
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::collections::BTreeMap;
2+
use std::collections::HashMap;
3+
4+
use crate::apaxos::history::History;
5+
use crate::Types;
6+
7+
#[derive(Clone, Debug)]
8+
pub struct GenericHistory<T>
9+
where T: Types<History = Self>
10+
{
11+
maximals: HashMap<T::Time, ()>,
12+
time_events: HashMap<T::Time, T::Event>,
13+
}
14+
15+
impl<T> Default for GenericHistory<T>
16+
where T: Types<History = Self>
17+
{
18+
fn default() -> Self {
19+
Self {
20+
maximals: HashMap::new(),
21+
time_events: HashMap::new(),
22+
}
23+
}
24+
}
25+
26+
impl<T: Types> History<T> for GenericHistory<T>
27+
where T: Types<History = Self>
28+
{
29+
fn do_append(&mut self, time: T::Time, event: T::Event) {
30+
self.time_events.insert(time, event);
31+
for max in self.maximals.keys().copied().collect::<Vec<_>>() {
32+
if max <= time {
33+
self.maximals.remove(&max);
34+
}
35+
}
36+
}
37+
38+
fn get(&self, time: &T::Time) -> Option<&T::Event> {
39+
self.time_events.get(time)
40+
}
41+
42+
fn lower_bounds(&self, time: T::Time) -> Self {
43+
let time_events = self
44+
.time_events
45+
.iter()
46+
.filter(|(t, _ev)| t <= &time)
47+
.map(|(t, ev)| (*t, ev.clone()))
48+
.collect::<HashMap<_, _>>();
49+
50+
Self {
51+
maximals: build_maximals(&time_events),
52+
time_events,
53+
}
54+
}
55+
56+
fn maximals(&self) -> impl Iterator<Item = (T::Time, T::Event)> {
57+
self.maximals.keys().copied().map(move |t| (t, self.time_events[&t].clone()))
58+
}
59+
60+
fn do_merge(&mut self, other: Self) {
61+
for (time, event) in other.time_events {
62+
self.time_events.insert(time, event);
63+
}
64+
65+
for (time, _) in other.maximals {
66+
self.maximals.insert(time, ());
67+
}
68+
}
69+
}
70+
71+
/// Build a map of **maximal** times from a map of time events.
72+
fn build_maximals<T: Types>(time_events: &HashMap<T::Time, T::Event>) -> HashMap<T::Time, ()> {
73+
let mut maximals = HashMap::new();
74+
for time in time_events.keys() {
75+
for max in maximals.keys().copied().collect::<Vec<_>>() {
76+
if time > &max {
77+
maximals.remove(&max);
78+
}
79+
}
80+
81+
maximals.insert(*time, ());
82+
}
83+
maximals
84+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod linear;
2+
pub mod generic;

src/list/raxos/protocal/src/commonly_used/time.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod vec_2d;
2+
13
pub struct BallotNumber {
24
pub round: u64,
35
pub leader: u64,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#[derive(Clone, Debug, Copy)]
2+
#[derive(PartialOrd, PartialEq, Eq, Hash)]
3+
pub struct Vec2DTime {
4+
pub x: u64,
5+
pub y: u64,
6+
}

src/list/raxos/protocal/src/implementations/paxos.rs

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,49 +42,52 @@ mod tests {
4242

4343
#[test]
4444
fn test_paxos() -> anyhow::Result<()> {
45-
//
46-
4745
let ex = Arc::new(Executor::new());
4846

49-
let fu = do_test(ex.clone());
47+
let fu = async move {
48+
let acceptor_ids = [1, 2, 3];
5049

51-
futures_lite::future::block_on(ex.run(fu))?;
52-
Ok(())
50+
let mut acceptors = BTreeMap::new();
51+
for id in acceptor_ids {
52+
acceptors.insert(id, Acceptor::default());
53+
}
5354

54-
// TODO: rebuild from previous value
55-
}
55+
let quorum_set = Majority::new(acceptor_ids);
56+
let transport = DirectCall::new(acceptors.clone());
5657

57-
async fn do_test(ex: Arc<Executor<'_>>) -> anyhow::Result<()> {
58-
ex.spawn(async {
59-
println!("Inner task");
60-
})
61-
.detach();
58+
let mut apaxos = APaxos::<Paxos>::new(acceptor_ids, quorum_set, transport);
6259

63-
let acceptor_ids = [1, 2, 3];
60+
let mut proposer = Proposer::new(&mut apaxos, 5, "hello".to_string());
6461

65-
let mut acceptors = BTreeMap::new();
66-
for id in acceptor_ids {
67-
acceptors.insert(id, Acceptor::default());
68-
}
62+
let committed = proposer.run().await?;
6963

70-
let quorum_set = Majority::new(acceptor_ids);
71-
let transport = DirectCall::new(acceptors.clone());
64+
assert_eq!(committed.latest_time(), Some(5));
65+
assert_eq!(committed.latest_value(), Some(s("hello")));
7266

73-
let mut apaxos = APaxos::<Paxos>::new(acceptor_ids, quorum_set, transport);
67+
println!("Done");
7468

75-
let mut proposer = Proposer::new(&mut apaxos, 5, "hello".to_string());
76-
let committed = proposer.run().await?;
69+
Ok::<(), anyhow::Error>(())
70+
};
7771

78-
assert_eq!(committed.latest_time(), Some(5));
79-
assert_eq!(committed.latest_value(), Some(s("hello")));
72+
// let mut proposer = Proposer::new(&mut apaxos, 6, "world".to_string());
73+
// let committed = proposer.run().await?;
74+
//
75+
// assert_eq!(committed.latest_time(), Some(6));
76+
// assert_eq!(committed.latest_value(), Some(s("hello")));
77+
78+
ex.spawn(fu).detach();
8079

81-
let mut proposer = Proposer::new(&mut apaxos, 6, "world".to_string());
82-
let committed = proposer.run().await?;
80+
futures_lite::future::block_on(ex.tick());
81+
Ok(())
8382

84-
assert_eq!(committed.latest_time(), Some(6));
85-
assert_eq!(committed.latest_value(), Some(s("hello")));
83+
// TODO: rebuild from previous value
84+
}
8685

87-
println!("Done");
86+
async fn do_test(ex: Arc<Executor<'_>>) -> anyhow::Result<()> {
87+
ex.spawn(async {
88+
println!("Inner task");
89+
})
90+
.detach();
8891

8992
Ok(())
9093
}

src/list/raxos/protocal/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ where Self: Default + Debug + Clone + Sized + 'static
3232
/// - In Paxos, it is ballot number, which is `(round, proposer_id)`.
3333
/// - In Raft, it is `(term, Option<voted_for>)`.
3434
/// - In 2PC, it is mainly a vector of related data entry name.
35-
// TODO: explain 2pc time.
3635
type Time: Time;
3736

3837
/// The value to propose and to commit

0 commit comments

Comments
 (0)