Skip to content

Commit 3c1c291

Browse files
authored
[Sharding] Change the partitioner to return analyzed txn instead of txn (aptos-labs#9187)
1 parent dfc012a commit 3c1c291

File tree

31 files changed

+181
-237
lines changed

31 files changed

+181
-237
lines changed

aptos-move/aptos-transaction-benchmarks/src/transactions.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use aptos_language_e2e_tests::{
1414
gas_costs::TXN_RESERVED,
1515
};
1616
use aptos_types::{
17-
block_executor::partitioner::BlockExecutorTransactions,
1817
block_metadata::BlockMetadata,
1918
on_chain_config::{OnChainConfig, ValidatorSet},
2019
transaction::{analyzed_transaction::AnalyzedTransaction, Transaction},
@@ -349,7 +348,7 @@ where
349348
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
350349
>(
351350
Arc::clone(&RAYON_EXEC_POOL),
352-
BlockExecutorTransactions::Unsharded(transactions),
351+
transactions,
353352
self.state_view.as_ref(),
354353
1,
355354
maybe_block_gas_limit,
@@ -394,7 +393,7 @@ where
394393
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
395394
>(
396395
Arc::clone(&RAYON_EXEC_POOL),
397-
BlockExecutorTransactions::Unsharded(transactions),
396+
transactions,
398397
self.state_view.as_ref(),
399398
concurrency_level_per_shard,
400399
maybe_block_gas_limit,

aptos-move/aptos-vm/src/aptos_vm.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ use aptos_state_view::StateView;
3232
use aptos_types::{
3333
account_config,
3434
account_config::new_block_event_key,
35-
block_executor::partitioner::{BlockExecutorTransactions, SubBlocksForShard},
35+
block_executor::partitioner::SubBlocksForShard,
3636
block_metadata::BlockMetadata,
3737
fee_statement::FeeStatement,
3838
on_chain_config::{new_epoch_event_key, FeatureFlag, TimedFeatureOverride},
3939
transaction::{
40-
EntryFunction, ExecutionError, ExecutionStatus, ModuleBundle, Multisig,
41-
MultisigTransactionPayload, SignatureCheckedTransaction, SignedTransaction, Transaction,
42-
TransactionOutput, TransactionPayload, TransactionStatus, VMValidatorResult,
43-
WriteSetPayload,
40+
analyzed_transaction::AnalyzedTransaction, EntryFunction, ExecutionError, ExecutionStatus,
41+
ModuleBundle, Multisig, MultisigTransactionPayload, SignatureCheckedTransaction,
42+
SignedTransaction, Transaction, TransactionOutput, TransactionPayload, TransactionStatus,
43+
VMValidatorResult, WriteSetPayload,
4444
},
4545
vm_status::{AbortLocation, StatusCode, VMStatus},
4646
write_set::WriteSet,
@@ -1514,7 +1514,7 @@ impl VMExecutor for AptosVM {
15141514
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
15151515
>(
15161516
Arc::clone(&RAYON_EXEC_POOL),
1517-
BlockExecutorTransactions::Unsharded(transactions),
1517+
transactions,
15181518
state_view,
15191519
Self::get_concurrency_level(),
15201520
maybe_block_gas_limit,
@@ -1529,7 +1529,7 @@ impl VMExecutor for AptosVM {
15291529

15301530
fn execute_block_sharded<S: StateView + Sync + Send + 'static>(
15311531
sharded_block_executor: &ShardedBlockExecutor<S>,
1532-
transactions: Vec<SubBlocksForShard<Transaction>>,
1532+
transactions: Vec<SubBlocksForShard<AnalyzedTransaction>>,
15331533
state_view: Arc<S>,
15341534
maybe_block_gas_limit: Option<u64>,
15351535
) -> Result<Vec<TransactionOutput>, VMStatus> {

aptos-move/aptos-vm/src/block_executor/mod.rs

Lines changed: 10 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ use aptos_block_executor::{
2626
use aptos_infallible::Mutex;
2727
use aptos_state_view::{StateView, StateViewId};
2828
use aptos_types::{
29-
block_executor::partitioner::{
30-
BlockExecutorTransactions, SubBlock, SubBlocksForShard, TransactionWithDependencies,
31-
},
3229
executable::ExecutableTestType,
3330
fee_statement::FeeStatement,
3431
state_store::state_key::StateKey,
@@ -145,59 +142,20 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
145142
pub struct BlockAptosVM();
146143

147144
impl BlockAptosVM {
148-
fn verify_transactions(
149-
transactions: BlockExecutorTransactions<Transaction>,
150-
) -> BlockExecutorTransactions<PreprocessedTransaction> {
151-
match transactions {
152-
BlockExecutorTransactions::Unsharded(transactions) => {
153-
let signature_verified_txns = transactions
154-
.into_par_iter()
155-
.with_min_len(25)
156-
.map(preprocess_transaction::<AptosVM>)
157-
.collect();
158-
BlockExecutorTransactions::Unsharded(signature_verified_txns)
159-
},
160-
BlockExecutorTransactions::Sharded(sub_blocks) => {
161-
let shard_id = sub_blocks.shard_id;
162-
let signature_verified_sub_blocks = sub_blocks
163-
.into_sub_blocks()
164-
.into_par_iter()
165-
.map(|sub_block| {
166-
let start_index = sub_block.start_index;
167-
let verified_txns = sub_block
168-
.into_transactions_with_deps()
169-
.into_par_iter()
170-
.with_min_len(25)
171-
.map(|txn_with_deps| {
172-
let TransactionWithDependencies {
173-
txn,
174-
cross_shard_dependencies,
175-
} = txn_with_deps;
176-
let preprocessed_txn = preprocess_transaction::<AptosVM>(txn);
177-
TransactionWithDependencies::new(
178-
preprocessed_txn,
179-
cross_shard_dependencies,
180-
)
181-
})
182-
.collect();
183-
SubBlock::new(start_index, verified_txns)
184-
})
185-
.collect();
186-
187-
BlockExecutorTransactions::Sharded(SubBlocksForShard::new(
188-
shard_id,
189-
signature_verified_sub_blocks,
190-
))
191-
},
192-
}
145+
fn verify_transactions(transactions: Vec<Transaction>) -> Vec<PreprocessedTransaction> {
146+
transactions
147+
.into_par_iter()
148+
.with_min_len(25)
149+
.map(preprocess_transaction::<AptosVM>)
150+
.collect()
193151
}
194152

195153
pub fn execute_block<
196154
S: StateView + Sync,
197155
L: TransactionCommitHook<Output = AptosTransactionOutput>,
198156
>(
199157
executor_thread_pool: Arc<ThreadPool>,
200-
transactions: BlockExecutorTransactions<Transaction>,
158+
transactions: Vec<Transaction>,
201159
state_view: &S,
202160
concurrency_level: usize,
203161
maybe_block_gas_limit: Option<u64>,
@@ -214,21 +172,13 @@ impl BlockAptosVM {
214172
executor_thread_pool.install(|| Self::verify_transactions(transactions));
215173
drop(signature_verification_timer);
216174

217-
let is_sharded_execution = matches!(
218-
signature_verified_block,
219-
BlockExecutorTransactions::Sharded(_)
220-
);
221-
let num_txns = signature_verified_block.num_txns();
222-
if !is_sharded_execution && state_view.id() != StateViewId::Miscellaneous {
175+
let num_txns = signature_verified_block.len();
176+
if state_view.id() != StateViewId::Miscellaneous {
223177
// Speculation is disabled in Miscellaneous context, which is used by testing and
224178
// can even lead to concurrent execute_block invocations, leading to errors on flush.
225179
init_speculative_logs(num_txns);
226180
}
227181

228-
if is_sharded_execution {
229-
aptos_vm_logging::disable_speculative_logging();
230-
}
231-
232182
BLOCK_EXECUTOR_CONCURRENCY.set(concurrency_level as i64);
233183
let executor = BlockExecutor::<
234184
PreprocessedTransaction,
@@ -254,7 +204,7 @@ impl BlockAptosVM {
254204
// Flush the speculative logs of the committed transactions.
255205
let pos = output_vec.partition_point(|o| !o.status().is_retry());
256206

257-
if !is_sharded_execution && state_view.id() != StateViewId::Miscellaneous {
207+
if state_view.id() != StateViewId::Miscellaneous {
258208
// Speculation is disabled in Miscellaneous context, which is used by testing and
259209
// can even lead to concurrent execute_block invocations, leading to errors on flush.
260210
flush_speculative_logs(pos);

aptos-move/aptos-vm/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ use crate::sharded_block_executor::ShardedBlockExecutor;
127127
use aptos_state_view::StateView;
128128
use aptos_types::{
129129
block_executor::partitioner::SubBlocksForShard,
130-
transaction::{SignedTransaction, Transaction, TransactionOutput, VMValidatorResult},
130+
transaction::{
131+
analyzed_transaction::AnalyzedTransaction, SignedTransaction, Transaction,
132+
TransactionOutput, VMValidatorResult,
133+
},
131134
vm_status::VMStatus,
132135
};
133136
use std::{marker::Sync, sync::Arc};
@@ -160,7 +163,7 @@ pub trait VMExecutor: Send + Sync {
160163
/// Executes a block of transactions using a sharded block executor and returns the results.
161164
fn execute_block_sharded<S: StateView + Sync + Send + 'static>(
162165
sharded_block_executor: &ShardedBlockExecutor<S>,
163-
block: Vec<SubBlocksForShard<Transaction>>,
166+
block: Vec<SubBlocksForShard<AnalyzedTransaction>>,
164167
state_view: Arc<S>,
165168
maybe_block_gas_limit: Option<u64>,
166169
) -> Result<Vec<TransactionOutput>, VMStatus>;

aptos-move/aptos-vm/src/sharded_block_executor/block_executor_client.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ use crate::block_executor::{AptosTransactionOutput, BlockAptosVM};
44
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
55
use aptos_state_view::StateView;
66
use aptos_types::{
7-
block_executor::partitioner::{BlockExecutorTransactions, SubBlocksForShard},
8-
transaction::{Transaction, TransactionOutput},
7+
block_executor::partitioner::SubBlocksForShard,
8+
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
99
};
1010
use move_core_types::vm_status::VMStatus;
1111
use std::sync::Arc;
1212

1313
pub trait BlockExecutorClient {
1414
fn execute_block<S: StateView + Sync + Send>(
1515
&self,
16-
transactions: SubBlocksForShard<Transaction>,
16+
transactions: SubBlocksForShard<AnalyzedTransaction>,
1717
state_view: &S,
1818
concurrency_level: usize,
1919
maybe_block_gas_limit: Option<u64>,
@@ -23,17 +23,22 @@ pub trait BlockExecutorClient {
2323
impl BlockExecutorClient for VMExecutorClient {
2424
fn execute_block<S: StateView + Sync + Send>(
2525
&self,
26-
sub_blocks: SubBlocksForShard<Transaction>,
26+
sub_blocks: SubBlocksForShard<AnalyzedTransaction>,
2727
state_view: &S,
2828
concurrency_level: usize,
2929
maybe_block_gas_limit: Option<u64>,
3030
) -> Result<Vec<Vec<TransactionOutput>>, VMStatus> {
31+
let txns = sub_blocks
32+
.into_txns()
33+
.into_iter()
34+
.map(|txn| txn.into_txn())
35+
.collect();
3136
Ok(vec![BlockAptosVM::execute_block::<
3237
_,
3338
NoOpTransactionCommitHook<AptosTransactionOutput, VMStatus>,
3439
>(
3540
self.executor_thread_pool.clone(),
36-
BlockExecutorTransactions::Sharded(sub_blocks),
41+
txns,
3742
state_view,
3843
concurrency_level,
3944
maybe_block_gas_limit,

aptos-move/aptos-vm/src/sharded_block_executor/cross_shard_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use aptos_state_view::StateView;
1414
use aptos_types::{
1515
block_executor::partitioner::{RoundId, ShardId, SubBlock},
1616
state_store::state_key::StateKey,
17-
transaction::Transaction,
17+
transaction::analyzed_transaction::AnalyzedTransaction,
1818
write_set::TransactionWrite,
1919
};
2020
use std::{
@@ -66,7 +66,7 @@ impl CrossShardCommitSender {
6666
pub fn new(
6767
shard_id: ShardId,
6868
message_txs: Arc<Vec<Vec<Mutex<Sender<CrossShardMsg>>>>>,
69-
sub_block: &SubBlock<Transaction>,
69+
sub_block: &SubBlock<AnalyzedTransaction>,
7070
) -> Self {
7171
let mut dependent_edges = HashMap::new();
7272
let mut num_dependent_edges = 0;

aptos-move/aptos-vm/src/sharded_block_executor/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use aptos_logger::{error, info, trace};
77
use aptos_state_view::StateView;
88
use aptos_types::{
99
block_executor::partitioner::SubBlocksForShard,
10-
transaction::{Transaction, TransactionOutput},
10+
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
1111
};
1212
use block_executor_client::BlockExecutorClient;
1313
use move_core_types::vm_status::VMStatus;
@@ -40,7 +40,12 @@ pub struct ShardedBlockExecutor<S: StateView + Sync + Send + 'static> {
4040
}
4141

4242
pub enum ExecutorShardCommand<S> {
43-
ExecuteSubBlocks(Arc<S>, SubBlocksForShard<Transaction>, usize, Option<u64>),
43+
ExecuteSubBlocks(
44+
Arc<S>,
45+
SubBlocksForShard<AnalyzedTransaction>,
46+
usize,
47+
Option<u64>,
48+
),
4449
Stop,
4550
}
4651

@@ -81,7 +86,7 @@ impl<S: StateView + Sync + Send + 'static> ShardedBlockExecutor<S> {
8186
pub fn execute_block(
8287
&self,
8388
state_view: Arc<S>,
84-
block: Vec<SubBlocksForShard<Transaction>>,
89+
block: Vec<SubBlocksForShard<AnalyzedTransaction>>,
8590
concurrency_level_per_shard: usize,
8691
maybe_block_gas_limit: Option<u64>,
8792
) -> Result<Vec<TransactionOutput>, VMStatus> {

aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_client.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ use aptos_block_partitioner::sharded_block_partitioner::MAX_ALLOWED_PARTITIONING
1414
use aptos_logger::{info, trace};
1515
use aptos_state_view::StateView;
1616
use aptos_types::{
17-
block_executor::partitioner::{
18-
BlockExecutorTransactions, ShardId, SubBlock, SubBlocksForShard,
19-
},
20-
transaction::{Transaction, TransactionOutput},
17+
block_executor::partitioner::{ShardId, SubBlock, SubBlocksForShard},
18+
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
2119
};
2220
use futures::{channel::oneshot, executor::block_on};
2321
use move_core_types::vm_status::VMStatus;
@@ -102,7 +100,7 @@ impl ShardedExecutorClient {
102100
fn create_cross_shard_state_view<'a, S: StateView + Sync + Send>(
103101
&self,
104102
base_view: &'a S,
105-
sub_block: &SubBlock<Transaction>,
103+
sub_block: &SubBlock<AnalyzedTransaction>,
106104
) -> CrossShardStateView<'a, S> {
107105
let mut cross_shard_state_key = HashSet::new();
108106
for txn in &sub_block.transactions {
@@ -117,7 +115,7 @@ impl ShardedExecutorClient {
117115

118116
fn execute_sub_block<S: StateView + Sync + Send>(
119117
&self,
120-
sub_block: SubBlock<Transaction>,
118+
sub_block: SubBlock<AnalyzedTransaction>,
121119
round: usize,
122120
state_view: &S,
123121
concurrency_level: usize,
@@ -158,7 +156,11 @@ impl ShardedExecutorClient {
158156
s.spawn(move |_| {
159157
let ret = BlockAptosVM::execute_block(
160158
self.executor_thread_pool.clone(),
161-
BlockExecutorTransactions::Unsharded(sub_block.into_txns()),
159+
sub_block
160+
.into_txns()
161+
.into_iter()
162+
.map(|txn| txn.into_txn())
163+
.collect(),
162164
cross_shard_state_view.as_ref(),
163165
concurrency_level,
164166
maybe_block_gas_limit,
@@ -188,7 +190,7 @@ impl ShardedExecutorClient {
188190
impl BlockExecutorClient for ShardedExecutorClient {
189191
fn execute_block<S: StateView + Sync + Send>(
190192
&self,
191-
transactions: SubBlocksForShard<Transaction>,
193+
transactions: SubBlocksForShard<AnalyzedTransaction>,
192194
state_view: &S,
193195
concurrency_level: usize,
194196
maybe_block_gas_limit: Option<u64>,

aptos-move/aptos-vm/src/sharded_block_executor/tests.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,10 @@ fn sharded_block_executor_with_conflict(concurrency: usize) {
171171
let partitioner = ShardedBlockPartitioner::new(num_shards);
172172
let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9);
173173

174-
let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone());
174+
let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone())
175+
.into_iter()
176+
.map(|t| t.into_txn())
177+
.collect::<Vec<_>>();
175178

176179
let executor_clients =
177180
ShardedExecutorClient::create_sharded_executor_clients(num_shards, Some(concurrency));
@@ -231,7 +234,10 @@ fn sharded_block_executor_with_random_transfers(concurrency: usize) {
231234
let partitioner = ShardedBlockPartitioner::new(num_shards);
232235
let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9);
233236

234-
let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone());
237+
let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone())
238+
.into_iter()
239+
.map(|t| t.into_txn())
240+
.collect::<Vec<_>>();
235241

236242
let executor_clients =
237243
ShardedExecutorClient::create_sharded_executor_clients(num_shards, Some(concurrency));

0 commit comments

Comments
 (0)