Skip to content

Commit 3d59b0e

Browse files
authored
Merge pull request #118 from skel84/feat/reservation-core-expiry-recovery
feat: add reservation-core logical-slot expiry recovery proof
2 parents ddc271e + 0409fcb commit 3d59b0e

File tree

3 files changed

+362
-8
lines changed

3 files changed

+362
-8
lines changed

crates/reservation-core/src/recovery.rs

Lines changed: 214 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,18 @@ fn validate_replay_order(
265265
#[cfg(test)]
266266
mod tests {
267267
use std::fs;
268+
use std::fs::OpenOptions;
269+
use std::io::Write;
268270
use std::path::PathBuf;
269271
use std::time::{SystemTime, UNIX_EPOCH};
270272

271273
use crate::{
272-
command::{ClientRequest, Command},
274+
command::{ClientRequest, Command, CommandContext},
273275
command_codec::encode_client_request,
274276
config::Config,
275-
ids::{ClientId, Lsn, OperationId, PoolId, Slot},
277+
ids::{ClientId, HoldId, Lsn, OperationId, PoolId, Slot},
276278
snapshot_file::SnapshotFile,
277-
state_machine::{PoolRecord, ReservationDb},
279+
state_machine::{HoldState, PoolRecord, ReservationDb},
278280
wal::{Frame, RecordType},
279281
wal_file::WalFile,
280282
};
@@ -371,4 +373,213 @@ mod tests {
371373
let _ = fs::remove_file(snapshot_path);
372374
let _ = fs::remove_file(wal_path);
373375
}
376+
377+
#[test]
378+
fn recovery_matches_live_path_when_next_request_expires_overdue_hold() {
379+
let snapshot_path = temp_path("snapshot-expiry-live-match", "snapshot");
380+
let wal_path = temp_path("wal-expiry-live-match", "wal");
381+
let snapshot_file = SnapshotFile::new(&snapshot_path, 4096);
382+
let mut wal_file = WalFile::open(&wal_path, 1024).unwrap();
383+
384+
let prefix = [
385+
Frame {
386+
lsn: Lsn(1),
387+
request_slot: Slot(1),
388+
record_type: RecordType::ClientCommand,
389+
payload: encode_client_request(ClientRequest {
390+
operation_id: OperationId(1),
391+
client_id: ClientId(1),
392+
command: Command::CreatePool {
393+
pool_id: PoolId(11),
394+
total_capacity: 5,
395+
},
396+
}),
397+
},
398+
Frame {
399+
lsn: Lsn(2),
400+
request_slot: Slot(2),
401+
record_type: RecordType::ClientCommand,
402+
payload: encode_client_request(ClientRequest {
403+
operation_id: OperationId(2),
404+
client_id: ClientId(1),
405+
command: Command::PlaceHold {
406+
pool_id: PoolId(11),
407+
hold_id: HoldId(21),
408+
quantity: 5,
409+
deadline_slot: Slot(5),
410+
},
411+
}),
412+
},
413+
];
414+
for frame in prefix {
415+
wal_file.append_frame(&frame).unwrap();
416+
}
417+
wal_file.sync().unwrap();
418+
419+
let mut live = ReservationDb::new(config()).unwrap();
420+
let _ = live.apply_client(
421+
CommandContext {
422+
lsn: Lsn(1),
423+
request_slot: Slot(1),
424+
},
425+
ClientRequest {
426+
operation_id: OperationId(1),
427+
client_id: ClientId(1),
428+
command: Command::CreatePool {
429+
pool_id: PoolId(11),
430+
total_capacity: 5,
431+
},
432+
},
433+
);
434+
let _ = live.apply_client(
435+
CommandContext {
436+
lsn: Lsn(2),
437+
request_slot: Slot(2),
438+
},
439+
ClientRequest {
440+
operation_id: OperationId(2),
441+
client_id: ClientId(1),
442+
command: Command::PlaceHold {
443+
pool_id: PoolId(11),
444+
hold_id: HoldId(21),
445+
quantity: 5,
446+
deadline_slot: Slot(5),
447+
},
448+
},
449+
);
450+
451+
let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap();
452+
let request = ClientRequest {
453+
operation_id: OperationId(3),
454+
client_id: ClientId(1),
455+
command: Command::CreatePool {
456+
pool_id: PoolId(12),
457+
total_capacity: 1,
458+
},
459+
};
460+
let context = CommandContext {
461+
lsn: Lsn(3),
462+
request_slot: Slot(20),
463+
};
464+
465+
let live_outcome = live.apply_client(context, request);
466+
let recovered_outcome = recovered.db.apply_client(context, request);
467+
468+
assert_eq!(recovered_outcome, live_outcome);
469+
assert_eq!(recovered.db.snapshot(), live.snapshot());
470+
assert_eq!(
471+
recovered
472+
.db
473+
.snapshot()
474+
.holds
475+
.iter()
476+
.find(|record| record.hold_id == HoldId(21))
477+
.unwrap()
478+
.state,
479+
HoldState::Expired
480+
);
481+
482+
let _ = fs::remove_file(snapshot_path);
483+
let _ = fs::remove_file(wal_path);
484+
}
485+
486+
#[test]
487+
fn recovery_truncates_torn_expire_frame_without_fabricating_expired_state() {
488+
let snapshot_path = temp_path("snapshot-torn-expire", "snapshot");
489+
let wal_path = temp_path("wal-torn-expire", "wal");
490+
let snapshot_file = SnapshotFile::new(&snapshot_path, 4096);
491+
let mut wal_file = WalFile::open(&wal_path, 1024).unwrap();
492+
493+
let create_pool = ClientRequest {
494+
operation_id: OperationId(1),
495+
client_id: ClientId(1),
496+
command: Command::CreatePool {
497+
pool_id: PoolId(11),
498+
total_capacity: 5,
499+
},
500+
};
501+
let place_hold = ClientRequest {
502+
operation_id: OperationId(2),
503+
client_id: ClientId(1),
504+
command: Command::PlaceHold {
505+
pool_id: PoolId(11),
506+
hold_id: HoldId(21),
507+
quantity: 5,
508+
deadline_slot: Slot(5),
509+
},
510+
};
511+
wal_file
512+
.append_frame(&Frame {
513+
lsn: Lsn(1),
514+
request_slot: Slot(1),
515+
record_type: RecordType::ClientCommand,
516+
payload: encode_client_request(create_pool),
517+
})
518+
.unwrap();
519+
wal_file
520+
.append_frame(&Frame {
521+
lsn: Lsn(2),
522+
request_slot: Slot(2),
523+
record_type: RecordType::ClientCommand,
524+
payload: encode_client_request(place_hold),
525+
})
526+
.unwrap();
527+
wal_file.sync().unwrap();
528+
529+
let torn_expire = Frame {
530+
lsn: Lsn(3),
531+
request_slot: Slot(6),
532+
record_type: RecordType::InternalCommand,
533+
payload: vec![3, 21],
534+
}
535+
.encode();
536+
let mut raw = OpenOptions::new().append(true).open(&wal_path).unwrap();
537+
raw.write_all(&torn_expire[..torn_expire.len() - 2])
538+
.unwrap();
539+
raw.sync_all().unwrap();
540+
541+
let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap();
542+
assert_eq!(
543+
recovered
544+
.db
545+
.snapshot()
546+
.holds
547+
.iter()
548+
.find(|record| record.hold_id == HoldId(21))
549+
.unwrap()
550+
.state,
551+
HoldState::Held
552+
);
553+
554+
let outcome = recovered.db.apply_client(
555+
CommandContext {
556+
lsn: Lsn(3),
557+
request_slot: Slot(6),
558+
},
559+
ClientRequest {
560+
operation_id: OperationId(3),
561+
client_id: ClientId(1),
562+
command: Command::CreatePool {
563+
pool_id: PoolId(12),
564+
total_capacity: 1,
565+
},
566+
},
567+
);
568+
569+
assert_eq!(outcome.result_code, crate::result::ResultCode::Ok);
570+
assert_eq!(
571+
recovered
572+
.db
573+
.snapshot()
574+
.holds
575+
.iter()
576+
.find(|record| record.hold_id == HoldId(21))
577+
.unwrap()
578+
.state,
579+
HoldState::Expired
580+
);
581+
582+
let _ = fs::remove_file(snapshot_path);
583+
let _ = fs::remove_file(wal_path);
584+
}
374585
}

crates/reservation-core/src/snapshot.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,13 @@ impl ReservationDb {
226226

227227
let mut hold_retire_entries = Vec::new();
228228
for (ordinal, record) in holds.into_iter().enumerate() {
229-
hold_retire_entries.push((
230-
record.hold_id,
231-
record.deadline_slot,
232-
u64::try_from(ordinal).expect("hold ordinal must fit u64"),
233-
));
229+
if record.state == HoldState::Held {
230+
hold_retire_entries.push((
231+
record.hold_id,
232+
record.deadline_slot,
233+
u64::try_from(ordinal).expect("hold ordinal must fit u64"),
234+
));
235+
}
234236
match db.restore_hold(record) {
235237
Ok(()) => {}
236238
Err(FixedMapError::DuplicateKey) => {

0 commit comments

Comments
 (0)