Skip to content

Commit 94e03f2

Browse files
afckclaude
andauthored
Journaling fix (#5892) (#5935)
Port of #5892. ## Motivation #5891 added logs but now we would like to prevent a possible DB corruption happening with journaling. ## Proposal * Distinguish DB errors that may force a reload of the view * This is the case for certain journaling errors during save(). ## Test Plan CI ## Release Plan - Nothing to do. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 41d2edb commit 94e03f2

File tree

9 files changed

+210
-56
lines changed

9 files changed

+210
-56
lines changed

linera-core/src/chain_worker/state.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ where
110110
chain_modes: Option<Arc<sync::RwLock<BTreeMap<ChainId, ListeningMode>>>>,
111111
delivery_notifier: DeliveryNotifier,
112112
knows_chain_is_active: bool,
113+
/// Set to `true` if a journal resolution failure has left storage potentially inconsistent.
114+
poisoned: bool,
113115
}
114116

115117
/// Whether the block was processed or skipped. Used for metrics.
@@ -170,6 +172,7 @@ where
170172
chain_modes,
171173
delivery_notifier,
172174
knows_chain_is_active: false,
175+
poisoned: false,
173176
})
174177
}
175178

@@ -189,8 +192,16 @@ where
189192
}
190193

191194
/// Rolls back any uncommitted changes to the chain state.
195+
/// Does nothing if the worker is poisoned (the view is in an inconsistent state).
192196
pub(crate) fn rollback(&mut self) {
193-
self.chain.rollback();
197+
if !self.poisoned {
198+
self.chain.rollback();
199+
}
200+
}
201+
202+
/// Returns `true` if the worker is poisoned due to a journal resolution failure.
203+
pub(crate) fn is_poisoned(&self) -> bool {
204+
self.poisoned
194205
}
195206

196207
/// Updates the last-access timestamp to the current time.
@@ -2130,11 +2141,23 @@ where
21302141
}
21312142

21322143
/// Stores the chain state in persistent storage.
2144+
///
2145+
/// If the save fails, the worker is marked as poisoned and must be reloaded.
21332146
#[instrument(skip_all, fields(
21342147
chain_id = %self.chain_id()
21352148
))]
21362149
async fn save(&mut self) -> Result<(), WorkerError> {
2137-
self.chain.save().await?;
2150+
if let Err(e) = self.chain.save().await {
2151+
if e.must_reload_view() {
2152+
tracing::error!(
2153+
error = ?e,
2154+
chain_id = %self.chain_id(),
2155+
"Journal resolution failed; marking worker as poisoned"
2156+
);
2157+
self.poisoned = true;
2158+
}
2159+
return Err(e.into());
2160+
}
21382161
Ok(())
21392162
}
21402163
}

linera-core/src/worker.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,12 @@ impl WorkerError {
451451
WorkerError::ChainError(chain_error) => chain_error.is_local(),
452452
}
453453
}
454+
455+
/// Returns `true` if this error was caused by a journal resolution failure,
456+
/// which may leave storage in an inconsistent state requiring a view reload.
457+
pub fn must_reload_view(&self) -> bool {
458+
matches!(self, WorkerError::ViewError(e) if e.must_reload_view())
459+
}
454460
}
455461

456462
impl From<ChainError> for WorkerError {
@@ -767,11 +773,20 @@ where
767773
Fut: std::future::Future<Output = Result<R, WorkerError>>,
768774
{
769775
let state = self.get_or_create_chain_worker(chain_id).await?;
770-
Box::pin(wrap_future(async move {
776+
let result = Box::pin(wrap_future(async move {
771777
let guard = handle::read_lock(&state).await;
778+
if guard.is_poisoned() {
779+
return Err(Self::poisoned_worker_error());
780+
}
772781
f(guard).await
773782
}))
774-
.await
783+
.await;
784+
if let Err(e) = &result {
785+
if e.must_reload_view() {
786+
self.evict_poisoned_worker(chain_id);
787+
}
788+
}
789+
result
775790
}
776791

777792
/// Acquires a write lock on the chain worker and executes the given closure.
@@ -785,11 +800,35 @@ where
785800
Fut: std::future::Future<Output = Result<R, WorkerError>>,
786801
{
787802
let state = self.get_or_create_chain_worker(chain_id).await?;
788-
Box::pin(wrap_future(async move {
803+
let result = Box::pin(wrap_future(async move {
789804
let guard = handle::write_lock(&state).await;
805+
if guard.is_poisoned() {
806+
return Err(Self::poisoned_worker_error());
807+
}
790808
f(guard).await
791809
}))
792-
.await
810+
.await;
811+
if let Err(e) = &result {
812+
if e.must_reload_view() {
813+
self.evict_poisoned_worker(chain_id);
814+
}
815+
}
816+
result
817+
}
818+
819+
fn poisoned_worker_error() -> WorkerError {
820+
WorkerError::ViewError(ViewError::StoreError {
821+
backend: "journaling",
822+
error: "Worker is poisoned due to a journal resolution failure".into(),
823+
must_reload_view: true,
824+
})
825+
}
826+
827+
/// Evicts a poisoned chain worker from the cache so it gets reloaded on the next request.
828+
fn evict_poisoned_worker(&self, chain_id: ChainId) {
829+
tracing::warn!(%chain_id, "Evicting poisoned chain worker from cache");
830+
let pin = self.chain_workers.pin();
831+
pin.remove(&chain_id);
793832
}
794833

795834
/// Gets or creates a chain worker for the given chain.

linera-views/src/backends/dual.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,4 +413,12 @@ where
413413
E2: KeyValueStoreError,
414414
{
415415
const BACKEND: &'static str = "dual_store";
416+
417+
fn must_reload_view(&self) -> bool {
418+
match self {
419+
DualStoreError::First(e) => e.must_reload_view(),
420+
DualStoreError::Second(e) => e.must_reload_view(),
421+
_ => false,
422+
}
423+
}
416424
}

linera-views/src/backends/dynamo_db.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::store::TestKeyValueDatabase;
4444
use crate::{
4545
batch::SimpleUnorderedBatch,
4646
common::get_uleb128_size,
47-
journaling::{JournalConsistencyError, JournalingKeyValueDatabase},
47+
journaling::JournalingKeyValueDatabase,
4848
lru_caching::{LruCachingConfig, LruCachingDatabase},
4949
store::{
5050
DirectWritableKeyValueStore, KeyValueDatabase, KeyValueStoreError, ReadableKeyValueStore,
@@ -994,10 +994,6 @@ pub enum DynamoDbStoreInternalError {
994994
#[error("The key prefix must have at most 1024 bytes")]
995995
KeyPrefixTooLong,
996996

997-
/// The journal is not coherent
998-
#[error(transparent)]
999-
JournalConsistencyError(#[from] JournalConsistencyError),
1000-
1001997
/// The length of the value should be at most 400 KB.
1002998
#[error("The DynamoDB value should be less than 400 KB")]
1003999
ValueLengthTooLarge,
@@ -1093,7 +1089,7 @@ impl KeyValueStoreError for DynamoDbStoreInternalError {
10931089

10941090
#[cfg(with_testing)]
10951091
impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInternal> {
1096-
async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, DynamoDbStoreInternalError> {
1092+
async fn new_test_config() -> Result<DynamoDbStoreInternalConfig, Self::Error> {
10971093
Ok(DynamoDbStoreInternalConfig {
10981094
use_dynamodb_local: true,
10991095
max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
@@ -1103,7 +1099,8 @@ impl TestKeyValueDatabase for JournalingKeyValueDatabase<DynamoDbDatabaseInterna
11031099
}
11041100

11051101
/// The combined error type for [`DynamoDbDatabase`].
1106-
pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
1102+
pub type DynamoDbStoreError =
1103+
ValueSplittingError<crate::journaling::JournalingError<DynamoDbStoreInternalError>>;
11071104

11081105
/// The config type for [`DynamoDbDatabase`]`
11091106
pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;

0 commit comments

Comments
 (0)