|
1 |
| -use anyhow::Result; |
| 1 | +use anyhow::{Result, bail}; |
| 2 | +use futures::FutureExt; |
| 3 | +use reqwest::Url; |
2 | 4 | use std::time::{Duration, Instant};
|
3 |
| -use tokio::signal; |
| 5 | +use timeboost_types::sailfish::RoundNumber; |
| 6 | +use tokio::time::sleep; |
| 7 | +use tokio::{select, signal}; |
| 8 | +use tracing::{error, info}; |
4 | 9 |
|
5 | 10 | const ROUND_TIMEOUT_SECS: u64 = 30;
|
6 | 11 | const MAX_ROUND_TIMEOUTS: u64 = 15;
|
7 | 12 |
|
8 |
| -pub async fn run_until(until: u64, timeout: u64, host: reqwest::Url) -> Result<()> { |
9 |
| - use futures::FutureExt; |
10 |
| - use tokio::time::sleep; |
| 13 | +#[derive(Debug)] |
| 14 | +pub struct Until { |
| 15 | + rounds: RoundNumber, |
| 16 | + duration: Duration, |
| 17 | + host: Url, |
| 18 | + require_decrypted: Option<RoundNumber>, |
| 19 | +} |
11 | 20 |
|
12 |
| - sleep(Duration::from_secs(1)).await; |
| 21 | +impl Until { |
| 22 | + pub fn new<N>(r: N, d: Duration, host: Url) -> Self |
| 23 | + where |
| 24 | + N: Into<RoundNumber>, |
| 25 | + { |
| 26 | + Self { |
| 27 | + rounds: r.into(), |
| 28 | + duration: d, |
| 29 | + host, |
| 30 | + require_decrypted: None, |
| 31 | + } |
| 32 | + } |
13 | 33 |
|
14 |
| - let mut timer = sleep(Duration::from_secs(timeout)).fuse().boxed(); |
| 34 | + pub fn require_decrypted<N>(&mut self, rounds: Option<N>) -> &mut Self |
| 35 | + where |
| 36 | + N: Into<RoundNumber>, |
| 37 | + { |
| 38 | + self.require_decrypted = rounds.map(|r| r.into()); |
| 39 | + self |
| 40 | + } |
15 | 41 |
|
16 |
| - let mut req_timer = sleep(Duration::from_secs(1)).fuse().boxed(); |
| 42 | + pub async fn run(self) -> Result<()> { |
| 43 | + let mut timer = sleep(self.duration).fuse().boxed(); |
17 | 44 |
|
18 |
| - let mut last_committed = 0; |
19 |
| - let mut last_committed_time = Instant::now(); |
20 |
| - // Deliberately run this on a timeout to avoid a runaway testing scenario. |
21 |
| - loop { |
22 |
| - tokio::select! { |
23 |
| - _ = &mut timer => { |
24 |
| - tracing::error!("watchdog timed out, shutting down"); |
25 |
| - anyhow::bail!("Watchdog timeout") |
26 |
| - } |
27 |
| - _ = &mut req_timer => { |
28 |
| - req_timer = sleep(Duration::from_secs(1)).fuse().boxed(); |
29 |
| - if let Ok(resp) = reqwest::get(host.join("i/metrics").expect("valid url")).await { |
30 |
| - if let Ok(text) = resp.text().await { |
31 |
| - let committed_round = text |
32 |
| - .lines() |
33 |
| - .find(|line| line.starts_with("committed_round")) |
34 |
| - .and_then(|line| line.split(' ').nth(1)) |
35 |
| - .and_then(|num| num.parse::<u64>().ok()) |
36 |
| - .unwrap_or(0); |
37 |
| - |
38 |
| - if committed_round > 0 && committed_round % 10 == 0 { |
39 |
| - tracing::info!("committed_round: {}", committed_round); |
40 |
| - } |
| 45 | + let mut req_timer = sleep(Duration::from_secs(1)).fuse().boxed(); |
41 | 46 |
|
42 |
| - let now = Instant::now(); |
43 |
| - if committed_round == last_committed |
44 |
| - && now.saturating_duration_since(last_committed_time) > Duration::from_secs(ROUND_TIMEOUT_SECS) |
45 |
| - { |
46 |
| - anyhow::bail!("Node stuck on round for more than {} seconds", ROUND_TIMEOUT_SECS) |
47 |
| - } else if committed_round > last_committed { |
48 |
| - last_committed = committed_round; |
49 |
| - last_committed_time = now; |
50 |
| - } |
| 47 | + let mut last_committed = 0; |
| 48 | + let mut last_committed_time = Instant::now(); |
51 | 49 |
|
52 |
| - let timeouts = text |
53 |
| - .lines() |
54 |
| - .find(|line| line.starts_with("rounds_timed_out")) |
55 |
| - .and_then(|line| line.split(' ').nth(1)) |
56 |
| - .and_then(|num| num.parse::<u64>().ok()) |
57 |
| - .unwrap_or(0); |
| 50 | + loop { |
| 51 | + select! { |
| 52 | + _ = &mut timer => { |
| 53 | + error!("watchdog timed out, shutting down"); |
| 54 | + bail!("watchdog timeout") |
| 55 | + } |
| 56 | + _ = &mut req_timer => { |
| 57 | + req_timer = sleep(Duration::from_secs(1)).fuse().boxed(); |
| 58 | + if let Ok(resp) = reqwest::get(self.host.join("i/metrics").expect("valid url")).await { |
| 59 | + if let Ok(text) = resp.text().await { |
| 60 | + let committed_round = text |
| 61 | + .lines() |
| 62 | + .find(|line| line.starts_with("committed_round")) |
| 63 | + .and_then(|line| line.split(' ').nth(1)) |
| 64 | + .and_then(|num| num.parse::<u64>().ok()) |
| 65 | + .unwrap_or(0); |
58 | 66 |
|
59 |
| - if timeouts >= MAX_ROUND_TIMEOUTS { |
60 |
| - anyhow::bail!("Node timed out too many rounds") |
61 |
| - } |
| 67 | + if committed_round > 0 && committed_round % 10 == 0 { |
| 68 | + info!(%committed_round); |
| 69 | + } |
62 | 70 |
|
63 |
| - let queued_encrypted = text |
64 |
| - .lines() |
65 |
| - .find(|line| line.starts_with("queued_encrypted_ilist")) |
66 |
| - .and_then(|line| line.split(' ').nth(1)) |
67 |
| - .and_then(|num| num.parse::<u64>().ok()) |
68 |
| - .unwrap_or(0); |
69 |
| - let output_decrypted = text |
70 |
| - .lines() |
71 |
| - .find(|line| line.starts_with("output_decrypted_ilist")) |
72 |
| - .and_then(|line| line.split(' ').nth(1)) |
73 |
| - .and_then(|num| num.parse::<u64>().ok()) |
74 |
| - .unwrap_or(0); |
75 |
| - |
76 |
| - if committed_round >= until && output_decrypted > 0 { |
77 |
| - tracing::info!("committed_round: {}", committed_round); |
78 |
| - tracing::info!("enqueued encrypted: {}, output decrypted: {}", queued_encrypted, output_decrypted); |
79 |
| - tracing::info!("watchdog completed successfully"); |
80 |
| - break; |
| 71 | + let now = Instant::now(); |
| 72 | + |
| 73 | + if committed_round == last_committed |
| 74 | + && now.saturating_duration_since(last_committed_time) > Duration::from_secs(ROUND_TIMEOUT_SECS) |
| 75 | + { |
| 76 | + bail!("node stuck on round for more than {} seconds", ROUND_TIMEOUT_SECS) |
| 77 | + } else if committed_round > last_committed { |
| 78 | + last_committed = committed_round; |
| 79 | + last_committed_time = now; |
| 80 | + } |
| 81 | + |
| 82 | + let timeouts = text |
| 83 | + .lines() |
| 84 | + .find(|line| line.starts_with("rounds_timed_out")) |
| 85 | + .and_then(|line| line.split(' ').nth(1)) |
| 86 | + .and_then(|num| num.parse::<u64>().ok()) |
| 87 | + .unwrap_or(0); |
| 88 | + |
| 89 | + if timeouts >= MAX_ROUND_TIMEOUTS { |
| 90 | + bail!("node timed out too many rounds") |
| 91 | + } |
| 92 | + |
| 93 | + let queued_encrypted = text |
| 94 | + .lines() |
| 95 | + .find(|line| line.starts_with("queued_encrypted_ilist")) |
| 96 | + .and_then(|line| line.split(' ').nth(1)) |
| 97 | + .and_then(|num| num.parse::<u64>().ok()) |
| 98 | + .unwrap_or(0); |
| 99 | + |
| 100 | + let output_decrypted = text |
| 101 | + .lines() |
| 102 | + .find(|line| line.starts_with("output_decrypted_ilist")) |
| 103 | + .and_then(|line| line.split(' ').nth(1)) |
| 104 | + .and_then(|num| num.parse::<u64>().ok()) |
| 105 | + .unwrap_or(0); |
| 106 | + |
| 107 | + if committed_round >= *self.rounds { |
| 108 | + if let Some(r) = self.require_decrypted { |
| 109 | + if output_decrypted > *r { |
| 110 | + info!(%committed_round); |
| 111 | + info!(%queued_encrypted, %output_decrypted); |
| 112 | + info!("watchdog completed successfully"); |
| 113 | + break; |
| 114 | + } |
| 115 | + } else { |
| 116 | + info!(%committed_round); |
| 117 | + info!("watchdog completed successfully"); |
| 118 | + break; |
| 119 | + } |
| 120 | + } |
81 | 121 | }
|
82 | 122 | }
|
83 | 123 | }
|
84 |
| - } |
85 |
| - _ = signal::ctrl_c() => { |
86 |
| - tracing::info!("ctrl-c received, shutting down"); |
87 |
| - break; |
| 124 | + _ = signal::ctrl_c() => { |
| 125 | + tracing::info!("ctrl-c received, shutting down"); |
| 126 | + break; |
| 127 | + } |
88 | 128 | }
|
89 | 129 | }
|
| 130 | + Ok(()) |
90 | 131 | }
|
91 |
| - Ok(()) |
92 | 132 | }
|
0 commit comments