Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ForceCommitment {
futures::channel::mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let executor =
MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender)
MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender)
.await
.context("Failed to create the executor")?;

Expand Down
10 changes: 3 additions & 7 deletions networks/movement/movement-full-node/src/node/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where

impl MovementPartialNode<Executor> {
pub async fn try_executor_from_config(
config: Config,
config: &Config,
mempool_tx_exec_result_sender: futures::channel::mpsc::Sender<Vec<TxExecutionResult>>,
) -> Result<Executor, anyhow::Error> {
let executor = Executor::try_from_config(
Expand Down Expand Up @@ -156,12 +156,8 @@ impl MovementPartialNode<Executor> {
};

debug!("Creating the executor");
let executor = Executor::try_from_config(
config.execution_config.maptos_config.clone(),
mempool_tx_exec_result_sender,
)
.await
.context("Failed to create the inner executor")?;
let executor =
Self::try_executor_from_config(&config, mempool_tx_exec_result_sender).await?;

let (settlement_manager, commitment_events) = if config.mcr.should_settle() {
debug!("Creating the settlement client");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Commitment {
futures::channel::mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let executor =
MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender)
MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender)
.await
.context("Failed to create the executor")?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl AcceptedCommitment {
futures::channel::mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let executor =
MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender)
MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender)
.await
.context("Failed to create the executor")?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Commitment {
futures::channel::mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let executor =
MovementPartialNode::try_executor_from_config(config, mempool_tx_exec_result_sender)
MovementPartialNode::try_executor_from_config(&config, mempool_tx_exec_result_sender)
.await
.context("Failed to create the executor")?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ pub struct TransactionPipe {
whitelisted_accounts: Option<HashSet<AccountAddress>>,
}

enum SequenceNumberValidity {
Valid(u64),
Invalid(SubmissionStatus),
}

impl TransactionPipe {
pub(crate) fn new(
mempool_commit_tx_receiver: futures_mpsc::Receiver<Vec<TxExecutionResult>>, // Sender, seq number)
Expand Down Expand Up @@ -379,21 +374,33 @@ mod tests {
use futures::channel::oneshot;
use maptos_execution_util::config::chain::Config;
use tempfile::TempDir;
use tokio::time::sleep;

struct TestHarness {
context: Context,
executor: Executor,
transaction_pipe: TransactionPipe,
tx_receiver: mpsc::Receiver<(u64, SignedTransaction)>,
tempdir: TempDir,
}

async fn setup() -> (Context, TransactionPipe, mpsc::Receiver<(u64, SignedTransaction)>, TempDir)
{
let (tx_sender, tx_receiver) = mpsc::channel(16);
let (mempool_tx_exec_result_sender, mempool_commit_tx_receiver) =
futures_mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let (executor, tempdir) =
Executor::try_test_default(GENESIS_KEYPAIR.0.clone(), mempool_tx_exec_result_sender)
.await
.unwrap();
let (context, background) =
executor.background(tx_sender, mempool_commit_tx_receiver).unwrap();
let transaction_pipe = background.into_transaction_pipe();
(context, transaction_pipe, tx_receiver, tempdir)
impl TestHarness {
async fn setup() -> Self {
let (tx_sender, tx_receiver) = mpsc::channel(16);
let (mempool_tx_exec_result_sender, mempool_commit_tx_receiver) =
futures_mpsc::channel::<Vec<TxExecutionResult>>(EXECUTOR_CHANNEL_SIZE);

let (executor, tempdir) = Executor::try_test_default(
GENESIS_KEYPAIR.0.clone(),
mempool_tx_exec_result_sender,
)
.await
.unwrap();
let (context, background) =
executor.background(tx_sender, mempool_commit_tx_receiver).unwrap();
let transaction_pipe = background.into_transaction_pipe();
Self { context, executor, transaction_pipe, tx_receiver, tempdir }
}
}

fn create_signed_transaction(sequence_number: u64, chain_config: &Config) -> SignedTransaction {
Expand All @@ -411,7 +418,13 @@ mod tests {
async fn test_pipe_mempool() -> Result<(), anyhow::Error> {
// set up
let maptos_config = Config::default();
let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await;
let TestHarness {
context,
mut transaction_pipe,
mut tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;
let user_transaction = create_signed_transaction(1, &maptos_config);

// send transaction to mempool
Expand All @@ -428,6 +441,12 @@ mod tests {
let (status, _vm_status_code) = callback.await??;
assert_eq!(status.code, MempoolStatusCode::Accepted);

// Wait for mempool interval to elapse
sleep(MEMPOOL_INTERVAL + Duration::from_millis(100)).await;

// tick the transaction pipe
transaction_pipe.tick().await?;

// receive the transaction
let received_transaction = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction.1, user_transaction);
Expand All @@ -439,7 +458,13 @@ mod tests {
async fn test_pipe_mempool_cancellation() -> Result<(), anyhow::Error> {
// set up
let maptos_config = Config::default();
let (context, mut transaction_pipe, _tx_receiver, _tempdir) = setup().await;
let TestHarness {
context,
mut transaction_pipe,
tx_receiver: _tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;
let user_transaction = create_signed_transaction(1, &maptos_config);

// send transaction to mempool
Expand All @@ -462,7 +487,13 @@ mod tests {
async fn test_pipe_mempool_with_duplicate_transaction() -> Result<(), anyhow::Error> {
// set up
let maptos_config = Config::default();
let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await;
let TestHarness {
context,
mut transaction_pipe,
mut tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;
let mut mempool_client_sender = context.mempool_client_sender();
let user_transaction = create_signed_transaction(1, &maptos_config);

Expand Down Expand Up @@ -503,7 +534,13 @@ mod tests {

#[tokio::test]
async fn test_pipe_mempool_from_api() -> Result<(), anyhow::Error> {
let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await;
let TestHarness {
context,
mut transaction_pipe,
mut tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;
let service = Service::new(&context);

#[allow(unreachable_code)]
Expand All @@ -530,7 +567,13 @@ mod tests {

#[tokio::test]
async fn test_repeated_pipe_mempool_from_api() -> Result<(), anyhow::Error> {
let (context, mut transaction_pipe, mut tx_receiver, _tempdir) = setup().await;
let TestHarness {
context,
mut transaction_pipe,
mut tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;
let service = Service::new(&context);

#[allow(unreachable_code)]
Expand Down Expand Up @@ -570,7 +613,13 @@ mod tests {
async fn test_cannot_submit_too_new() -> Result<(), anyhow::Error> {
// set up
let maptos_config = Config::default();
let (_context, mut transaction_pipe, _tx_receiver, _tempdir) = setup().await;
let TestHarness {
context: _context,
mut transaction_pipe,
tx_receiver: _tx_receiver,
executor: _executor,
tempdir: _tempdir,
} = TestHarness::setup().await;

// submit a transaction with a valid sequence number
let user_transaction = create_signed_transaction(0, &maptos_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ impl Executor {
block: ExecutableBlock,
) -> Result<BlockCommitment, anyhow::Error> {
let (block_metadata, block, senders_and_sequence_numbers) = {
// get the block metadata transaction
let metadata_access_block = block.transactions.clone();
let metadata_access_transactions = metadata_access_block.into_txns();
// Break down the block into parts that will be used in execution
let block_id = block.block_id;
let transactions = block.transactions.into_txns();

let first_signed = metadata_access_transactions
let first_signed = transactions
.first()
.ok_or(anyhow::anyhow!("Block must contain a block metadata transaction"))?;
// cloning is cheaper than moving the array
let block_metadata = match first_signed.clone().into_inner() {
Transaction::BlockMetadata(metadata) => metadata.clone(),
Transaction::BlockMetadata(metadata) => metadata,
_ => {
anyhow::bail!("First transaction in block must be a block metadata transaction")
}
};

// senders and sequence numbers
let senders_and_sequence_numbers = metadata_access_transactions
let senders_and_sequence_numbers = transactions
.iter()
.map(|transaction| match transaction.clone().into_inner() {
Transaction::UserTransaction(transaction) => (
Expand All @@ -54,10 +54,8 @@ impl Executor {
.collect::<Vec<(HashValue, AccountAddress, u64)>>();

// reconstruct the block
let block = ExecutableBlock::new(
block.block_id.clone(),
ExecutableTransactions::Unsharded(metadata_access_transactions),
);
let block =
ExecutableBlock::new(block_id, ExecutableTransactions::Unsharded(transactions));

(block_metadata, block, senders_and_sequence_numbers)
};
Expand Down
Loading