Skip to content

Commit 0d56f2b

Browse files
authored
Merge pull request #1456 from wprzytula/fix-pager-panic
pager: fix panic when runtime is shut down
2 parents 78fb4de + cf40086 commit 0d56f2b

File tree

1 file changed

+45
-8
lines changed

1 file changed

+45
-8
lines changed

scylla/src/client/pager.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -999,14 +999,51 @@ If you are using this API, you are probably doing something wrong."
999999
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
10001000
mut receiver: mpsc::Receiver<Result<ReceivedPage, NextPageError>>,
10011001
) -> Result<Self, NextPageError> {
1002-
tokio::task::spawn(worker_task);
1003-
1004-
// This unwrap is safe because:
1005-
// - The future returned by worker.work sends at least one item
1006-
// to the channel (the PageSendAttemptedProof helps enforce this)
1007-
// - That future is polled in a tokio::task which isn't going to be
1008-
// cancelled
1009-
let page_received = receiver.recv().await.unwrap()?;
1002+
let worker_handle = tokio::task::spawn(worker_task);
1003+
1004+
let Some(page_received_res) = receiver.recv().await else {
1005+
// - The future returned by worker.work sends at least one item
1006+
// to the channel (the PageSendAttemptedProof helps enforce this);
1007+
// - That future is polled in a tokio::task which isn't going to be
1008+
// cancelled, **unless** the runtime is being shut down.
1009+
// - Another way for the worker task to terminate without sending
1010+
// anything could be panic.
1011+
// Therefore, there are two possible reasons for recv() to return None:
1012+
// 1. The runtime is being shut down.
1013+
// 2. The worker task panicked.
1014+
//
1015+
// Both cases are handled below, and in both cases we do not return
1016+
// from this function, but rather either propagate the panic,
1017+
// or hang indefinitely to avoid returning from here during runtime shutdown.
1018+
let worker_result = worker_handle.await;
1019+
match worker_result {
1020+
Ok(_send_attempted_proof) => {
1021+
unreachable!(
1022+
"Worker task completed without sending any page, despite having returned proof of having sent some"
1023+
)
1024+
}
1025+
Err(join_error) => {
1026+
let is_cancelled = join_error.is_cancelled();
1027+
if let Ok(panic_payload) = join_error.try_into_panic() {
1028+
// Worker task panicked. Propagate the panic.
1029+
std::panic::resume_unwind(panic_payload);
1030+
} else {
1031+
// This is not a panic, so it must be runtime shutdown.
1032+
assert!(
1033+
is_cancelled,
1034+
"PagerWorker task join error is neither a panic nor cancellation, which should be impossible"
1035+
);
1036+
// Let's await a never-ending future to avoid returning from here.
1037+
// But before, let's emit a message to indicate that we're in such a situation.
1038+
tracing::info!(
1039+
"Runtime is being shut down while QueryPager is being constructed; hanging the future indefinitely"
1040+
);
1041+
return futures::future::pending().await;
1042+
}
1043+
}
1044+
}
1045+
};
1046+
let page_received = page_received_res?;
10101047
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
10111048

10121049
Ok(Self {

0 commit comments

Comments
 (0)