|
1 | 1 | use futures::future::join_all;
|
2 | 2 | use reqwest::{Client, Url};
|
3 |
| -use std::time::Duration; |
| 3 | +use std::{collections::HashMap, time::Duration}; |
4 | 4 | use timeboost::types::BundleVariant;
|
5 | 5 | use timeboost_crypto::prelude::{ThresholdEncKey, ThresholdEncKeyCell};
|
6 | 6 | use timeboost_utils::load_generation::{make_bundle, tps_to_millis};
|
@@ -85,26 +85,75 @@ async fn fetch_encryption_key(client: &Client, enckey_url: &Url) -> Option<Thres
|
85 | 85 | }
|
86 | 86 | }
|
87 | 87 |
|
| 88 | +/// helper struct to keep track of sufficient quorum of DKG keys |
| 89 | +struct ThresholdEncKeyCellAccumulator { |
| 90 | + client: Client, |
| 91 | + // DKG results on individual node |
| 92 | + results: HashMap<Url, Option<ThresholdEncKey>>, |
| 93 | + // (t+1)-agreed upon encryption key |
| 94 | + output: ThresholdEncKeyCell, |
| 95 | + // threshold for the accumulator to be considered as matured / finalized |
| 96 | + threshold: usize, |
| 97 | +} |
| 98 | + |
| 99 | +impl ThresholdEncKeyCellAccumulator { |
| 100 | + /// give a list of TimeboostApi's endpoint to query `/enckey` status |
| 101 | + pub fn new(client: Client, urls: impl Iterator<Item = Url>) -> Self { |
| 102 | + let results: HashMap<Url, Option<ThresholdEncKey>> = urls.map(|url| (url, None)).collect(); |
| 103 | + let output = ThresholdEncKeyCell::new(); |
| 104 | + let threshold = results.len().div_ceil(3); |
| 105 | + Self { |
| 106 | + client, |
| 107 | + results, |
| 108 | + output, |
| 109 | + threshold, |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + /// try to get the threshold encryption key, only available after a threshold of nodes |
| 114 | + /// finish their DKG processes. |
| 115 | + pub async fn enc_key(&mut self) -> &ThresholdEncKeyCell { |
| 116 | + // if result is already available, directly return |
| 117 | + if self.output.get_ref().is_some() { |
| 118 | + &self.output |
| 119 | + } else { |
| 120 | + // first update DKG status for yet-finished nodes |
| 121 | + for (url, res) in self.results.iter_mut() { |
| 122 | + if res.is_none() { |
| 123 | + *res = fetch_encryption_key(&self.client, url).await; |
| 124 | + } |
| 125 | + } |
| 126 | + |
| 127 | + // count for each unique enc key from DKG results of different nodes |
| 128 | + let mut counts: HashMap<ThresholdEncKey, usize> = HashMap::new(); |
| 129 | + for v in self.results.values().flatten() { |
| 130 | + *counts.entry(v.to_owned()).or_insert(0) += 1; |
| 131 | + } |
| 132 | + |
| 133 | + // (t+1)-agreed enc_key is the output |
| 134 | + for (v, c) in counts.iter() { |
| 135 | + if *c >= self.threshold { |
| 136 | + self.output.set(v.to_owned()); |
| 137 | + } |
| 138 | + } |
| 139 | + &self.output |
| 140 | + } |
| 141 | + } |
| 142 | +} |
| 143 | + |
88 | 144 | pub async fn yap(addresses: &[Address], tps: u32) -> Result<()> {
|
89 | 145 | let c = Client::builder().timeout(Duration::from_secs(1)).build()?;
|
90 | 146 | let urls = setup_urls(addresses)?;
|
91 | 147 |
|
92 | 148 | let mut interval = interval(Duration::from_millis(tps_to_millis(tps)));
|
93 | 149 | interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
94 | 150 |
|
95 |
| - let enc_key = ThresholdEncKeyCell::new(); |
96 |
| - let enckey_url = &urls.first().expect("urls shouldn't be empty").2; |
97 |
| - loop { |
98 |
| - if enc_key.get_ref().is_none() { |
99 |
| - tracing::debug!("DKG ongoing ..."); |
100 |
| - if let Some(k) = fetch_encryption_key(&c, enckey_url).await { |
101 |
| - tracing::info!("DKG done"); |
102 |
| - enc_key.set(k) |
103 |
| - } |
104 |
| - } |
| 151 | + let mut acc = |
| 152 | + ThresholdEncKeyCellAccumulator::new(c.clone(), urls.iter().map(|url| url.2.clone())); |
105 | 153 |
|
| 154 | + loop { |
106 | 155 | // create a bundle for next `interval.tick()`, then send this bundle to each node
|
107 |
| - let Ok(b) = make_bundle(&enc_key) else { |
| 156 | + let Ok(b) = make_bundle(acc.enc_key().await) else { |
108 | 157 | warn!("failed to generate bundle");
|
109 | 158 | continue;
|
110 | 159 | };
|
|
0 commit comments