Skip to content

Commit 430ae8b

Browse files
authored
Merge pull request #89 from skel84/issue-83-m9-bundle-commit
feat(core): implement atomic bundle commit
2 parents 90156eb + 6f776f0 commit 430ae8b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1846
-544
lines changed

crates/allocdb-bench/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ fn open_hotspot_engine(
707707
shard_id: 0,
708708
max_resources: 1,
709709
max_reservations: hotspot_reservation_capacity(options.hotspot_rounds),
710+
max_bundle_size: 1,
710711
max_operations: hotspot_operation_capacity(
711712
options.hotspot_rounds,
712713
options.hotspot_contenders,
@@ -727,6 +728,7 @@ fn open_retry_engine(
727728
shard_id: 0,
728729
max_resources: retry_resource_capacity(options.retry_table_capacity),
729730
max_reservations: 1,
731+
max_bundle_size: 1,
730732
max_operations: options.retry_table_capacity,
731733
max_ttl_slots: RETRY_TTL_SLOTS,
732734
max_client_retry_window_slots: RETRY_WINDOW_SLOTS,

crates/allocdb-core/src/command.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,20 @@ pub struct CommandContext {
66
pub request_slot: Slot,
77
}
88

9-
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
9+
#[derive(Clone, Debug, Eq, PartialEq)]
1010
pub struct ClientRequest {
1111
pub operation_id: OperationId,
1212
pub client_id: ClientId,
1313
pub command: Command,
1414
}
1515

16-
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
16+
impl AsRef<ClientRequest> for ClientRequest {
17+
fn as_ref(&self) -> &ClientRequest {
18+
self
19+
}
20+
}
21+
22+
#[derive(Clone, Debug, Eq, PartialEq)]
1723
pub enum Command {
1824
CreateResource {
1925
resource_id: ResourceId,
@@ -23,6 +29,11 @@ pub enum Command {
2329
holder_id: HolderId,
2430
ttl_slots: u64,
2531
},
32+
ReserveBundle {
33+
resource_ids: Vec<ResourceId>,
34+
holder_id: HolderId,
35+
ttl_slots: u64,
36+
},
2637
Confirm {
2738
reservation_id: ReservationId,
2839
holder_id: HolderId,
@@ -37,9 +48,20 @@ pub enum Command {
3748
},
3849
}
3950

51+
impl AsRef<Command> for Command {
52+
fn as_ref(&self) -> &Command {
53+
self
54+
}
55+
}
56+
4057
impl Command {
4158
#[must_use]
42-
pub fn fingerprint(self) -> u128 {
59+
///
60+
/// # Panics
61+
///
62+
/// Panics only if the in-memory bundle length cannot fit into `u128`, which cannot happen on
63+
/// supported targets because slice lengths are already bounded by `usize`.
64+
pub fn fingerprint(&self) -> u128 {
4365
let mut state = 0x6c62_272e_07bb_0142_62b8_2175_6295_c58du128;
4466

4567
match self {
@@ -55,29 +77,45 @@ impl Command {
5577
state = mix(state, 2);
5678
state = mix(state, resource_id.get());
5779
state = mix(state, holder_id.get());
58-
mix(state, u128::from(ttl_slots))
80+
mix(state, u128::from(*ttl_slots))
81+
}
82+
Self::ReserveBundle {
83+
resource_ids,
84+
holder_id,
85+
ttl_slots,
86+
} => {
87+
state = mix(state, 3);
88+
state = mix(
89+
state,
90+
u128::try_from(resource_ids.len()).expect("bundle len must fit u128"),
91+
);
92+
for resource_id in resource_ids {
93+
state = mix(state, resource_id.get());
94+
}
95+
state = mix(state, holder_id.get());
96+
mix(state, u128::from(*ttl_slots))
5997
}
6098
Self::Confirm {
6199
reservation_id,
62100
holder_id,
63101
} => {
64-
state = mix(state, 3);
102+
state = mix(state, 4);
65103
state = mix(state, reservation_id.get());
66104
mix(state, holder_id.get())
67105
}
68106
Self::Release {
69107
reservation_id,
70108
holder_id,
71109
} => {
72-
state = mix(state, 4);
110+
state = mix(state, 5);
73111
state = mix(state, reservation_id.get());
74112
mix(state, holder_id.get())
75113
}
76114
Self::Expire {
77115
reservation_id,
78116
deadline_slot,
79117
} => {
80-
state = mix(state, 5);
118+
state = mix(state, 6);
81119
state = mix(state, reservation_id.get());
82120
mix(state, u128::from(deadline_slot.get()))
83121
}

crates/allocdb-core/src/command_codec.rs

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ pub enum CommandCodecError {
99
}
1010

1111
#[must_use]
12-
pub fn encode_client_request(request: ClientRequest) -> Vec<u8> {
12+
pub fn encode_client_request(request: impl AsRef<ClientRequest>) -> Vec<u8> {
13+
let request = request.as_ref();
1314
let mut bytes = Vec::new();
1415
bytes.extend_from_slice(&request.operation_id.get().to_le_bytes());
1516
bytes.extend_from_slice(&request.client_id.get().to_le_bytes());
16-
encode_command(&mut bytes, request.command);
17+
encode_command(&mut bytes, &request.command);
1718
bytes
1819
}
1920

@@ -36,7 +37,8 @@ pub fn decode_client_request(bytes: &[u8]) -> Result<ClientRequest, CommandCodec
3637
}
3738

3839
#[must_use]
39-
pub fn encode_internal_command(command: Command) -> Vec<u8> {
40+
pub fn encode_internal_command(command: impl AsRef<Command>) -> Vec<u8> {
41+
let command = command.as_ref();
4042
let mut bytes = Vec::new();
4143
encode_command(&mut bytes, command);
4244
bytes
@@ -54,7 +56,7 @@ pub fn decode_internal_command(bytes: &[u8]) -> Result<Command, CommandCodecErro
5456
Ok(command)
5557
}
5658

57-
fn encode_command(bytes: &mut Vec<u8>, command: Command) {
59+
fn encode_command(bytes: &mut Vec<u8>, command: &Command) {
5860
match command {
5961
Command::CreateResource { resource_id } => {
6062
bytes.push(1);
@@ -70,27 +72,44 @@ fn encode_command(bytes: &mut Vec<u8>, command: Command) {
7072
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
7173
bytes.extend_from_slice(&ttl_slots.to_le_bytes());
7274
}
75+
Command::ReserveBundle {
76+
resource_ids,
77+
holder_id,
78+
ttl_slots,
79+
} => {
80+
bytes.push(3);
81+
bytes.extend_from_slice(
82+
&u32::try_from(resource_ids.len())
83+
.expect("bundle len must fit u32")
84+
.to_le_bytes(),
85+
);
86+
for resource_id in resource_ids {
87+
bytes.extend_from_slice(&resource_id.get().to_le_bytes());
88+
}
89+
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
90+
bytes.extend_from_slice(&ttl_slots.to_le_bytes());
91+
}
7392
Command::Confirm {
7493
reservation_id,
7594
holder_id,
7695
} => {
77-
bytes.push(3);
96+
bytes.push(4);
7897
bytes.extend_from_slice(&reservation_id.get().to_le_bytes());
7998
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
8099
}
81100
Command::Release {
82101
reservation_id,
83102
holder_id,
84103
} => {
85-
bytes.push(4);
104+
bytes.push(5);
86105
bytes.extend_from_slice(&reservation_id.get().to_le_bytes());
87106
bytes.extend_from_slice(&holder_id.get().to_le_bytes());
88107
}
89108
Command::Expire {
90109
reservation_id,
91110
deadline_slot,
92111
} => {
93-
bytes.push(5);
112+
bytes.push(6);
94113
bytes.extend_from_slice(&reservation_id.get().to_le_bytes());
95114
bytes.extend_from_slice(&deadline_slot.get().to_le_bytes());
96115
}
@@ -107,15 +126,28 @@ fn decode_command(cursor: &mut Cursor<'_>) -> Result<Command, CommandCodecError>
107126
holder_id: HolderId(cursor.read_u128()?),
108127
ttl_slots: cursor.read_u64()?,
109128
}),
110-
3 => Ok(Command::Confirm {
129+
3 => {
130+
let resource_count = usize::try_from(cursor.read_u32()?)
131+
.map_err(|_| CommandCodecError::InvalidLayout)?;
132+
let mut resource_ids = Vec::with_capacity(resource_count);
133+
for _ in 0..resource_count {
134+
resource_ids.push(ResourceId(cursor.read_u128()?));
135+
}
136+
Ok(Command::ReserveBundle {
137+
resource_ids,
138+
holder_id: HolderId(cursor.read_u128()?),
139+
ttl_slots: cursor.read_u64()?,
140+
})
141+
}
142+
4 => Ok(Command::Confirm {
111143
reservation_id: ReservationId(cursor.read_u128()?),
112144
holder_id: HolderId(cursor.read_u128()?),
113145
}),
114-
4 => Ok(Command::Release {
146+
5 => Ok(Command::Release {
115147
reservation_id: ReservationId(cursor.read_u128()?),
116148
holder_id: HolderId(cursor.read_u128()?),
117149
}),
118-
5 => Ok(Command::Expire {
150+
6 => Ok(Command::Expire {
119151
reservation_id: ReservationId(cursor.read_u128()?),
120152
deadline_slot: Slot(cursor.read_u64()?),
121153
}),
@@ -160,6 +192,10 @@ impl<'a> Cursor<'a> {
160192
Ok(u64::from_le_bytes(self.read_exact::<8>()?))
161193
}
162194

195+
fn read_u32(&mut self) -> Result<u32, CommandCodecError> {
196+
Ok(u32::from_le_bytes(self.read_exact::<4>()?))
197+
}
198+
163199
fn read_u128(&mut self) -> Result<u128, CommandCodecError> {
164200
Ok(u128::from_le_bytes(self.read_exact::<16>()?))
165201
}
@@ -187,7 +223,7 @@ mod tests {
187223
},
188224
};
189225

190-
let decoded = decode_client_request(&encode_client_request(request)).unwrap();
226+
let decoded = decode_client_request(&encode_client_request(&request)).unwrap();
191227
assert_eq!(decoded, request);
192228
}
193229

@@ -198,10 +234,26 @@ mod tests {
198234
deadline_slot: Slot(42),
199235
};
200236

201-
let decoded = decode_internal_command(&encode_internal_command(command)).unwrap();
237+
let decoded = decode_internal_command(&encode_internal_command(&command)).unwrap();
202238
assert_eq!(decoded, command);
203239
}
204240

241+
#[test]
242+
fn reserve_bundle_round_trips() {
243+
let request = ClientRequest {
244+
operation_id: OperationId(11),
245+
client_id: ClientId(12),
246+
command: Command::ReserveBundle {
247+
resource_ids: vec![ResourceId(13), ResourceId(14)],
248+
holder_id: HolderId(15),
249+
ttl_slots: 16,
250+
},
251+
};
252+
253+
let decoded = decode_client_request(&encode_client_request(&request)).unwrap();
254+
assert_eq!(decoded, request);
255+
}
256+
205257
#[test]
206258
fn decoder_rejects_truncated_payload() {
207259
let request = ClientRequest {
@@ -211,7 +263,7 @@ mod tests {
211263
resource_id: ResourceId(3),
212264
},
213265
};
214-
let mut bytes = encode_client_request(request);
266+
let mut bytes = encode_client_request(&request);
215267
bytes.pop();
216268

217269
assert_eq!(

crates/allocdb-core/src/config.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub struct Config {
33
pub shard_id: u64,
44
pub max_resources: u32,
55
pub max_reservations: u32,
6+
pub max_bundle_size: u32,
67
pub max_operations: u32,
78
pub max_ttl_slots: u64,
89
pub max_client_retry_window_slots: u64,
@@ -16,6 +17,8 @@ pub enum ConfigError {
1617
HistoryWindowTooLarge,
1718
OperationWindowTooLarge,
1819
BucketCapacityTooLarge,
20+
BundleSizeTooLarge,
21+
ReservationMemberTableTooLarge,
1922
WheelTooLarge,
2023
}
2124

@@ -33,6 +36,18 @@ impl Config {
3336
.expect("validated operation window must fit in u64")
3437
}
3538

39+
/// Returns the fixed-capacity reservation-member table size.
40+
///
41+
/// # Panics
42+
///
43+
/// Panics if called before [`Self::validate`] has confirmed that the derived table size fits
44+
/// `usize`.
45+
#[must_use]
46+
pub fn max_reservation_members(&self) -> usize {
47+
usize::try_from(u64::from(self.max_reservations) * u64::from(self.max_bundle_size))
48+
.expect("validated reservation-member table size must fit usize")
49+
}
50+
3651
/// Validates that the configured capacities and retention windows are internally consistent.
3752
///
3853
/// # Errors
@@ -52,6 +67,10 @@ impl Config {
5267
return Err(ConfigError::ZeroCapacity("max_operations"));
5368
}
5469

70+
if self.max_bundle_size == 0 {
71+
return Err(ConfigError::ZeroCapacity("max_bundle_size"));
72+
}
73+
5574
if self.max_ttl_slots == 0 {
5675
return Err(ConfigError::ZeroCapacity("max_ttl_slots"));
5776
}
@@ -60,6 +79,10 @@ impl Config {
6079
return Err(ConfigError::ZeroCapacity("max_expiration_bucket_len"));
6180
}
6281

82+
if self.max_bundle_size > self.max_resources {
83+
return Err(ConfigError::BundleSizeTooLarge);
84+
}
85+
6386
if self.reservation_history_window_slots > self.max_ttl_slots {
6487
return Err(ConfigError::HistoryWindowTooLarge);
6588
}
@@ -75,6 +98,16 @@ impl Config {
7598
return Err(ConfigError::BucketCapacityTooLarge);
7699
}
77100

101+
let Some(max_reservation_members) =
102+
u64::from(self.max_reservations).checked_mul(u64::from(self.max_bundle_size))
103+
else {
104+
return Err(ConfigError::ReservationMemberTableTooLarge);
105+
};
106+
107+
if usize::try_from(max_reservation_members).is_err() {
108+
return Err(ConfigError::ReservationMemberTableTooLarge);
109+
}
110+
78111
let Some(_) = self.max_ttl_slots.checked_add(1) else {
79112
return Err(ConfigError::WheelTooLarge);
80113
};

crates/allocdb-core/src/fixed_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ impl<K: FixedKey, V> FixedMap<K, V> {
270270
}
271271
}
272272

273-
fn hash_u128(value: u128) -> u64 {
273+
pub(crate) fn hash_u128(value: u128) -> u64 {
274274
let bytes = value.to_le_bytes();
275275
let lower = u64::from_le_bytes(bytes[..8].try_into().expect("slice has exact size"));
276276
let upper = u64::from_le_bytes(bytes[8..].try_into().expect("slice has exact size"));

crates/allocdb-core/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub use config::{Config, ConfigError};
1717
pub use ids::{ClientId, HolderId, Lsn, OperationId, ReservationId, ResourceId, Slot};
1818
pub use result::{CommandOutcome, ResultCode};
1919
pub use state_machine::{
20-
AllocDb, HealthMetrics, OperationRecord, ReservationLookupError, ReservationRecord,
21-
ReservationState, ResourceRecord, ResourceState, SlotOverflowError, SlotOverflowKind,
20+
AllocDb, HealthMetrics, OperationRecord, ReservationLookupError, ReservationMemberRecord,
21+
ReservationRecord, ReservationState, ResourceRecord, ResourceState, SlotOverflowError,
22+
SlotOverflowKind,
2223
};

0 commit comments

Comments
 (0)