Skip to content

Commit 00854a0

Browse files
committed
Fix dangling endpoint to chain worker actor that failed to start (#3298)
Testnet is currently experiencing some panics when an attempt is made to send a request to a chain worker actor. The panic message says that the endpoint is closed. One possible cause is #3295, where a storage error could cause an endpoint to be added to the chain worker cache but its respective actor task wouldn't start. Refactor the `ChainWorkerActor` so that it has a `run` method that loads the chain state from storage and then handles the incoming requests. If loading the state fails, then the error is reported to the next requester, and loading will be reattempted while the actor is running and receiving requests. CI should catch any regressions caused by this refactor. - These changes should be backported to the latest `devnet` branch, then - be released in a validator hotfix. - These changes should be backported to the latest `testnet` branch, then - be released in a validator hotfix. This needs to be hotfixed because it may fix a bug that's in production. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) - Closes #3295
1 parent 1566fa9 commit 00854a0

File tree

5 files changed

+231
-127
lines changed

5 files changed

+231
-127
lines changed

linera-client/src/client_context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ where
295295
// Try processing the inbox optimistically without waiting for validator notifications.
296296
let (new_certificates, maybe_timeout) = {
297297
chain_client.synchronize_from_validators().await?;
298-
let result = chain_client.process_inbox_without_prepare().await;
298+
let result = Box::pin(chain_client.process_inbox_without_prepare()).await;
299299
self.update_wallet_from_client(chain_client).await?;
300300
if result.is_err() {
301301
self.save_wallet().await?;
@@ -314,7 +314,7 @@ where
314314

315315
loop {
316316
let (new_certificates, maybe_timeout) = {
317-
let result = chain_client.process_inbox().await;
317+
let result = Box::pin(chain_client.process_inbox()).await;
318318
self.update_wallet_from_client(chain_client).await?;
319319
if result.is_err() {
320320
self.save_wallet().await?;
@@ -591,7 +591,7 @@ where
591591

592592
for chain_id in key_pairs.keys() {
593593
let child_client = self.make_chain_client(*chain_id)?;
594-
child_client.process_inbox().await?;
594+
Box::pin(child_client.process_inbox()).await?;
595595
self.wallet.as_mut().update_from_state(&child_client).await;
596596
self.save_wallet().await?;
597597
}
@@ -650,7 +650,7 @@ where
650650
async move {
651651
for i in 0..5 {
652652
linera_base::time::timer::sleep(Duration::from_secs(i)).await;
653-
chain_client.process_inbox().await?;
653+
Box::pin(chain_client.process_inbox()).await?;
654654
let chain_state = chain_client.chain_state_view().await?;
655655
if chain_state
656656
.execution_state

linera-core/src/chain_worker/actor.rs

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use linera_execution::{
2626
};
2727
use linera_storage::Storage;
2828
use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard};
29-
use tracing::{instrument, trace, warn};
29+
use tracing::{debug, instrument, trace, warn};
3030

3131
use super::{config::ChainWorkerConfig, state::ChainWorkerState, DeliveryNotifier};
3232
use crate::{
@@ -141,9 +141,46 @@ impl<StorageClient> ChainWorkerActor<StorageClient>
141141
where
142142
StorageClient: Storage + Clone + Send + Sync + 'static,
143143
{
144-
/// Spawns a new task to run the [`ChainWorkerActor`], returning an endpoint for sending
145-
/// requests to the worker.
146-
#[tracing::instrument(level = "debug", skip_all, fields(?chain_id))]
144+
/// Runs the [`ChainWorkerActor`], first by loading the chain state from `storage` then
145+
/// handling all `incoming_requests` as they arrive.
146+
///
147+
/// If loading the chain state fails the next request will receive the error reported by the
148+
/// `storage`, and the actor will then try again to load the state.
149+
pub async fn run(
150+
config: ChainWorkerConfig,
151+
storage: StorageClient,
152+
certificate_value_cache: Arc<ValueCache<CryptoHash, HashedCertificateValue>>,
153+
tracked_chains: Option<Arc<RwLock<HashSet<ChainId>>>>,
154+
delivery_notifier: DeliveryNotifier,
155+
chain_id: ChainId,
156+
mut incoming_requests: mpsc::UnboundedReceiver<ChainWorkerRequest<StorageClient::Context>>,
157+
) {
158+
let actor = loop {
159+
let load_result = Self::load(
160+
config.clone(),
161+
storage.clone(),
162+
certificate_value_cache.clone(),
163+
tracked_chains.clone(),
164+
delivery_notifier.clone(),
165+
chain_id,
166+
)
167+
.await
168+
.inspect_err(|error| warn!("Failed to load chain state: {error:?}"));
169+
170+
match load_result {
171+
Ok(actor) => break actor,
172+
Err(error) => match incoming_requests.recv().await {
173+
Some(request) => request.send_error(error),
174+
None => return,
175+
},
176+
}
177+
};
178+
179+
actor.handle_requests(incoming_requests).await;
180+
}
181+
182+
/// Creates a [`ChainWorkerActor`], loading it with the chain state for the requested
183+
/// [`ChainId`].
147184
pub async fn load(
148185
config: ChainWorkerConfig,
149186
storage: StorageClient,
@@ -218,7 +255,7 @@ where
218255
skip_all,
219256
fields(chain_id = format!("{:.8}", self.worker.chain_id())),
220257
)]
221-
pub async fn run(
258+
pub async fn handle_requests(
222259
mut self,
223260
mut incoming_requests: mpsc::UnboundedReceiver<ChainWorkerRequest<StorageClient::Context>>,
224261
) {
@@ -455,3 +492,59 @@ where
455492
}
456493
}
457494
}
495+
496+
impl<Context> ChainWorkerRequest<Context>
497+
where
498+
Context: linera_views::context::Context + Clone + Send + Sync + 'static,
499+
{
500+
/// Responds to this request with an `error`.
501+
pub fn send_error(self, error: WorkerError) {
502+
debug!("Immediately sending error to chain worker request {self:?}");
503+
504+
let responded = match self {
505+
#[cfg(with_testing)]
506+
ChainWorkerRequest::ReadCertificate { callback, .. } => {
507+
callback.send(Err(error)).is_ok()
508+
}
509+
#[cfg(with_testing)]
510+
ChainWorkerRequest::FindBundleInInbox { callback, .. } => {
511+
callback.send(Err(error)).is_ok()
512+
}
513+
ChainWorkerRequest::GetChainStateView { callback } => callback.send(Err(error)).is_ok(),
514+
ChainWorkerRequest::QueryApplication { callback, .. } => {
515+
callback.send(Err(error)).is_ok()
516+
}
517+
ChainWorkerRequest::DescribeApplication { callback, .. } => {
518+
callback.send(Err(error)).is_ok()
519+
}
520+
ChainWorkerRequest::StageBlockExecution { callback, .. } => {
521+
callback.send(Err(error)).is_ok()
522+
}
523+
ChainWorkerRequest::ProcessTimeout { callback, .. } => {
524+
callback.send(Err(error)).is_ok()
525+
}
526+
ChainWorkerRequest::HandleBlockProposal { callback, .. } => {
527+
callback.send(Err(error)).is_ok()
528+
}
529+
ChainWorkerRequest::ProcessValidatedBlock { callback, .. } => {
530+
callback.send(Err(error)).is_ok()
531+
}
532+
ChainWorkerRequest::ProcessConfirmedBlock { callback, .. } => {
533+
callback.send(Err(error)).is_ok()
534+
}
535+
ChainWorkerRequest::ProcessCrossChainUpdate { callback, .. } => {
536+
callback.send(Err(error)).is_ok()
537+
}
538+
ChainWorkerRequest::ConfirmUpdatedRecipient { callback, .. } => {
539+
callback.send(Err(error)).is_ok()
540+
}
541+
ChainWorkerRequest::HandleChainInfoQuery { callback, .. } => {
542+
callback.send(Err(error)).is_ok()
543+
}
544+
};
545+
546+
if !responded {
547+
warn!("Callback for `ChainWorkerActor` was dropped before a response was sent");
548+
}
549+
}
550+
}

0 commit comments

Comments
 (0)