Skip to content

Commit c5263ec

Browse files
committed
Support for partitioned identifiers + state mutation request id
1 parent 9c5a751 commit c5263ec

File tree

5 files changed

+280
-0
lines changed

5 files changed

+280
-0
lines changed

crates/types/src/id_util.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ prefixed_ids! {
5555
Awakeable("prom"),
5656
Signal("sign"),
5757
Snapshot("snap"),
58+
StateMutation("mut"),
59+
// used for testing
60+
#[cfg(test)]
61+
Test("tst"),
5862
}
5963
}
6064

crates/types/src/identifiers.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
//! Restate uses many identifiers to uniquely identify its services and entities.
1212
13+
mod partitioned;
14+
15+
pub use partitioned::PartitionedResourceId;
16+
1317
use std::cell::RefCell;
1418
use std::fmt::{self, Display, Formatter};
1519
use std::hash::Hash;
@@ -27,6 +31,7 @@ use ulid::Ulid;
2731

2832
use restate_encoding::{BilrostNewType, NetSerde};
2933

34+
use self::partitioned::partitioned_resource_id;
3035
use crate::base62_util::{base62_encode_fixed_width_u128, base62_max_length_for_type};
3136
use crate::errors::IdDecodeError;
3237
use crate::id_util::IdResourceType;
@@ -1089,6 +1094,11 @@ ulid_backed_id!(Subscription @with_resource_id);
10891094
ulid_backed_id!(PartitionProcessorRpcRequest);
10901095
ulid_backed_id!(Snapshot @with_resource_id);
10911096

1097+
partitioned::partitioned_resource_id!(
1098+
/// StateMutation request identifier
1099+
StateMutation
1100+
);
1101+
10921102
impl SnapshotId {
10931103
pub const INVALID: Self = Self::from_parts(0, 0);
10941104
}
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
// Resource IDs with a partition key
12+
pub trait PartitionedResourceId: std::fmt::Display + std::fmt::Debug {
13+
fn from_partition_key_and_slice(
14+
partition_key: crate::PartitionKey,
15+
remainder: &[u8],
16+
) -> Result<Self, IdDecodeError>
17+
where
18+
Self: Sized;
19+
20+
fn partition_key(&self) -> crate::PartitionKey;
21+
/// The remainder byte-slice portion of the ID that doesn't include the partition key
22+
fn remainder_slice(&self) -> &[u8];
23+
}
24+
25+
/// Generate an identifier internally backed by ULID + partition key (with resource id)
26+
///
27+
/// This generates the Id struct and some associated methods: `generate`, `from_parts`, `from_slice`, `from_bytes`, `to_bytes`,
28+
/// plus implements `Display`, `Debug`, `FromStr`, `JsonSchema` and `TimestampAwareId`.
29+
///
30+
/// To use:
31+
///
32+
/// ```ignore
33+
/// partitioned_resource_id!(MyResource);
34+
/// ```
35+
///
36+
/// The difference between the two will be the usage of ResourceId for serde and string representations.
37+
macro_rules! partitioned_resource_id {
38+
(
39+
$(#[$m:meta])*
40+
$res_name:ident
41+
) => {
42+
partitioned_resource_id!(@common $(#[$m])* $res_name);
43+
44+
paste::paste! {
45+
impl $crate::identifiers::ResourceId for [< $res_name Id >] {
46+
const RAW_BYTES_LEN: usize = 24;
47+
const RESOURCE_TYPE: $crate::id_util::IdResourceType = $crate::id_util::IdResourceType::$res_name;
48+
49+
type StrEncodedLen = ::generic_array::ConstArrayLength<
50+
// prefix + separator + version + suffix
51+
{
52+
Self::RESOURCE_TYPE.as_str().len()
53+
+ 2
54+
// partition key
55+
+ $crate::base62_util::base62_max_length_for_type::<u64>()
56+
// ts + random
57+
+ $crate::base62_util::base62_max_length_for_type::<u128>()
58+
},
59+
>;
60+
61+
fn push_to_encoder(&self, encoder: &mut $crate::id_util::IdEncoder<Self>) {
62+
encoder.push_u64(self.partition_key);
63+
encoder.push_u128(u128::from_be_bytes(self.remainder));
64+
}
65+
}
66+
67+
impl ::std::str::FromStr for [< $res_name Id >] {
68+
type Err = $crate::errors::IdDecodeError;
69+
70+
fn from_str(input: &str) -> ::std::result::Result<Self, Self::Err> {
71+
if input.len() < <Self as $crate::identifiers::ResourceId>::str_encoded_len() {
72+
return Err($crate::errors::IdDecodeError::Length);
73+
}
74+
75+
let mut decoder = $crate::id_util::IdDecoder::new(input)?;
76+
// Ensure we are decoding the correct resource type
77+
if decoder.resource_type != <Self as $crate::identifiers::ResourceId>::RESOURCE_TYPE {
78+
return Err($crate::errors::IdDecodeError::TypeMismatch);
79+
}
80+
81+
// partition key (u64)
82+
let partition_key = decoder.cursor.decode_next()?;
83+
84+
// ulid (u128)
85+
let raw_ulid: u128 = decoder.cursor.decode_next()?;
86+
assert_eq!(0, decoder.cursor.remaining());
87+
88+
Ok(Self { partition_key, remainder: raw_ulid.to_be_bytes() })
89+
}
90+
}
91+
92+
impl ::std::fmt::Display for [< $res_name Id >] {
93+
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
94+
use $crate::identifiers::ResourceId;
95+
let mut encoder = $crate::id_util::IdEncoder::new();
96+
self.push_to_encoder(&mut encoder);
97+
f.write_str(encoder.as_str())
98+
}
99+
}
100+
}
101+
};
102+
(@common $(#[$meta:meta])* $res_name:ident) => {
103+
paste::paste! {
104+
$(#[$meta])*
105+
#[derive(
106+
PartialEq,
107+
Eq,
108+
Clone,
109+
Hash,
110+
::serde_with::SerializeDisplay,
111+
::serde_with::DeserializeFromStr,
112+
::bilrost::Message,
113+
)]
114+
pub struct [< $res_name Id >] {
115+
#[bilrost(tag(1))]
116+
partition_key: $crate::PartitionKey,
117+
#[bilrost(tag(2), encoding(plainbytes))]
118+
remainder: [u8; 16],
119+
}
120+
121+
impl [< $res_name Id >] {
122+
/// Length of the raw bytes needed when serializing this resource identifiers using
123+
/// `to_bytes()`. This does not apply to `to_string()` or `serde` implementations.
124+
pub const fn serialized_length_fixed() -> usize {
125+
std::mem::size_of::<$crate::PartitionKey>() + 16
126+
}
127+
128+
pub fn generate(partition_key: $crate::PartitionKey) -> Self {
129+
Self { partition_key, remainder: ::ulid::Ulid::new().to_bytes() }
130+
}
131+
132+
pub const fn from_partition_key_and_bytes(partition_key: $crate::PartitionKey, remainder: [u8;16]) -> Self {
133+
Self { partition_key, remainder }
134+
}
135+
136+
pub const fn from_parts(partition_key: $crate::PartitionKey, timestamp_ms: u64, random: u128) -> Self {
137+
Self {
138+
partition_key,
139+
remainder: ::ulid::Ulid::from_parts(timestamp_ms, random).to_bytes(),
140+
}
141+
}
142+
143+
pub fn from_slice(b: &[u8]) -> Result<Self, $crate::errors::IdDecodeError> {
144+
if b.len() < Self::serialized_length_fixed() {
145+
return Err($crate::errors::IdDecodeError::Length);
146+
}
147+
// read pkey as u64 (big-endian)
148+
let partition_key = u64::from_be_bytes(b[..size_of::<$crate::PartitionKey>()].try_into().unwrap());
149+
let remainder = b[size_of::<$crate::PartitionKey>()..].try_into().map_err(|_| $crate::errors::IdDecodeError::Length)?;
150+
Ok(Self{ partition_key, remainder })
151+
}
152+
153+
pub fn from_bytes(b: [u8; 24]) -> Self {
154+
let partition_key = u64::from_be_bytes(b[..size_of::<$crate::PartitionKey>()].try_into().unwrap());
155+
let remainder = b[size_of::<$crate::PartitionKey>()..].try_into().unwrap();
156+
Self{ partition_key, remainder }
157+
}
158+
159+
pub fn to_bytes(&self) -> [u8; 24] {
160+
let mut buf = [0u8; 24];
161+
buf[..size_of::<$crate::PartitionKey>()].copy_from_slice(&self.partition_key.to_be_bytes());
162+
buf[size_of::<$crate::PartitionKey>()..].copy_from_slice(&self.remainder);
163+
buf
164+
}
165+
166+
#[inline]
167+
pub fn to_remainder_bytes(self) -> [u8; 16] {
168+
self.remainder
169+
}
170+
171+
#[inline]
172+
pub fn as_remainder_bytes(&self) -> &[u8; 16] {
173+
&self.remainder
174+
}
175+
}
176+
177+
impl $crate::identifiers::TimestampAwareId for [< $res_name Id >] {
178+
fn timestamp(&self) -> ::restate_clock::time::MillisSinceEpoch {
179+
let ulid = ::ulid::Ulid::from_bytes(self.remainder);
180+
ulid.timestamp_ms().into()
181+
}
182+
}
183+
184+
impl $crate::PartitionedResourceId for [< $res_name Id >] {
185+
#[inline]
186+
fn from_partition_key_and_slice(
187+
partition_key: crate::PartitionKey,
188+
remainder: &[u8],
189+
) -> Result<Self, crate::errors::IdDecodeError> {
190+
let remainder = remainder.try_into().map_err(|_| crate::errors::IdDecodeError::Length)?;
191+
Ok(Self { partition_key, remainder })
192+
}
193+
194+
#[inline]
195+
fn partition_key(&self) -> $crate::PartitionKey {
196+
self.partition_key
197+
}
198+
199+
#[inline]
200+
fn remainder_slice(&self) -> &[u8] {
201+
&self.remainder
202+
}
203+
}
204+
205+
impl $crate::WithPartitionKey for [< $res_name Id >] {
206+
#[inline]
207+
fn partition_key(&self) -> $crate::PartitionKey {
208+
self.partition_key
209+
}
210+
}
211+
212+
impl ::std::fmt::Debug for [< $res_name Id >] {
213+
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
214+
// use the same formatting for debug and display to show a consistent representation
215+
::std::fmt::Display::fmt(self, f)
216+
}
217+
}
218+
219+
#[cfg(feature = "schemars")]
220+
impl ::schemars::JsonSchema for [< $res_name Id >] {
221+
fn schema_name() -> ::std::borrow::Cow<'static, str> {
222+
<String as ::schemars::JsonSchema>::schema_name()
223+
}
224+
225+
fn json_schema(g: &mut ::schemars::SchemaGenerator) -> ::schemars::Schema {
226+
<String as ::schemars::JsonSchema>::json_schema(g)
227+
}
228+
}
229+
}
230+
};
231+
}
232+
233+
pub(crate) use partitioned_resource_id;
234+
235+
use crate::errors::IdDecodeError;
236+
237+
#[cfg(test)]
238+
mod tests {
239+
use crate::identifiers::WithPartitionKey;
240+
241+
partitioned_resource_id!(Test);
242+
243+
#[test]
244+
fn test_roundtrip() {
245+
let id = TestId::generate(13);
246+
assert_eq!(id.partition_key(), 13);
247+
let encoded = id.to_string();
248+
assert!(encoded.starts_with("tst_1"));
249+
assert_eq!(encoded.len(), 38);
250+
println!("{encoded}");
251+
let decoded: TestId = encoded.parse().unwrap();
252+
assert_eq!(id, decoded);
253+
assert_eq!(decoded.partition_key(), 13);
254+
255+
// bytes roundtrip
256+
let bytes = id.to_bytes();
257+
assert_eq!(bytes.len(), 24);
258+
let decoded = TestId::from_bytes(bytes);
259+
assert_eq!(id, decoded);
260+
assert_eq!(decoded.partition_key(), 13);
261+
}
262+
}

crates/types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub mod timer;
5959
pub mod vqueue;
6060

6161
pub use id_util::IdResourceType;
62+
pub use identifiers::PartitionedResourceId;
6263
pub use locking::*;
6364
pub use node_id::*;
6465
use restate_encoding::BilrostNewType;

crates/types/src/macros.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ macro_rules! prefixed_ids {
3535
pub const fn as_str(&self) -> &'static str {
3636
match self {
3737
$(
38+
$(#[$variant_meta])*
3839
$typename::$variant => $variant_prefix,
3940
)+
4041
}
4142
}
4243
pub fn iter() -> ::core::slice::Iter<'static, $typename> {
4344
static VARIANTS: &'static [$typename] = &[
4445
$(
46+
$(#[$variant_meta])*
4547
$typename::$variant,
4648
)+
4749
];
@@ -57,6 +59,7 @@ macro_rules! prefixed_ids {
5759
fn from_str(value: &str) -> Result<Self, Self::Err> {
5860
match value {
5961
$(
62+
$(#[$variant_meta])*
6063
$variant_prefix => Ok($typename::$variant),
6164
)+
6265
_ => Err(crate::errors::IdDecodeError::UnrecognizedType(value.to_string())),

0 commit comments

Comments
 (0)