Skip to content

Commit c752aff

Browse files
authored
refactor(hydroflow_plus): use max and min in Paxos and make client generic over ballots (#1443)
1 parent 486dfbe commit c752aff

File tree

4 files changed

+1812
-1498
lines changed

4 files changed

+1812
-1498
lines changed

hydroflow_plus/src/singleton.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> {
136136
}
137137
}
138138

139+
impl<'a, T, C, N: Location> From<Singleton<'a, T, Bounded, C, N>>
140+
for Singleton<'a, T, Unbounded, C, N>
141+
{
142+
fn from(singleton: Singleton<'a, T, Bounded, C, N>) -> Self {
143+
Singleton::new(
144+
singleton.location_kind,
145+
singleton.ir_leaves,
146+
singleton.ir_node.into_inner(),
147+
)
148+
}
149+
}
150+
139151
impl<'a, T, N: Location> CycleComplete<'a, Tick> for Singleton<'a, T, Bounded, Tick, N> {
140152
fn complete(self, ident: syn::Ident) {
141153
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
@@ -298,8 +310,8 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
298310
)
299311
}
300312

301-
pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
302-
Optional::new(
313+
pub fn latest(self) -> Singleton<'a, T, Unbounded, NoTick, N> {
314+
Singleton::new(
303315
self.location_kind,
304316
self.ir_leaves,
305317
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
@@ -598,7 +610,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
598610
)
599611
}
600612

601-
pub fn or_else(
613+
pub fn unwrap_or(
602614
self,
603615
other: Singleton<'a, T, Bounded, Tick, N>,
604616
) -> Singleton<'a, T, Bounded, Tick, N> {
@@ -692,6 +704,18 @@ impl<'a, T, B, N: Location> Optional<'a, T, B, NoTick, N> {
692704
.latest()
693705
.tick_samples()
694706
}
707+
708+
pub fn unwrap_or(
709+
self,
710+
other: impl Into<Singleton<'a, T, Unbounded, NoTick, N>>,
711+
) -> Singleton<'a, T, Unbounded, NoTick, N> {
712+
let other = other.into();
713+
if self.location_kind != other.location_kind {
714+
panic!("or_else must be called on streams on the same node");
715+
}
716+
717+
self.latest_tick().unwrap_or(other.latest_tick()).latest()
718+
}
695719
}
696720

697721
impl<'a, T, N: Location> Optional<'a, T, Unbounded, NoTick, N> {

hydroflow_plus/src/stream.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,28 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> {
382382
)
383383
}
384384

385+
pub fn max(self) -> Optional<'a, T, Bounded, Tick, N>
386+
where
387+
T: Ord,
388+
{
389+
self.reduce(q!(|curr, new| {
390+
if new > *curr {
391+
*curr = new;
392+
}
393+
}))
394+
}
395+
396+
pub fn min(self) -> Optional<'a, T, Bounded, Tick, N>
397+
where
398+
T: Ord,
399+
{
400+
self.reduce(q!(|curr, new| {
401+
if new < *curr {
402+
*curr = new;
403+
}
404+
}))
405+
}
406+
385407
pub fn sort(self) -> Stream<'a, T, Bounded, Tick, N>
386408
where
387409
T: Ord,
@@ -497,6 +519,28 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> {
497519
})),
498520
)
499521
}
522+
523+
pub fn max(self) -> Optional<'a, T, Unbounded, NoTick, N>
524+
where
525+
T: Ord,
526+
{
527+
self.reduce(q!(|curr, new| {
528+
if new > *curr {
529+
*curr = new;
530+
}
531+
}))
532+
}
533+
534+
pub fn min(self) -> Optional<'a, T, Unbounded, NoTick, N>
535+
where
536+
T: Ord,
537+
{
538+
self.reduce(q!(|curr, new| {
539+
if new < *curr {
540+
*curr = new;
541+
}
542+
}))
543+
}
500544
}
501545

502546
impl<'a, T, C, N: Location> Stream<'a, T, Bounded, C, N> {

hydroflow_plus_test/src/cluster/paxos.rs

Lines changed: 38 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ pub fn paxos(
133133
// c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));
134134

135135
let p_received_max_ballot = p_max_ballot(
136+
flow,
137+
&proposers,
136138
a_to_proposers_p1b.clone(),
137139
a_to_proposers_p2b.clone(),
138140
p_to_proposers_i_am_leader.clone(),
@@ -195,10 +197,12 @@ pub fn paxos(
195197
.for_each(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)));
196198
// p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a)));
197199
let (a_to_proposers_p1b_new, a_to_proposers_p2b_new) = acceptor(
200+
flow,
198201
p_to_acceptors_p1a,
199202
p_to_acceptors_p2a,
200203
r_to_acceptors_checkpoint,
201204
&proposers,
205+
&acceptors,
202206
f,
203207
);
204208
a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b_new);
@@ -216,10 +220,12 @@ pub fn paxos(
216220

217221
#[allow(clippy::type_complexity)]
218222
fn acceptor<'a>(
223+
flow: &FlowBuilder<'a>,
219224
p_to_acceptors_p1a: Stream<'a, P1a, Unbounded, NoTick, Cluster<Acceptor>>,
220225
p_to_acceptors_p2a: Stream<'a, P2a, Unbounded, NoTick, Cluster<Acceptor>>,
221226
r_to_acceptors_checkpoint: Stream<'a, (u32, i32), Unbounded, NoTick, Cluster<Acceptor>>,
222227
proposers: &Cluster<Proposer>,
228+
acceptors: &Cluster<Acceptor>,
223229
f: usize,
224230
) -> (
225231
Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster<Proposer>>,
@@ -244,14 +250,9 @@ fn acceptor<'a>(
244250
// Find the smallest checkpoint seq that everyone agrees to, track whenever it changes
245251
let a_new_checkpoint = a_checkpoint_largest_seqs
246252
.continue_if(a_checkpoints_quorum_reached)
247-
.fold(
248-
q!(|| -1),
249-
q!(|min_seq, (_sender, seq)| {
250-
if *min_seq == -1 || seq < *min_seq {
251-
*min_seq = seq;
252-
}
253-
}),
254-
)
253+
.map(q!(|(_sender, seq)| seq))
254+
.min()
255+
.unwrap_or(flow.singleton(acceptors, q!(-1)).latest_tick())
255256
.delta()
256257
.map(q!(|min_seq| (
257258
min_seq,
@@ -267,14 +268,11 @@ fn acceptor<'a>(
267268
)));
268269
// .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq)));
269270

270-
let a_max_ballot = p_to_acceptors_p1a.clone().fold(
271-
q!(|| Ballot { num: 0, id: 0 }),
272-
q!(|max_ballot, p1a| {
273-
if p1a.ballot > *max_ballot {
274-
*max_ballot = p1a.ballot;
275-
}
276-
}),
277-
);
271+
let a_max_ballot = p_to_acceptors_p1a
272+
.clone()
273+
.map(q!(|p1a| p1a.ballot))
274+
.max()
275+
.unwrap_or(flow.singleton(acceptors, q!(Ballot { num: 0, id: 0 })));
278276
let a_p2as_to_place_in_log = p_to_acceptors_p2a
279277
.clone()
280278
.tick_batch()
@@ -439,7 +437,7 @@ fn p_p2b<'a>(
439437
fn p_p2a<'a>(
440438
flow: &FlowBuilder<'a>,
441439
proposers: &Cluster<Proposer>,
442-
p_max_slot: Singleton<'a, i32, Bounded, Tick, Cluster<Proposer>>,
440+
p_max_slot: Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
443441
c_to_proposers: Stream<'a, ClientPayload, Unbounded, NoTick, Cluster<Proposer>>,
444442
p_ballot_num: Singleton<'a, u32, Bounded, Tick, Cluster<Proposer>>,
445443
p_log_to_try_commit: Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
@@ -454,6 +452,7 @@ fn p_p2a<'a>(
454452
let (p_next_slot_complete_cycle, p_next_slot) =
455453
flow.tick_cycle::<Optional<i32, _, _, _>>(proposers);
456454
let p_next_slot_after_reconciling_p1bs = p_max_slot
455+
.unwrap_or(flow.singleton(proposers, q!(-1)).latest_tick())
457456
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
458457
.continue_unless(p_next_slot.clone())
459458
.map(q!(|max_slot| max_slot + 1));
@@ -518,7 +517,7 @@ fn p_p1b<'a>(
518517
) -> (
519518
Optional<'a, bool, Bounded, Tick, Cluster<Proposer>>,
520519
Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
521-
Singleton<'a, i32, Bounded, Tick, Cluster<Proposer>>,
520+
Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
522521
Stream<'a, P2a, Bounded, Tick, Cluster<Proposer>>,
523522
) {
524523
let p_id = flow.cluster_self_id(proposers);
@@ -580,14 +579,10 @@ fn p_p1b<'a>(
580579
None
581580
}
582581
));
583-
let p_max_slot = p_p1b_highest_entries_and_count.clone().fold(
584-
q!(|| -1),
585-
q!(|max_slot, (slot, (_count, _entry))| {
586-
if slot > *max_slot {
587-
*max_slot = slot;
588-
}
589-
}),
590-
);
582+
let p_max_slot = p_p1b_highest_entries_and_count
583+
.clone()
584+
.map(q!(|(slot, _)| slot))
585+
.max();
591586
let p_proposed_slots = p_p1b_highest_entries_and_count
592587
.clone()
593588
.map(q!(|(slot, _)| slot));
@@ -687,14 +682,10 @@ fn replica<'a>(
687682
// Send checkpoints to the acceptors when we've processed enough payloads
688683
let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) =
689684
flow.tick_cycle::<Optional<'a, i32, _, _, _>>(replicas);
690-
let r_max_checkpointed_seq = r_checkpointed_seqs.persist().fold(
691-
q!(|| -1),
692-
q!(|max_seq, seq| {
693-
if seq > *max_seq {
694-
*max_seq = seq;
695-
}
696-
}),
697-
);
685+
let r_max_checkpointed_seq = r_checkpointed_seqs
686+
.persist()
687+
.max()
688+
.unwrap_or(flow.singleton(replicas, q!(-1)).latest_tick());
698689
let r_checkpoint_seq_new = r_max_checkpointed_seq
699690
.cross_singleton(r_new_highest_seq)
700691
.filter_map(q!(
@@ -717,9 +708,9 @@ fn replica<'a>(
717708
}
718709

719710
// Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed.
720-
fn client<'a>(
711+
fn client<'a, B: Address + Ord + std::fmt::Debug + Clone>(
721712
clients: &Cluster<Client>,
722-
p_to_clients_leader_elected: Stream<'a, Ballot, Unbounded, NoTick, Cluster<Client>>,
713+
p_to_clients_leader_elected: Stream<'a, B, Unbounded, NoTick, Cluster<Client>>,
723714
r_to_clients_payload_applied: Stream<
724715
'a,
725716
(u32, ReplicaPayload),
@@ -741,12 +732,7 @@ fn client<'a>(
741732
)));
742733
// r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload)));
743734
// Only keep the latest leader
744-
let c_max_leader_ballot =
745-
p_to_clients_leader_elected.reduce(q!(|curr_max_ballot, new_ballot| {
746-
if new_ballot > *curr_max_ballot {
747-
*curr_max_ballot = new_ballot;
748-
}
749-
}));
735+
let c_max_leader_ballot = p_to_clients_leader_elected.max();
750736
let c_new_leader_ballot = c_max_leader_ballot.clone().latest_tick().delta();
751737
// Whenever the leader changes, make all clients send a message
752738
let c_new_payloads_when_leader_elected =
@@ -901,7 +887,7 @@ fn client<'a>(
901887
}
902888
}),
903889
);
904-
#[allow(clippy::type_complexity)]
890+
905891
c_stats_output_timer
906892
.cross_singleton(c_latencies)
907893
.cross_singleton(c_throughput)
@@ -928,6 +914,8 @@ fn client<'a>(
928914

929915
// Proposer logic to calculate the largest ballot received so far.
930916
fn p_max_ballot<'a>(
917+
flow: &FlowBuilder<'a>,
918+
proposers: &Cluster<Proposer>,
931919
a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster<Proposer>>,
932920
a_to_proposers_p2b: Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster<Proposer>>,
933921
p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster<Proposer>>,
@@ -938,18 +926,11 @@ fn p_max_ballot<'a>(
938926
let p_received_p2b_ballots = a_to_proposers_p2b
939927
.clone()
940928
.map(q!(|(_, p2b)| p2b.max_ballot));
941-
let p_received_max_ballot = p_received_p1b_ballots
929+
p_received_p1b_ballots
942930
.union(p_received_p2b_ballots)
943931
.union(p_to_proposers_i_am_leader)
944-
.fold(
945-
q!(|| Ballot { num: 0, id: 0 }),
946-
q!(|curr_max_ballot, new_ballot| {
947-
if new_ballot > *curr_max_ballot {
948-
*curr_max_ballot = new_ballot;
949-
}
950-
}),
951-
);
952-
p_received_max_ballot
932+
.max()
933+
.unwrap_or(flow.singleton(proposers, q!(Ballot { num: 0, id: 0 })))
953934
}
954935

955936
// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot.
@@ -1017,9 +998,10 @@ fn p_p1a<'a>(
1017998
let p_id = flow.cluster_self_id(proposers);
1018999
let p_to_proposers_i_am_leader_new = p_ballot_num
10191000
.clone()
1020-
.latest()
1021-
.sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
1022-
.tick_batch()
1001+
.continue_if(
1002+
flow.source_interval(proposers, q!(Duration::from_secs(i_am_leader_send_timeout)))
1003+
.latest_tick(),
1004+
)
10231005
.continue_if(p_is_leader.clone())
10241006
.map(q!(move |ballot_num| Ballot {
10251007
num: ballot_num,

0 commit comments

Comments
 (0)