Skip to content

Commit 57acab7

Browse files
authored
Shard BPS per chain group to prevent oversending requests (#4176)
## Motivation Right now we have a number of chain groups, all together trying to reach a certain BPS. They check if the BPS was reached by checking this one atomic variable. However, you could reach a situation where one of the chain groups achieves the BPS, but the other `num_chain_groups - 1` chain groups all have block proposals in flight. Meaning that in the worse case we could be overshooting our BPS by 2 * BPS - 1 🤦🏻‍♂️ ## Proposal Make each chain group have a BPS share that they need to achieve, which is controlled by their own atomic variable. The BPS control task will now sum all those atomic variables to make sure we reached the desired BPS/TPS. This prevents the race condition that causes us to overshoot the BPS. ## Test Plan Tested on a deployed network. Was seeing before that we were overshooting by 1.5x. Now we're back to normal. ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent af5a870 commit 57acab7

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

linera-client/src/benchmark.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ impl<Env: Environment> Benchmark<Env> {
203203
runtime_in_seconds: Option<u64>,
204204
delay_between_chain_groups_ms: Option<u64>,
205205
) -> Result<(), BenchmarkError> {
206-
let bps_count = Arc::new(AtomicUsize::new(0));
206+
let bps_counts = (0..num_chain_groups)
207+
.map(|_| Arc::new(AtomicUsize::new(0)))
208+
.collect::<Vec<_>>();
207209
let notifier = Arc::new(Notify::new());
208210
let barrier = Arc::new(Barrier::new(num_chain_groups + 1));
209211

@@ -213,7 +215,7 @@ impl<Env: Environment> Benchmark<Env> {
213215
let bps_control_task = Self::bps_control_task(
214216
&barrier,
215217
&shutdown_notifier,
216-
&bps_count,
218+
&bps_counts,
217219
&notifier,
218220
transactions_per_block,
219221
bps,
@@ -225,6 +227,8 @@ impl<Env: Environment> Benchmark<Env> {
225227
let (runtime_control_task, runtime_control_sender) =
226228
Self::runtime_control_task(&shutdown_notifier, runtime_in_seconds, num_chain_groups);
227229

230+
let bps_initial_share = bps / num_chain_groups;
231+
let mut bps_remainder = bps % num_chain_groups;
228232
let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
229233
for (chain_group_index, (chain_group, chain_clients)) in blocks_infos
230234
.into_iter()
@@ -235,14 +239,20 @@ impl<Env: Environment> Benchmark<Env> {
235239
let committee = committee.clone();
236240
let barrier_clone = barrier.clone();
237241
let block_time_quantiles_sender = block_time_quantiles_sender.clone();
238-
let bps_count_clone = bps_count.clone();
242+
let bps_count_clone = bps_counts[chain_group_index].clone();
239243
let notifier_clone = notifier.clone();
240244
let runtime_control_sender_clone = runtime_control_sender.clone();
245+
let bps_share = if bps_remainder > 0 {
246+
bps_remainder -= 1;
247+
bps_initial_share + 1
248+
} else {
249+
bps_initial_share
250+
};
241251
join_set.spawn(
242252
async move {
243253
Box::pin(Self::run_benchmark_internal(
244254
chain_group_index,
245-
bps,
255+
bps_share,
246256
chain_group,
247257
chain_clients,
248258
shutdown_notifier_clone,
@@ -290,13 +300,13 @@ impl<Env: Environment> Benchmark<Env> {
290300
fn bps_control_task(
291301
barrier: &Arc<Barrier>,
292302
shutdown_notifier: &CancellationToken,
293-
bps_count: &Arc<AtomicUsize>,
303+
bps_counts: &[Arc<AtomicUsize>],
294304
notifier: &Arc<Notify>,
295305
transactions_per_block: usize,
296306
bps: usize,
297307
) -> task::JoinHandle<()> {
298308
let shutdown_notifier = shutdown_notifier.clone();
299-
let bps_count = bps_count.clone();
309+
let bps_counts = bps_counts.to_vec();
300310
let notifier = notifier.clone();
301311
let barrier = barrier.clone();
302312
task::spawn(
@@ -309,7 +319,10 @@ impl<Env: Environment> Benchmark<Env> {
309319
break;
310320
}
311321
one_second_interval.tick().await;
312-
let current_bps_count = bps_count.swap(0, Ordering::Relaxed);
322+
let current_bps_count: usize = bps_counts
323+
.iter()
324+
.map(|count| count.swap(0, Ordering::Relaxed))
325+
.sum();
313326
notifier.notify_waiters();
314327
let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
315328
let formatted_current_tps = (current_bps_count * transactions_per_block)
@@ -320,7 +333,7 @@ impl<Env: Environment> Benchmark<Env> {
320333
if current_bps_count >= bps {
321334
info!(
322335
"Achieved {} BPS/{} TPS",
323-
formatted_bps_goal, formatted_tps_goal
336+
formatted_current_bps, formatted_current_tps
324337
);
325338
} else {
326339
warn!(

0 commit comments

Comments
 (0)