Skip to content

Commit 17e86e8

Browse files
authored
fix(uptime): remove and prevent unwrap/panic (#462)
expect() are still allowed, since there are lots of places where it seems perfectly valid (poisoned locks, in particular, but also lots of things in the startup path.)
1 parent 87fe637 commit 17e86e8

File tree

12 files changed

+94
-80
lines changed

12 files changed

+94
-80
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

clippy.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
allow-unwrap-in-tests = true
2+
allow-panic-in-tests = true

src/app.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ pub fn execute() -> io::Result<()> {
4242
}
4343
result = shutdown.recv_task_finished() => {
4444
match result {
45-
None => panic!("tasks_finished channel unexpectedly closed"),
45+
None => tracing::info!("system.recv_task_finished.channel_closed"),
4646
Some(Err(err)) => {
47-
panic!("Error in partition: {err:?}");
47+
tracing::error!(%err, "system.recv_task_finished.shutdown_error");
4848
},
49-
_ => panic!("Unexpected end of task"),
49+
Some(Ok(_)) => tracing::info!("system.recv_task_finished.end_of_task"),
5050
}
5151
}
5252
}
53-
shutdown.stop().await;
53+
if let Err(err) = shutdown.stop().await {
54+
tracing::error!(%err, "system.shutdown_error");
55+
}
5456
tracing::info!("system.shutdown");
5557
Ok(())
5658
}),

src/assertions/compiled.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ pub fn extract_failure_data(
168168
crate::assertions::Op::Not { operand } => crate::assertions::Op::Not {
169169
operand: extract_failure_data(child, operand).into(),
170170
},
171-
_ => panic!(),
171+
_ => {
172+
tracing::error!("unexpected assert_op; this should be impossible");
173+
unreachable!();
174+
}
172175
},
173176
}
174177
}

src/check_config_provider/redis_config_provider.rs

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,15 @@ impl RedisConfigProvider {
158158
.set(config_payloads.len() as f64);
159159

160160
for config_payload in config_payloads {
161-
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
162-
.map_err(|err| {
163-
tracing::error!(?err, "config_consumer.invalid_config_message");
164-
})
165-
.unwrap();
166-
manager
167-
.get_service(partition.number)
168-
.get_config_store()
169-
.write()
170-
.unwrap()
171-
.add_config(config);
161+
match rmp_serde::from_slice(&config_payload) {
162+
Ok(config) => manager
163+
.get_service(partition.number)
164+
.get_config_store()
165+
.write()
166+
.expect("lock not poisoned")
167+
.add_config(config),
168+
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
169+
}
172170
}
173171
let partition_loading_time = partition_start_loading.elapsed().as_secs_f64();
174172
metrics::histogram!(
@@ -252,22 +250,22 @@ impl RedisConfigProvider {
252250
.await;
253251

254252
for config_payload in config_payloads {
255-
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
256-
.map_err(|err| {
257-
tracing::error!(?err, "config_consumer.invalid_config_message");
258-
})
259-
.unwrap();
260-
tracing::debug!(
261-
partition = partition.number,
262-
subscription_id = %config.subscription_id,
263-
"redis_config_provider.upserting_config"
264-
);
265-
manager
266-
.get_service(partition.number)
267-
.get_config_store()
268-
.write()
269-
.unwrap()
270-
.add_config(config);
253+
match rmp_serde::from_slice::<CheckConfig>(&config_payload) {
254+
Ok(config) => {
255+
tracing::debug!(
256+
partition = partition.number,
257+
subscription_id = %config.subscription_id,
258+
"redis_config_provider.upserting_config"
259+
);
260+
manager
261+
.get_service(partition.number)
262+
.get_config_store()
263+
.write()
264+
.expect("lock not poisoned")
265+
.add_config(config);
266+
}
267+
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
268+
}
271269
}
272270
let partition_update_duration = partition_update_start.elapsed().as_secs_f64();
273271
metrics::histogram!(
@@ -329,6 +327,9 @@ pub fn run_config_provider(
329327
})
330328
}
331329

330+
// This function is allowed to panic, as an incorrect checker number represents a fatal
331+
// logic error for the uptime checker.
332+
#[allow(clippy::panic)]
332333
pub fn determine_owned_partitions(config: &Config) -> HashSet<u16> {
333334
// Determines which partitions this checker owns based on number of partitions,
334335
// number of checkers and checker number

src/check_executor.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -301,18 +301,19 @@ async fn executor_loop(
301301
job_producer,
302302
conf.region,
303303
);
304-
if conf.record_task_metrics {
304+
let check_task_result = if conf.record_task_metrics {
305305
if conf.checker_parallel {
306-
tokio::spawn(metrics_monitor.instrument(check_fut))
307-
.await
308-
.expect("The check task should not fail");
306+
tokio::spawn(metrics_monitor.instrument(check_fut)).await
309307
} else {
310308
metrics_monitor.instrument(check_fut).await;
309+
Ok(())
311310
}
312311
} else {
313-
tokio::spawn(check_fut)
314-
.await
315-
.expect("The check task should not fail");
312+
tokio::spawn(check_fut).await
313+
};
314+
315+
if let Err(err) = check_task_result {
316+
tracing::error!(%err, "executor.check_task_failed");
316317
}
317318
let num_running_val = num_running.fetch_sub(1, Ordering::Relaxed) - 1;
318319
num_running_gauge.set(num_running_val as f64);
@@ -351,9 +352,9 @@ pub(crate) async fn do_check(
351352
{
352353
Some(check_result) => check_result,
353354
None => {
354-
scheduled_check
355-
.record_result(None)
356-
.expect("Check recording channel should exist");
355+
if let Err(err) = scheduled_check.record_result(None) {
356+
tracing::error!(%err, "executor.do_check.robots_record_results_error");
357+
}
357358
return;
358359
}
359360
},

src/checker/reqwest_checker.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,7 @@ async fn do_request(
8888
check_config: &CheckConfig,
8989
sentry_trace: &str,
9090
) -> Result<(Response, RequestId), reqwest::Error> {
91-
let timeout = check_config
92-
.timeout
93-
.to_std()
94-
.expect("Timeout duration should be representable as a duration");
91+
let timeout = check_config.timeout.to_std().unwrap_or_default();
9592

9693
let url = check_config.url.as_str();
9794

@@ -457,16 +454,15 @@ fn to_errored_request_infos(
457454
// connection error_ will require some effort, so for now, just bill the full time to the part that
458455
// we failed on, leaving the others at zero.
459456

460-
let request_duration =
461-
TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
457+
let request_duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();
462458
let zero_timing = Timing {
463459
start_us: actual_check_time.timestamp_micros() as u128,
464460
duration_us: 0,
465461
};
466462

467463
let full_duration = Timing {
468464
start_us: actual_check_time.timestamp_micros() as u128,
469-
duration_us: request_duration.num_microseconds().unwrap() as u64,
465+
duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
470466
};
471467

472468
let mut dns_timing = zero_timing;
@@ -493,7 +489,7 @@ fn to_errored_request_infos(
493489
request_body_size_bytes: check.get_config().request_body.len() as u32,
494490
url: check.get_config().url.clone(),
495491
response_body_size_bytes: 0,
496-
request_duration_us: request_duration.num_microseconds().unwrap() as u64,
492+
request_duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
497493
durations: RequestDurations {
498494
dns_lookup: dns_timing,
499495
tcp_connection: connection_timing,
@@ -600,7 +596,7 @@ impl Checker for ReqwestChecker {
600596
);
601597

602598
// Our total duration includes the additional processing time, including running the assert.
603-
let duration = TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
599+
let duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();
604600

605601
let mut rinfos = rinfos;
606602

@@ -615,7 +611,7 @@ impl Checker for ReqwestChecker {
615611
}
616612
}
617613

618-
let final_req = rinfos.last().unwrap().clone();
614+
let request_info = rinfos.last().cloned();
619615

620616
let assertion_failure_data = if let Some(path) = check_result.assert_path {
621617
Assertion {
@@ -625,7 +621,7 @@ impl Checker for ReqwestChecker {
625621
.get_config()
626622
.assertion
627623
.as_ref()
628-
.expect("cannot have assertion failure data with an assertion")
624+
.expect("cannot have assertion failure data without an assertion")
629625
.root,
630626
),
631627
}
@@ -647,7 +643,7 @@ impl Checker for ReqwestChecker {
647643
actual_check_time_us: actual_check_time,
648644
duration: Some(duration),
649645
duration_us: Some(duration),
650-
request_info: Some(final_req),
646+
request_info,
651647
region,
652648
request_info_list: rinfos,
653649
assertion_failure_data,

src/endpoint/mod.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ pub fn start_endpoint(
3737
let endpoint_port = config.webserver_port;
3838

3939
tokio::spawn(async move {
40-
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", endpoint_port)).await;
41-
42-
let Ok(listener) = listener else {
43-
tracing::error!(
44-
"Could not listen on webserver endpoint: {}",
45-
listener.err().unwrap().to_string()
46-
);
47-
return;
48-
};
40+
let listener =
41+
match tokio::net::TcpListener::bind(format!("0.0.0.0:{}", endpoint_port)).await {
42+
Ok(listener) => listener,
43+
Err(err) => {
44+
tracing::error!(%err, "endpoint.listen_error");
45+
return;
46+
}
47+
};
4948
let shutdown_fut = shutdown_signal.cancelled_owned();
5049
let result = axum::serve(listener, app)
5150
.with_graceful_shutdown(shutdown_fut)

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![deny(clippy::unwrap_used, clippy::panic)]
12
// TODO: We might want to remove this once more stable, but it's just noisy for now.
23
#![allow(dead_code)]
34
mod app;

src/manager.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
sync::{Arc, RwLock},
88
};
99
use tokio::sync::mpsc::{self, UnboundedSender};
10-
use tokio::task::JoinHandle;
10+
use tokio::task::{JoinError, JoinHandle};
1111
use tokio_stream::wrappers::UnboundedReceiverStream;
1212
use tokio_util::sync::CancellationToken;
1313

@@ -104,8 +104,9 @@ impl PartitionedService {
104104
pub async fn stop(self) {
105105
self.shutdown_signal.cancel();
106106

107-
// Okay to unwrap here, since we're just shutting down.
108-
self.scheduler_join_handle.await.unwrap();
107+
if let Err(err) = self.scheduler_join_handle.await {
108+
tracing::error!(%err, "partionied_service.shutdown_error");
109+
}
109110
tracing::info!(partition = self.partition, "partitioned_service.shutdown");
110111
}
111112
}
@@ -130,20 +131,20 @@ pub struct ManagerHandle {
130131
}
131132

132133
impl ManagerHandle {
133-
pub async fn stop(self) {
134+
pub async fn stop(self) -> Result<(), JoinError> {
134135
self.shutdown_signal.cancel();
135-
// Unwrapping here because we're just shutting down; it's okay to fail badly
136-
// at this point.
137136

138-
self.endpoint_join_handle.await.unwrap();
137+
self.endpoint_join_handle.await?;
138+
139+
self.consumer_join_handle.await?;
139140

140-
self.consumer_join_handle.await.unwrap();
141+
self.results_worker.await?;
141142

142-
self.results_worker.await.unwrap();
143+
self.services_join_handle.await?;
143144

144-
self.services_join_handle.await.unwrap();
145+
self.executor_join_handle.await?;
145146

146-
self.executor_join_handle.await.unwrap();
147+
Ok(())
147148
}
148149

149150
pub async fn recv_task_finished(&mut self) -> Option<anyhow::Result<()>> {

0 commit comments

Comments
 (0)