Skip to content

Commit f2280de

Browse files
committed
Introduces independent lock storage and vqueue scheduler resource manager
1 parent 480a054 commit f2280de

40 files changed

+1575
-712
lines changed

crates/partition-store/src/tests/vqueue_table_test/mod.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,7 @@ fn verify_empty_queue_returns_none(db: &crate::PartitionDb) {
280280
async fn vqueue_isolation<W: WriteVQueueTable>(txn: &mut W) {
281281
let pkey = PartitionKey::from(5000u64);
282282

283-
let qid1 = VQueueId::new(
284-
VQueueParent::from_raw(1),
285-
pkey,
286-
VQueueInstance::from_raw(1),
287-
);
283+
let qid1 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(1));
288284
let qid2 = VQueueId::new(
289285
VQueueParent::from_raw(1),
290286
pkey,
@@ -304,21 +300,9 @@ async fn vqueue_isolation<W: WriteVQueueTable>(txn: &mut W) {
304300
fn verify_vqueue_isolation(db: &crate::PartitionDb) {
305301
let pkey = PartitionKey::from(5000u64);
306302

307-
let qid1 = VQueueId::new(
308-
VQueueParent::from_raw(1),
309-
pkey,
310-
VQueueInstance::from_raw(1),
311-
);
312-
let qid2 = VQueueId::new(
313-
VQueueParent::from_raw(1),
314-
pkey,
315-
VQueueInstance::from_raw(2),
316-
);
317-
let qid3 = VQueueId::new(
318-
VQueueParent::from_raw(2),
319-
pkey,
320-
VQueueInstance::from_raw(1),
321-
);
303+
let qid1 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(1));
304+
let qid2 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(2));
305+
let qid3 = VQueueId::new(VQueueParent::from_raw(2), pkey, VQueueInstance::from_raw(1));
322306

323307
// Each queue should only see its own entry
324308
let mut reader1 = db.new_inbox_reader(&qid1);

crates/partition-store/src/vqueue_table/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,23 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
332332
}
333333
}
334334

335+
fn delete_inbox_entry(&mut self, qid: &VQueueId, stage: Stage, card: &EntryCard) {
336+
let key_buffer = InboxKey {
337+
partition_key: qid.partition_key(),
338+
parent: qid.parent,
339+
instance: qid.instance,
340+
stage,
341+
visible_at: card.visible_at,
342+
priority: card.priority,
343+
created_at: card.created_at,
344+
kind: card.kind,
345+
id: card.id,
346+
}
347+
.to_bytes();
348+
349+
self.raw_delete_cf(KeyKind::VQueueInbox, key_buffer);
350+
}
351+
335352
fn mark_vqueue_as_active(&mut self, qid: &restate_types::vqueue::VQueueId) {
336353
let mut key_buffer = [0u8; ActiveKey::serialized_length_fixed()];
337354
ActiveKey {

crates/storage-api/src/vqueue_table/entry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl From<Lsn> for EntryId {
9696

9797
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
9898
pub struct EntryCard {
99+
// pub has_lock: bool,
99100
pub priority: EffectivePriority,
100101
pub visible_at: VisibleAt,
101102
/// The unique timestamp of the initial creation of the entry.
@@ -120,6 +121,11 @@ impl EntryCard {
120121
+ std::mem::size_of::<EntryId>()
121122
}
122123

124+
pub fn has_lock(&self) -> bool {
125+
// todo: change
126+
self.priority.token_held()
127+
}
128+
123129
/// A unique hash of the entry card.
124130
///
125131
/// Do not use this for any stored data as it changes across version/restarts.

crates/storage-api/src/vqueue_table/metadata.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ use smallvec::SmallVec;
1313
use restate_clock::WallClock;
1414
use restate_clock::time::MillisSinceEpoch;
1515
use restate_limiter::LimitKey;
16-
use restate_types::Scope;
1716
use restate_types::clock::UniqueTimestamp;
1817
use restate_types::vqueue::EffectivePriority;
19-
use restate_util_string::{InternedReString, ReString};
18+
use restate_types::{LockName, Scope};
19+
use restate_util_string::ReString;
2020

2121
use super::{Stage, VisibleAt};
2222

@@ -88,10 +88,16 @@ pub struct VQueueMeta {
8888
pub(crate) scope: Option<Scope>,
8989
#[bilrost(tag(8))]
9090
pub(crate) limit_key: LimitKey<ReString>,
91+
#[bilrost(tag(9))]
92+
lock_name: Option<LockName>,
9193
}
9294

9395
impl VQueueMeta {
94-
pub fn new(scope: Option<Scope>, limit_key: LimitKey<ReString>) -> Self {
96+
pub fn new(
97+
scope: Option<Scope>,
98+
limit_key: LimitKey<ReString>,
99+
lock_name: Option<LockName>,
100+
) -> Self {
95101
Self {
96102
is_paused: false,
97103
length: 0,
@@ -101,13 +107,26 @@ impl VQueueMeta {
101107
stats: VQueueStatistics::new(WallClock::recent_ms()),
102108
scope,
103109
limit_key,
110+
lock_name,
104111
}
105112
}
106113

107-
pub fn scope(&self) -> &Option<InternedReString> {
114+
pub fn scope(&self) -> &Option<Scope> {
108115
&self.scope
109116
}
110117

118+
pub fn scope_ref(&self) -> &Option<Scope> {
119+
&self.scope
120+
}
121+
122+
pub fn requires_locking(&self) -> bool {
123+
self.lock_name.is_some()
124+
}
125+
126+
pub fn lock_name(&self) -> &Option<LockName> {
127+
&self.lock_name
128+
}
129+
111130
pub fn limit_key(&self) -> &LimitKey<ReString> {
112131
&self.limit_key
113132
}

crates/storage-api/src/vqueue_table/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub struct WaitStats {
101101
pub vqueue_start_throttling_ms: u32,
102102
/// Total milliseconds the item was throttled on global "run" token bucket
103103
#[bilrost(tag(3))]
104-
pub global_throttling_ms: u32,
104+
pub global_invoker_throttling_ms: u32,
105105
/// Total milliseconds the item spent waiting on invoker memory pool
106106
#[bilrost(tag(4))]
107107
pub blocked_on_invoker_memory_ms: u32,

crates/storage-api/src/vqueue_table/store.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub trait VQueueEntry: PartialOrd + PartialEq + Eq + Clone + std::fmt::Debug {
3030
fn priority(&self) -> EffectivePriority;
3131
fn visible_at(&self) -> VisibleAt;
3232
fn kind(&self) -> EntryKind;
33-
fn is_token_held(&self) -> bool;
33+
fn has_lock(&self) -> bool;
3434
// Weight is as a proxy for _costing_ dequeueing an item. Lower weight means
3535
// lower cost and higher chances to be dequeued.
3636
fn weight(&self) -> NonZeroU16;
@@ -88,7 +88,8 @@ impl VQueueEntry for EntryCard {
8888
}
8989

9090
#[inline(always)]
91-
fn is_token_held(&self) -> bool {
91+
fn has_lock(&self) -> bool {
92+
// todo: change to use has_lock directly
9293
self.priority.token_held()
9394
}
9495
}

crates/storage-api/src/vqueue_table/tables.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ pub trait WriteVQueueTable {
5959
/// successful or not.
6060
fn pop_inbox_entry(&mut self, qid: &VQueueId, stage: Stage, card: &EntryCard) -> Result<bool>;
6161

62+
/// Deletes entry from inbox unconditionally
63+
fn delete_inbox_entry(&mut self, qid: &VQueueId, stage: Stage, card: &EntryCard);
64+
6265
/// Adds a vqueue to the list of active vqueues
6366
///
6467
/// A vqueue is considered active when it's of interest to the scheduler.

crates/vqueues/src/cache.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@
1010

1111
use hashbrown::{HashMap, hash_map};
1212
use slotmap::SlotMap;
13+
use tokio::task::JoinHandle;
1314
use tracing::{debug, trace};
1415

1516
use restate_storage_api::StorageError;
1617
use restate_storage_api::vqueue_table::metadata::{self, VQueueMeta};
1718
use restate_storage_api::vqueue_table::{ReadVQueueTable, ScanVQueueTable};
1819
use restate_types::vqueue::VQueueId;
1920

20-
use crate::vqueue_config::ConfigPool;
21-
2221
type Result<T> = std::result::Result<T, StorageError>;
2322

2423
slotmap::new_key_type! { pub struct VQueueCacheKey; }
@@ -35,10 +34,6 @@ impl<'a> VQueuesMeta<'a> {
3534
Self { inner: cache }
3635
}
3736

38-
pub(crate) fn config_pool(&'a self) -> &'a ConfigPool {
39-
&self.inner.config
40-
}
41-
4237
pub fn get(&self, key: VQueueCacheKey) -> Option<&Slot> {
4338
self.inner.slab.get(key)
4439
}
@@ -102,7 +97,6 @@ impl Slot {
10297
// Needs rewriting after the workload pattern becomes more clear.
10398
#[derive(Clone)]
10499
pub struct VQueuesMetaCache {
105-
config: ConfigPool,
106100
queues: HashMap<VQueueId, VQueueCacheKey>,
107101
slab: SlotMap<VQueueCacheKey, Slot>,
108102
}
@@ -121,31 +115,36 @@ impl VQueuesMetaCache {
121115
Self {
122116
slab: Default::default(),
123117
queues: Default::default(),
124-
config: ConfigPool::default(),
125118
}
126119
}
127120

128121
/// Initializes the vqueue cache by loading all active vqueues into the cache.
129122
///
130123
/// From this point on, the cache remains in-sync with the storage state by
131124
/// using the "apply_updates" method.
132-
pub async fn create<S: ScanVQueueTable>(storage: &S) -> Result<Self> {
133-
let mut cache = Self {
134-
slab: Default::default(),
135-
queues: Default::default(),
136-
config: ConfigPool::default(),
137-
};
138-
// find and load all active vqueues.
139-
storage.scan_active_vqueues(|qid, meta| {
140-
let key = cache.slab.insert(Slot {
141-
qid: qid.clone(),
142-
meta,
143-
});
144-
// SAFETY: at batch load time we are guaranteed to observe every vqueue id only once.
145-
unsafe { cache.queues.insert_unique_unchecked(qid, key) };
146-
})?;
147-
148-
Ok(cache)
125+
pub async fn create<S: ScanVQueueTable + Send + Sync + 'static>(storage: S) -> Result<Self> {
126+
let handle: JoinHandle<Result<_>> = tokio::task::spawn_blocking({
127+
move || {
128+
let mut slab = SlotMap::default();
129+
let mut queues = HashMap::default();
130+
// find and load all active vqueues.
131+
storage.scan_active_vqueues(|qid, meta| {
132+
let key = slab.insert(Slot {
133+
qid: qid.clone(),
134+
meta,
135+
});
136+
// SAFETY: at batch load time we are guaranteed to observe every vqueue id only once.
137+
unsafe { queues.insert_unique_unchecked(qid, key) };
138+
})?;
139+
Ok((slab, queues))
140+
}
141+
});
142+
143+
let (slab, queues) = handle
144+
.await
145+
.map_err(|e| StorageError::Generic(e.into()))??;
146+
147+
Ok(Self { slab, queues })
149148
}
150149

151150
pub async fn load<S: ReadVQueueTable>(

0 commit comments

Comments
 (0)