Skip to content

Commit bac464b

Browse files
authored
Merge pull request #64 from skel84/issue-53-suffix-catchup-rejoin
M7-T05 Add suffix catch-up, snapshot transfer, and rejoin
2 parents 0bd0934 + c7a6bdc commit bac464b

File tree

6 files changed

+918
-48
lines changed

6 files changed

+918
-48
lines changed

crates/allocdb-node/src/replica.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use allocdb_core::ids::{Lsn, Slot};
88
use log::{error, info, warn};
99

1010
use crate::engine::{
11-
EngineConfig, EngineOpenError, ReadError, RecoverEngineError, SingleNodeEngine,
12-
SubmissionError, SubmissionResult,
11+
CheckpointError, CheckpointResult, EngineConfig, EngineOpenError, ReadError,
12+
RecoverEngineError, SingleNodeEngine, SubmissionError, SubmissionResult,
1313
};
1414

1515
#[cfg(all(test, unix))]
@@ -361,6 +361,25 @@ impl From<ReplicaMetadataFileError> for RecoverReplicaError {
361361
}
362362
}
363363

364+
#[derive(Debug)]
365+
pub enum ReplicaCheckpointError {
366+
Inactive(ReplicaNodeStatus),
367+
Checkpoint(CheckpointError),
368+
MetadataFile(ReplicaMetadataFileError),
369+
}
370+
371+
impl From<CheckpointError> for ReplicaCheckpointError {
372+
fn from(error: CheckpointError) -> Self {
373+
Self::Checkpoint(error)
374+
}
375+
}
376+
377+
impl From<ReplicaMetadataFileError> for ReplicaCheckpointError {
378+
fn from(error: ReplicaMetadataFileError) -> Self {
379+
Self::MetadataFile(error)
380+
}
381+
}
382+
364383
#[derive(Debug)]
365384
pub enum ReplicaProtocolError {
366385
Inactive(ReplicaNodeStatus),
@@ -735,6 +754,35 @@ impl ReplicaNode {
735754
&self.prepare_log_file.path
736755
}
737756

757+
/// Persists one local checkpoint through the wrapped single-node engine and updates the active
758+
/// snapshot anchor in replica metadata.
759+
///
760+
/// # Errors
761+
///
762+
/// Returns [`ReplicaCheckpointError`] if the replica is faulted, if the checkpoint operation
763+
/// fails, or if the updated metadata cannot be persisted durably.
764+
///
765+
/// # Panics
766+
///
767+
/// Panics only if an `active` replica no longer holds its required live wrapped engine.
768+
pub fn checkpoint_local_state(&mut self) -> Result<CheckpointResult, ReplicaCheckpointError> {
769+
match self.status {
770+
ReplicaNodeStatus::Active => {}
771+
status @ ReplicaNodeStatus::Faulted(_) => {
772+
return Err(ReplicaCheckpointError::Inactive(status));
773+
}
774+
}
775+
776+
let result = self
777+
.engine
778+
.as_mut()
779+
.expect("active replica must keep one live engine")
780+
.checkpoint(&self.paths.snapshot_path)?;
781+
self.metadata.active_snapshot_lsn = result.snapshot_lsn;
782+
self.persist_metadata()?;
783+
Ok(result)
784+
}
785+
738786
/// Moves one active replica into normal mode for the provided view.
739787
///
740788
/// # Errors

0 commit comments

Comments
 (0)