Skip to content

Commit 2883d05

Browse files
committed
Remove ResumeAlreadyRunning as it's not going to be feasible
1 parent 692f06b commit 2883d05

File tree

6 files changed

+14
-63
lines changed

6 files changed

+14
-63
lines changed

crates/vqueues/src/metric_definitions.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ pub const VQUEUE_GLOBAL_THROTTLE_WAIT_MS: &str =
2424
pub const VQUEUE_LOCAL_THROTTLE_WAIT_MS: &str = "restate.vqueue.scheduler.vqueue_throttle_ms.total";
2525

2626
pub const ACTION_YIELD: &str = "yield";
27-
pub const ACTION_RESUME: &str = "resume";
2827
pub const ACTION_RUN: &str = "run";
2928
pub const ACTION_START: &str = "start";
3029

@@ -72,14 +71,8 @@ pub fn describe_metrics() {
7271
);
7372
}
7473

75-
pub fn publish_scheduler_decision_metrics(
76-
num_start: u16,
77-
num_run: u16,
78-
num_yield: u16,
79-
num_resume: u16,
80-
) {
74+
pub fn publish_scheduler_decision_metrics(num_start: u16, num_run: u16, num_yield: u16) {
8175
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_START).increment(num_start as u64);
8276
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_RUN).increment(num_run as u64);
83-
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_RESUME).increment(num_resume as u64);
8477
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_YIELD).increment(num_yield as u64);
8578
}

crates/vqueues/src/scheduler.rs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,7 @@ impl<Item> AssignmentSegment<Item> {
237237
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
238238
pub enum Action {
239239
/// Items are in inbox, let's move them to running queue.
240-
MoveToRunning,
241-
/// Items are already in running queue, execute them now.
242-
ResumeAlreadyRunning,
240+
MoveToRun,
243241
/// Items was already in running queue and we want them to yield back to the inbox
244242
Yield,
245243
}
@@ -252,8 +250,6 @@ pub struct Decision<Item> {
252250
num_start: u16,
253251
/// running items previously started
254252
num_run: u16,
255-
/// running items that should continue to run
256-
num_resume: u16,
257253
/// Items in run queue that need to go back to waiting inbox
258254
num_yield: u16,
259255
}
@@ -264,7 +260,6 @@ impl<Item> Default for Decision<Item> {
264260
q: HashMap::default(),
265261
num_start: 0,
266262
num_run: 0,
267-
num_resume: 0,
268263
num_yield: 0,
269264
}
270265
}
@@ -281,10 +276,9 @@ impl<Item: VQueueEntry> Decision<Item> {
281276
let assignments = self.q.entry_ref(qid).or_default();
282277
assignments.set_latest_run_tb_zero_time(updated_zt);
283278
match action {
284-
Action::ResumeAlreadyRunning => self.num_resume += 1,
285279
Action::Yield => self.num_yield += 1,
286-
Action::MoveToRunning if entry.item.priority().is_new() => self.num_start += 1,
287-
Action::MoveToRunning => self.num_run += 1,
280+
Action::MoveToRun if entry.item.priority().is_new() => self.num_start += 1,
281+
Action::MoveToRun => self.num_run += 1,
288282
}
289283
assignments.push(action, entry);
290284
}
@@ -313,31 +307,18 @@ impl<Item: VQueueEntry> Decision<Item> {
313307
self.num_run as usize
314308
}
315309

316-
#[cfg(test)]
317-
pub fn num_resume(&self) -> usize {
318-
self.num_resume as usize
319-
}
320-
321310
#[cfg(test)]
322311
pub fn num_yield(&self) -> usize {
323312
self.num_yield as usize
324313
}
325314

326315
/// Total number of items in all queues
327316
pub fn total_items(&self) -> usize {
328-
self.num_start as usize
329-
+ self.num_run as usize
330-
+ self.num_resume as usize
331-
+ self.num_yield as usize
317+
self.num_start as usize + self.num_run as usize + self.num_yield as usize
332318
}
333319

334320
pub fn report_metrics(&self) {
335-
publish_scheduler_decision_metrics(
336-
self.num_start,
337-
self.num_run,
338-
self.num_yield,
339-
self.num_resume,
340-
);
321+
publish_scheduler_decision_metrics(self.num_start, self.num_run, self.num_yield);
341322
}
342323
}
343324

crates/vqueues/src/scheduler/drr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ mod tests {
646646

647647
decision.into_iter().for_each(|(qid, assignments)| {
648648
for (action, mut items) in assignments.into_iter_per_action() {
649-
assert_eq!(action, Action::MoveToRunning);
649+
assert_eq!(action, Action::MoveToRun);
650650
// for each vqueue, we should see the head item. The head item is the lowest
651651
// ID, so we should expect the item ID to the same as the queue ID.
652652
assert_eq!(items.len(), 1);
@@ -868,7 +868,7 @@ mod tests {
868868
// creation timestamp is identical (for the purpose of the test)
869869
for (action, entries) in assignments.iter() {
870870
assert_eq!(entries.len(), 2);
871-
assert_eq!(action, Action::MoveToRunning);
871+
assert_eq!(action, Action::MoveToRun);
872872
assert_eq!(entries[0].item.priority, EffectivePriority::UserDefault);
873873
assert_eq!(entries[0].item.id, EntryId::new([1; 16]));
874874
in_flight.push(entries[0].item.clone());
@@ -920,7 +920,7 @@ mod tests {
920920

921921
for (action, entries) in assignments.iter() {
922922
assert_eq!(entries.len(), 2);
923-
assert_eq!(action, Action::MoveToRunning);
923+
assert_eq!(action, Action::MoveToRun);
924924
// Verify first item is high priority
925925
assert_eq!(entries[0].item.priority, EffectivePriority::UserHigh);
926926
assert_eq!(entries[0].item.id, EntryId::new([125; 16]));
@@ -973,7 +973,7 @@ mod tests {
973973
for (_, assignments) in decision {
974974
for (action, entries) in assignments.iter() {
975975
assert_eq!(entries.len(), 2);
976-
assert_eq!(action, Action::MoveToRunning);
976+
assert_eq!(action, Action::MoveToRun);
977977
// Verify first item is high priority
978978
assert_eq!(entries[0].item.priority, EffectivePriority::UserDefault);
979979
assert_eq!(entries[0].item.id, EntryId::new([4; 16]));
@@ -994,7 +994,7 @@ mod tests {
994994
for (_, assignments) in decision {
995995
for (action, entries) in assignments.iter() {
996996
assert_eq!(entries.len(), 2);
997-
assert_eq!(action, Action::MoveToRunning);
997+
assert_eq!(action, Action::MoveToRun);
998998
// Verify first item is high priority
999999
assert_eq!(entries[0].item.priority, EffectivePriority::UserDefault);
10001000
assert_eq!(entries[0].item.id, EntryId::new([6; 16]));

crates/vqueues/src/scheduler/vqueue_state.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,6 @@ impl<S: VQueueStore> VQueueState<S> {
255255
}
256256

257257
if is_running {
258-
// switch between these to change the behavior as needed
259-
// Action::ResumeAlreadyRunning;
260-
// Note that resumption requires acquiring concurrency permits similar to
261-
// MoveToRunning. This is currently not implemented since (at the moment) we
262-
// only support yielding.
263258
let result = Pop::Item {
264259
action: Action::Yield,
265260
permit: None,
@@ -337,7 +332,7 @@ impl<S: VQueueStore> VQueueState<S> {
337332
self.unconfirmed_assignments
338333
.insert(inbox_head.unique_hash());
339334
let result = Pop::Item {
340-
action: Action::MoveToRunning,
335+
action: Action::MoveToRun,
341336
permit,
342337
entry: Entry {
343338
item: inbox_head.clone(),

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,6 @@ impl LeaderState {
267267
pub async fn handle_action_effects(
268268
&mut self,
269269
action_effects: impl IntoIterator<Item = ActionEffect>,
270-
// invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
271270
) -> Result<(), Error> {
272271
for effect in action_effects {
273272
match effect {
@@ -285,7 +284,7 @@ impl LeaderState {
285284
assignment.push(item, stats);
286285
}
287286
match action {
288-
scheduler::Action::MoveToRunning => {
287+
scheduler::Action::MoveToRun => {
289288
let command = vqueues::VQWaitingToRunning {
290289
assignment,
291290
meta_updates: vqueues::MetaUpdates {
@@ -306,21 +305,6 @@ impl LeaderState {
306305
Command::VQYieldRunning(command),
307306
));
308307
}
309-
scheduler::Action::ResumeAlreadyRunning => {
310-
todo!(
311-
"Unsupported: We don't support directly resuming at the moment"
312-
)
313-
// invoker_tx
314-
// .run(
315-
// self.partition_id,
316-
// self.leader_epoch,
317-
// inv_id,
318-
// // todo: fix/remove
319-
// SharedString::from_borrowed(""),
320-
// SharedString::from_borrowed(""),
321-
// )
322-
// .map_err(Error::Invoker)?;
323-
}
324308
}
325309
}
326310
}

crates/worker/src/partition/leadership/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -640,9 +640,7 @@ where
640640
// nothing to do :-)
641641
}
642642
State::Leader(leader_state) => {
643-
leader_state
644-
.handle_action_effects(action_effects /*, &mut self.invoker_tx */)
645-
.await?
643+
leader_state.handle_action_effects(action_effects).await?
646644
}
647645
}
648646

0 commit comments

Comments
 (0)