Skip to content

Commit 2294a51

Browse files
committed
New VQueueId
1 parent 6b835c2 commit 2294a51

File tree

26 files changed

+649
-577
lines changed

26 files changed

+649
-577
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ingress-kafka/src/builder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ impl InvocationBuilder {
221221
};
222222

223223
let invocation_id = InvocationId::generate_or_else(&invocation_target, None, || {
224+
// todo: reconcile this with the world of scopes+limit keys.
225+
// In particular, the scatter-width of partition keys and how scopes can be assigned to
226+
// ingested items from kafka.
224227
partitioner::HashPartitioner::compute_partition_key(seed)
225228
});
226229

crates/invoker-impl/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,7 +1823,7 @@ mod tests {
18231823
};
18241824
use restate_types::schema::service::ServiceMetadata;
18251825
use restate_types::service_protocol::ServiceProtocolVersion;
1826-
use restate_types::vqueue::{VQueueId, VQueueInstance, VQueueParent};
1826+
use restate_types::vqueue::VQueueId;
18271827
use tempfile::tempdir;
18281828
use test_log::test;
18291829
use tokio::sync::mpsc;
@@ -2385,10 +2385,9 @@ mod tests {
23852385

23862386
// Create an invocation state machine and register it with an in-flight notification proposal
23872387
let mut ism = InvocationStateMachine::create(
2388-
Some(VQueueId::new(
2389-
VQueueParent::default_unlimited(),
2388+
Some(VQueueId::custom(
23902389
invocation_id.partition_key(),
2391-
VQueueInstance::Default,
2390+
invocation_id.to_string(),
23922391
)),
23932392
Permit::new_empty(),
23942393
invocation_target.clone(),

crates/partition-store/src/tests/vqueue_table_test/mod.rs

Lines changed: 20 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,12 @@ use restate_storage_api::vqueue_table::{
2222
};
2323
use restate_types::clock::UniqueTimestamp;
2424
use restate_types::identifiers::PartitionKey;
25-
use restate_types::vqueue::{EffectivePriority, VQueueId, VQueueInstance, VQueueParent};
25+
use restate_types::vqueue::{EffectivePriority, VQueueId};
2626

2727
use crate::PartitionStore;
2828

2929
fn test_qid() -> VQueueId {
30-
VQueueId::new(
31-
VQueueParent::from_raw(1),
32-
PartitionKey::from(1000u64),
33-
VQueueInstance::from_raw(1),
34-
)
30+
VQueueId::custom(PartitionKey::from(1000u64), "1")
3531
}
3632

3733
fn entry_card(id: u8) -> EntryCard {
@@ -120,11 +116,7 @@ fn verify_priority_ordering(db: &crate::PartitionDb) {
120116

121117
/// Test: Within the same priority, items are ordered by visible_at then created_at.
122118
async fn ordering_within_same_priority<W: WriteVQueueTable>(txn: &mut W) {
123-
let qid = VQueueId::new(
124-
VQueueParent::from_raw(1),
125-
PartitionKey::from(2000u64),
126-
VQueueInstance::from_raw(1),
127-
);
119+
let qid = VQueueId::custom(2000, "1");
128120

129121
// All entries have UserDefault priority, but different visible_at and created_at
130122
// visible_at ordering: Now (0) < At(ts)
@@ -155,11 +147,7 @@ async fn ordering_within_same_priority<W: WriteVQueueTable>(txn: &mut W) {
155147
}
156148

157149
fn verify_ordering_within_same_priority(db: &crate::PartitionDb) {
158-
let qid = VQueueId::new(
159-
VQueueParent::from_raw(1),
160-
PartitionKey::from(2000u64),
161-
VQueueInstance::from_raw(1),
162-
);
150+
let qid = VQueueId::custom(2000, "1");
163151
let mut reader = db.new_inbox_reader(&qid);
164152
let items = collect_cursor(&mut reader);
165153

@@ -173,11 +161,7 @@ fn verify_ordering_within_same_priority(db: &crate::PartitionDb) {
173161

174162
/// Test: Running and inbox stages are separate namespaces.
175163
async fn running_and_inbox_are_separate<W: WriteVQueueTable>(txn: &mut W) {
176-
let qid = VQueueId::new(
177-
VQueueParent::from_raw(1),
178-
PartitionKey::from(3000u64),
179-
VQueueInstance::from_raw(1),
180-
);
164+
let qid = VQueueId::custom(3000, "1");
181165

182166
// Put one entry in Run stage
183167
let run_entry = entry_card(10);
@@ -191,11 +175,7 @@ async fn running_and_inbox_are_separate<W: WriteVQueueTable>(txn: &mut W) {
191175
}
192176

193177
fn verify_running_and_inbox_are_separate(db: &crate::PartitionDb) {
194-
let qid = VQueueId::new(
195-
VQueueParent::from_raw(1),
196-
PartitionKey::from(3000u64),
197-
VQueueInstance::from_raw(1),
198-
);
178+
let qid = VQueueId::custom(3000, "1");
199179

200180
// Running reader should only see the Run stage entry
201181
let mut run_reader = db.new_run_reader(&qid);
@@ -217,11 +197,7 @@ fn verify_running_and_inbox_are_separate(db: &crate::PartitionDb) {
217197

218198
/// Test: seek_after positions cursor strictly after the given item.
219199
async fn seek_after_works<W: WriteVQueueTable>(txn: &mut W) {
220-
let qid = VQueueId::new(
221-
VQueueParent::from_raw(1),
222-
PartitionKey::from(4000u64),
223-
VQueueInstance::from_raw(1),
224-
);
200+
let qid = VQueueId::custom(4000, "1");
225201

226202
// Insert entries in order
227203
let entries: Vec<_> = (1..=5).map(entry_card).collect();
@@ -231,11 +207,7 @@ async fn seek_after_works<W: WriteVQueueTable>(txn: &mut W) {
231207
}
232208

233209
fn verify_seek_after_works(db: &crate::PartitionDb) {
234-
let qid = VQueueId::new(
235-
VQueueParent::from_raw(1),
236-
PartitionKey::from(4000u64),
237-
VQueueInstance::from_raw(1),
238-
);
210+
let qid = VQueueId::custom(4000, "1");
239211

240212
let entries: Vec<_> = (1..=5).map(entry_card).collect();
241213

@@ -255,11 +227,7 @@ fn verify_seek_after_works(db: &crate::PartitionDb) {
255227

256228
/// Test: Empty queue returns None from peek.
257229
fn verify_empty_queue_returns_none(db: &crate::PartitionDb) {
258-
let qid = VQueueId::new(
259-
VQueueParent::from_raw(99),
260-
PartitionKey::from(9999u64), // unused partition key
261-
VQueueInstance::from_raw(99),
262-
);
230+
let qid = VQueueId::custom(9999, "99");
263231

264232
let mut run_reader = db.new_run_reader(&qid);
265233
run_reader.seek_to_first();
@@ -279,18 +247,9 @@ fn verify_empty_queue_returns_none(db: &crate::PartitionDb) {
279247
/// Test: Different vqueues (different parent/instance) are isolated.
280248
async fn vqueue_isolation<W: WriteVQueueTable>(txn: &mut W) {
281249
let pkey = PartitionKey::from(5000u64);
282-
283-
let qid1 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(1));
284-
let qid2 = VQueueId::new(
285-
VQueueParent::from_raw(1),
286-
pkey,
287-
VQueueInstance::from_raw(2), // different instance
288-
);
289-
let qid3 = VQueueId::new(
290-
VQueueParent::from_raw(2), // different parent
291-
pkey,
292-
VQueueInstance::from_raw(1),
293-
);
250+
let qid1 = VQueueId::custom(pkey, "1");
251+
let qid2 = VQueueId::custom(pkey, "2");
252+
let qid3 = VQueueId::custom(pkey, "3");
294253

295254
txn.put_inbox_entry(&qid1, Stage::Inbox, &entry_card(1));
296255
txn.put_inbox_entry(&qid2, Stage::Inbox, &entry_card(2));
@@ -300,9 +259,9 @@ async fn vqueue_isolation<W: WriteVQueueTable>(txn: &mut W) {
300259
fn verify_vqueue_isolation(db: &crate::PartitionDb) {
301260
let pkey = PartitionKey::from(5000u64);
302261

303-
let qid1 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(1));
304-
let qid2 = VQueueId::new(VQueueParent::from_raw(1), pkey, VQueueInstance::from_raw(2));
305-
let qid3 = VQueueId::new(VQueueParent::from_raw(2), pkey, VQueueInstance::from_raw(1));
262+
let qid1 = VQueueId::custom(pkey, "1");
263+
let qid2 = VQueueId::custom(pkey, "2");
264+
let qid3 = VQueueId::custom(pkey, "3");
306265

307266
// Each queue should only see its own entry
308267
let mut reader1 = db.new_inbox_reader(&qid1);
@@ -326,11 +285,7 @@ fn verify_vqueue_isolation(db: &crate::PartitionDb) {
326285
/// This verifies that the inbox reader (which uses a tailing iterator) can see
327286
/// items that were added after the reader was created, when re-seeking.
328287
async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore) {
329-
let qid = VQueueId::new(
330-
VQueueParent::from_raw(1),
331-
PartitionKey::from(6000u64),
332-
VQueueInstance::from_raw(1),
333-
);
288+
let qid = VQueueId::custom(6000, "1");
334289

335290
// Insert initial entries
336291
let entry1 = entry_card(1);
@@ -388,11 +343,7 @@ async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore)
388343
/// This verifies that when using seek_after to resume iteration, newly added
389344
/// items that sort after the seek position are visible.
390345
async fn tailing_iterator_sees_new_items_on_seek_after(rocksdb: &mut PartitionStore) {
391-
let qid = VQueueId::new(
392-
VQueueParent::from_raw(1),
393-
PartitionKey::from(7000u64),
394-
VQueueInstance::from_raw(1),
395-
);
346+
let qid = VQueueId::custom(7000, "1");
396347

397348
// Insert initial entries with different priorities
398349
let entry_high = entry_card_with_priority(1, EffectivePriority::System);
@@ -440,11 +391,7 @@ async fn tailing_iterator_sees_new_items_on_seek_after(rocksdb: &mut PartitionSt
440391
/// This verifies that when an item is deleted while the reader is open,
441392
/// re-seeking will not return the deleted item.
442393
async fn deleted_items_not_visible_after_reseek(rocksdb: &mut PartitionStore) {
443-
let qid = VQueueId::new(
444-
VQueueParent::from_raw(1),
445-
PartitionKey::from(8000u64),
446-
VQueueInstance::from_raw(1),
447-
);
394+
let qid = VQueueId::custom(8000, "1");
448395

449396
// Insert initial entries
450397
let entry1 = entry_card(1);
@@ -492,11 +439,7 @@ async fn deleted_items_not_visible_after_reseek(rocksdb: &mut PartitionStore) {
492439
/// This verifies that deleted items are not returned when using seek_after
493440
/// to resume iteration past a certain point.
494441
async fn deleted_items_not_visible_after_seek_after(rocksdb: &mut PartitionStore) {
495-
let qid = VQueueId::new(
496-
VQueueParent::from_raw(1),
497-
PartitionKey::from(8500u64),
498-
VQueueInstance::from_raw(1),
499-
);
442+
let qid = VQueueId::custom(8500, "1");
500443

501444
// Insert entries
502445
let entry1 = entry_card(1);
@@ -540,11 +483,7 @@ async fn deleted_items_not_visible_after_seek_after(rocksdb: &mut PartitionStore
540483
/// This tests a more complex scenario where items are both added and removed
541484
/// while the reader is open.
542485
async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) {
543-
let qid = VQueueId::new(
544-
VQueueParent::from_raw(1),
545-
PartitionKey::from(9000u64),
546-
VQueueInstance::from_raw(1),
547-
);
486+
let qid = VQueueId::custom(9000, "1");
548487

549488
// Insert initial entries with different priorities
550489
let entry_high = entry_card_with_priority(10, EffectivePriority::System);

crates/partition-store/src/vqueue_table/entry.rs

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use restate_storage_api::vqueue_table::{
1313
};
1414
use restate_types::clock::UniqueTimestamp;
1515
use restate_types::identifiers::{InvocationId, PartitionKey, WithPartitionKey as _};
16-
use restate_types::vqueue::{EffectivePriority, VQueueId, VQueueInstance, VQueueParent};
16+
use restate_types::vqueue::{EffectivePriority, VQueueId};
1717

1818
use crate::TableKind;
1919
use crate::keys::{EncodeTableKey, KeyKind, define_table_key};
@@ -61,25 +61,22 @@ impl From<&InvocationId> for EntryStateKey {
6161

6262
#[derive(Debug, Clone, PartialEq, bilrost::Message)]
6363
pub struct EntryStateHeader {
64+
#[bilrost(1)]
65+
pub qid: VQueueId,
6466
/// Unknown is an invalid state, this will be set to None when the invocation
6567
/// leaves the queue.
66-
#[bilrost(1)]
67-
pub stage: Stage,
6868
#[bilrost(2)]
69-
pub queue_parent: u32,
70-
#[bilrost(3)]
71-
pub queue_instance: u32,
69+
pub stage: Stage,
7270
// current entry card details
73-
#[bilrost(4)]
71+
#[bilrost(3)]
7472
pub effective_priority: EffectivePriority,
75-
#[bilrost(5)]
73+
#[bilrost(4)]
7674
pub visible_at: VisibleAt,
77-
#[bilrost(6)]
75+
#[bilrost(5)]
7876
pub created_at: UniqueTimestamp,
7977
}
8078

8179
pub struct OwnedHeader {
82-
pub(crate) partition_key: PartitionKey,
8380
pub(crate) kind: EntryKind,
8481
pub(crate) id: EntryId,
8582

@@ -91,24 +88,12 @@ impl AsEntryStateHeader for OwnedHeader {
9188
self.kind
9289
}
9390

94-
fn stage(&self) -> Stage {
95-
self.inner.stage
96-
}
97-
98-
fn queue_parent(&self) -> VQueueParent {
99-
VQueueParent::from_raw(self.inner.queue_parent)
100-
}
101-
102-
fn queue_instance(&self) -> VQueueInstance {
103-
VQueueInstance::from_raw(self.inner.queue_instance)
91+
fn vqueue_id(&self) -> &VQueueId {
92+
&self.inner.qid
10493
}
10594

106-
fn vqueue_id(&self) -> VQueueId {
107-
VQueueId::new(
108-
self.queue_parent(),
109-
self.partition_key,
110-
self.queue_instance(),
111-
)
95+
fn stage(&self) -> Stage {
96+
self.inner.stage
11297
}
11398

11499
fn current_entry_card(&self) -> EntryCard {
@@ -136,15 +121,7 @@ impl<E> AsEntryStateHeader for OwnedEntryState<E> {
136121
self.header.stage()
137122
}
138123

139-
fn queue_parent(&self) -> VQueueParent {
140-
self.header.queue_parent()
141-
}
142-
143-
fn queue_instance(&self) -> VQueueInstance {
144-
self.header.queue_instance()
145-
}
146-
147-
fn vqueue_id(&self) -> VQueueId {
124+
fn vqueue_id(&self) -> &VQueueId {
148125
self.header.vqueue_id()
149126
}
150127

@@ -180,20 +157,8 @@ impl<E> AsEntryState for OwnedEntryState<E> {
180157
// self.inner.stage
181158
// }
182159
//
183-
// fn queue_parent(&self) -> VQueueParent {
184-
// VQueueParent::from_raw(self.inner.queue_parent)
185-
// }
186-
//
187-
// fn queue_instance(&self) -> VQueueInstance {
188-
// VQueueInstance::from_raw(self.inner.queue_instance)
189-
// }
190-
//
191-
// fn vqueue_id(&self) -> VQueueId {
192-
// VQueueId::new(
193-
// self.queue_parent(),
194-
// self.partition_key,
195-
// self.queue_instance(),
196-
// )
160+
// fn vqueue_id(&self) -> &VQueueId {
161+
// self.vqueue_id
197162
// }
198163
//
199164
// fn current_entry_card(&self) -> EntryCard {

0 commit comments

Comments
 (0)