Skip to content

Commit 0bd0934

Browse files
authored
Merge pull request #63 from skel84/issue-52-view-change-fail-closed
Add view change and fail-closed reads on quorum loss
2 parents 8538cc1 + ce7a08b commit 0bd0934

File tree

7 files changed

+805
-63
lines changed

7 files changed

+805
-63
lines changed

crates/allocdb-node/src/replica.rs

Lines changed: 163 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod non_unix_tests;
2222

2323
const MAX_REPLICA_METADATA_BYTES: u64 = 256;
2424
const REPLICA_METADATA_MAGIC: [u8; 4] = *b"RPLM";
25-
const REPLICA_METADATA_VERSION: u8 = 1;
25+
const REPLICA_METADATA_VERSION: u8 = 2;
2626
const REPLICA_PREPARE_LOG_MAGIC: [u8; 4] = *b"RPLP";
2727
const REPLICA_PREPARE_LOG_VERSION: u8 = 1;
2828

@@ -52,6 +52,7 @@ pub struct ReplicaIdentity {
5252
pub enum ReplicaRole {
5353
Primary,
5454
Backup,
55+
ViewUncertain,
5556
Recovering,
5657
Faulted,
5758
}
@@ -368,6 +369,10 @@ pub enum ReplicaProtocolError {
368369
expected: ReplicaRole,
369370
found: ReplicaRole,
370371
},
372+
RoleSetMismatch {
373+
expected: &'static str,
374+
found: ReplicaRole,
375+
},
371376
ViewRegression {
372377
current_view: u64,
373378
requested_view: u64,
@@ -746,9 +751,10 @@ impl ReplicaNode {
746751
if !matches!(role, ReplicaRole::Primary | ReplicaRole::Backup) {
747752
return Err(ReplicaProtocolError::UnsupportedNormalRole(role));
748753
}
749-
if current_view < self.metadata.current_view {
754+
let highest_known_view = self.highest_known_view();
755+
if current_view < highest_known_view {
750756
return Err(ReplicaProtocolError::ViewRegression {
751-
current_view: self.metadata.current_view,
757+
current_view: highest_known_view,
752758
requested_view: current_view,
753759
});
754760
}
@@ -761,6 +767,64 @@ impl ReplicaNode {
761767
Ok(())
762768
}
763769

770+
/// Records one durable vote for a higher view without entering normal mode.
771+
///
772+
/// # Errors
773+
///
774+
/// Returns [`ReplicaProtocolError`] if the replica is faulted, the requested view would move
775+
/// backwards relative to the highest persisted local view knowledge, or the updated metadata
776+
/// cannot be persisted durably.
777+
pub fn record_durable_vote(
778+
&mut self,
779+
voted_view: u64,
780+
voted_for: ReplicaId,
781+
) -> Result<(), ReplicaProtocolError> {
782+
self.ensure_active()?;
783+
let highest_known_view = self.highest_known_view();
784+
if voted_view < highest_known_view {
785+
return Err(ReplicaProtocolError::ViewRegression {
786+
current_view: highest_known_view,
787+
requested_view: voted_view,
788+
});
789+
}
790+
791+
self.metadata.durable_vote = Some(DurableVote {
792+
view: voted_view,
793+
voted_for,
794+
});
795+
self.persist_metadata()?;
796+
Ok(())
797+
}
798+
799+
/// Moves one active replica into explicit view-uncertain mode.
800+
///
801+
/// # Errors
802+
///
803+
/// Returns [`ReplicaProtocolError`] if the replica is faulted or if the metadata update cannot
804+
/// be persisted durably.
805+
pub fn enter_view_uncertain(&mut self) -> Result<(), ReplicaProtocolError> {
806+
self.ensure_active()?;
807+
if self.metadata.role == ReplicaRole::ViewUncertain {
808+
return Ok(());
809+
}
810+
811+
self.metadata.role = ReplicaRole::ViewUncertain;
812+
self.persist_metadata()?;
813+
Ok(())
814+
}
815+
816+
/// Discards any uncommitted prepared suffix from local replica state.
817+
///
818+
/// # Errors
819+
///
820+
/// Returns [`ReplicaProtocolError`] if the replica is faulted or if the prepared-entry sidecar
821+
/// cannot be updated durably.
822+
pub fn discard_uncommitted_suffix(&mut self) -> Result<(), ReplicaProtocolError> {
823+
self.ensure_active()?;
824+
self.prepared_entries.clear();
825+
self.persist_prepared_entries()
826+
}
827+
764828
/// Validates and durably prepares one client request on the current primary.
765829
///
766830
/// # Errors
@@ -838,56 +902,34 @@ impl ReplicaNode {
838902
commit_lsn: Lsn,
839903
) -> Result<Option<SubmissionResult>, ReplicaProtocolError> {
840904
self.ensure_normal_role()?;
841-
let first_pending = self
842-
.metadata
843-
.commit_lsn
844-
.and_then(|lsn| lsn.get().checked_add(1))
845-
.unwrap_or(1);
846-
if commit_lsn.get() < first_pending {
847-
return Ok(None);
848-
}
849-
850-
let entries = (first_pending..=commit_lsn.get())
851-
.map(|lsn| {
852-
self.prepared_entries
853-
.get(&lsn)
854-
.cloned()
855-
.ok_or(ReplicaProtocolError::MissingPreparedEntry { lsn: Lsn(lsn) })
856-
})
857-
.collect::<Result<Vec<_>, _>>()?;
858-
859-
let mut last_result = None;
860-
for entry in &entries {
861-
if entry.view != self.metadata.current_view {
862-
return Err(ReplicaProtocolError::ViewMismatch {
863-
expected: self.metadata.current_view,
864-
found: entry.view,
865-
});
866-
}
867-
let result = self
868-
.engine_mut()?
869-
.submit_encoded(entry.request_slot, &entry.payload)?;
870-
if result.applied_lsn != entry.lsn {
871-
return Err(ReplicaProtocolError::AppliedLsnMismatch {
872-
expected: entry.lsn,
873-
found: result.applied_lsn,
874-
});
875-
}
876-
self.metadata.commit_lsn = Some(entry.lsn);
877-
self.metadata.active_snapshot_lsn = self
878-
.engine
879-
.as_ref()
880-
.and_then(SingleNodeEngine::active_snapshot_lsn);
881-
last_result = Some(result);
882-
}
905+
self.commit_prepared_through_impl(commit_lsn)
906+
}
883907

884-
for entry in &entries {
885-
self.prepared_entries.remove(&entry.lsn.get());
908+
/// Applies prepared entries through one already reconstructed committed prefix while the
909+
/// replica remains outside normal mode.
910+
///
911+
/// # Errors
912+
///
913+
/// Returns [`ReplicaProtocolError`] if the replica is faulted, if the local role is not
914+
/// `backup` or `view_uncertain`, if any committed LSN is missing from the prepared buffer, if
915+
/// the entry view no longer matches the local view, if the single-node executor rejects the
916+
/// payload, if the applied LSN diverges from the prepared position, or if the updated metadata
917+
/// / prepared-entry sidecar cannot be persisted.
918+
pub fn reconstruct_committed_prefix_through(
919+
&mut self,
920+
commit_lsn: Lsn,
921+
) -> Result<Option<SubmissionResult>, ReplicaProtocolError> {
922+
self.ensure_active()?;
923+
if !matches!(
924+
self.metadata.role,
925+
ReplicaRole::Backup | ReplicaRole::ViewUncertain
926+
) {
927+
return Err(ReplicaProtocolError::RoleSetMismatch {
928+
expected: "backup or view_uncertain",
929+
found: self.metadata.role,
930+
});
886931
}
887-
888-
self.persist_prepared_entries()?;
889-
self.persist_metadata()?;
890-
Ok(last_result)
932+
self.commit_prepared_through_impl(commit_lsn)
891933
}
892934

893935
/// Enforces the first replicated-release read rule: only the current primary may serve reads.
@@ -943,6 +985,14 @@ impl ReplicaNode {
943985
}
944986
}
945987

988+
fn highest_known_view(&self) -> u64 {
989+
self.metadata
990+
.durable_vote
991+
.map_or(self.metadata.current_view, |vote| {
992+
self.metadata.current_view.max(vote.view)
993+
})
994+
}
995+
946996
fn next_prepared_lsn(&self) -> Result<Lsn, ReplicaProtocolError> {
947997
let last_lsn = self
948998
.prepared_entries
@@ -965,6 +1015,62 @@ impl ReplicaNode {
9651015
.map_err(|error| ReplicaProtocolError::PrepareStorage(error.kind()))
9661016
}
9671017

1018+
fn commit_prepared_through_impl(
1019+
&mut self,
1020+
commit_lsn: Lsn,
1021+
) -> Result<Option<SubmissionResult>, ReplicaProtocolError> {
1022+
let first_pending = self
1023+
.metadata
1024+
.commit_lsn
1025+
.and_then(|lsn| lsn.get().checked_add(1))
1026+
.unwrap_or(1);
1027+
if commit_lsn.get() < first_pending {
1028+
return Ok(None);
1029+
}
1030+
1031+
let entries = (first_pending..=commit_lsn.get())
1032+
.map(|lsn| {
1033+
self.prepared_entries
1034+
.get(&lsn)
1035+
.cloned()
1036+
.ok_or(ReplicaProtocolError::MissingPreparedEntry { lsn: Lsn(lsn) })
1037+
})
1038+
.collect::<Result<Vec<_>, _>>()?;
1039+
1040+
let mut last_result = None;
1041+
for entry in &entries {
1042+
if entry.view != self.metadata.current_view {
1043+
return Err(ReplicaProtocolError::ViewMismatch {
1044+
expected: self.metadata.current_view,
1045+
found: entry.view,
1046+
});
1047+
}
1048+
let result = self
1049+
.engine_mut()?
1050+
.submit_encoded(entry.request_slot, &entry.payload)?;
1051+
if result.applied_lsn != entry.lsn {
1052+
return Err(ReplicaProtocolError::AppliedLsnMismatch {
1053+
expected: entry.lsn,
1054+
found: result.applied_lsn,
1055+
});
1056+
}
1057+
self.metadata.commit_lsn = Some(entry.lsn);
1058+
self.metadata.active_snapshot_lsn = self
1059+
.engine
1060+
.as_ref()
1061+
.and_then(SingleNodeEngine::active_snapshot_lsn);
1062+
last_result = Some(result);
1063+
}
1064+
1065+
for entry in &entries {
1066+
self.prepared_entries.remove(&entry.lsn.get());
1067+
}
1068+
1069+
self.persist_prepared_entries()?;
1070+
self.persist_metadata()?;
1071+
Ok(last_result)
1072+
}
1073+
9681074
fn engine_mut(&mut self) -> Result<&mut SingleNodeEngine, ReplicaProtocolError> {
9691075
self.ensure_active()?;
9701076
Ok(self
@@ -1164,17 +1270,19 @@ const fn encode_role(role: ReplicaRole) -> u8 {
11641270
match role {
11651271
ReplicaRole::Primary => 1,
11661272
ReplicaRole::Backup => 2,
1167-
ReplicaRole::Recovering => 3,
1168-
ReplicaRole::Faulted => 4,
1273+
ReplicaRole::ViewUncertain => 3,
1274+
ReplicaRole::Recovering => 4,
1275+
ReplicaRole::Faulted => 5,
11691276
}
11701277
}
11711278

11721279
fn decode_role(value: u8) -> Result<ReplicaRole, ReplicaMetadataDecodeError> {
11731280
match value {
11741281
1 => Ok(ReplicaRole::Primary),
11751282
2 => Ok(ReplicaRole::Backup),
1176-
3 => Ok(ReplicaRole::Recovering),
1177-
4 => Ok(ReplicaRole::Faulted),
1283+
3 => Ok(ReplicaRole::ViewUncertain),
1284+
4 => Ok(ReplicaRole::Recovering),
1285+
5 => Ok(ReplicaRole::Faulted),
11781286
_ => Err(ReplicaMetadataDecodeError::InvalidRole(value)),
11791287
}
11801288
}

0 commit comments

Comments
 (0)