Skip to content

Commit 42013c5

Browse files
authored
core: add lease epoch fencing for holder commands (#90)
* core: add lease epoch fencing for holder commands * Address CodeRabbit feedback
1 parent 430ae8b commit 42013c5

29 files changed

+712
-52
lines changed

crates/allocdb-bench/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ fn release_request(
11281128
command: Command::Release {
11291129
reservation_id,
11301130
holder_id,
1131+
lease_epoch: 1,
11311132
},
11321133
}
11331134
}

crates/allocdb-core/src/command.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ pub enum Command {
3737
Confirm {
3838
reservation_id: ReservationId,
3939
holder_id: HolderId,
40+
lease_epoch: u64,
4041
},
4142
Release {
4243
reservation_id: ReservationId,
4344
holder_id: HolderId,
45+
lease_epoch: u64,
4446
},
4547
Expire {
4648
reservation_id: ReservationId,
@@ -98,18 +100,22 @@ impl Command {
98100
Self::Confirm {
99101
reservation_id,
100102
holder_id,
103+
lease_epoch,
101104
} => {
102105
state = mix(state, 4);
103106
state = mix(state, reservation_id.get());
104-
mix(state, holder_id.get())
107+
state = mix(state, holder_id.get());
108+
mix(state, u128::from(*lease_epoch))
105109
}
106110
Self::Release {
107111
reservation_id,
108112
holder_id,
113+
lease_epoch,
109114
} => {
110115
state = mix(state, 5);
111116
state = mix(state, reservation_id.get());
112-
mix(state, holder_id.get())
117+
state = mix(state, holder_id.get());
118+
mix(state, u128::from(*lease_epoch))
113119
}
114120
Self::Expire {
115121
reservation_id,

crates/allocdb-core/src/command_codec.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,22 @@ fn encode_command(bytes: &mut Vec<u8>, command: &Command) {
9292
Command::Confirm {
9393
reservation_id,
9494
holder_id,
95+
lease_epoch,
9596
} => {
9697
bytes.push(4);
9798
bytes.extend_from_slice(&reservation_id.get().to_le_bytes());
9899
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
100+
bytes.extend_from_slice(&lease_epoch.to_le_bytes());
99101
}
100102
Command::Release {
101103
reservation_id,
102104
holder_id,
105+
lease_epoch,
103106
} => {
104107
bytes.push(5);
105108
bytes.extend_from_slice(&reservation_id.get().to_le_bytes());
106109
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
110+
bytes.extend_from_slice(&lease_epoch.to_le_bytes());
107111
}
108112
Command::Expire {
109113
reservation_id,
@@ -142,10 +146,12 @@ fn decode_command(cursor: &mut Cursor<'_>) -> Result<Command, CommandCodecError>
142146
4 => Ok(Command::Confirm {
143147
reservation_id: ReservationId(cursor.read_u128()?),
144148
holder_id: HolderId(cursor.read_u128()?),
149+
lease_epoch: decode_optional_legacy_epoch(cursor)?,
145150
}),
146151
5 => Ok(Command::Release {
147152
reservation_id: ReservationId(cursor.read_u128()?),
148153
holder_id: HolderId(cursor.read_u128()?),
154+
lease_epoch: decode_optional_legacy_epoch(cursor)?,
149155
}),
150156
6 => Ok(Command::Expire {
151157
reservation_id: ReservationId(cursor.read_u128()?),
@@ -199,6 +205,18 @@ impl<'a> Cursor<'a> {
199205
fn read_u128(&mut self) -> Result<u128, CommandCodecError> {
200206
Ok(u128::from_le_bytes(self.read_exact::<16>()?))
201207
}
208+
209+
fn remaining(&self) -> usize {
210+
self.bytes.len().saturating_sub(self.offset)
211+
}
212+
}
213+
214+
fn decode_optional_legacy_epoch(cursor: &mut Cursor<'_>) -> Result<u64, CommandCodecError> {
215+
match cursor.remaining() {
216+
0 => Ok(1),
217+
8 => cursor.read_u64(),
218+
_ => Err(CommandCodecError::InvalidLayout),
219+
}
202220
}
203221

204222
#[cfg(test)]
@@ -254,6 +272,59 @@ mod tests {
254272
assert_eq!(decoded, request);
255273
}
256274

275+
#[test]
276+
fn holder_commands_round_trip_with_lease_epoch() {
277+
let requests = [
278+
ClientRequest {
279+
operation_id: OperationId(11),
280+
client_id: ClientId(12),
281+
command: Command::Confirm {
282+
reservation_id: ReservationId(13),
283+
holder_id: HolderId(14),
284+
lease_epoch: 15,
285+
},
286+
},
287+
ClientRequest {
288+
operation_id: OperationId(21),
289+
client_id: ClientId(22),
290+
command: Command::Release {
291+
reservation_id: ReservationId(23),
292+
holder_id: HolderId(24),
293+
lease_epoch: 25,
294+
},
295+
},
296+
];
297+
298+
for request in requests {
299+
let decoded = decode_client_request(&encode_client_request(&request)).unwrap();
300+
assert_eq!(decoded, request);
301+
}
302+
}
303+
304+
#[test]
305+
fn decoder_accepts_legacy_holder_command_without_lease_epoch() {
306+
let mut bytes = Vec::new();
307+
bytes.extend_from_slice(&OperationId(11).get().to_le_bytes());
308+
bytes.extend_from_slice(&ClientId(12).get().to_le_bytes());
309+
bytes.push(4);
310+
bytes.extend_from_slice(&ReservationId(13).get().to_le_bytes());
311+
bytes.extend_from_slice(&HolderId(14).get().to_le_bytes());
312+
313+
let decoded = decode_client_request(&bytes).unwrap();
314+
assert_eq!(
315+
decoded,
316+
ClientRequest {
317+
operation_id: OperationId(11),
318+
client_id: ClientId(12),
319+
command: Command::Confirm {
320+
reservation_id: ReservationId(13),
321+
holder_id: HolderId(14),
322+
lease_epoch: 1,
323+
},
324+
}
325+
);
326+
}
327+
257328
#[test]
258329
fn decoder_rejects_truncated_payload() {
259330
let request = ClientRequest {

crates/allocdb-core/src/recovery_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ fn recover_allocdb_skips_frames_covered_by_snapshot() {
133133
command: Command::Confirm {
134134
reservation_id: ReservationId(2),
135135
holder_id: HolderId(5),
136+
lease_epoch: 1,
136137
},
137138
};
138139

crates/allocdb-core/src/result.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ pub enum ResultCode {
1919
OperationConflict,
2020
InvalidState,
2121
HolderMismatch,
22+
StaleEpoch,
2223
SlotOverflow,
2324
}
2425

2526
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2627
pub struct CommandOutcome {
2728
pub result_code: ResultCode,
2829
pub reservation_id: Option<ReservationId>,
30+
pub lease_epoch: Option<u64>,
2931
pub deadline_slot: Option<Slot>,
3032
}
3133

@@ -35,6 +37,7 @@ impl CommandOutcome {
3537
Self {
3638
result_code,
3739
reservation_id: None,
40+
lease_epoch: None,
3841
deadline_slot: None,
3942
}
4043
}
@@ -48,6 +51,22 @@ impl CommandOutcome {
4851
Self {
4952
result_code,
5053
reservation_id: Some(reservation_id),
54+
lease_epoch: None,
55+
deadline_slot: Some(deadline_slot),
56+
}
57+
}
58+
59+
#[must_use]
60+
pub const fn with_reservation_epoch(
61+
result_code: ResultCode,
62+
reservation_id: ReservationId,
63+
lease_epoch: u64,
64+
deadline_slot: Slot,
65+
) -> Self {
66+
Self {
67+
result_code,
68+
reservation_id: Some(reservation_id),
69+
lease_epoch: Some(lease_epoch),
5170
deadline_slot: Some(deadline_slot),
5271
}
5372
}

crates/allocdb-core/src/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod issue_30_tests;
1818
mod tests;
1919

2020
const MAGIC: u32 = 0x4144_4253;
21-
const VERSION: u16 = 3;
21+
const VERSION: u16 = 4;
2222

2323
#[derive(Clone, Debug, Eq, PartialEq)]
2424
pub struct Snapshot {

crates/allocdb-core/src/snapshot_codec.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub(super) fn encode_snapshot(snapshot: &Snapshot) -> Vec<u8> {
5454
bytes.extend_from_slice(&reservation.reservation_id.get().to_le_bytes());
5555
bytes.extend_from_slice(&reservation.resource_id.get().to_le_bytes());
5656
bytes.extend_from_slice(&reservation.holder_id.get().to_le_bytes());
57+
bytes.extend_from_slice(&reservation.lease_epoch.to_le_bytes());
5758
bytes.push(encode_reservation_state(reservation.state));
5859
bytes.extend_from_slice(&reservation.created_lsn.get().to_le_bytes());
5960
bytes.extend_from_slice(&reservation.deadline_slot.get().to_le_bytes());
@@ -76,6 +77,7 @@ pub(super) fn encode_snapshot(snapshot: &Snapshot) -> Vec<u8> {
7677
&mut bytes,
7778
operation.result_reservation_id.map(ReservationId::get),
7879
);
80+
encode_optional_u64(&mut bytes, operation.result_lease_epoch);
7981
encode_optional_u64(&mut bytes, operation.result_deadline_slot.map(Slot::get));
8082
bytes.extend_from_slice(&operation.applied_lsn.get().to_le_bytes());
8183
bytes.extend_from_slice(&operation.retire_after_slot.get().to_le_bytes());
@@ -103,7 +105,7 @@ pub(super) fn decode_snapshot(bytes: &[u8]) -> Result<Snapshot, SnapshotError> {
103105
}
104106

105107
let version = cursor.read_u16()?;
106-
if version != 1 && version != 2 && version != VERSION {
108+
if version != 1 && version != 2 && version != 3 && version != VERSION {
107109
return Err(SnapshotError::InvalidVersion(version));
108110
}
109111

@@ -143,6 +145,11 @@ pub(super) fn decode_snapshot(bytes: &[u8]) -> Result<Snapshot, SnapshotError> {
143145
command_fingerprint: cursor.read_u128()?,
144146
result_code: decode_result_code(cursor.read_u8()?)?,
145147
result_reservation_id: cursor.read_optional_u128()?.map(ReservationId),
148+
result_lease_epoch: if version >= 4 {
149+
cursor.read_optional_u64()?
150+
} else {
151+
None
152+
},
146153
result_deadline_slot: cursor.read_optional_u64()?.map(Slot),
147154
applied_lsn: Lsn(cursor.read_u64()?),
148155
retire_after_slot: Slot(cursor.read_u64()?),
@@ -235,6 +242,7 @@ fn decode_snapshot_reservations(
235242
reservation_id: ReservationId(cursor.read_u128()?),
236243
resource_id: ResourceId(cursor.read_u128()?),
237244
holder_id: HolderId(cursor.read_u128()?),
245+
lease_epoch: if version >= 4 { cursor.read_u64()? } else { 1 },
238246
state: decode_reservation_state(cursor.read_u8()?)?,
239247
created_lsn: Lsn(cursor.read_u64()?),
240248
deadline_slot: Slot(cursor.read_u64()?),
@@ -358,7 +366,8 @@ fn decode_result_code(tag: u8) -> Result<ResultCode, SnapshotError> {
358366
14 => Ok(ResultCode::OperationConflict),
359367
15 => Ok(ResultCode::InvalidState),
360368
16 => Ok(ResultCode::HolderMismatch),
361-
17 => Ok(ResultCode::SlotOverflow),
369+
17 => Ok(ResultCode::StaleEpoch),
370+
18 => Ok(ResultCode::SlotOverflow),
362371
_ => Err(SnapshotError::InvalidStateTag(tag)),
363372
}
364373
}
@@ -382,6 +391,7 @@ fn encode_result_code(code: ResultCode) -> u8 {
382391
ResultCode::OperationConflict => 14,
383392
ResultCode::InvalidState => 15,
384393
ResultCode::HolderMismatch => 16,
385-
ResultCode::SlotOverflow => 17,
394+
ResultCode::StaleEpoch => 17,
395+
ResultCode::SlotOverflow => 18,
386396
}
387397
}

crates/allocdb-core/src/snapshot_issue_30_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ fn expired_reservation(reservation_id: u128) -> ReservationRecord {
3636
reservation_id: ReservationId(reservation_id),
3737
resource_id: ResourceId(11),
3838
holder_id: HolderId(5),
39+
lease_epoch: 2,
3940
state: ReservationState::Expired,
4041
created_lsn: Lsn(reservation_lsn),
4142
deadline_slot: Slot(5),
@@ -53,6 +54,7 @@ fn operation_record(operation_id: u128) -> OperationRecord {
5354
command_fingerprint: operation_id + 10,
5455
result_code: crate::result::ResultCode::Ok,
5556
result_reservation_id: None,
57+
result_lease_epoch: None,
5658
result_deadline_slot: None,
5759
applied_lsn: Lsn(operation_lsn),
5860
retire_after_slot: Slot(operation_lsn + 8),

0 commit comments

Comments
 (0)