Skip to content

Commit 6b835c2

Browse files
committed
Remove num_tokens_used from VQueueMeta
1 parent faf363b commit 6b835c2

8 files changed

Lines changed: 16 additions & 114 deletions

File tree

crates/storage-api/src/vqueue_table/metadata.rs

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,18 @@ pub struct VQueueMeta {
7474
/// being added to the vqueue. The capacity configuration will limit this value.
7575
#[bilrost(tag(2))]
7676
pub(crate) length: u32,
77-
/// Number of concurrency tokens being used
78-
#[bilrost(tag(3))]
79-
pub(crate) num_tokens_used: u32,
8077
/// The number of entries waiting to be dequeued. The vector index implies the priority
81-
#[bilrost(tag(4), encoding(packed))]
78+
#[bilrost(tag(3), encoding(packed))]
8279
pub(crate) num_waiting: [u32; EffectivePriority::NUM_PRIORITIES],
83-
#[bilrost(tag(5))]
80+
#[bilrost(tag(4))]
8481
pub(crate) num_running: u32,
85-
#[bilrost(tag(6))]
82+
#[bilrost(tag(5))]
8683
pub(crate) stats: VQueueStatistics,
87-
#[bilrost(tag(7))]
84+
#[bilrost(tag(6))]
8885
pub(crate) scope: Option<Scope>,
89-
#[bilrost(tag(8))]
86+
#[bilrost(tag(7))]
9087
pub(crate) limit_key: LimitKey<ReString>,
91-
#[bilrost(tag(9))]
88+
#[bilrost(tag(8))]
9289
lock_name: Option<LockName>,
9390
}
9491

@@ -101,7 +98,6 @@ impl VQueueMeta {
10198
Self {
10299
is_paused: false,
103100
length: 0,
104-
num_tokens_used: 0,
105101
num_waiting: [0; EffectivePriority::NUM_PRIORITIES],
106102
num_running: 0,
107103
stats: VQueueStatistics::new(WallClock::recent_ms()),
@@ -131,10 +127,6 @@ impl VQueueMeta {
131127
&self.limit_key
132128
}
133129

134-
pub fn tokens_used(&self) -> u32 {
135-
self.num_tokens_used
136-
}
137-
138130
pub fn len(&self) -> u32 {
139131
self.length
140132
}
@@ -199,14 +191,6 @@ impl VQueueMeta {
199191
self.num_waiting[priority as usize] -= 1;
200192
}
201193

202-
fn acquire_token(&mut self) {
203-
self.num_tokens_used += 1;
204-
}
205-
206-
fn release_token(&mut self) {
207-
self.num_tokens_used = self.num_tokens_used.saturating_sub(1);
208-
}
209-
210194
pub fn apply_update(&mut self, update: &Update) -> anyhow::Result<()> {
211195
debug_assert!(self.length >= self.total_waiting());
212196
let now = update.ts;
@@ -244,13 +228,8 @@ impl VQueueMeta {
244228
let latency_ms = now.to_unix_millis().saturating_sub_ms(visible_since);
245229
self.stats.update_avg_queue_duration(latency_ms);
246230
}
247-
248-
if !priority.token_held() {
249-
self.acquire_token();
250-
}
251231
}
252232
Action::Park {
253-
should_release_concurrency_token,
254233
priority,
255234
previous_stage,
256235
} => {
@@ -269,12 +248,6 @@ impl VQueueMeta {
269248
// do nothing.
270249
}
271250
}
272-
273-
if should_release_concurrency_token && priority.token_held() {
274-
// Release the token immediately on park if this entry doesn't require
275-
// holding while parked.
276-
self.release_token();
277-
}
278251
}
279252
Action::WakeUp { priority } => {
280253
debug_assert!(self.length > 0);
@@ -306,10 +279,6 @@ impl VQueueMeta {
306279
// do nothing.
307280
}
308281
}
309-
310-
if priority.token_held() {
311-
self.release_token();
312-
}
313282
}
314283
}
315284
Ok(())
@@ -379,7 +348,6 @@ pub enum Action {
379348
#[bilrost(tag(4), message)]
380349
Park {
381350
priority: EffectivePriority,
382-
should_release_concurrency_token: bool,
383351
previous_stage: Stage,
384352
},
385353
// Wake up after pause or suspend.

crates/vqueues/src/lib.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -433,15 +433,12 @@ where
433433

434434
/// Park an entry
435435
///
436-
/// If `should_release_concurrency_token` is true, the parked entry will release its token
437-
///
438436
/// Returns `true` if the entry was found in inbox and parked correctly, `false` otherwise.
439437
pub fn park(
440438
&mut self,
441439
at: UniqueTimestamp,
442440
card: &EntryCard,
443441
previous_stage: Stage,
444-
should_release_concurrency_token: bool,
445442
) -> Result<bool, StorageError> {
446443
let meta = self.cache.get_mut(self.cache_key).unwrap();
447444

@@ -457,7 +454,6 @@ where
457454
let update = metadata::Update::new(
458455
at,
459456
metadata::Action::Park {
460-
should_release_concurrency_token,
461457
priority: card.priority,
462458
previous_stage,
463459
},
@@ -485,16 +481,10 @@ where
485481
unreachable!("Cannot remove an item from a dormant vqueue");
486482
}
487483

488-
let mut modified_card = card.clone();
489-
if should_release_concurrency_token && card.priority.token_held() {
490-
// adjust the priority to reflect releasing the token
491-
modified_card.priority = EffectivePriority::Started;
492-
}
493-
494484
self.storage
495-
.put_inbox_entry(meta.vqueue_id(), Stage::Park, &modified_card);
485+
.put_inbox_entry(meta.vqueue_id(), Stage::Park, card);
496486
self.storage
497-
.put_vqueue_entry_state(meta.vqueue_id(), &modified_card, Stage::Park, ());
487+
.put_vqueue_entry_state(meta.vqueue_id(), card, Stage::Park, ());
498488

499489
if let Some(collector) = self.action_collector.as_deref_mut() {
500490
collector.push(A::from(event));

crates/vqueues/src/scheduler.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ pub struct VQueueSchedulerStatus {
5454
pub remaining_running: u32,
5555
/// Number of items waiting in the inbox stage.
5656
pub waiting_inbox: u32,
57-
/// Number of concurrency tokens currently in use by this vqueue.
58-
pub tokens_used: u32,
5957
/// The current scheduling status of this vqueue.
6058
pub status: SchedulingStatus,
6159
}

crates/vqueues/src/scheduler/drr.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,6 @@ impl<S: VQueueStore> DRRScheduler<S> {
352352
wait_stats: qstate.get_head_wait_stats(),
353353
remaining_running: qstate.num_remaining_in_running_stage(),
354354
waiting_inbox: qstate.num_waiting_inbox(),
355-
tokens_used: qstate.num_tokens_used(),
356355
status: self.eligible.get_status(qstate),
357356
};
358357

@@ -370,7 +369,6 @@ impl<S: VQueueStore> DRRScheduler<S> {
370369
wait_stats: qstate.get_head_wait_stats(),
371370
remaining_running: qstate.num_remaining_in_running_stage(),
372371
waiting_inbox: qstate.num_waiting_inbox(),
373-
tokens_used: qstate.num_tokens_used(),
374372
status: self.eligible.get_status(qstate),
375373
})
376374
}
@@ -895,7 +893,6 @@ mod tests {
895893
// 5 left in inbox from scheduler's perspective
896894
assert_eq!(status.waiting_inbox, 5);
897895
assert_eq!(status.remaining_running, 0);
898-
assert_eq!(status.tokens_used, 2);
899896

900897
// Now enqueue high priority item
901898
let mut txn = rocksdb.transaction();
@@ -948,7 +945,6 @@ mod tests {
948945
// we added one, and took 2 (5 + 1 - 2 = 4)
949946
assert_eq!(status.waiting_inbox, 4);
950947
assert_eq!(status.remaining_running, 0);
951-
assert_eq!(status.tokens_used, 4);
952948
// let's confirm all the items
953949
events.clear();
954950
let mut txn = rocksdb.transaction();
@@ -1144,14 +1140,12 @@ mod tests {
11441140
);
11451141
assert_eq!(status.waiting_inbox, 1);
11461142
assert_eq!(status.remaining_running, 0);
1147-
assert_eq!(status.tokens_used, 2);
11481143

11491144
// qid2 is exhausted
11501145
let status = scheduler.get_status(&qid2).unwrap();
11511146
assert_eq!(status.status, SchedulingStatus::Empty);
11521147
assert_eq!(status.waiting_inbox, 0);
11531148
assert_eq!(status.remaining_running, 0);
1154-
assert_eq!(status.tokens_used, 1);
11551149

11561150
// Pop the permit from the scheduler to release the concurrency token.
11571151
// Only inbox items acquire permits, running items yield without permits.

crates/vqueues/src/scheduler/vqueue_state.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,4 @@ impl<S: VQueueStore> VQueueState<S> {
417417
.total_waiting()
418418
.saturating_sub(self.unconfirmed_assignments.len() as u32)
419419
}
420-
421-
pub fn num_tokens_used(&self) -> u32 {
422-
self.meta.tokens_used() + self.unconfirmed_assignments.len() as u32
423-
}
424420
}

crates/worker/src/partition/state_machine/lifecycle/paused.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
use crate::debug_if_leader;
1212
use crate::partition::state_machine::lifecycle::event::ApplyEventCommand;
13-
use crate::partition::state_machine::{CommandHandler, Error, ParkCause, StateMachineApplyContext};
13+
use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext};
1414
use restate_storage_api::invocation_status_table::{
1515
InvocationStatus, ReadInvocationStatusTable, WriteInvocationStatusTable,
1616
};
@@ -58,12 +58,8 @@ where
5858
debug_if_leader!(ctx.is_leader, "Paused the invocation");
5959

6060
if Configuration::pinned().common.experimental_enable_vqueues {
61-
ctx.vqueue_park_invocation(
62-
&self.invocation_id,
63-
&invoked_meta.invocation_target,
64-
ParkCause::Pause,
65-
)
66-
.await?;
61+
ctx.vqueue_park_invocation(&self.invocation_id, &invoked_meta.invocation_target)
62+
.await?;
6763
}
6864

6965
let mut invocation_status = InvocationStatus::Paused(invoked_meta);

crates/worker/src/partition/state_machine/lifecycle/suspend.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use crate::partition::state_machine::{CommandHandler, Error, ParkCause, StateMachineApplyContext};
11+
use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext};
1212
use restate_storage_api::invocation_status_table::{InvocationStatus, WriteInvocationStatusTable};
1313
use restate_storage_api::journal_table_v2::ReadJournalTable;
1414
use restate_storage_api::lock_table::WriteLockTable;
@@ -88,7 +88,6 @@ where
8888
ctx.vqueue_park_invocation(
8989
&self.invocation_id,
9090
&in_flight_invocation_metadata.invocation_target,
91-
ParkCause::Suspend,
9291
)
9392
.await?;
9493
}

crates/worker/src/partition/state_machine/mod.rs

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ use restate_types::invocation::{
8484
AttachInvocationRequest, IngressInvocationResponseSink, InvocationMutationResponseSink,
8585
InvocationQuery, InvocationResponse, InvocationTarget, InvocationTargetType,
8686
InvocationTermination, JournalCompletionTarget, NotifySignalRequest, ResponseResult,
87-
ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, ServiceType,
88-
Source, SubmitNotificationSink, TerminationFlavor, VirtualObjectHandlerType,
89-
WorkflowHandlerType,
87+
ServiceInvocation, ServiceInvocationResponseSink, ServiceInvocationSpanContext, Source,
88+
SubmitNotificationSink, TerminationFlavor, VirtualObjectHandlerType, WorkflowHandlerType,
9089
};
9190
use restate_types::invocation::{InvocationInput, SpanRelation};
9291
use restate_types::journal::Completion;
@@ -4475,12 +4474,8 @@ impl<S> StateMachineApplyContext<'_, S> {
44754474
metadata.timestamps.update(self.record_created_at);
44764475

44774476
if Configuration::pinned().common.experimental_enable_vqueues {
4478-
self.vqueue_park_invocation(
4479-
&invocation_id,
4480-
&metadata.invocation_target,
4481-
ParkCause::Suspend,
4482-
)
4483-
.await?;
4477+
self.vqueue_park_invocation(&invocation_id, &metadata.invocation_target)
4478+
.await?;
44844479
}
44854480

44864481
self.storage
@@ -5115,7 +5110,6 @@ impl<S> StateMachineApplyContext<'_, S> {
51155110
&mut self,
51165111
invocation_id: &InvocationId,
51175112
invocation_target: &InvocationTarget,
5118-
cause: ParkCause,
51195113
) -> Result<(), Error>
51205114
where
51215115
S: WriteVQueueTable + WriteLockTable + ReadVQueueTable,
@@ -5150,34 +5144,10 @@ impl<S> StateMachineApplyContext<'_, S> {
51505144

51515145
let now = UniqueTimestamp::from_unix_millis_unchecked(self.record_created_at);
51525146

5153-
let should_release_concurrency_token = match cause {
5154-
ParkCause::Suspend => {
5155-
// Always hold on to your concurrency token until the invocation is completed if
5156-
// we are suspending for all types (services, VOs, workflows). This has the benefit
5157-
// of an easy to reason about concurrency model for our users. The downside is that
5158-
// callers might deadlock if they call a limited service which has no more
5159-
// concurrency tokens left and there is a cyclic dependency (e.g. a limited service
5160-
// calling itself).
5161-
false
5162-
}
5163-
ParkCause::Pause => {
5164-
// We release the concurrency token in case we are pausing a service because
5165-
// unpausing requires human intervention, and we don't want to block other service
5166-
// invocations.
5167-
//
5168-
// Note that we don't do this for paused VOs and workflows because they need to
5169-
// ensure that no other instance can run while they hold their lock. Technically,
5170-
// we still have the service_status_table which stores the locking information, and
5171-
// we need to keep things in sync until we decide what to do with this table.
5172-
matches!(invocation_target.service_ty(), ServiceType::Service)
5173-
}
5174-
};
5175-
51765147
vqueue.park(
51775148
now,
51785149
&entry_state_header.current_entry_card(),
51795150
entry_state_header.stage(),
5180-
should_release_concurrency_token,
51815151
)?;
51825152

51835153
Ok(())
@@ -5378,15 +5348,6 @@ impl<S> StateMachineApplyContext<'_, S> {
53785348
}
53795349
}
53805350

5381-
/// Cause for parking an invocation
5382-
#[derive(Debug)]
5383-
enum ParkCause {
5384-
/// The invocation suspends to await completion or signals
5385-
Suspend,
5386-
/// The invocation pauses because it depleted it retries or was manually paused
5387-
Pause,
5388-
}
5389-
53905351
// To write completions in the effects log
53915352
struct CompletionResultFmt<'a>(&'a CompletionResult);
53925353

0 commit comments

Comments
 (0)