Skip to content

Commit 9b90455

Browse files
authored
moves gossip CrdsValue hash calculations to CrdsValue::deserialize (#3593)
CrdsValue hashes are calculated in Crds::insert while holding an exclusive lock on gossip CRDS table: https://github.com/anza-xyz/agave/blob/d2e7488c1/gossip/src/crds.rs#L252 Additionally in order to calculate the hash, the CrdsValue needs to be bincode serialized, which can exacerbate lock contention on gossip CRDS table: https://github.com/anza-xyz/agave/blob/d2e7488c1/gossip/src/crds.rs#L159 This commit instead moves hash calculation to CrdsValue::deserialize, which is done in a thread-pool and without acquiring any locks: https://github.com/anza-xyz/agave/blob/d2e7488c1/gossip/src/cluster_info.rs#L2317-L2326
1 parent 11d16f2 commit 9b90455

File tree

4 files changed

+73
-44
lines changed

4 files changed

+73
-44
lines changed

gossip/src/crds.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,13 @@ use {
3535
crds_value::{CrdsValue, CrdsValueLabel},
3636
},
3737
assert_matches::debug_assert_matches,
38-
bincode::serialize,
3938
indexmap::{
4039
map::{rayon::ParValues, Entry, IndexMap},
4140
set::IndexSet,
4241
},
4342
lru::LruCache,
4443
rayon::{prelude::*, ThreadPool},
45-
solana_sdk::{
46-
clock::Slot,
47-
hash::{hash, Hash},
48-
pubkey::Pubkey,
49-
signature::Signature,
50-
},
44+
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature},
5145
std::{
5246
cmp::Ordering,
5347
collections::{hash_map, BTreeMap, HashMap, VecDeque},
@@ -131,8 +125,6 @@ pub struct VersionedCrdsValue {
131125
pub value: CrdsValue,
132126
/// local time when updated
133127
pub(crate) local_timestamp: u64,
134-
/// value hash
135-
pub(crate) value_hash: Hash,
136128
/// None -> value upserted by GossipRoute::{LocalMessage,PullRequest}
137129
/// Some(0) -> value upserted by GossipRoute::PullResponse
138130
/// Some(k) if k > 0 -> value upserted by GossipRoute::PushMessage w/ k - 1 push duplicates
@@ -156,7 +148,6 @@ impl Cursor {
156148

157149
impl VersionedCrdsValue {
158150
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self {
159-
let value_hash = hash(&serialize(&value).unwrap());
160151
let num_push_recv = match route {
161152
GossipRoute::LocalMessage => None,
162153
GossipRoute::PullRequest => None,
@@ -168,7 +159,6 @@ impl VersionedCrdsValue {
168159
ordinal: cursor.ordinal(),
169160
value,
170161
local_timestamp,
171-
value_hash,
172162
num_push_recv,
173163
}
174164
}
@@ -223,10 +213,7 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
223213
// Ties should be broken in a deterministic way across the cluster.
224214
// For backward compatibility this is done by comparing hash of
225215
// serialized values.
226-
Ordering::Equal => {
227-
let value_hash = hash(&serialize(&value).unwrap());
228-
other.value_hash < value_hash
229-
}
216+
Ordering::Equal => other.value.hash() < value.hash(),
230217
}
231218
}
232219

@@ -310,7 +297,7 @@ impl Crds {
310297
// does not need to be updated.
311298
debug_assert_eq!(entry.get().value.pubkey(), pubkey);
312299
self.cursor.consume(value.ordinal);
313-
self.purged.push_back((entry.get().value_hash, now));
300+
self.purged.push_back((*entry.get().value.hash(), now));
314301
entry.insert(value);
315302
Ok(())
316303
}
@@ -323,8 +310,8 @@ impl Crds {
323310
);
324311
// Identify if the message is outdated (as opposed to
325312
// duplicate) by comparing value hashes.
326-
if entry.get().value_hash != value.value_hash {
327-
self.purged.push_back((value.value_hash, now));
313+
if entry.get().value.hash() != value.value.hash() {
314+
self.purged.push_back((*value.value.hash(), now));
328315
Err(CrdsError::InsertFailed)
329316
} else if matches!(route, GossipRoute::PushMessage(_)) {
330317
let entry = entry.get_mut();
@@ -560,7 +547,7 @@ impl Crds {
560547
let Some((index, _ /*label*/, value)) = self.table.swap_remove_full(key) else {
561548
return;
562549
};
563-
self.purged.push_back((value.value_hash, now));
550+
self.purged.push_back((*value.value.hash(), now));
564551
self.shards.remove(index, &value);
565552
match value.value.data() {
566553
CrdsData::ContactInfo(_) => {
@@ -837,7 +824,7 @@ mod tests {
837824
&Pubkey::default(),
838825
0,
839826
)));
840-
let value_hash = hash(&serialize(&original).unwrap());
827+
let value_hash = *original.hash();
841828
assert_matches!(crds.insert(original, 0, GossipRoute::LocalMessage), Ok(()));
842829
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
843830
&Pubkey::default(),
@@ -857,7 +844,7 @@ mod tests {
857844
&Pubkey::default(),
858845
0,
859846
)));
860-
let val1_hash = hash(&serialize(&val1).unwrap());
847+
let val1_hash = *val1.hash();
861848
assert_eq!(
862849
crds.insert(val1.clone(), 0, GossipRoute::LocalMessage),
863850
Ok(())
@@ -912,7 +899,7 @@ mod tests {
912899
let other = NodeInstance::new(&mut rng, pubkey, now - 1);
913900
let other = other.with_wallclock(now + 1);
914901
let other = make_crds_value(other);
915-
let value_hash = hash(&serialize(&other).unwrap());
902+
let value_hash = *other.hash();
916903
assert_eq!(
917904
crds.insert(other, now, GossipRoute::LocalMessage),
918905
Err(CrdsError::InsertFailed)
@@ -924,7 +911,7 @@ mod tests {
924911
for _ in 0..100 {
925912
let other = NodeInstance::new(&mut rng, pubkey, now);
926913
let other = make_crds_value(other);
927-
let value_hash = hash(&serialize(&other).unwrap());
914+
let value_hash = *other.hash();
928915
match crds.insert(other, now, GossipRoute::LocalMessage) {
929916
Ok(()) => num_overrides += 1,
930917
Err(CrdsError::InsertFailed) => {
@@ -1433,7 +1420,7 @@ mod tests {
14331420
let purged: HashSet<_> = crds.purged.iter().map(|(hash, _)| hash).copied().collect();
14341421
values
14351422
.into_iter()
1436-
.filter(|v| purged.contains(&v.value_hash))
1423+
.filter(|v| purged.contains(v.value.hash()))
14371424
.collect()
14381425
};
14391426
assert_eq!(purged.len() + crds.table.len(), num_values);
@@ -1515,10 +1502,10 @@ mod tests {
15151502

15161503
assert_eq!(v1.value.label(), v2.value.label());
15171504
assert_eq!(v1.value.wallclock(), v2.value.wallclock());
1518-
assert_ne!(v1.value_hash, v2.value_hash);
1505+
assert_ne!(v1.value.hash(), v2.value.hash());
15191506
assert!(v1 != v2);
15201507
assert!(!(v1 == v2));
1521-
if v1.value_hash > v2.value_hash {
1508+
if v1.value.hash() > v2.value.hash() {
15221509
assert!(overrides(&v1.value, &v2));
15231510
assert!(!overrides(&v2.value, &v1));
15241511
} else {

gossip/src/crds_gossip_pull.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use {
3030
rayon::{prelude::*, ThreadPool},
3131
solana_bloom::bloom::{Bloom, ConcurrentBloom},
3232
solana_sdk::{
33-
hash::{hash, Hash},
33+
hash::Hash,
3434
native_token::LAMPORTS_PER_SOL,
3535
packet::PACKET_DATA_SIZE,
3636
pubkey::Pubkey,
@@ -349,7 +349,7 @@ impl CrdsGossipPull {
349349
let failed_inserts = responses
350350
.into_iter()
351351
.filter_map(upsert)
352-
.map(|resp| hash(&bincode::serialize(&resp).unwrap()))
352+
.map(|resp| *resp.hash())
353353
.collect();
354354
(active_values, expired_values, failed_inserts)
355355
}
@@ -427,7 +427,7 @@ impl CrdsGossipPull {
427427
thread_pool.install(|| {
428428
crds.par_values()
429429
.with_min_len(PAR_MIN_LENGTH)
430-
.map(|v| v.value_hash)
430+
.map(|v| *v.value.hash())
431431
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
432432
.chain(
433433
failed_inserts
@@ -472,13 +472,13 @@ impl CrdsGossipPull {
472472
}
473473
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
474474
let pred = |entry: &&VersionedCrdsValue| {
475-
debug_assert!(filter.test_mask(&entry.value_hash));
475+
debug_assert!(filter.test_mask(entry.value.hash()));
476476
// Skip values that are too new.
477477
if entry.value.wallclock() > caller_wallclock {
478478
total_skipped.fetch_add(1, Ordering::Relaxed);
479479
false
480480
} else {
481-
!filter.filter_contains(&entry.value_hash)
481+
!filter.filter_contains(entry.value.hash())
482482
}
483483
};
484484
let out: Vec<_> = crds
@@ -811,7 +811,11 @@ pub(crate) mod tests {
811811
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4));
812812
let crds = crds.read().unwrap();
813813
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
814-
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
814+
let hash_values: Vec<_> = crds
815+
.values()
816+
.map(|v| *v.value.hash())
817+
.chain(purged)
818+
.collect();
815819
// CrdsValue::new_rand may generate exact same value twice in which
816820
// case its hash-value is not added to purged values.
817821
assert!(
@@ -1246,10 +1250,7 @@ pub(crate) mod tests {
12461250
node_crds
12471251
.insert(old.clone(), 0, GossipRoute::LocalMessage)
12481252
.unwrap();
1249-
let value_hash = {
1250-
let entry: &VersionedCrdsValue = node_crds.get(&old.label()).unwrap();
1251-
entry.value_hash
1252-
};
1253+
let value_hash = *node_crds.get::<&CrdsValue>(&old.label()).unwrap().hash();
12531254
//verify self is valid
12541255
assert_eq!(
12551256
node_crds.get::<&CrdsValue>(&node_label).unwrap().label(),

gossip/src/crds_shards.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ impl CrdsShards {
2525
}
2626

2727
pub fn insert(&mut self, index: usize, value: &VersionedCrdsValue) -> bool {
28-
let hash = CrdsFilter::hash_as_u64(&value.value_hash);
28+
let hash = CrdsFilter::hash_as_u64(value.value.hash());
2929
self.shard_mut(hash).insert(index, hash).is_none()
3030
}
3131

3232
pub fn remove(&mut self, index: usize, value: &VersionedCrdsValue) -> bool {
33-
let hash = CrdsFilter::hash_as_u64(&value.value_hash);
33+
let hash = CrdsFilter::hash_as_u64(value.value.hash());
3434
self.shard_mut(hash).swap_remove(&index).is_some()
3535
}
3636

@@ -94,7 +94,7 @@ impl CrdsShards {
9494
assert_eq!(indices, (0..crds.len()).collect::<Vec<_>>());
9595
for (shard_index, shard) in self.shards.iter().enumerate() {
9696
for (&index, &hash) in shard {
97-
assert_eq!(hash, CrdsFilter::hash_as_u64(&crds[index].value_hash));
97+
assert_eq!(hash, CrdsFilter::hash_as_u64(crds[index].value.hash()));
9898
assert_eq!(
9999
shard_index as u64,
100100
hash.checked_shr(64 - self.shard_bits).unwrap_or(0)
@@ -155,7 +155,7 @@ mod test {
155155
// Returns true if the first mask_bits most significant bits of hash is the
156156
// same as the given bit mask.
157157
fn check_mask(value: &VersionedCrdsValue, mask: u64, mask_bits: u32) -> bool {
158-
let hash = CrdsFilter::hash_as_u64(&value.value_hash);
158+
let hash = CrdsFilter::hash_as_u64(value.value.hash());
159159
let ones = (!0u64).checked_shr(mask_bits).unwrap_or(0u64);
160160
(hash | ones) == (mask | ones)
161161
}
@@ -217,7 +217,7 @@ mod test {
217217
}
218218
// Existing hash values.
219219
for (index, value) in values.iter().enumerate() {
220-
let mask = CrdsFilter::hash_as_u64(&value.value_hash);
220+
let mask = CrdsFilter::hash_as_u64(value.value.hash());
221221
let hits: Vec<_> = shards.find(mask, 64).collect();
222222
assert_eq!(hits, vec![index]);
223223
}

gossip/src/crds_value.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use {
77
},
88
bincode::serialize,
99
rand::Rng,
10+
serde::de::{Deserialize, Deserializer},
1011
solana_sanitize::{Sanitize, SanitizeError},
1112
solana_sdk::{
13+
hash::Hash,
1214
pubkey::Pubkey,
1315
signature::{Keypair, Signable, Signature, Signer},
1416
},
@@ -17,10 +19,12 @@ use {
1719

1820
/// CrdsValue that is replicated across the cluster
1921
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
20-
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
22+
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
2123
pub struct CrdsValue {
2224
signature: Signature,
2325
data: CrdsData,
26+
#[serde(skip_serializing)]
27+
hash: Hash, // Sha256 hash of [signature, data].
2428
}
2529

2630
impl Sanitize for CrdsValue {
@@ -98,14 +102,23 @@ impl CrdsValue {
98102
pub fn new(data: CrdsData, keypair: &Keypair) -> Self {
99103
let bincode_serialized_data = bincode::serialize(&data).unwrap();
100104
let signature = keypair.sign_message(&bincode_serialized_data);
101-
Self { signature, data }
105+
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
106+
Self {
107+
signature,
108+
data,
109+
hash,
110+
}
102111
}
103112

104113
#[cfg(test)]
105114
pub(crate) fn new_unsigned(data: CrdsData) -> Self {
115+
let bincode_serialized_data = bincode::serialize(&data).unwrap();
116+
let signature = Signature::default();
117+
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
106118
Self {
107-
signature: Signature::default(),
119+
signature,
108120
data,
121+
hash,
109122
}
110123
}
111124

@@ -134,6 +147,11 @@ impl CrdsValue {
134147
&self.data
135148
}
136149

150+
#[inline]
151+
pub(crate) fn hash(&self) -> &Hash {
152+
&self.hash
153+
}
154+
137155
/// Totally unsecure unverifiable wallclock of the node that generated this message
138156
/// Latest wallclock is always picked.
139157
/// This is used to time out push messages.
@@ -194,6 +212,29 @@ impl CrdsValue {
194212
}
195213
}
196214

215+
// Manual implementation of Deserialize for CrdsValue in order to populate
216+
// CrdsValue.hash which is skipped in serialization.
217+
impl<'de> Deserialize<'de> for CrdsValue {
218+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
219+
where
220+
D: Deserializer<'de>,
221+
{
222+
#[derive(Deserialize)]
223+
struct CrdsValue {
224+
signature: Signature,
225+
data: CrdsData,
226+
}
227+
let CrdsValue { signature, data } = CrdsValue::deserialize(deserializer)?;
228+
let bincode_serialized_data = bincode::serialize(&data).unwrap();
229+
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
230+
Ok(Self {
231+
signature,
232+
data,
233+
hash,
234+
})
235+
}
236+
}
237+
197238
#[cfg(test)]
198239
mod test {
199240
use {

0 commit comments

Comments
 (0)