Skip to content

Commit da7cae2

Browse files
committed
Adds high-level restate-types Scope, ServiceName, and LockName
ServiceName uses the new `InternedReString` for thread-local checked interning when it makes sense. LockName will be used for representing the new service locking mechanism for vqueues that will be introduced in the next commit(s).
1 parent 3cf37cf commit da7cae2

File tree

5 files changed

+389
-18
lines changed

5 files changed

+389
-18
lines changed

crates/types/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ restate-memory = { workspace = true }
3030
restate-serde-util = { workspace = true }
3131
restate-test-util = { workspace = true, optional = true }
3232
restate-time-util = { workspace = true, features = ["serde", "serde_with"] }
33+
restate-util-string = { workspace = true, features = ["serde", "bilrost"] }
3334
restate-utoipa = { workspace = true }
3435

3536
adaptive-timeout = { workspace = true, features = ["serde"] }
@@ -89,7 +90,6 @@ serde_path_to_error = { version = "0.1" }
8990
serde_with = { workspace = true, features = ["json"] }
9091
sha2 = { workspace = true }
9192
smallvec = { workspace = true }
92-
restate-util-string = { workspace = true, features = ["serde"] }
9393
static_assertions = { workspace = true }
9494
strum = { workspace = true }
9595
tempfile = { workspace = true, optional = true }

crates/types/src/identifiers.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
//! Restate uses many identifiers to uniquely identify its services and entities.
1212
13+
use std::cell::RefCell;
1314
use std::fmt::{self, Display, Formatter};
1415
use std::hash::Hash;
1516
use std::mem::size_of;
@@ -34,6 +35,12 @@ use crate::invocation::{InvocationTarget, InvocationTargetType, WorkflowHandlerT
3435
use crate::journal_v2::SignalId;
3536
use crate::time::MillisSinceEpoch;
3637

38+
thread_local! {
39+
// a thread-local xxh3 hashing state to reuse its allocation since its internal buffer is quite
40+
// large (~500 bytes)
41+
static PARTITION_KEY_HASHER: RefCell<xxhash_rust::xxh3::Xxh3> = const { RefCell::new(xxhash_rust::xxh3::Xxh3::new()) };
42+
}
43+
3744
/// Identifying the leader epoch of a partition processor
3845
#[derive(
3946
PartialEq,
@@ -711,7 +718,7 @@ impl WithPartitionKey for IdempotencyId {
711718
pub type ServiceRevision = u32;
712719

713720
pub mod partitioner {
714-
use super::PartitionKey;
721+
use super::{PARTITION_KEY_HASHER, PartitionKey};
715722

716723
use std::hash::{Hash, Hasher};
717724

@@ -720,9 +727,12 @@ pub mod partitioner {
720727

721728
impl HashPartitioner {
722729
pub fn compute_partition_key(value: impl Hash) -> PartitionKey {
723-
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
724-
value.hash(&mut hasher);
725-
hasher.finish()
730+
PARTITION_KEY_HASHER.with_borrow_mut(|hasher| {
731+
value.hash(hasher);
732+
let key = hasher.finish();
733+
hasher.reset();
734+
key
735+
})
726736
}
727737
}
728738
}

crates/types/src/invocation/mod.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,31 @@
1212
1313
pub mod client;
1414

15-
use crate::errors::InvocationError;
16-
use crate::identifiers::{
17-
DeploymentId, EntryIndex, IdempotencyId, InvocationId, PartitionKey,
18-
PartitionProcessorRpcRequestId, ServiceId, SubscriptionId, WithInvocationId, WithPartitionKey,
19-
};
20-
use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
21-
use crate::time::MillisSinceEpoch;
22-
use crate::{GenerationalNodeId, RestateVersion};
23-
24-
use bytes::Bytes;
25-
use bytestring::ByteString;
26-
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
27-
use serde_with::{DisplayFromStr, FromInto, serde_as};
2815
use std::borrow::Cow;
2916
use std::hash::Hash;
3017
use std::ops::Deref;
3118
use std::str::FromStr;
3219
use std::time::Duration;
3320
use std::{cmp, fmt};
3421

22+
use bytes::Bytes;
23+
use bytestring::ByteString;
24+
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
3525
// Re-exporting opentelemetry [`TraceId`] to avoid having to import opentelemetry in all crates.
3626
pub use opentelemetry::trace::TraceId;
27+
use serde_with::{DisplayFromStr, FromInto, serde_as};
28+
29+
use restate_util_string::ReString;
30+
31+
use crate::Scope;
32+
use crate::errors::InvocationError;
33+
use crate::identifiers::{
34+
DeploymentId, EntryIndex, IdempotencyId, InvocationId, PartitionKey,
35+
PartitionProcessorRpcRequestId, ServiceId, SubscriptionId, WithInvocationId, WithPartitionKey,
36+
};
37+
use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
38+
use crate::time::MillisSinceEpoch;
39+
use crate::{GenerationalNodeId, LockName, RestateVersion, ServiceName};
3740

3841
#[derive(Eq, Hash, PartialEq, Clone, Copy, Debug, serde::Serialize, serde::Deserialize)]
3942
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
@@ -220,6 +223,25 @@ impl InvocationTarget {
220223
}
221224
}
222225

226+
pub fn scope(&self) -> Option<Scope> {
227+
// todo: This needs to be filled
228+
None
229+
}
230+
231+
pub fn lock_name(&self) -> Option<LockName> {
232+
match self {
233+
InvocationTarget::Service { .. } => None,
234+
InvocationTarget::VirtualObject { name, key, .. } => Some(LockName::new(
235+
ServiceName::new(name.as_ref()),
236+
ReString::from(key.as_ref()),
237+
)),
238+
InvocationTarget::Workflow { name, key, .. } => Some(LockName::new(
239+
ServiceName::new(name.as_ref()),
240+
ReString::from(key.as_ref()),
241+
)),
242+
}
243+
}
244+
223245
pub fn key(&self) -> Option<&ByteString> {
224246
match self {
225247
InvocationTarget::Service { .. } => None,

crates/types/src/lib.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
mod base62_util;
1414
mod id_util;
15+
mod locking;
1516
mod macros;
1617
mod node_id;
1718
mod restate_version;
@@ -58,12 +59,18 @@ pub mod timer;
5859
pub mod vqueue;
5960

6061
pub use id_util::IdResourceType;
62+
pub use locking::*;
6163
pub use node_id::*;
64+
use restate_encoding::BilrostNewType;
65+
use restate_util_string::InternedReString;
6266
pub use restate_version::*;
6367
pub use version::*;
6468

6569
// Re-export of the old time module by delegating to the restate-clock crate.
6670
pub use restate_clock::time;
71+
72+
use self::identifiers::partitioner::HashPartitioner;
73+
use self::identifiers::{PartitionKey, WithPartitionKey};
6774
pub mod clock {
6875
pub use restate_clock::*;
6976
}
@@ -76,6 +83,151 @@ pub mod memory {
7683
// Re-export metrics' SharedString (Space-efficient Cow + RefCounted variant)
7784
pub type SharedString = metrics::SharedString;
7885

86+
/// An interned service name
87+
#[derive(
88+
derive_more::Display,
89+
derive_more::Debug,
90+
derive_more::AsRef,
91+
derive_more::From,
92+
Clone,
93+
Eq,
94+
PartialEq,
95+
Ord,
96+
PartialOrd,
97+
Hash,
98+
BilrostNewType,
99+
)]
100+
#[debug("{}", _0)]
101+
#[display("{}", _0)]
102+
#[repr(transparent)]
103+
pub struct ServiceName(InternedReString);
104+
105+
impl ServiceName {
106+
#[inline]
107+
pub fn new(value: &str) -> Self {
108+
assert!(!value.is_empty());
109+
Self(InternedReString::new(value))
110+
}
111+
112+
#[inline]
113+
pub const fn from_static(value: &'static str) -> Self {
114+
assert!(!value.is_empty());
115+
Self(InternedReString::from_static(value))
116+
}
117+
118+
#[inline]
119+
pub fn as_str(&self) -> &str {
120+
self.0.as_str()
121+
}
122+
123+
#[inline]
124+
pub fn as_bytes(&self) -> &[u8] {
125+
self.0.as_bytes()
126+
}
127+
128+
#[inline]
129+
pub fn len(&self) -> usize {
130+
self.0.len()
131+
}
132+
133+
#[inline]
134+
pub fn is_empty(&self) -> bool {
135+
self.0.is_empty()
136+
}
137+
}
138+
139+
impl AsRef<str> for ServiceName {
140+
#[inline]
141+
fn as_ref(&self) -> &str {
142+
self.0.as_ref()
143+
}
144+
}
145+
146+
impl std::borrow::Borrow<str> for ServiceName {
147+
#[inline]
148+
fn borrow(&self) -> &str {
149+
self.0.as_ref()
150+
}
151+
}
152+
153+
/// An interned scope
154+
///
155+
/// A scope defines the partitioning boundary for sets of service instances and
156+
/// invocations.
157+
#[derive(
158+
derive_more::Display,
159+
derive_more::Debug,
160+
derive_more::AsRef,
161+
derive_more::From,
162+
Clone,
163+
Eq,
164+
PartialEq,
165+
Ord,
166+
PartialOrd,
167+
Hash,
168+
BilrostNewType,
169+
)]
170+
#[debug("{}", _0)]
171+
#[display("{}", _0)]
172+
#[repr(transparent)]
173+
pub struct Scope(InternedReString);
174+
175+
impl Scope {
176+
#[inline]
177+
pub fn new(value: &str) -> Self {
178+
assert!(!value.is_empty());
179+
Self(InternedReString::new(value))
180+
}
181+
182+
#[inline]
183+
pub const fn from_static(value: &'static str) -> Self {
184+
assert!(!value.is_empty());
185+
Self(InternedReString::from_static(value))
186+
}
187+
188+
#[inline]
189+
pub fn as_str(&self) -> &str {
190+
self.0.as_str()
191+
}
192+
193+
#[inline]
194+
pub fn as_bytes(&self) -> &[u8] {
195+
self.0.as_bytes()
196+
}
197+
198+
#[inline]
199+
pub fn len(&self) -> usize {
200+
self.0.len()
201+
}
202+
203+
#[inline]
204+
pub fn is_empty(&self) -> bool {
205+
self.0.is_empty()
206+
}
207+
}
208+
209+
impl AsRef<str> for Scope {
210+
#[inline]
211+
fn as_ref(&self) -> &str {
212+
self.0.as_ref()
213+
}
214+
}
215+
216+
impl std::borrow::Borrow<str> for Scope {
217+
#[inline]
218+
fn borrow(&self) -> &str {
219+
self.0.as_ref()
220+
}
221+
}
222+
223+
impl WithPartitionKey for Scope {
224+
#[inline]
225+
fn partition_key(&self) -> PartitionKey {
226+
// the partition key is calculated directly from the scope value
227+
HashPartitioner::compute_partition_key(&self.0)
228+
}
229+
}
230+
79231
/// Trait for merging two attributes
80232
pub trait Merge {
81233
/// Return true if the value was mutated as a result of the merge

0 commit comments

Comments
 (0)