Skip to content

Commit 5ab08cd

Browse files
afckclaude
andauthored
Cleanups; fix potential race condition (#5936)
## Motivation In #5935 a few questions were raised after it was already merged. Whether we `rollback()` is irrelevant if the state is poisoned because we drop that view anyway. Also, the port needlessly introduced a few changes compared to `testnet_conway`. ## Proposal Bring the code more in line with `testnet_conway`. Make sure worker states only get evicted if they are really poisoned. ## Test Plan CI ## Release Plan - Backport the worker fix. ## Links - Addresses some comments from #5935. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ebc90d8 commit 5ab08cd

File tree

2 files changed

+55
-49
lines changed

2 files changed

+55
-49
lines changed

linera-core/src/chain_worker/state.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,6 @@ where
114114
poisoned: bool,
115115
}
116116

117-
/// Whether the block was processed or skipped. Used for metrics.
118-
pub enum BlockOutcome {
119-
Processed,
120-
Preprocessed,
121-
Skipped,
122-
}
123-
124117
/// The result of processing a cross-chain update.
125118
pub(crate) enum CrossChainUpdateResult {
126119
/// The update was applied and the chain was saved up to the given height.
@@ -136,6 +129,13 @@ pub(crate) enum CrossChainUpdateResult {
136129
},
137130
}
138131

132+
/// Whether the block was processed or skipped. Used for metrics.
133+
pub enum BlockOutcome {
134+
Processed,
135+
Preprocessed,
136+
Skipped,
137+
}
138+
139139
impl<StorageClient> ChainWorkerState<StorageClient>
140140
where
141141
StorageClient: Storage + Clone + 'static,
@@ -192,16 +192,15 @@ where
192192
}
193193

194194
/// Rolls back any uncommitted changes to the chain state.
195-
/// Does nothing if the worker is poisoned (the view is in an inconsistent state).
196195
pub(crate) fn rollback(&mut self) {
197-
if !self.poisoned {
198-
self.chain.rollback();
199-
}
196+
self.chain.rollback();
200197
}
201198

202-
/// Returns `true` if the worker is poisoned due to a journal resolution failure.
203-
pub(crate) fn is_poisoned(&self) -> bool {
204-
self.poisoned
199+
/// Returns `WorkerError::PoisonedWorker` if the worker is poisoned due to a journal
200+
/// resolution failure.
201+
pub(crate) fn check_not_poisoned(&self) -> Result<(), WorkerError> {
202+
ensure!(!self.poisoned, WorkerError::PoisonedWorker);
203+
Ok(())
205204
}
206205

207206
/// Updates the last-access timestamp to the current time.
@@ -345,7 +344,6 @@ where
345344
Ok(maybe_blobs.into_iter().collect())
346345
}
347346

348-
/// Loads pending cross-chain requests, and adds `NewRound` notifications where appropriate.
349347
/// Creates cross-chain requests for a single recipient from its outbox.
350348
#[instrument(skip_all, fields(
351349
chain_id = %self.chain_id()
@@ -2147,16 +2145,16 @@ where
21472145
chain_id = %self.chain_id()
21482146
))]
21492147
async fn save(&mut self) -> Result<(), WorkerError> {
2150-
if let Err(e) = self.chain.save().await {
2151-
if e.must_reload_view() {
2148+
if let Err(error) = self.chain.save().await {
2149+
if error.must_reload_view() {
21522150
tracing::error!(
2153-
error = ?e,
2151+
?error,
21542152
chain_id = %self.chain_id(),
21552153
"Journal resolution failed; marking worker as poisoned"
21562154
);
21572155
self.poisoned = true;
21582156
}
2159-
return Err(e.into());
2157+
return Err(error.into());
21602158
}
21612159
Ok(())
21622160
}

linera-core/src/worker.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ pub enum WorkerError {
412412
MissingNetworkDescription,
413413
#[error("thread error: {0}")]
414414
Thread(#[from] web_thread_pool::Error),
415+
#[error("Chain worker was poisoned by a journal resolution failure")]
416+
PoisonedWorker,
415417
}
416418

417419
impl WorkerError {
@@ -447,15 +449,23 @@ impl WorkerError {
447449
| WorkerError::MissingNetworkDescription
448450
| WorkerError::Thread(_)
449451
| WorkerError::ReadCertificatesError(_)
450-
| WorkerError::IncorrectOutcome { .. } => true,
452+
| WorkerError::IncorrectOutcome { .. }
453+
| WorkerError::PoisonedWorker => true,
451454
WorkerError::ChainError(chain_error) => chain_error.is_local(),
452455
}
453456
}
454457

455-
/// Returns `true` if this error was caused by a journal resolution failure,
456-
/// which may leave storage in an inconsistent state requiring a view reload.
457-
pub fn must_reload_view(&self) -> bool {
458-
matches!(self, WorkerError::ViewError(e) if e.must_reload_view())
458+
/// Returns `true` if this error indicates that the chain worker's in-memory
459+
/// state may be inconsistent and must be evicted from the cache.
460+
fn must_reload_view(&self) -> bool {
461+
matches!(
462+
self,
463+
WorkerError::PoisonedWorker
464+
| WorkerError::ViewError(ViewError::StoreError {
465+
must_reload_view: true,
466+
..
467+
})
468+
)
459469
}
460470
}
461471

@@ -773,17 +783,16 @@ where
773783
Fut: std::future::Future<Output = Result<R, WorkerError>>,
774784
{
775785
let state = self.get_or_create_chain_worker(chain_id).await?;
786+
let state_ref = &state;
776787
let result = Box::pin(wrap_future(async move {
777-
let guard = handle::read_lock(&state).await;
778-
if guard.is_poisoned() {
779-
return Err(Self::poisoned_worker_error());
780-
}
788+
let guard = handle::read_lock(state_ref).await;
789+
guard.check_not_poisoned()?;
781790
f(guard).await
782791
}))
783792
.await;
784-
if let Err(e) = &result {
785-
if e.must_reload_view() {
786-
self.evict_poisoned_worker(chain_id);
793+
if let Err(error) = &result {
794+
if error.must_reload_view() {
795+
self.evict_poisoned_worker(chain_id, &state);
787796
}
788797
}
789798
result
@@ -800,35 +809,34 @@ where
800809
Fut: std::future::Future<Output = Result<R, WorkerError>>,
801810
{
802811
let state = self.get_or_create_chain_worker(chain_id).await?;
812+
let state_ref = &state;
803813
let result = Box::pin(wrap_future(async move {
804-
let guard = handle::write_lock(&state).await;
805-
if guard.is_poisoned() {
806-
return Err(Self::poisoned_worker_error());
807-
}
814+
let guard = handle::write_lock(state_ref).await;
815+
guard.check_not_poisoned()?;
808816
f(guard).await
809817
}))
810818
.await;
811-
if let Err(e) = &result {
812-
if e.must_reload_view() {
813-
self.evict_poisoned_worker(chain_id);
819+
if let Err(error) = &result {
820+
if error.must_reload_view() {
821+
self.evict_poisoned_worker(chain_id, &state);
814822
}
815823
}
816824
result
817825
}
818826

819-
fn poisoned_worker_error() -> WorkerError {
820-
WorkerError::ViewError(ViewError::StoreError {
821-
backend: "journaling",
822-
error: "Worker is poisoned due to a journal resolution failure".into(),
823-
must_reload_view: true,
824-
})
825-
}
826-
827-
/// Evicts a poisoned chain worker from the cache so it gets reloaded on the next request.
828-
fn evict_poisoned_worker(&self, chain_id: ChainId) {
827+
/// Evicts a poisoned chain worker from the cache, but only if the entry still
828+
/// points to the same instance. This avoids removing a fresh replacement that
829+
/// another task may have already loaded.
830+
fn evict_poisoned_worker(&self, chain_id: ChainId, poisoned: &ChainWorkerArc<StorageClient>) {
829831
tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
830832
let pin = self.chain_workers.pin();
831-
pin.remove(&chain_id);
833+
let weak_poisoned = Arc::downgrade(poisoned);
834+
let _ = pin.remove_if(&chain_id, |_key, future| {
835+
future
836+
.peek()
837+
.and_then(|r| r.clone().ok())
838+
.is_some_and(|weak| weak.ptr_eq(&weak_poisoned))
839+
});
832840
}
833841

834842
/// Gets or creates a chain worker for the given chain.

0 commit comments

Comments
 (0)