Skip to content

Commit 0d6829d

Browse files
authored
[Block Executor] Follow up fix-ups (aptos-labs#12468)
1 parent 93ed9ae commit 0d6829d

File tree

5 files changed

+79
-61
lines changed

5 files changed

+79
-61
lines changed

aptos-move/block-executor/src/executor.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ where
383383
scheduler.queueing_commits_arm();
384384
}
385385

386-
Ok(SchedulerTask::NoTask)
386+
Ok(SchedulerTask::Retry)
387387
}
388388
}
389389

@@ -739,7 +739,7 @@ where
739739
drop(init_timer);
740740

741741
let _timer = WORK_WITH_TASK_SECONDS.start_timer();
742-
let mut scheduler_task = SchedulerTask::NoTask;
742+
let mut scheduler_task = SchedulerTask::Retry;
743743

744744
let drain_commit_queue = || -> Result<(), PanicError> {
745745
while let Ok(txn_idx) = scheduler.pop_from_commit_queue() {
@@ -813,16 +813,19 @@ where
813813
scheduler.finish_execution(txn_idx, incarnation, updates_outside)?
814814
},
815815
SchedulerTask::ExecutionTask(_, _, ExecutionTaskType::Wakeup(condvar)) => {
816-
let (lock, cvar) = &*condvar;
817-
// Mark dependency resolved.
818-
let mut lock = lock.lock();
819-
*lock = DependencyStatus::Resolved;
820-
// Wake up the process waiting for dependency.
821-
cvar.notify_one();
816+
{
817+
let (lock, cvar) = &*condvar;
818+
819+
// Mark dependency resolved.
820+
let mut lock = lock.lock();
821+
*lock = DependencyStatus::Resolved;
822+
// Wake up the process waiting for dependency.
823+
cvar.notify_one();
824+
}
822825

823826
scheduler.next_task()
824827
},
825-
SchedulerTask::NoTask => scheduler.next_task(),
828+
SchedulerTask::Retry => scheduler.next_task(),
826829
SchedulerTask::Done => {
827830
drain_commit_queue()?;
828831
break Ok(());

aptos-move/block-executor/src/proptest_types/tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ proptest! {
9999
#[test]
100100
fn no_early_termination(
101101
universe in vec(any::<[u8; 32]>(), 100),
102-
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 5000).no_shrink(),
102+
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 4000).no_shrink(),
103103
abort_transactions in vec(any::<Index>(), 0),
104104
skip_rest_transactions in vec(any::<Index>(), 0),
105105
) {
@@ -108,8 +108,8 @@ proptest! {
108108

109109
#[test]
110110
fn abort_only(
111-
universe in vec(any::<[u8; 32]>(), 100),
112-
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 5000).no_shrink(),
111+
universe in vec(any::<[u8; 32]>(), 80),
112+
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 300).no_shrink(),
113113
abort_transactions in vec(any::<Index>(), 5),
114114
skip_rest_transactions in vec(any::<Index>(), 0),
115115
) {
@@ -118,8 +118,8 @@ proptest! {
118118

119119
#[test]
120120
fn skip_rest_only(
121-
universe in vec(any::<[u8; 32]>(), 100),
122-
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 5000).no_shrink(),
121+
universe in vec(any::<[u8; 32]>(), 80),
122+
transaction_gen in vec(any::<TransactionGen<[u8;32]>>(), 300).no_shrink(),
123123
abort_transactions in vec(any::<Index>(), 0),
124124
skip_rest_transactions in vec(any::<Index>(), 5),
125125
) {

aptos-move/block-executor/src/scheduler.rs

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,17 @@ pub enum ExecutionTaskType {
7979
Wakeup(DependencyCondvar),
8080
}
8181

82-
/// A holder for potential task returned from the Scheduler. ExecutionTask and ValidationTask
83-
/// each contain a version of transaction that must be executed or validated, respectively.
84-
/// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that
85-
/// there are no more tasks and the scheduler is done.
82+
/// Task type that the parallel execution workers get from the scheduler.
8683
#[derive(Debug)]
8784
pub enum SchedulerTask {
85+
/// Execution task with a version of the transaction, and whether it's waking up an already
86+
/// executing worker (suspended / waiting on a dependency).
8887
ExecutionTask(TxnIndex, Incarnation, ExecutionTaskType),
88+
/// Validation task with a version of the transaction, and the validation wave information.
8989
ValidationTask(TxnIndex, Incarnation, Wave),
90-
NoTask,
90+
/// Retry holds no task (similar None if we wrapped tasks in Option)
91+
Retry,
92+
/// Done implies that there are no more tasks and the scheduler is done.
9193
Done,
9294
}
9395

@@ -453,7 +455,7 @@ impl Scheduler {
453455
&& !self.never_executed(idx_to_validate);
454456

455457
if !prefer_validate && idx_to_execute >= self.num_txns {
456-
return SchedulerTask::NoTask;
458+
return SchedulerTask::Retry;
457459
}
458460

459461
if prefer_validate {
@@ -511,7 +513,7 @@ impl Scheduler {
511513
/// After txn is executed, schedule its dependencies for re-execution.
512514
/// If revalidate_suffix is true, decrease validation_idx to schedule all higher transactions
513515
/// for (re-)validation. Otherwise, in some cases (if validation_idx not already lower),
514-
/// return a validation task of the transaction to the caller (otherwise NoTask).
516+
/// return a validation task of the transaction to the caller (otherwise Retry).
515517
pub fn finish_execution(
516518
&self,
517519
txn_idx: TxnIndex,
@@ -521,10 +523,10 @@ impl Scheduler {
521523
// Note: It is preferable to hold the validation lock throughout the finish_execution,
522524
// in particular before updating execution status. The point was that we don't want
523525
// any validation to come before the validation status is correspondingly updated.
524-
// It may be possible to make work more granular, but shouldn't make performance
525-
// difference and like this correctness argument is much easier to see, in fact also
526-
// the reason why we grab write lock directly, and never release it during the whole function.
527-
// So even validation status readers have to wait if they somehow end up at the same index.
526+
// It may be possible to reduce granularity, but shouldn't make performance difference
527+
// and like this correctness argument is much easier to see, which is also why we grab
528+
// the write lock directly, and never release it during the whole function. This way,
529+
// even validation status readers have to wait if they somehow end up at the same index.
528530
let mut validation_status = self.txn_status[txn_idx as usize].1.write();
529531
self.set_executed_status(txn_idx, incarnation)?;
530532

@@ -552,7 +554,7 @@ impl Scheduler {
552554
));
553555
}
554556

555-
Ok(SchedulerTask::NoTask)
557+
Ok(SchedulerTask::Retry)
556558
}
557559

558560
pub fn finish_execution_during_commit(&self, txn_idx: TxnIndex) -> Result<(), PanicError> {
@@ -567,7 +569,7 @@ impl Scheduler {
567569
}
568570

569571
/// Finalize a validation task of version (txn_idx, incarnation). In some cases,
570-
/// may return a re-execution task back to the caller (otherwise, NoTask).
572+
/// may return a re-execution task back to the caller (otherwise, Retry).
571573
pub fn finish_abort(
572574
&self,
573575
txn_idx: TxnIndex,
@@ -611,7 +613,7 @@ impl Scheduler {
611613
}
612614
}
613615

614-
Ok(SchedulerTask::NoTask)
616+
Ok(SchedulerTask::Retry)
615617
}
616618

617619
/// This function can halt the BlockSTM early, even if there are unfinished tasks.
@@ -674,13 +676,12 @@ impl TWaitForDependency for Scheduler {
674676
// mutexes. Thus, acquisitions always happen in the same order (here), may not deadlock.
675677

676678
if self.is_executed(dep_txn_idx, true).is_some() {
677-
// Current status of dep_txn_idx is 'executed', so the dependency got resolved.
678-
// To avoid zombie dependency (and losing liveness), must return here and
679-
// not add a (stale) dependency.
679+
// Current status of dep_txn_idx is 'executed' (or even committed), so the dependency
680+
// got resolved. To avoid zombie dependency (and losing liveness), must return here
681+
// and not add a (stale) dependency.
680682

681683
// Note: acquires (a different, status) mutex, while holding (dependency) mutex.
682-
// Only place in scheduler where a thread may hold >1 mutexes, hence, such
683-
// acquisitions always happens in the same order (this function), may not deadlock.
684+
// For status lock this only happens here, thus the order is always higher index to lower.
684685
return Ok(DependencyResult::Resolved);
685686
}
686687

@@ -706,17 +707,31 @@ impl TWaitForDependency for Scheduler {
706707

707708
/// Private functions of the Scheduler
708709
impl Scheduler {
709-
/// Helper function to be called from Scheduler::halt(); Sets the transaction status to Halted.
710-
/// If the transaction is suspended, it will wake it up.
710+
/// Helper function to be called from Scheduler::halt(); Sets the transaction status to Halted and
711+
/// notifies the waiting thread, if applicable. The guarantee is that if halt(txn_idx) is called,
712+
/// then no thread can remain suspended on some dependency while executing transaction txn_idx.
713+
///
714+
/// Proof sketch as a result of code invariants that can be checked:
715+
/// 1. Status is replaced with ExecutionHalted, and ExecutionHalted status can never change.
716+
/// 2. In order for wait_for_dependency by txn_idx to return a CondVar to wait on, suspend must
717+
/// be successful, which implies the status at that point may not be ExecutionHalted and that
718+
/// the status at that point would be set to Suspended(_, CondVar).
719+
/// 3. Suspended status can turn into Ready or Executing, all containing the CondVar, unless a
720+
/// worker with a ExecutionTaskType::WakeUp actually wakes up the suspending thread.
721+
/// 4. The waking up consists of acquiring the CondVar lock, setting the status to ExecutionHalted
722+
/// or Resolved (if the worker with WakeUp task did it), and also calling notify_one. This
723+
/// ensures that a thread that waits until the condition variable changes from Unresolved will
724+
/// get released in all cases.
711725
fn halt_transaction_execution(&self, txn_idx: TxnIndex) {
712726
let mut status = self.txn_status[txn_idx as usize].0.write();
713727

714-
// Replace status to sure that the txn never gets suspended.
728+
// Always replace the status.
715729
match std::mem::replace(&mut *status, ExecutionStatus::ExecutionHalted) {
716730
ExecutionStatus::Suspended(_, condvar)
717731
| ExecutionStatus::Ready(_, ExecutionTaskType::Wakeup(condvar))
718732
| ExecutionStatus::Executing(_, ExecutionTaskType::Wakeup(condvar)) => {
719-
let (lock, cvar) = &*(condvar.clone());
733+
// Condvar lock must always be taken inner-most.
734+
let (lock, cvar) = &*condvar;
720735

721736
let mut lock = lock.lock();
722737
*lock = DependencyStatus::ExecutionHalted;

aptos-move/block-executor/src/unit_tests/mod.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ fn scheduler_tasks() {
538538
for i in 0..5 {
539539
// Validation index is at 0, so transactions will be validated and no
540540
// need to return a validation task to the caller.
541-
assert_matches!(s.finish_execution(i, 0, false), Ok(SchedulerTask::NoTask));
541+
assert_matches!(s.finish_execution(i, 0, false), Ok(SchedulerTask::Retry));
542542
}
543543

544544
for i in 0..5 {
@@ -584,7 +584,7 @@ fn scheduler_tasks() {
584584
))
585585
);
586586

587-
assert_matches!(s.finish_execution(4, 1, true), Ok(SchedulerTask::NoTask));
587+
assert_matches!(s.finish_execution(4, 1, true), Ok(SchedulerTask::Retry));
588588
assert_matches!(
589589
s.finish_execution(1, 1, false),
590590
Ok(SchedulerTask::ValidationTask(1, 1, 1))
@@ -628,7 +628,7 @@ fn scheduler_first_wave() {
628628

629629
// validation index will not increase for the first execution wave
630630
// until the status becomes executed.
631-
assert_matches!(s.finish_execution(0, 0, false), Ok(SchedulerTask::NoTask));
631+
assert_matches!(s.finish_execution(0, 0, false), Ok(SchedulerTask::Retry));
632632

633633
// Now we can validate version (0, 0).
634634
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(0, 0, 0));
@@ -638,17 +638,17 @@ fn scheduler_first_wave() {
638638
);
639639
// Since (1, 0) is not EXECUTED, no validation tasks, and execution index
640640
// is already at the limit, so no tasks immediately available.
641-
assert_matches!(s.next_task(), SchedulerTask::NoTask);
641+
assert_matches!(s.next_task(), SchedulerTask::Retry);
642642

643-
assert_matches!(s.finish_execution(2, 0, false), Ok(SchedulerTask::NoTask));
643+
assert_matches!(s.finish_execution(2, 0, false), Ok(SchedulerTask::Retry));
644644
// There should be no tasks, but finishing (1,0) should enable validating
645645
// (1, 0) then (2,0).
646-
assert_matches!(s.next_task(), SchedulerTask::NoTask);
646+
assert_matches!(s.next_task(), SchedulerTask::Retry);
647647

648-
assert_matches!(s.finish_execution(1, 0, false), Ok(SchedulerTask::NoTask));
648+
assert_matches!(s.finish_execution(1, 0, false), Ok(SchedulerTask::Retry));
649649
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(1, 0, 0));
650650
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(2, 0, 0));
651-
assert_matches!(s.next_task(), SchedulerTask::NoTask);
651+
assert_matches!(s.next_task(), SchedulerTask::Retry);
652652
}
653653

654654
#[test]
@@ -665,7 +665,7 @@ fn scheduler_dependency() {
665665

666666
// validation index will not increase for the first execution wave
667667
// until the status becomes executed.
668-
assert_matches!(s.finish_execution(0, 0, false), Ok(SchedulerTask::NoTask));
668+
assert_matches!(s.finish_execution(0, 0, false), Ok(SchedulerTask::Retry));
669669
// Now we can validate version (0, 0).
670670
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(0, 0, 0));
671671
// Current status of 0 is executed - hence, no dependency added.
@@ -676,7 +676,7 @@ fn scheduler_dependency() {
676676
Ok(DependencyResult::Dependency(_))
677677
);
678678

679-
assert_matches!(s.finish_execution(2, 0, false), Ok(SchedulerTask::NoTask));
679+
assert_matches!(s.finish_execution(2, 0, false), Ok(SchedulerTask::Retry));
680680

681681
// resumed task doesn't bump incarnation
682682
assert_matches!(
@@ -696,7 +696,7 @@ fn incarnation_one_scheduler(num_txns: TxnIndex) -> Scheduler {
696696
s.next_task(),
697697
SchedulerTask::ExecutionTask(j, 0, ExecutionTaskType::Execution) if j == i
698698
);
699-
assert_matches!(s.finish_execution(i, 0, false), Ok(SchedulerTask::NoTask));
699+
assert_matches!(s.finish_execution(i, 0, false), Ok(SchedulerTask::Retry));
700700
assert_matches!(
701701
s.next_task(),
702702
SchedulerTask::ValidationTask(j, 0, 0) if i == j
@@ -732,7 +732,7 @@ fn scheduler_incarnation() {
732732
Ok(SchedulerTask::ValidationTask(2, 1, 1))
733733
);
734734
// Here since validation index is lower, wave doesn't increase and no task returned.
735-
assert_matches!(s.finish_execution(4, 1, true), Ok(SchedulerTask::NoTask));
735+
assert_matches!(s.finish_execution(4, 1, true), Ok(SchedulerTask::Retry));
736736

737737
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(4, 1, 1));
738738

@@ -755,7 +755,7 @@ fn scheduler_incarnation() {
755755
);
756756
// execution index = 1
757757

758-
assert_matches!(s.finish_abort(4, 1), Ok(SchedulerTask::NoTask));
758+
assert_matches!(s.finish_abort(4, 1), Ok(SchedulerTask::Retry));
759759

760760
assert_matches!(
761761
s.next_task(),
@@ -785,7 +785,7 @@ fn scheduler_incarnation() {
785785
);
786786

787787
// validation index is 4, so finish execution doesn't return validation task, next task does.
788-
assert_matches!(s.finish_execution(4, 2, false), Ok(SchedulerTask::NoTask));
788+
assert_matches!(s.finish_execution(4, 2, false), Ok(SchedulerTask::Retry));
789789
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(4, 2, 2));
790790
}
791791

@@ -802,11 +802,11 @@ fn scheduler_basic() {
802802
}
803803

804804
// Finish executions & dispatch validation tasks.
805-
assert_matches!(s.finish_execution(0, 0, true), Ok(SchedulerTask::NoTask));
806-
assert_matches!(s.finish_execution(1, 0, true), Ok(SchedulerTask::NoTask));
805+
assert_matches!(s.finish_execution(0, 0, true), Ok(SchedulerTask::Retry));
806+
assert_matches!(s.finish_execution(1, 0, true), Ok(SchedulerTask::Retry));
807807
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(0, 0, 0));
808808
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(1, 0, 0));
809-
assert_matches!(s.finish_execution(2, 0, true), Ok(SchedulerTask::NoTask));
809+
assert_matches!(s.finish_execution(2, 0, true), Ok(SchedulerTask::Retry));
810810
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(2, 0, 0));
811811

812812
for i in 0..3 {
@@ -834,11 +834,11 @@ fn scheduler_drain_idx() {
834834
}
835835

836836
// Finish executions & dispatch validation tasks.
837-
assert_matches!(s.finish_execution(0, 0, true), Ok(SchedulerTask::NoTask));
838-
assert_matches!(s.finish_execution(1, 0, true), Ok(SchedulerTask::NoTask));
837+
assert_matches!(s.finish_execution(0, 0, true), Ok(SchedulerTask::Retry));
838+
assert_matches!(s.finish_execution(1, 0, true), Ok(SchedulerTask::Retry));
839839
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(0, 0, 0));
840840
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(1, 0, 0));
841-
assert_matches!(s.finish_execution(2, 0, true), Ok(SchedulerTask::NoTask));
841+
assert_matches!(s.finish_execution(2, 0, true), Ok(SchedulerTask::Retry));
842842
assert_matches!(s.next_task(), SchedulerTask::ValidationTask(2, 0, 0));
843843

844844
for i in 0..3 {
@@ -914,7 +914,7 @@ fn rolling_commit_wave() {
914914
assert_eq!(s.commit_state(), (2, 0));
915915

916916
// No validation task because index is already 2.
917-
assert_matches!(s.finish_execution(2, 1, false), Ok(SchedulerTask::NoTask,));
917+
assert_matches!(s.finish_execution(2, 1, false), Ok(SchedulerTask::Retry,));
918918
// finish validating with a lower wave.
919919
s.finish_validation(2, 0);
920920
assert!(s.try_commit().is_none());
@@ -967,7 +967,7 @@ fn no_conflict_task_count() {
967967
// false means a validation task.
968968
tasks.insert(rng.gen::<u32>(), (false, txn_idx));
969969
},
970-
SchedulerTask::NoTask => break,
970+
SchedulerTask::Retry => break,
971971
// Unreachable because we never call try_commit.
972972
SchedulerTask::Done => unreachable!(),
973973
}
@@ -993,7 +993,7 @@ fn no_conflict_task_count() {
993993
assert_eq!(wave, 0);
994994
tasks.insert(rng.gen::<u32>(), (false, txn_idx));
995995
} else {
996-
assert_matches!(task_res, Ok(SchedulerTask::NoTask));
996+
assert_matches!(task_res, Ok(SchedulerTask::Retry));
997997
}
998998
},
999999
(_, (false, txn_idx)) => {

aptos-move/block-executor/src/view.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ fn wait_for_dependency(
426426
// eventually finish and lead to unblocking txn_idx, contradiction.
427427
let (lock, cvar) = &*dep_condition;
428428
let mut dep_resolved = lock.lock();
429-
while let DependencyStatus::Unresolved = *dep_resolved {
429+
while matches!(*dep_resolved, DependencyStatus::Unresolved) {
430430
dep_resolved = cvar.wait(dep_resolved).unwrap();
431431
}
432432
// dep resolved status is either resolved or execution halted.

0 commit comments

Comments
 (0)