From 1af90015faa2fd48110230651b54aa964fbdf5cc Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 12 Mar 2025 11:12:07 -0700 Subject: [PATCH 1/4] refactor(movement-full-node): code reuse Make MovementPartialNode::try_from_config call MovementPartialNode::try_executor_from_config. The latter uses the config struct passed by reference. --- .../src/admin/mcr/force_commitment.rs | 2 +- .../movement/movement-full-node/src/node/partial.rs | 10 +++------- .../movement-full-node/src/state/node/commitment.rs | 2 +- .../src/state/settlement/accepted_commitment.rs | 2 +- .../src/state/settlement/commitment.rs | 2 +- 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/networks/movement/movement-full-node/src/admin/mcr/force_commitment.rs b/networks/movement/movement-full-node/src/admin/mcr/force_commitment.rs index ccda3d533..dd71d71c3 100644 --- a/networks/movement/movement-full-node/src/admin/mcr/force_commitment.rs +++ b/networks/movement/movement-full-node/src/admin/mcr/force_commitment.rs @@ -34,7 +34,7 @@ impl ForceCommitment { futures::channel::mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); let executor = - MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender) + MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender) .await .context("Failed to create the executor")?; diff --git a/networks/movement/movement-full-node/src/node/partial.rs b/networks/movement/movement-full-node/src/node/partial.rs index 4084903dc..59f398d6f 100644 --- a/networks/movement/movement-full-node/src/node/partial.rs +++ b/networks/movement/movement-full-node/src/node/partial.rs @@ -87,7 +87,7 @@ where impl MovementPartialNode { pub async fn try_executor_from_config( - config: Config, + config: &Config, mempool_tx_exec_result_sender: futures::channel::mpsc::Sender>, ) -> Result { let executor = Executor::try_from_config( @@ -156,12 +156,8 @@ impl MovementPartialNode { }; debug!("Creating the executor"); - let executor = Executor::try_from_config( - config.execution_config.maptos_config.clone(), - mempool_tx_exec_result_sender, - ) - .await - .context("Failed to create the inner executor")?; + let executor = + Self::try_executor_from_config(&config, mempool_tx_exec_result_sender).await?; let (settlement_manager, commitment_events) = if config.mcr.should_settle() { debug!("Creating the settlement client"); diff --git a/networks/movement/movement-full-node/src/state/node/commitment.rs b/networks/movement/movement-full-node/src/state/node/commitment.rs index 838d1aef4..7bca5ad81 100644 --- a/networks/movement/movement-full-node/src/state/node/commitment.rs +++ b/networks/movement/movement-full-node/src/state/node/commitment.rs @@ -29,7 +29,7 @@ impl Commitment { futures::channel::mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); let executor = - MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender) + MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender) .await .context("Failed to create the executor")?; diff --git a/networks/movement/movement-full-node/src/state/settlement/accepted_commitment.rs b/networks/movement/movement-full-node/src/state/settlement/accepted_commitment.rs index 4ff9e4408..65a13faa9 100644 --- a/networks/movement/movement-full-node/src/state/settlement/accepted_commitment.rs +++ b/networks/movement/movement-full-node/src/state/settlement/accepted_commitment.rs @@ -34,7 +34,7 @@ impl AcceptedCommitment { futures::channel::mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); let executor = - MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender) + MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender) .await .context("Failed to create the executor")?; diff --git a/networks/movement/movement-full-node/src/state/settlement/commitment.rs b/networks/movement/movement-full-node/src/state/settlement/commitment.rs index 7114751be..c61f19fa3 100644 --- a/networks/movement/movement-full-node/src/state/settlement/commitment.rs +++ b/networks/movement/movement-full-node/src/state/settlement/commitment.rs @@ -34,7 +34,7 @@ impl Commitment { futures::channel::mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); let executor = - MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender) + MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender) .await .context("Failed to create the executor")?; From a2d06e7ff6d2dcaa6bacd5d974cbe5bd07898cf0 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 12 Mar 2025 11:14:10 -0700 Subject: [PATCH 2/4] fix(opt-executor): reduce cloning in execute_block --- .../opt-executor/src/executor/execution.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/protocol-units/execution/maptos/opt-executor/src/executor/execution.rs b/protocol-units/execution/maptos/opt-executor/src/executor/execution.rs index 152df0ce7..12f0b0f82 100644 --- a/protocol-units/execution/maptos/opt-executor/src/executor/execution.rs +++ b/protocol-units/execution/maptos/opt-executor/src/executor/execution.rs @@ -25,23 +25,23 @@ impl Executor { block: ExecutableBlock, ) -> Result { let (block_metadata, block, senders_and_sequence_numbers) = { - // get the block metadata transaction - let metadata_access_block = block.transactions.clone(); - let metadata_access_transactions = metadata_access_block.into_txns(); + // Break down the block into parts that will be used in execution + let block_id = block.block_id; + let transactions = block.transactions.into_txns(); - let first_signed = metadata_access_transactions + let first_signed = transactions .first() .ok_or(anyhow::anyhow!("Block must contain a block metadata transaction"))?; // cloning is cheaper than moving the array let block_metadata = match first_signed.clone().into_inner() { - Transaction::BlockMetadata(metadata) => metadata.clone(), + Transaction::BlockMetadata(metadata) => metadata, _ => { anyhow::bail!("First transaction in block must be a block metadata transaction") } }; // senders and sequence numbers - let senders_and_sequence_numbers = metadata_access_transactions + let senders_and_sequence_numbers = transactions .iter() .map(|transaction| match transaction.clone().into_inner() { Transaction::UserTransaction(transaction) => ( @@ -54,10 +54,8 @@ impl Executor { .collect::>(); // reconstruct the block - let block = ExecutableBlock::new( - block.block_id.clone(), - ExecutableTransactions::Unsharded(metadata_access_transactions), - ); + let block = + ExecutableBlock::new(block_id, ExecutableTransactions::Unsharded(transactions)); (block_metadata, block, senders_and_sequence_numbers) }; From 8a0246493c294626019250a3b740dc1d2f728bfe Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Fri, 21 Mar 2025 15:15:55 +0200 Subject: [PATCH 3/4] chore(opt-executor): remove unused enum --- .../maptos/opt-executor/src/background/transaction_pipe.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs index 66438e2d5..9091d0385 100644 --- a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs +++ b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs @@ -54,11 +54,6 @@ pub struct TransactionPipe { whitelisted_accounts: Option>, } -enum SequenceNumberValidity { - Valid(u64), - Invalid(SubmissionStatus), -} - impl TransactionPipe { pub(crate) fn new( mempool_commit_tx_receiver: futures_mpsc::Receiver>, // Sender, seq number) From a77e85f159fae60e6bd3b752f7bd7902068c8675 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Mon, 24 Mar 2025 19:21:41 +0200 Subject: [PATCH 4/4] test(opt-executor): don't drop executor In transaction_pipe tests, keep the instance of executor which keeps the confirmed tx channel alive. --- .../src/background/transaction_pipe.rs | 94 +++++++++++++++---- 1 file changed, 74 insertions(+), 20 deletions(-) diff --git a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs index 9091d0385..ef2670a3b 100644 --- a/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs +++ b/protocol-units/execution/maptos/opt-executor/src/background/transaction_pipe.rs @@ -374,21 +374,33 @@ mod tests { use futures::channel::oneshot; use maptos_execution_util::config::chain::Config; use tempfile::TempDir; + use tokio::time::sleep; + + struct TestHarness { + context: Context, + executor: Executor, + transaction_pipe: TransactionPipe, + tx_receiver: mpsc::Receiver<(u64, SignedTransaction)>, + tempdir: TempDir, + } - async fn setup() -> (Context, TransactionPipe, mpsc::Receiver<(u64, SignedTransaction)>, TempDir) - { - let (tx_sender, tx_receiver) = mpsc::channel(16); - let (mempool_tx_exec_result_sender, mempool_commit_tx_receiver) = - futures_mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); - - let (executor, tempdir) = - Executor::try_test_default(GENESIS_KEYPAIR.0.clone(), mempool_tx_exec_result_sender) - .await - .unwrap(); - let (context, background) = - executor.background(tx_sender, mempool_commit_tx_receiver).unwrap(); - let transaction_pipe = background.into_transaction_pipe(); - (context, transaction_pipe, tx_receiver, tempdir) + impl TestHarness { + async fn setup() -> Self { + let (tx_sender, tx_receiver) = mpsc::channel(16); + let (mempool_tx_exec_result_sender, mempool_commit_tx_receiver) = + futures_mpsc::channel::>(EXECUTOR_CHANNEL_SIZE); + + let (executor, tempdir) = Executor::try_test_default( + GENESIS_KEYPAIR.0.clone(), + mempool_tx_exec_result_sender, + ) + .await + .unwrap(); + let (context, background) = + executor.background(tx_sender, mempool_commit_tx_receiver).unwrap(); + let transaction_pipe = background.into_transaction_pipe(); + Self { context, executor, transaction_pipe, tx_receiver, tempdir } + } } fn create_signed_transaction(sequence_number: u64, chain_config: &Config) -> SignedTransaction { @@ -406,7 +418,13 @@ mod tests { async fn test_pipe_mempool() -> Result<(), anyhow::Error> { // set up let maptos_config = Config::default(); - let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await; + let TestHarness { + context, + mut transaction_pipe, + mut tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; let user_transaction = create_signed_transaction(1, &maptos_config); // send transaction to mempool @@ -423,6 +441,12 @@ mod tests { let (status, _vm_status_code) = callback.await??; assert_eq!(status.code, MempoolStatusCode::Accepted); + // Wait for mempool interval to elapse + sleep(MEMPOOL_INTERVAL + Duration::from_millis(100)).await; + + // tick the transaction pipe + transaction_pipe.tick().await?; + // receive the transaction let received_transaction = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction.1, user_transaction); @@ -434,7 +458,13 @@ mod tests { async fn test_pipe_mempool_cancellation() -> Result<(), anyhow::Error> { // set up let maptos_config = Config::default(); - let (context, mut transaction_pipe, _tx_receiver, _tempdir) = setup().await; + let TestHarness { + context, + mut transaction_pipe, + tx_receiver: _tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; let user_transaction = create_signed_transaction(1, &maptos_config); // send transaction to mempool @@ -457,7 +487,13 @@ mod tests { async fn test_pipe_mempool_with_duplicate_transaction() -> Result<(), anyhow::Error> { // set up let maptos_config = Config::default(); - let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await; + let TestHarness { + context, + mut transaction_pipe, + mut tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; let mut mempool_client_sender = context.mempool_client_sender(); let user_transaction = create_signed_transaction(1, &maptos_config); @@ -498,7 +534,13 @@ mod tests { #[tokio::test] async fn test_pipe_mempool_from_api() -> Result<(), anyhow::Error> { - let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await; + let TestHarness { + context, + mut transaction_pipe, + mut tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; let service = Service::new(&context); #[allow(unreachable_code)] @@ -525,7 +567,13 @@ mod tests { #[tokio::test] async fn test_repeated_pipe_mempool_from_api() -> Result<(), anyhow::Error> { - let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await; + let TestHarness { + context, + mut transaction_pipe, + mut tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; let service = Service::new(&context); #[allow(unreachable_code)] @@ -565,7 +613,13 @@ mod tests { async fn test_cannot_submit_too_new() -> Result<(), anyhow::Error> { // set up let maptos_config = Config::default(); - let (_context, mut transaction_pipe, _tx_receiver, _tempdir) = setup().await; + let TestHarness { + context: _context, + mut transaction_pipe, + tx_receiver: _tx_receiver, + executor: _executor, + tempdir: _tempdir, + } = TestHarness::setup().await; // submit a transaction with a valid sequence number let user_transaction = create_signed_transaction(0, &maptos_config);