Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ci/do-audit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@ cargo_audit_ignores=(
# Solution: Upgrade to >=0.17.12
# Dependency tree:
--ignore RUSTSEC-2025-0009

# Crate: time
# Version: 0.3.9
# Title: Denial of Service via Stack Exhaustion
# Date: 2026-02-05
# ID: RUSTSEC-2026-0009
# URL: https://rustsec.org/advisories/RUSTSEC-2026-0009
# Solution: Upgrade to >=0.3.47
# MSRV 1.88.0 required; rustc 1.86.0-nightly in use
--ignore RUSTSEC-2026-0009


# Crate: bytes
# Version: 1.10.1
# Title: Integer overflow in `BytesMut::reserve`
# Date: 2026-02-03
# ID: RUSTSEC-2026-0007
# URL: https://github.com/advisories/GHSA-434x-w66g-qw3r
# Solution: Upgrade to >=1.11.1
# TODO: remove ignore once workspace bytes is >=1.11.1 everywhere
--ignore RUSTSEC-2026-0007

)
scripts/cargo-for-all-lock-files.sh audit "${cargo_audit_ignores[@]}" | $dep_tree_filter
# we want the `cargo audit` exit code, not `$dep_tree_filter`'s
Expand Down
309 changes: 265 additions & 44 deletions core/src/banking_stage/transaction_scheduler/bam_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,25 +466,44 @@ impl<Tx: TransactionWithMeta> BamScheduler<Tx> {
}
}

/// Generates a `bundle_result::Result` based on the processed result of a single transaction.
fn generate_bundle_result(processed: &TransactionResult) -> atomic_txn_batch_result::Result {
match processed {
TransactionResult::Committed(result) => atomic_txn_batch_result::Result::Committed(
jito_protos::proto::bam_types::Committed {
transaction_results: vec![result.clone()],
},
),
TransactionResult::NotCommitted(reason) => {
let (index, not_commit_reason) = match reason {
NotCommittedReason::PohTimeout => (0, NotCommittedReason::PohTimeout),
NotCommittedReason::Error(err) => (0, NotCommittedReason::Error(err.clone())),
};
atomic_txn_batch_result::Result::NotCommitted(
jito_protos::proto::bam_types::NotCommitted {
reason: Some(Self::convert_reason_to_proto(index, not_commit_reason)),
},
)
/// Generates a `bundle_result::Result` for non-revert-on-error batches.
/// If all transactions committed, return all results. Otherwise return the
/// first non-committed reason with its index.
fn generate_non_revert_bundle_result(
processed_results: &[TransactionResult],
) -> atomic_txn_batch_result::Result {
if processed_results
.iter()
.all(|result| matches!(result, TransactionResult::Committed(_)))
{
let transaction_results = processed_results
.iter()
.filter_map(|result| {
if let TransactionResult::Committed(processed) = result {
Some(processed.clone())
} else {
None
}
})
.collect();
atomic_txn_batch_result::Result::Committed(jito_protos::proto::bam_types::Committed {
transaction_results,
})
} else {
let mut index = 0;
let mut not_commit_reason = NotCommittedReason::PohTimeout;
for (i, result) in processed_results.iter().enumerate() {
if let TransactionResult::NotCommitted(reason) = result {
index = i;
not_commit_reason = reason.clone();
break;
}
}
atomic_txn_batch_result::Result::NotCommitted(
jito_protos::proto::bam_types::NotCommitted {
reason: Some(Self::convert_reason_to_proto(index, not_commit_reason)),
},
)
}
}

Expand Down Expand Up @@ -752,33 +771,25 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for BamScheduler<Tx> {
.as_micros() as u64,
);

// Should never not be 1; but just in case
let len = if revert_on_error {
1
} else {
inflight_batch_info.batch_priority_ids.len()
};
for (i, priority_id) in inflight_batch_info
.batch_priority_ids
.iter()
.enumerate()
.take(len)
{
for (i, priority_id) in inflight_batch_info.batch_priority_ids.iter().enumerate() {
// If we got extra info, we can send back the result
if let Some(extra_info) = result.extra_info.as_ref() {
let bundle_result = if revert_on_error {
Self::generate_revert_on_error_bundle_result(&extra_info.processed_results)
} else {
let Some(txn_result) = extra_info.processed_results.get(i) else {
warn!(
"Processed results for batch {} are missing for index {}",
batch_id.0, i
);
continue;
};
Self::generate_bundle_result(txn_result)
};
self.send_back_result(priority_to_seq_id(priority_id.priority), bundle_result);
if revert_on_error {
let bundle_result = Self::generate_revert_on_error_bundle_result(
&extra_info.processed_results,
);
self.send_back_result(
priority_to_seq_id(priority_id.priority),
bundle_result,
);
} else if i == 0 {
let bundle_result =
Self::generate_non_revert_bundle_result(&extra_info.processed_results);
self.send_back_result(
priority_to_seq_id(priority_id.priority),
bundle_result,
);
}
}

// If in the same slot, unblock the transaction
Expand Down Expand Up @@ -823,7 +834,7 @@ mod tests {
itertools::Itertools,
jito_protos::proto::bam_types::{
atomic_txn_batch_result::Result::{Committed, NotCommitted},
TransactionCommittedResult,
TransactionCommittedResult, TransactionErrorReason,
},
smallvec::SmallVec,
solana_compute_budget_interface::ComputeBudgetInstruction,
Expand All @@ -837,6 +848,7 @@ mod tests {
solana_signer::Signer,
solana_system_interface::instruction::transfer_many,
solana_transaction::{sanitized::SanitizedTransaction, Transaction},
solana_transaction_error::TransactionError,
std::{
borrow::Borrow,
sync::{Arc, RwLock},
Expand Down Expand Up @@ -1290,6 +1302,215 @@ mod tests {
}
}

#[test]
fn test_non_revert_batch_reports_all_results() {
let (bank_forks, _) = test_bank_forks();
let TestScheduler {
mut scheduler,
consume_work_receivers,
finished_consume_work_sender,
response_receiver,
} = create_test_scheduler(1, &bank_forks);
scheduler.extra_checks_enabled = false;

let keypair_a = Keypair::new();
let recipient_a = Pubkey::new_unique();
let recipient_b = Pubkey::new_unique();

let tx_1 = prioritized_tranfers(&keypair_a, vec![recipient_a], 1000, 1);
let tx_2 = prioritized_tranfers(&keypair_a, vec![recipient_b], 2000, 1);

let mut container = TransactionStateContainer::with_capacity(100);
const TEST_TRANSACTION_COST: u64 = 5000;
let _batch_id = container
.insert_new_batch(
vec![(tx_1, MaxAge::MAX), (tx_2, MaxAge::MAX)],
seq_id_to_priority(5),
TEST_TRANSACTION_COST,
false,
u64::MAX,
)
.expect("batch should fit");

let decision = BufferedPacketsDecision::Consume(bank_forks.read().unwrap().working_bank());
scheduler
.receive_completed(&mut container, &decision)
.unwrap();

let result = scheduler
.schedule(
&mut container,
0,
|_, _| {},
|_| PreLockFilterAction::AttemptToSchedule,
)
.unwrap();
assert_eq!(result.num_scheduled, 2);

let work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(work.ids.len(), 2);

let finished_work = FinishedConsumeWork {
work,
retryable_indexes: vec![],
extra_info: Some(
crate::banking_stage::scheduler_messages::FinishedConsumeWorkExtraInfo {
processed_results: vec![
TransactionResult::Committed(TransactionCommittedResult {
cus_consumed: 100,
feepayer_balance_lamports: 1000,
loaded_accounts_data_size: 10,
execution_success: true,
}),
TransactionResult::Committed(TransactionCommittedResult {
cus_consumed: 200,
feepayer_balance_lamports: 900,
loaded_accounts_data_size: 20,
execution_success: true,
}),
],
},
),
};
let _ = finished_consume_work_sender.send(finished_work);

let (num_transactions, _) = scheduler
.receive_completed(&mut container, &decision)
.unwrap();
assert_eq!(num_transactions, 2);

let response = response_receiver.try_recv().unwrap();
let BamOutboundMessage::AtomicTxnBatchResult(bundle_result) = response else {
panic!("Expected AtomicTxnBatchResult message");
};
assert_eq!(bundle_result.seq_id, 5);
assert!(
bundle_result.result.is_some(),
"Bundle result should be present"
);
let result = bundle_result.result.unwrap();
match result {
Committed(committed) => {
assert_eq!(committed.transaction_results.len(), 2);
assert_eq!(committed.transaction_results[0].cus_consumed, 100);
assert_eq!(committed.transaction_results[1].cus_consumed, 200);
}
NotCommitted(not_committed) => {
panic!("Expected Committed result, got NotCommitted: {not_committed:?}");
}
}
}

#[test]
fn test_non_revert_batch_reports_first_error_index() {
let (bank_forks, _) = test_bank_forks();
let TestScheduler {
mut scheduler,
consume_work_receivers,
finished_consume_work_sender,
response_receiver,
} = create_test_scheduler(1, &bank_forks);
scheduler.extra_checks_enabled = false;

let keypair_a = Keypair::new();
let recipient_a = Pubkey::new_unique();
let recipient_b = Pubkey::new_unique();

let tx_1 = prioritized_tranfers(&keypair_a, vec![recipient_a], 1000, 1);
let tx_2 = prioritized_tranfers(&keypair_a, vec![recipient_b], 2000, 1);

let mut container = TransactionStateContainer::with_capacity(100);
const TEST_TRANSACTION_COST: u64 = 5000;
let _batch_id = container
.insert_new_batch(
vec![(tx_1, MaxAge::MAX), (tx_2, MaxAge::MAX)],
seq_id_to_priority(6),
TEST_TRANSACTION_COST,
false,
u64::MAX,
)
.expect("batch should fit");

let decision = BufferedPacketsDecision::Consume(bank_forks.read().unwrap().working_bank());
scheduler
.receive_completed(&mut container, &decision)
.unwrap();

let result = scheduler
.schedule(
&mut container,
0,
|_, _| {},
|_| PreLockFilterAction::AttemptToSchedule,
)
.unwrap();
assert_eq!(result.num_scheduled, 2);

let work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(work.ids.len(), 2);

let finished_work = FinishedConsumeWork {
work,
retryable_indexes: vec![],
extra_info: Some(
crate::banking_stage::scheduler_messages::FinishedConsumeWorkExtraInfo {
processed_results: vec![
TransactionResult::Committed(TransactionCommittedResult {
cus_consumed: 100,
feepayer_balance_lamports: 1000,
loaded_accounts_data_size: 10,
execution_success: true,
}),
TransactionResult::NotCommitted(NotCommittedReason::Error(
TransactionError::AccountInUse,
)),
],
},
),
};
let _ = finished_consume_work_sender.send(finished_work);

let (num_transactions, _) = scheduler
.receive_completed(&mut container, &decision)
.unwrap();
assert_eq!(num_transactions, 2);

let response = response_receiver.try_recv().unwrap();
let BamOutboundMessage::AtomicTxnBatchResult(bundle_result) = response else {
panic!("Expected AtomicTxnBatchResult message");
};
assert_eq!(bundle_result.seq_id, 6);
assert!(
bundle_result.result.is_some(),
"Bundle result should be present"
);
let result = bundle_result.result.unwrap();
match result {
Committed(_) => {
panic!("Expected NotCommitted result, got Committed");
}
NotCommitted(not_committed) => {
let reason = not_committed
.reason
.expect("NotCommitted reason should be present");
match reason {
jito_protos::proto::bam_types::not_committed::Reason::TransactionError(
transaction_error,
) => {
assert_eq!(transaction_error.index, 1);
assert_eq!(
transaction_error.reason,
TransactionErrorReason::AccountInUse as i32
);
}
other => {
panic!("Expected TransactionError reason, got: {other:?}");
}
}
}
}
}

#[test]
#[should_panic(expected = "node must exist")]
fn test_prio_graph_clears_on_slot_boundary() {
Expand Down