Skip to content
This repository was archived by the owner on Mar 11, 2025. It is now read-only.

Commit 902ca60

Browse files
committed
Groom send_and_confirm_messages_with_spinner
1 parent 0737892 commit 902ca60

File tree

4 files changed

+89
-98
lines changed

4 files changed

+89
-98
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

token/cli/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ version = "2.0.14"
1010

1111
[dependencies]
1212
clap = "2.33.3"
13-
chrono = "0.4"
14-
chrono-humanize = "0.2.1"
1513
console = "0.14.0"
1614
serde = "1.0.130"
1715
serde_derive = "1.0.103"

token/cli/src/bench.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use {
44
config::Config, owner_address_arg,
55
rpc_client_utils::send_and_confirm_messages_with_spinner, Error,
66
},
7-
chrono_humanize::{Accuracy, HumanTime, Tense},
87
clap::{value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand},
98
solana_clap_utils::{
109
input_parsers::pubkey_of_signer,
@@ -469,12 +468,11 @@ fn send_messages(
469468
}
470469
}
471470
let elapsed = Instant::now().duration_since(start);
471+
let tps = messages.len() as f64 / elapsed.as_secs_f64();
472472
println!(
473-
"Elapsed time: {}",
474-
HumanTime::from(chrono::Duration::from_std(elapsed).unwrap())
475-
.to_text_en(Accuracy::Precise, Tense::Present)
473+
"Average TPS: {:.2}\nElapsed time: {} seconds",
474+
tps,
475+
elapsed.as_secs_f64(),
476476
);
477-
let tps = messages.len() as f64 / elapsed.as_secs_f64();
478-
println!("Average TPS: {:.2}", tps);
479477
Ok(())
480478
}

token/cli/src/rpc_client_utils.rs

Lines changed: 85 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -24,60 +24,91 @@ pub fn send_and_confirm_messages_with_spinner<T: Signers>(
2424
messages: &[Message],
2525
signers: &T,
2626
) -> Result<Vec<Option<TransactionError>>, Box<dyn error::Error>> {
27-
let commitment = rpc_client.commitment();
2827
let progress_bar = new_spinner_progress_bar();
29-
let mut send_retries = 5;
28+
let mut expired_blockhash_retries = 5;
3029
let send_transaction_interval = Duration::from_millis(10); /* ~100 TPS */
3130

32-
let Fees {
33-
blockhash,
34-
fee_calculator: _,
35-
mut last_valid_block_height,
36-
} = rpc_client.get_fees()?;
37-
38-
let mut transactions = vec![];
39-
let mut transaction_errors = vec![None; messages.len()];
40-
for (i, message) in messages.iter().enumerate() {
41-
let mut transaction = Transaction::new_unsigned(message.clone());
42-
transaction.try_sign(signers, blockhash)?;
43-
transactions.push((i, transaction));
44-
}
45-
46-
progress_bar.set_message("Finding leader nodes...");
31+
progress_bar.set_message("Connecting...");
4732
let tpu_client = TpuClient::new(
4833
rpc_client.clone(),
4934
websocket_url,
5035
TpuClientConfig::default(),
5136
)?;
52-
loop {
53-
// Send all transactions
37+
38+
let mut transactions = messages
39+
.iter()
40+
.enumerate()
41+
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
42+
.collect::<Vec<_>>();
43+
let mut transaction_errors = vec![None; messages.len()];
44+
let set_message =
45+
|confirmed_transactions, block_height: u64, last_valid_block_height: u64, status: &str| {
46+
progress_bar.set_message(&format!(
47+
"{:>5.1}% | {:<40}[block height {}; block hash valid for {} blocks]",
48+
confirmed_transactions as f64 * 100. / messages.len() as f64,
49+
status,
50+
block_height,
51+
last_valid_block_height.saturating_sub(block_height),
52+
));
53+
};
54+
55+
let mut confirmed_transactions = 0;
56+
let mut block_height = rpc_client.get_block_height()?;
57+
while expired_blockhash_retries > 0 {
58+
let Fees {
59+
blockhash,
60+
fee_calculator: _,
61+
last_valid_block_height,
62+
} = rpc_client.get_fees()?;
63+
5464
let mut pending_transactions = HashMap::new();
55-
let num_transactions = transactions.len();
56-
for (i, transaction) in transactions {
57-
if !tpu_client.send_transaction(&transaction) {
58-
let _result = rpc_client
59-
.send_transaction_with_config(
60-
&transaction,
65+
for (i, mut transaction) in transactions {
66+
transaction.try_sign(signers, blockhash)?;
67+
pending_transactions.insert(transaction.signatures[0], (i, transaction));
68+
}
69+
70+
loop {
71+
// Send all pending transactions
72+
let num_transactions = pending_transactions.len();
73+
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
74+
if !tpu_client.send_transaction(transaction) {
75+
let _ = rpc_client.send_transaction_with_config(
76+
transaction,
6177
RpcSendTransactionConfig {
62-
preflight_commitment: Some(commitment.commitment),
78+
skip_preflight: true,
6379
..RpcSendTransactionConfig::default()
6480
},
65-
)
66-
.ok();
81+
);
82+
}
83+
set_message(
84+
confirmed_transactions,
85+
block_height,
86+
last_valid_block_height,
87+
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
88+
);
89+
sleep(send_transaction_interval);
6790
}
68-
pending_transactions.insert(transaction.signatures[0], (i, transaction));
69-
progress_bar.set_message(&format!(
70-
"[{}/{}] Transactions sent",
71-
pending_transactions.len(),
72-
num_transactions
73-
));
7491

75-
sleep(send_transaction_interval);
76-
}
92+
// Wait for the next block before checking fro transaction statuses
93+
set_message(
94+
confirmed_transactions,
95+
block_height,
96+
last_valid_block_height,
97+
&format!("Waiting for next block, {} pending...", num_transactions),
98+
);
7799

78-
// Collect statuses for all the transactions, drop those that are confirmed
79-
loop {
80-
let mut block_height = 0;
100+
block_height = rpc_client.get_block_height()?;
101+
let mut new_block_height = block_height;
102+
while block_height == new_block_height {
103+
sleep(Duration::from_millis(200));
104+
new_block_height = rpc_client.get_block_height()?;
105+
}
106+
107+
if new_block_height > last_valid_block_height {
108+
break;
109+
}
110+
111+
// Collect statuses for the transactions, drop those that are confirmed
81112
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
82113
for pending_signatures_chunk in
83114
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
@@ -92,74 +123,40 @@ pub fn send_and_confirm_messages_with_spinner<T: Signers>(
92123
if *confirmation_status != TransactionConfirmationStatus::Processed
93124
{
94125
if let Some((i, _)) = pending_transactions.remove(signature) {
126+
confirmed_transactions += 1;
95127
transaction_errors[i] = status.err;
96128
}
97129
}
98130
} else if status.confirmations.is_none()
99131
|| status.confirmations.unwrap() > 1
100132
{
101133
if let Some((i, _)) = pending_transactions.remove(signature) {
134+
confirmed_transactions += 1;
102135
transaction_errors[i] = status.err;
103136
}
104137
}
105138
}
106139
}
107140
}
108-
109-
block_height = rpc_client.get_block_height()?;
110-
progress_bar.set_message(&format!(
111-
"[{}/{}] Transactions confirmed. Retrying in {} blocks",
112-
num_transactions - pending_transactions.len(),
113-
num_transactions,
114-
last_valid_block_height.saturating_sub(block_height)
115-
));
141+
set_message(
142+
confirmed_transactions,
143+
block_height,
144+
last_valid_block_height,
145+
"Checking transaction status...",
146+
);
116147
}
117148

118149
if pending_transactions.is_empty() {
119150
return Ok(transaction_errors);
120151
}
121-
122-
if block_height > last_valid_block_height {
123-
break;
124-
}
125-
126-
for (_i, transaction) in pending_transactions.values() {
127-
if !tpu_client.send_transaction(transaction) {
128-
let _result = rpc_client
129-
.send_transaction_with_config(
130-
transaction,
131-
RpcSendTransactionConfig {
132-
preflight_commitment: Some(commitment.commitment),
133-
..RpcSendTransactionConfig::default()
134-
},
135-
)
136-
.ok();
137-
}
138-
}
139-
140-
if cfg!(not(test)) {
141-
// Retry twice a second
142-
sleep(Duration::from_millis(500));
143-
}
144152
}
145153

146-
if send_retries == 0 {
147-
return Err("Transactions failed".into());
148-
}
149-
send_retries -= 1;
150-
151-
// Re-sign any failed transactions with a new blockhash and retry
152-
let Fees {
153-
blockhash,
154-
fee_calculator: _,
155-
last_valid_block_height: new_last_valid_block_height,
156-
} = rpc_client.get_fees()?;
157-
158-
last_valid_block_height = new_last_valid_block_height;
159-
transactions = vec![];
160-
for (_, (i, mut transaction)) in pending_transactions.into_iter() {
161-
transaction.try_sign(signers, blockhash)?;
162-
transactions.push((i, transaction));
163-
}
154+
transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
155+
progress_bar.println(format!(
156+
"Blockhash expired. {} retries remaining",
157+
expired_blockhash_retries
158+
));
159+
expired_blockhash_retries -= 1;
164160
}
161+
Err("Max retries exceeded".into())
165162
}

0 commit comments

Comments
 (0)