Skip to content

Commit cf40086

Browse files
committed
pager: fix panic when runtime is shut down
Historically, when making changes to the code we would forget to send something to the Pager Worker channel before closing it. This would cause the `recv().unwrap()` in QueryPager to panic, because the channel would be closed without having sent anything. To harden against such logic bugs, we added an abstraction of PageSendAttemptedProof, which enforces that at least one item is sent to the channel before it is closed, or else `worker.work()` can't return. With this abstraction in place, we long believed the `unwrap()` to be safe. However, there is two more cases when `recv()` can return None: 1) when the runtime is being shut down; 2) when the worker task which is owner of the channel's sending part panics. In both cases, the worker task terminates, and the channel is closed without sending anything. This commit handles both cases by executing special recovery logic when `recv()` returns None. This allows for graceful handling of runtime shutdown scenarios, without panicking, as well as correct panic propagation. We are sure that we won't introduce silent errors this way, because if we get None, the only possible explanation is that the runtime is indeed being shut down or that the worker task panicked. The logic bugs on the side of the Pager Worker are already prevented by the PageSendAttemptedProof abstraction. If panic is detected, it is propagated using `std::panic::resume_unwind`. If runtime shutdown is detected, we await a never-ending future to avoid returning from this function while the runtime is being shut down. To help debugging, we also emit a tracing info-level message in this case. Fixes: #1435
1 parent 78fb4de commit cf40086

File tree

1 file changed

+45
-8
lines changed

1 file changed

+45
-8
lines changed

scylla/src/client/pager.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -999,14 +999,51 @@ If you are using this API, you are probably doing something wrong."
999999
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
10001000
mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
10011001
) -> Result<Self, NextPageError> {
1002-
tokio::task::spawn(worker_task);
1003-
1004-
// This unwrap is safe because:
1005-
// - The future returned by worker.work sends at least one item
1006-
// to the channel (the PageSendAttemptedProof helps enforce this)
1007-
// - That future is polled in a tokio::task which isn't going to be
1008-
// cancelled
1009-
let page_received = receiver.recv().await.unwrap()?;
1002+
let worker_handle = tokio::task::spawn(worker_task);
1003+
1004+
let Some(page_received_res) = receiver.recv().await else {
1005+
// - The future returned by worker.work sends at least one item
1006+
// to the channel (the PageSendAttemptedProof helps enforce this);
1007+
// - That future is polled in a tokio::task which isn't going to be
1008+
// cancelled, **unless** the runtime is being shut down.
1009+
// - Another way for the worker task to terminate without sending
1010+
// anything could be panic.
1011+
// Therefore, there are two possible reasons for recv() to return None:
1012+
// 1. The runtime is being shut down.
1013+
// 2. The worker task panicked.
1014+
//
1015+
// Both cases are handled below, and in both cases we do not return
1016+
// from this function, but rather either propagate the panic,
1017+
// or hang indefinitely to avoid returning from here during runtime shutdown.
1018+
let worker_result = worker_handle.await;
1019+
match worker_result {
1020+
Ok(_send_attempted_proof) => {
1021+
unreachable!(
1022+
"Worker task completed without sending any page, despite having returned proof of having sent some"
1023+
)
1024+
}
1025+
Err(join_error) => {
1026+
let is_cancelled = join_error.is_cancelled();
1027+
if let Ok(panic_payload) = join_error.try_into_panic() {
1028+
// Worker task panicked. Propagate the panic.
1029+
std::panic::resume_unwind(panic_payload);
1030+
} else {
1031+
// This is not a panic, so it must be runtime shutdown.
1032+
assert!(
1033+
is_cancelled,
1034+
"PagerWorker task join error is neither a panic nor cancellation, which should be impossible"
1035+
);
1036+
// Let's await a never-ending future to avoid returning from here.
1037+
// But before, let's emit a message to indicate that we're in such a situation.
1038+
tracing::info!(
1039+
"Runtime is being shut down while QueryPager is being constructed; hanging the future indefinitely"
1040+
);
1041+
return futures::future::pending().await;
1042+
}
1043+
}
1044+
}
1045+
};
1046+
let page_received = page_received_res?;
10101047
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
10111048

10121049
Ok(Self {

0 commit comments

Comments
 (0)