-
Notifications
You must be signed in to change notification settings - Fork 73
Closed
Closed
Copy link
Description
#[cfg(stageleft_runtime)]
hydro_lang::setup!();
use std::hash::Hash;
use hydro_lang::live_collections::stream::{NoOrder, Ordering};
use hydro_lang::location::{Location, NoTick};
use hydro_lang::prelude::*;
#[expect(clippy::type_complexity, reason = "stream types with ordering")]
pub fn collect_quorum<
'a,
L: Location<'a> + NoTick,
Order: Ordering,
K: Clone + Eq + Hash,
E: Clone,
>(
responses: Stream<(K, Result<(), E>), L, Unbounded, Order>,
count: usize,
) -> (Stream<K, L, Unbounded, NoOrder>, Stream<(K, E), L, Unbounded, Order>) {
let (successes, errors) = sliced! {
let mut state = use::state(| l | {
l.singleton(q!(std::collections::HashMap::<K, (usize, usize)>::new()))
});
let batch = use(responses, nondet!(#[doc = "batch boundaries don't affect correctness"]));
let errors_out = batch
.clone()
.filter_map(q!(| (k, res) | {
match res {
Ok(()) => None,
Err(e) => Some((k, e)),
}
}));
let successes_out = batch
.clone()
.filter_map(q!(| (k, res) | {
match res {
Ok(()) => Some(k),
Err(_) => None,
}
}));
let updated_state = batch
.assume_ordering(nondet!(#[doc = "fold is commutative for this operation"]))
.fold(
q!(|| std::collections::HashMap::<K, (usize, usize)>::new()),
q!(| acc: &mut std::collections::HashMap<K, (usize, usize)>, (k, res) | {
let (succ, err) = acc.get(&k).cloned().unwrap_or((0, 0));
match res {
Ok(()) => acc.insert(k, (succ + 1, err)),
Err(_) => acc.insert(k, (succ, err + 1)),
};
}),
);
state = updated_state;
(successes_out.weakest_ordering(), errors_out)
};
(successes, errors)
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels