Skip to content

Commit 59eee75

Browse files
Speed up packet dedup and fix benches (backport solana-labs#22592) (solana-labs#22611)
* Speed up packet dedup and fix benches (solana-labs#22592) * Speed up packet dedup and fix benches * fix tests * allow int arithmetic in bench (cherry picked from commit a2d251c) # Conflicts: # perf/src/sigverify.rs * resolve conflicts Co-authored-by: Justin Starry <[email protected]>
1 parent c2dd9a0 commit 59eee75

File tree

3 files changed

+101
-32
lines changed

3 files changed

+101
-32
lines changed

bloom/src/bloom.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,16 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
174174
(index as usize, mask)
175175
}
176176

177-
pub fn add(&self, key: &T) {
177+
/// Adds an item to the bloom filter and returns true if the item
178+
/// was not in the filter before.
179+
pub fn add(&self, key: &T) -> bool {
180+
let mut added = false;
178181
for k in &self.keys {
179182
let (index, mask) = self.pos(key, *k);
180-
self.bits[index].fetch_or(mask, Ordering::Relaxed);
183+
let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed);
184+
added = added || prev_val & mask == 0u64;
181185
}
186+
added
182187
}
183188

184189
pub fn contains(&self, key: &T) -> bool {
@@ -189,6 +194,12 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
189194
})
190195
}
191196

197+
pub fn clear_for_tests(&mut self) {
198+
self.bits.iter().for_each(|bit| {
199+
bit.store(0u64, Ordering::Relaxed);
200+
});
201+
}
202+
192203
// Only for tests and simulations.
193204
pub fn mock_clone(&self) -> Self {
194205
Self {
@@ -320,7 +331,9 @@ mod test {
320331
assert_eq!(bloom.keys.len(), 3);
321332
assert_eq!(bloom.num_bits, 6168);
322333
assert_eq!(bloom.bits.len(), 97);
323-
hash_values.par_iter().for_each(|v| bloom.add(v));
334+
hash_values.par_iter().for_each(|v| {
335+
bloom.add(v);
336+
});
324337
let bloom: Bloom<Hash> = bloom.into();
325338
assert_eq!(bloom.keys.len(), 3);
326339
assert_eq!(bloom.bits.len(), 6168);
@@ -362,7 +375,9 @@ mod test {
362375
}
363376
// Round trip, re-inserting the same hash values.
364377
let bloom: AtomicBloom<_> = bloom.into();
365-
hash_values.par_iter().for_each(|v| bloom.add(v));
378+
hash_values.par_iter().for_each(|v| {
379+
bloom.add(v);
380+
});
366381
for hash_value in &hash_values {
367382
assert!(bloom.contains(hash_value));
368383
}
@@ -380,7 +395,9 @@ mod test {
380395
let bloom: AtomicBloom<_> = bloom.into();
381396
assert_eq!(bloom.num_bits, 9731);
382397
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
383-
more_hash_values.par_iter().for_each(|v| bloom.add(v));
398+
more_hash_values.par_iter().for_each(|v| {
399+
bloom.add(v);
400+
});
384401
for hash_value in &hash_values {
385402
assert!(bloom.contains(hash_value));
386403
}

perf/benches/dedup.rs

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,98 @@
1+
#![allow(clippy::integer_arithmetic)]
12
#![feature(test)]
23

34
extern crate test;
45

56
use {
7+
rand::prelude::*,
68
solana_bloom::bloom::{AtomicBloom, Bloom},
7-
solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
9+
solana_perf::{
10+
packet::{to_packet_batches, PacketBatch},
11+
sigverify,
12+
},
813
test::Bencher,
914
};
1015

16+
fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
17+
// subtract 8 bytes because the length will get serialized as well
18+
(0..size.checked_sub(8).unwrap())
19+
.map(|_| rng.gen())
20+
.collect()
21+
}
22+
23+
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
24+
// verify packets
25+
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
26+
bencher.iter(|| {
27+
// bench
28+
sigverify::dedup_packets(&bloom, &mut batches);
29+
30+
// reset
31+
bloom.clear_for_tests();
32+
batches.iter_mut().for_each(|batch| {
33+
batch.packets.iter_mut().for_each(|p| {
34+
p.meta.discard = false;
35+
})
36+
});
37+
})
38+
}
39+
1140
#[bench]
12-
fn bench_dedup_same(bencher: &mut Bencher) {
13-
let tx = test_tx();
41+
#[ignore]
42+
fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
43+
let mut rng = rand::thread_rng();
44+
let small_packet = test_packet_with_size(128, &mut rng);
1445

15-
// generate packet vector
16-
let mut batches = to_packet_batches(
17-
&std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
46+
let batches = to_packet_batches(
47+
&std::iter::repeat(small_packet)
48+
.take(4096)
49+
.collect::<Vec<_>>(),
1850
128,
1951
);
20-
let packet_count = sigverify::count_packets_in_batches(&batches);
21-
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
2252

23-
println!("packet_count {} {}", packet_count, batches.len());
53+
do_bench_dedup_packets(bencher, batches);
54+
}
2455

25-
// verify packets
26-
bencher.iter(|| {
27-
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
28-
})
56+
#[bench]
57+
#[ignore]
58+
fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
59+
let mut rng = rand::thread_rng();
60+
let big_packet = test_packet_with_size(1024, &mut rng);
61+
62+
let batches = to_packet_batches(
63+
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
64+
128,
65+
);
66+
67+
do_bench_dedup_packets(bencher, batches);
2968
}
3069

3170
#[bench]
32-
fn bench_dedup_diff(bencher: &mut Bencher) {
33-
// generate packet vector
34-
let mut batches =
35-
to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
36-
let packet_count = sigverify::count_packets_in_batches(&batches);
37-
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
71+
#[ignore]
72+
fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
73+
let mut rng = rand::thread_rng();
3874

39-
println!("packet_count {} {}", packet_count, batches.len());
75+
let batches = to_packet_batches(
76+
&(0..4096)
77+
.map(|_| test_packet_with_size(128, &mut rng))
78+
.collect::<Vec<_>>(),
79+
128,
80+
);
4081

41-
// verify packets
42-
bencher.iter(|| {
43-
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
44-
})
82+
do_bench_dedup_packets(bencher, batches);
83+
}
84+
85+
#[bench]
86+
#[ignore]
87+
fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
88+
let mut rng = rand::thread_rng();
89+
90+
let batches = to_packet_batches(
91+
&(0..4096)
92+
.map(|_| test_packet_with_size(1024, &mut rng))
93+
.collect::<Vec<_>>(),
94+
128,
95+
);
96+
97+
do_bench_dedup_packets(bencher, batches);
4598
}

perf/src/sigverify.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8
396396
return;
397397
}
398398

399-
if bloom.contains(&&packet.data[..]) {
399+
// If this packet was not newly added, it's a dup and should be discarded
400+
if !bloom.add(&&packet.data[0..packet.meta.size]) {
400401
packet.meta.discard = true;
401402
count.fetch_add(1, Ordering::Relaxed);
402-
return;
403403
}
404-
bloom.add(&&packet.data[..]);
405404
}
406405

407406
pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {

0 commit comments

Comments
 (0)