Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/reservation-core/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) const TAG_PLACE_HOLD: u8 = 2;
pub(crate) const TAG_CONFIRM_HOLD: u8 = 3;
pub(crate) const TAG_RELEASE_HOLD: u8 = 4;
pub(crate) const TAG_EXPIRE_HOLD: u8 = 5;
pub(crate) const TAG_EXTEND_HOLD: u8 = 6;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CommandContext {
Expand Down Expand Up @@ -43,6 +44,10 @@ pub enum Command {
ReleaseHold {
hold_id: HoldId,
},
ExtendHold {
hold_id: HoldId,
deadline_slot: Slot,
},
ExpireHold {
hold_id: HoldId,
},
Expand Down
22 changes: 17 additions & 5 deletions crates/reservation-core/src/command_codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::command::{
ClientRequest, Command, TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_PLACE_HOLD,
TAG_RELEASE_HOLD,
ClientRequest, Command, TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_EXTEND_HOLD,
TAG_PLACE_HOLD, TAG_RELEASE_HOLD,
};
use crate::ids::{ClientId, HoldId, OperationId, PoolId, Slot};

Expand Down Expand Up @@ -79,6 +79,14 @@ fn encode_command(bytes: &mut Vec<u8>, command: &Command) {
bytes.push(TAG_RELEASE_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
}
Command::ExtendHold {
hold_id,
deadline_slot,
} => {
bytes.push(TAG_EXTEND_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
bytes.extend_from_slice(&deadline_slot.get().to_le_bytes());
}
Command::ExpireHold { hold_id } => {
bytes.push(TAG_EXPIRE_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
Expand All @@ -104,6 +112,10 @@ fn decode_command(cursor: &mut Cursor<'_>) -> Result<Command, CommandCodecError>
TAG_RELEASE_HOLD => Ok(Command::ReleaseHold {
hold_id: HoldId(cursor.read_u128()?),
}),
TAG_EXTEND_HOLD => Ok(Command::ExtendHold {
hold_id: HoldId(cursor.read_u128()?),
deadline_slot: Slot(cursor.read_u64()?),
}),
TAG_EXPIRE_HOLD => Ok(Command::ExpireHold {
hold_id: HoldId(cursor.read_u128()?),
}),
Expand Down Expand Up @@ -184,9 +196,9 @@ mod tests {

#[test]
fn internal_command_round_trips() {
let command = Command::CreatePool {
pool_id: PoolId(7),
total_capacity: 9,
let command = Command::ExtendHold {
hold_id: HoldId(7),
deadline_slot: Slot(9),
};

let decoded = decode_internal_command(&encode_internal_command(command)).unwrap();
Expand Down
136 changes: 136 additions & 0 deletions crates/reservation-core/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,41 @@ mod tests {
}
}

fn create_pool_request() -> ClientRequest {
ClientRequest {
operation_id: OperationId(1),
client_id: ClientId(1),
command: Command::CreatePool {
pool_id: PoolId(11),
total_capacity: 5,
},
}
}

fn place_hold_request(deadline_slot: Slot) -> ClientRequest {
ClientRequest {
operation_id: OperationId(2),
client_id: ClientId(1),
command: Command::PlaceHold {
pool_id: PoolId(11),
hold_id: HoldId(21),
quantity: 5,
deadline_slot,
},
}
}

fn extend_hold_request(deadline_slot: Slot) -> ClientRequest {
ClientRequest {
operation_id: OperationId(3),
client_id: ClientId(1),
command: Command::ExtendHold {
hold_id: HoldId(21),
deadline_slot,
},
}
}

#[test]
fn recovery_replays_from_empty_snapshot() {
let snapshot_path = temp_path("snapshot-empty", "snapshot");
Expand Down Expand Up @@ -483,6 +518,107 @@ mod tests {
let _ = fs::remove_file(wal_path);
}

#[test]
fn recovery_preserves_extended_deadline_before_later_request() {
let snapshot_path = temp_path("snapshot-extend-live-match", "snapshot");
let wal_path = temp_path("wal-extend-live-match", "wal");
let snapshot_file = SnapshotFile::new(&snapshot_path, 4096);
let mut wal_file = WalFile::open(&wal_path, 1024).unwrap();

let prefix = [
Frame {
lsn: Lsn(1),
request_slot: Slot(1),
record_type: RecordType::ClientCommand,
payload: encode_client_request(create_pool_request()),
},
Frame {
lsn: Lsn(2),
request_slot: Slot(2),
record_type: RecordType::ClientCommand,
payload: encode_client_request(place_hold_request(Slot(5))),
},
Frame {
lsn: Lsn(3),
request_slot: Slot(3),
record_type: RecordType::ClientCommand,
payload: encode_client_request(extend_hold_request(Slot(10))),
},
];
for frame in prefix {
wal_file.append_frame(&frame).unwrap();
}
wal_file.sync().unwrap();

let mut live = ReservationDb::new(config()).unwrap();
let _ = live.apply_client(
CommandContext {
lsn: Lsn(1),
request_slot: Slot(1),
},
create_pool_request(),
);
let _ = live.apply_client(
CommandContext {
lsn: Lsn(2),
request_slot: Slot(2),
},
place_hold_request(Slot(5)),
);
let _ = live.apply_client(
CommandContext {
lsn: Lsn(3),
request_slot: Slot(3),
},
extend_hold_request(Slot(10)),
);

let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap();
let request = ClientRequest {
operation_id: OperationId(4),
client_id: ClientId(1),
command: Command::CreatePool {
pool_id: PoolId(12),
total_capacity: 1,
},
};
let context = CommandContext {
lsn: Lsn(4),
request_slot: Slot(8),
};

let live_outcome = live.apply_client(context, request);
let recovered_outcome = recovered.db.apply_client(context, request);

assert_eq!(recovered_outcome, live_outcome);
assert_eq!(recovered.db.snapshot(), live.snapshot());
assert_eq!(
recovered
.db
.snapshot()
.holds
.iter()
.find(|record| record.hold_id == HoldId(21))
.unwrap()
.state,
HoldState::Held
);
assert_eq!(
recovered
.db
.snapshot()
.holds
.iter()
.find(|record| record.hold_id == HoldId(21))
.unwrap()
.deadline_slot,
Slot(10)
);

let _ = fs::remove_file(snapshot_path);
let _ = fs::remove_file(wal_path);
}

#[test]
fn recovery_truncates_torn_expire_frame_without_fabricating_expired_state() {
let snapshot_path = temp_path("snapshot-torn-expire", "snapshot");
Expand Down
21 changes: 16 additions & 5 deletions crates/reservation-core/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Command;
use crate::command::{
TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_PLACE_HOLD, TAG_RELEASE_HOLD,
TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_EXTEND_HOLD, TAG_PLACE_HOLD,
TAG_RELEASE_HOLD,
};
use crate::config::{Config, ConfigError};
use crate::fixed_map::FixedMapError;
Expand Down Expand Up @@ -365,6 +366,14 @@ fn encode_command(bytes: &mut Vec<u8>, command: Command) {
bytes.push(TAG_RELEASE_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
}
Command::ExtendHold {
hold_id,
deadline_slot,
} => {
bytes.push(TAG_EXTEND_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
bytes.extend_from_slice(&deadline_slot.get().to_le_bytes());
}
Command::ExpireHold { hold_id } => {
bytes.push(TAG_EXPIRE_HOLD);
bytes.extend_from_slice(&hold_id.get().to_le_bytes());
Expand All @@ -390,6 +399,10 @@ fn decode_command(bytes: &[u8], cursor: &mut usize) -> Result<Command, SnapshotE
TAG_RELEASE_HOLD => Ok(Command::ReleaseHold {
hold_id: HoldId(decode_u128(bytes, cursor)?),
}),
TAG_EXTEND_HOLD => Ok(Command::ExtendHold {
hold_id: HoldId(decode_u128(bytes, cursor)?),
deadline_slot: Slot(decode_u64(bytes, cursor)?),
}),
TAG_EXPIRE_HOLD => Ok(Command::ExpireHold {
hold_id: HoldId(decode_u128(bytes, cursor)?),
}),
Expand Down Expand Up @@ -540,11 +553,9 @@ mod tests {
operations: vec![OperationRecord {
client_id: ClientId(1),
operation_id: OperationId(1),
command: Command::PlaceHold {
pool_id: PoolId(11),
command: Command::ExtendHold {
hold_id: HoldId(21),
quantity: 2,
deadline_slot: Slot(5),
deadline_slot: Slot(7),
},
result_code: ResultCode::Ok,
result_pool_id: Some(PoolId(11)),
Expand Down
Loading
Loading