Skip to content

Commit 6596e93

Browse files
afckclaude
andauthored
[testnet] Fix poisoned worker race condition. (#5936) (#5940)
Backport of #5936. ## Motivation #5936 fixed a potential race condition in case of a DB journaling error. ## Proposal Ensure that the state that's removed from the map is really the poisoned one. Other minor cleanups to be closer to the `main` branch. ## Test Plan CI ## Release Plan - Validator hotfix. ## Links - PR to `main`: #5936 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0194776 commit 6596e93

File tree

3 files changed

+61
-46
lines changed

3 files changed

+61
-46
lines changed

linera-core/src/chain_worker/handle.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,6 @@ impl<S: Storage + Clone + 'static> DerefMut for RollbackGuard<S> {
8282

8383
impl<S: Storage + Clone + 'static> Drop for RollbackGuard<S> {
8484
fn drop(&mut self) {
85-
if self.0.is_poisoned() {
86-
// The view is in an inconsistent state due to a journal resolution failure.
87-
// Don't rollback — the worker will be evicted and reloaded.
88-
return;
89-
}
9085
self.0.rollback();
9186
}
9287
}

linera-core/src/chain_worker/state.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,11 @@ where
195195
self.chain.rollback();
196196
}
197197

198-
/// Returns whether this worker is poisoned (view is inconsistent).
199-
pub(crate) fn is_poisoned(&self) -> bool {
200-
self.poisoned
198+
/// Returns `WorkerError::PoisonedWorker` if the worker is poisoned due to a journal
199+
/// resolution failure.
200+
pub(crate) fn check_not_poisoned(&self) -> Result<(), WorkerError> {
201+
ensure!(!self.poisoned, WorkerError::PoisonedWorker);
202+
Ok(())
201203
}
202204

203205
/// Updates the last-access timestamp to the current time.
@@ -2131,16 +2133,16 @@ where
21312133
chain_id = %self.chain_id()
21322134
))]
21332135
async fn save(&mut self) -> Result<(), WorkerError> {
2134-
if let Err(e) = self.chain.save().await {
2135-
if e.must_reload_view() {
2136+
if let Err(error) = self.chain.save().await {
2137+
if error.must_reload_view() {
21362138
tracing::error!(
2137-
error = ?e,
2139+
?error,
21382140
chain_id = %self.chain_id(),
21392141
"Journal resolution failed; marking worker as poisoned"
21402142
);
21412143
self.poisoned = true;
21422144
}
2143-
return Err(e.into());
2145+
return Err(error.into());
21442146
}
21452147
Ok(())
21462148
}

linera-core/src/worker.rs

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -412,8 +412,8 @@ pub enum WorkerError {
412412

413413
#[error("Fallback mode is not available on this network")]
414414
NoFallbackMode,
415-
#[error("Chain worker for {chain_id} is poisoned and must be reloaded")]
416-
WorkerPoisoned { chain_id: ChainId },
415+
#[error("Chain worker was poisoned by a journal resolution failure")]
416+
PoisonedWorker,
417417
}
418418

419419
impl WorkerError {
@@ -449,15 +449,22 @@ impl WorkerError {
449449
| WorkerError::Thread(_)
450450
| WorkerError::ReadCertificatesError(_)
451451
| WorkerError::IncorrectOutcome { .. }
452-
| WorkerError::WorkerPoisoned { .. } => true,
452+
| WorkerError::PoisonedWorker => true,
453453
WorkerError::ChainError(chain_error) => chain_error.is_local(),
454454
}
455455
}
456456

457-
/// Returns `true` if this error was caused by a journal resolution failure,
458-
/// which may leave storage in an inconsistent state requiring a view reload.
459-
pub fn must_reload_view(&self) -> bool {
460-
matches!(self, WorkerError::ViewError(e) if e.must_reload_view())
457+
/// Returns `true` if this error indicates that the chain worker's in-memory
458+
/// state may be inconsistent and must be evicted from the cache.
459+
fn must_reload_view(&self) -> bool {
460+
matches!(
461+
self,
462+
WorkerError::PoisonedWorker
463+
| WorkerError::ViewError(ViewError::StoreError {
464+
must_reload_view: true,
465+
..
466+
})
467+
)
461468
}
462469
}
463470

@@ -756,16 +763,19 @@ where
756763
Fut: std::future::Future<Output = Result<R, WorkerError>>,
757764
{
758765
let state = self.get_or_create_chain_worker(chain_id).await?;
759-
Box::pin(wrap_future(async move {
760-
let guard = handle::read_lock(&state).await;
761-
if guard.is_poisoned() {
762-
self.chain_workers.pin().remove(&chain_id);
763-
drop(guard);
764-
return Err(WorkerError::WorkerPoisoned { chain_id });
765-
}
766+
let state_ref = &state;
767+
let result = Box::pin(wrap_future(async move {
768+
let guard = handle::read_lock(state_ref).await;
769+
guard.check_not_poisoned()?;
766770
f(guard).await
767771
}))
768-
.await
772+
.await;
773+
if let Err(error) = &result {
774+
if error.must_reload_view() {
775+
self.evict_poisoned_worker(chain_id, &state);
776+
}
777+
}
778+
result
769779
}
770780

771781
/// Acquires a write lock on the chain worker and executes the given closure.
@@ -779,26 +789,34 @@ where
779789
Fut: std::future::Future<Output = Result<R, WorkerError>>,
780790
{
781791
let state = self.get_or_create_chain_worker(chain_id).await?;
782-
Box::pin(wrap_future(async move {
783-
let guard = handle::write_lock(&state).await;
784-
if guard.is_poisoned() {
785-
self.chain_workers.pin().remove(&chain_id);
786-
drop(guard);
787-
return Err(WorkerError::WorkerPoisoned { chain_id });
788-
}
789-
let result = f(guard).await;
790-
if result.is_err() {
791-
// Check if the operation poisoned the worker. If so, evict it
792-
// so the next request reloads from storage.
793-
let guard = state.read().await;
794-
if guard.is_poisoned() {
795-
self.chain_workers.pin().remove(&chain_id);
796-
drop(guard);
797-
}
798-
}
799-
result
792+
let state_ref = &state;
793+
let result = Box::pin(wrap_future(async move {
794+
let guard = handle::write_lock(state_ref).await;
795+
guard.check_not_poisoned()?;
796+
f(guard).await
800797
}))
801-
.await
798+
.await;
799+
if let Err(error) = &result {
800+
if error.must_reload_view() {
801+
self.evict_poisoned_worker(chain_id, &state);
802+
}
803+
}
804+
result
805+
}
806+
807+
/// Evicts a poisoned chain worker from the cache, but only if the entry still
808+
/// points to the same instance. This avoids removing a fresh replacement that
809+
/// another task may have already loaded.
810+
fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
811+
tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
812+
let pin = self.chain_workers.pin();
813+
let weak_poisoned = Arc::downgrade(poisoned);
814+
let _ = pin.remove_if(&chain_id, |_key, future| {
815+
future
816+
.peek()
817+
.and_then(|r| r.clone().ok())
818+
.is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
819+
});
802820
}
803821

804822
/// Gets or creates a chain worker for the given chain.

0 commit comments

Comments
 (0)