Skip to content

Commit 2145d9e

Browse files
authored
refactor: add batch content validation (#1869)
1 parent 8c40c4f commit 2145d9e

File tree

13 files changed

+479
-423
lines changed

13 files changed

+479
-423
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/portalnet/src/overlay/protocol.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,7 @@ impl<
227227
content_key: TContentKey,
228228
content_value: RawContentValue,
229229
) -> PutContentInfo {
230-
let should_we_store = match self
231-
.store
232-
.lock()
233-
.is_key_within_radius_and_unavailable(&content_key)
234-
{
230+
let should_we_store = match self.store.lock().should_we_store(&content_key) {
235231
Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store),
236232
Err(err) => {
237233
warn!(
@@ -492,12 +488,14 @@ impl<
492488
Ok(Response::Content(found_content)) => {
493489
match found_content {
494490
Content::Content(content) => {
495-
match self.validate_content(&content_key, &content).await {
496-
Ok(_) => Ok((Content::Content(content), false)),
497-
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
498-
"Network: {:?}, Reason: {msg:?}",
491+
let validation_result = self.validate_content(&content_key, &content).await;
492+
if validation_result.is_valid() {
493+
Ok((Content::Content(content), false))
494+
} else {
495+
Err(OverlayRequestError::FailedValidation(format!(
496+
"Network: {:?}, Reason: {validation_result:?}",
499497
self.protocol
500-
))),
498+
)))
501499
}
502500
}
503501
Content::Enrs(_) => Ok((found_content, false)),
@@ -514,12 +512,14 @@ impl<
514512
};
515513
let content = RawContentValue::from(bytes);
516514

517-
match self.validate_content(&content_key, &content).await {
518-
Ok(_) => Ok((Content::Content(content), true)),
519-
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
520-
"Network: {:?}, Reason: {msg:?}",
515+
let validation_result = self.validate_content(&content_key, &content).await;
516+
if validation_result.is_valid() {
517+
Ok((Content::Content(content), true))
518+
} else {
519+
Err(OverlayRequestError::FailedValidation(format!(
520+
"Network: {:?}, Reason: {validation_result:?}",
521521
self.protocol
522-
))),
522+
)))
523523
}
524524
}
525525
}
@@ -533,13 +533,18 @@ impl<
533533
&self,
534534
content_key: &TContentKey,
535535
content: &[u8],
536-
) -> anyhow::Result<ValidationResult<TContentKey>> {
536+
) -> ValidationResult {
537537
let validation_result = self.validator.validate_content(content_key, content).await;
538-
self.metrics.report_validation(validation_result.is_ok());
539538

540-
validation_result.map_err(|err| {
541-
anyhow!("Content validation failed for content key {content_key:?} with error: {err:?}")
542-
})
539+
self.metrics.report_validation(validation_result.is_valid());
540+
if !validation_result.is_valid() {
541+
warn!(
542+
"Content validation failed for content key {}: {validation_result:?}",
543+
content_key.to_bytes(),
544+
)
545+
}
546+
547+
validation_result
543548
}
544549

545550
/// Initialize FindContent uTP stream with remote node

crates/portalnet/src/overlay/service/find_content.rs

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -525,37 +525,36 @@ impl<
525525
.await;
526526
utp_processing
527527
.metrics
528-
.report_validation(validation_result.is_ok());
529-
530-
let validation_result = match validation_result {
531-
Ok(validation_result) => validation_result,
532-
Err(err) => {
533-
warn!(
534-
error = ?err,
535-
content.id = %hex_encode_compact(content_id),
536-
content.key = %content_key,
537-
"Error validating content"
538-
);
539-
// Indicate to the query that the content is invalid
540-
let _ = valid_content_callback.send(None);
541-
if let Some(query_trace_events_tx) = query_trace_events_tx {
542-
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
543-
query_id,
544-
sending_peer,
545-
QueryFailureKind::InvalidContent,
546-
));
547-
}
548-
return;
528+
.report_validation(validation_result.is_valid());
529+
530+
if !validation_result.is_valid() {
531+
warn!(
532+
content.id = %hex_encode_compact(content_id),
533+
content.key = %content_key,
534+
?validation_result,
535+
"Error validating content"
536+
);
537+
// Indicate to the query that the content is invalid
538+
let _ = valid_content_callback.send(None);
539+
if let Some(query_trace_events_tx) = query_trace_events_tx {
540+
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
541+
query_id,
542+
sending_peer,
543+
QueryFailureKind::InvalidContent,
544+
));
549545
}
550-
};
546+
return;
547+
}
551548

552-
// skip storing if content is not valid for storing, the content
553-
// is already stored or if there's an error reading the store
554-
let should_store = validation_result.valid_for_storing
549+
// store content that:
550+
// - is canonically valid
551+
// - is not already stored
552+
// - is within radius (if applicable)
553+
let should_store = validation_result.is_canonically_valid()
555554
&& utp_processing
556555
.store
557556
.lock()
558-
.is_key_within_radius_and_unavailable(&content_key)
557+
.should_we_store(&content_key)
559558
.map_or_else(
560559
|err| {
561560
error!("Unable to read store: {err}");
@@ -571,17 +570,12 @@ impl<
571570
{
572571
Ok(dropped_content) => {
573572
let mut content_to_propagate = vec![(content_key.clone(), content.clone())];
574-
if let Some(additional_content_to_propagate) =
575-
validation_result.additional_content_to_propagate
576-
{
577-
content_to_propagate.push(additional_content_to_propagate);
578-
}
579573
if !dropped_content.is_empty() && utp_processing.gossip_dropped {
580574
debug!(
581575
"Dropped {:?} pieces of content after inserting new content, propagating them back into the network.",
582576
dropped_content.len(),
583577
);
584-
content_to_propagate.extend(dropped_content.clone());
578+
content_to_propagate.extend(dropped_content);
585579
}
586580
propagate_put_content_cross_thread::<_, TMetric>(
587581
content_to_propagate,

crates/portalnet/src/overlay/service/offer.rs

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,11 @@ impl<
120120

121121
for (i, key) in content_keys.iter().enumerate() {
122122
// Accept content if within radius and not already present in the data store.
123-
let accept = self
124-
.store
125-
.lock()
126-
.is_key_within_radius_and_unavailable(key)
127-
.map_err(|err| {
128-
OverlayRequestError::AcceptError(format!(
129-
"Unable to check content availability {err}"
130-
))
131-
})?;
123+
let accept = self.store.lock().should_we_store(key).map_err(|err| {
124+
OverlayRequestError::AcceptError(format!(
125+
"Unable to check content availability {err}"
126+
))
127+
})?;
132128
let accept_code = match accept {
133129
ShouldWeStoreContent::Store => {
134130
// accept all keys that are successfully added to the queue
@@ -558,45 +554,23 @@ impl<
558554
.await;
559555
utp_processing
560556
.metrics
561-
.report_validation(validation_result.is_ok());
557+
.report_validation(validation_result.is_valid());
562558

563-
let validation_result = match validation_result {
564-
Ok(validation_result) => validation_result,
565-
Err(err) => {
566-
// Skip storing & propagating content if it's not valid
567-
warn!(
568-
error = %err,
569-
content.key = %key.to_hex(),
570-
"Error validating accepted content"
571-
);
572-
return None;
573-
}
574-
};
575-
576-
if !validation_result.valid_for_storing {
577-
// Content received via Offer/Accept should be valid for storing.
578-
// If it isn't, don't store it and don't propagate it.
559+
if !validation_result.is_canonically_valid() {
579560
warn!(
580561
content.key = %key.to_hex(),
581-
"Error validating accepted content - not valid for storing"
562+
?validation_result,
563+
"Error validating accepted content",
582564
);
583565
return None;
584566
}
585567

586568
// Collect all content to propagate
587569
let mut content_to_propagate = vec![(key.clone(), content_value.clone())];
588-
if let Some(additional_content_to_propagate) =
589-
validation_result.additional_content_to_propagate
590-
{
591-
content_to_propagate.push(additional_content_to_propagate);
592-
}
593570

594571
// Check if data should be stored, and store if it is within our radius and not
595572
// already stored.
596-
let key_desired = utp_processing
597-
.store
598-
.lock()
599-
.is_key_within_radius_and_unavailable(&key);
573+
let key_desired = utp_processing.store.lock().should_we_store(&key);
600574
match key_desired {
601575
Ok(ShouldWeStoreContent::Store) => {
602576
match utp_processing.store.lock().put(key.clone(), &content_value) {

crates/storage/src/lib.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub trait ContentStore {
3939
fn get(&self, key: &Self::Key) -> Result<Option<RawContentValue>, ContentStoreError>;
4040

4141
/// Puts a piece of content into the store.
42+
///
4243
/// Returns a list of keys that were evicted from the store, which should be gossiped into the
4344
/// network. In the future this might be updated to a separate table that stores a queue
4445
/// of content keys to be gossiped and gossips them in a background task.
@@ -51,10 +52,17 @@ pub trait ContentStore {
5152

5253
/// Returns whether the content denoted by `key` is within the radius of the data store and not
5354
/// already stored within the data store.
54-
fn is_key_within_radius_and_unavailable(
55+
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError>;
56+
57+
/// Performs [ContentStore::should_we_store] for multiple content keys.
58+
///
59+
/// The default implementation calls `self.should_we_store` for each key.
60+
fn should_we_store_batch(
5561
&self,
56-
key: &Self::Key,
57-
) -> Result<ShouldWeStoreContent, ContentStoreError>;
62+
keys: &[Self::Key],
63+
) -> Result<Vec<ShouldWeStoreContent>, ContentStoreError> {
64+
keys.iter().map(|key| self.should_we_store(key)).collect()
65+
}
5866

5967
/// Returns the radius of the data store.
6068
fn radius(&self) -> Distance;
@@ -122,10 +130,7 @@ impl<TMetric: Metric> ContentStore for MemoryContentStore<TMetric> {
122130
Ok(vec![])
123131
}
124132

125-
fn is_key_within_radius_and_unavailable(
126-
&self,
127-
key: &Self::Key,
128-
) -> Result<ShouldWeStoreContent, ContentStoreError> {
133+
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError> {
129134
if key.affected_by_radius() && self.distance_to_key(key) > self.radius {
130135
return Ok(ShouldWeStoreContent::NotWithinRadius);
131136
}
@@ -251,18 +256,14 @@ pub mod test {
251256
// Arbitrary key within radius and unavailable.
252257
let arb_key = IdentityContentKey::new(node_id.raw());
253258
assert_eq!(
254-
store
255-
.is_key_within_radius_and_unavailable(&arb_key)
256-
.unwrap(),
259+
store.should_we_store(&arb_key).unwrap(),
257260
ShouldWeStoreContent::Store
258261
);
259262

260263
// Arbitrary key available.
261264
let _ = store.put(arb_key.clone(), val);
262265
assert_eq!(
263-
store
264-
.is_key_within_radius_and_unavailable(&arb_key)
265-
.unwrap(),
266+
store.should_we_store(&arb_key).unwrap(),
266267
ShouldWeStoreContent::AlreadyStored
267268
);
268269
}

crates/subnetworks/beacon/src/storage.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl ContentStore for BeaconStorage {
196196
}
197197

198198
/// The "radius" concept is not applicable for Beacon network
199-
fn is_key_within_radius_and_unavailable(
199+
fn should_we_store(
200200
&self,
201201
key: &BeaconContentKey,
202202
) -> Result<ShouldWeStoreContent, ContentStoreError> {
@@ -836,21 +836,21 @@ mod test {
836836
assert_eq!(result, value.as_ssz_bytes());
837837

838838
// Test is_key_within_radius_and_unavailable for the same finalized slot
839-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
839+
let should_store_content = storage.should_we_store(&key).unwrap();
840840
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
841841

842842
// Test is_key_within_radius_and_unavailable for older finalized slot
843843
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
844844
finalized_slot: finalized_slot - 1,
845845
});
846-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
846+
let should_store_content = storage.should_we_store(&key).unwrap();
847847
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
848848

849849
// Test is_key_within_radius_and_unavailable for newer finalized slot
850850
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
851851
finalized_slot: finalized_slot + 1,
852852
});
853-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
853+
let should_store_content = storage.should_we_store(&key).unwrap();
854854
assert_eq!(should_store_content, ShouldWeStoreContent::Store);
855855

856856
// Test getting the latest finality update
@@ -875,21 +875,21 @@ mod test {
875875
assert_eq!(result, value.as_ssz_bytes());
876876

877877
// Test is_key_within_radius_and_unavailable for the same signature slot
878-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
878+
let should_store_content = storage.should_we_store(&key).unwrap();
879879
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
880880

881881
// Test is_key_within_radius_and_unavailable for older signature slot
882882
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
883883
signature_slot: signature_slot - 1,
884884
});
885-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
885+
let should_store_content = storage.should_we_store(&key).unwrap();
886886
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
887887

888888
// Test is_key_within_radius_and_unavailable for newer signature slot
889889
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
890890
signature_slot: signature_slot + 1,
891891
});
892-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
892+
let should_store_content = storage.should_we_store(&key).unwrap();
893893
assert_eq!(should_store_content, ShouldWeStoreContent::Store);
894894

895895
// Test getting unavailable optimistic update
@@ -927,21 +927,21 @@ mod test {
927927
assert_eq!(result, value.encode());
928928

929929
// Test is_key_within_radius_and_unavailable for the same epoch
930-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
930+
let should_store_content = storage.should_we_store(&key).unwrap();
931931
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
932932

933933
// Test is_key_within_radius_and_unavailable for older epoch
934934
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
935935
epoch: epoch - 1,
936936
});
937-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
937+
let should_store_content = storage.should_we_store(&key).unwrap();
938938
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);
939939

940940
// Test is_key_within_radius_and_unavailable for newer epoch
941941
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
942942
epoch: epoch + 1,
943943
});
944-
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
944+
let should_store_content = storage.should_we_store(&key).unwrap();
945945
assert_eq!(should_store_content, ShouldWeStoreContent::Store);
946946

947947
// Test getting unavailable historical summaries with proof

0 commit comments

Comments
 (0)