Skip to content

Commit 5dca37d

Browse files
authored
feat: add trace offer for a series of content keys (#1830)
1 parent e47418e commit 5dca37d

File tree

3 files changed

+62
-28
lines changed

3 files changed

+62
-28
lines changed

crates/ethportal-api/src/types/portal_wire.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,9 +473,9 @@ impl From<PopulatedOffer> for Offer {
473473
#[derive(Debug, Clone)]
474474
pub struct PopulatedOfferWithResult {
475475
/// The offered content key & value
476-
pub content_item: (RawContentKey, RawContentValue),
476+
pub content_items: Vec<(RawContentKey, RawContentValue)>,
477477
/// The channel to send the result of the offer to
478-
pub result_tx: tokio::sync::mpsc::UnboundedSender<OfferTrace>,
478+
pub result_tx: tokio::sync::mpsc::UnboundedSender<OfferTraceMultipleItems>,
479479
}
480480

481481
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -486,11 +486,22 @@ pub enum OfferTrace {
486486
Failed,
487487
}
488488

489+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
490+
pub enum OfferTraceMultipleItems {
491+
/// Offer was successful sent and received a response
492+
Success(AcceptCodeList),
493+
/// This offer failed, perhaps locally or from a timeout or transfer failure
494+
Failed,
495+
}
496+
489497
impl From<PopulatedOfferWithResult> for Offer {
490-
fn from(val: PopulatedOfferWithResult) -> Self {
491-
Self {
492-
content_keys: vec![val.content_item.0],
493-
}
498+
fn from(value: PopulatedOfferWithResult) -> Self {
499+
let content_keys = value
500+
.content_items
501+
.into_iter()
502+
.map(|(key, _val)| key)
503+
.collect();
504+
Self { content_keys }
494505
}
495506
}
496507

crates/portalnet/src/overlay/protocol.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use ethportal_api::{
3131
},
3232
portal::PutContentInfo,
3333
portal_wire::{
34-
Content, FindContent, FindNodes, Message, Nodes, OfferTrace, Ping, Pong,
35-
PopulatedOffer, PopulatedOfferWithResult, Request, Response,
34+
Content, FindContent, FindNodes, Message, Nodes, OfferTrace, OfferTraceMultipleItems,
35+
Ping, Pong, PopulatedOffer, PopulatedOfferWithResult, Request, Response,
3636
},
3737
},
3838
utils::bytes::hex_encode,
@@ -609,10 +609,34 @@ impl<
609609
content_key: RawContentKey,
610610
content_value: RawContentValue,
611611
) -> Result<OfferTrace, OverlayRequestError> {
612+
match self
613+
.send_offer_trace_with_multiple_items(enr, vec![(content_key, content_value)])
614+
.await?
615+
{
616+
OfferTraceMultipleItems::Success(accept_code_list) => {
617+
if accept_code_list.len() != 1 {
618+
return Err(OverlayRequestError::Failure(format!(
619+
"Expected exactly one AcceptCode in the response. Got {}",
620+
accept_code_list.len()
621+
)));
622+
}
623+
624+
Ok(OfferTrace::Success(accept_code_list[0]))
625+
}
626+
OfferTraceMultipleItems::Failed => Ok(OfferTrace::Failed),
627+
}
628+
}
629+
630+
/// Send Offer request with trace, without storing the content into db, with multiple items
631+
pub async fn send_offer_trace_with_multiple_items(
632+
&self,
633+
enr: Enr,
634+
content_items: Vec<(RawContentKey, RawContentValue)>,
635+
) -> Result<OfferTraceMultipleItems, OverlayRequestError> {
612636
// Construct the request.
613637
let (result_tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
614638
let request = Request::PopulatedOfferWithResult(PopulatedOfferWithResult {
615-
content_item: (content_key, content_value),
639+
content_items,
616640
result_tx,
617641
});
618642

crates/portalnet/src/overlay/service/offer.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use ethportal_api::{
99
distance::Metric,
1010
enr::Enr,
1111
network_spec::network_spec,
12-
portal_wire::{Accept, Content, FindContent, Offer, OfferTrace, Request, Response},
12+
portal_wire::{
13+
Accept, Content, FindContent, Offer, OfferTraceMultipleItems, Request, Response,
14+
},
1315
protocol_versions::ProtocolVersion,
1416
},
1517
OverlayContentKey, RawContentKey, RawContentValue,
@@ -358,13 +360,9 @@ impl<
358360
// keys
359361
if content_keys.all_declined() {
360362
if let Some(tx) = gossip_result_tx {
361-
if content_keys.len() != 1 {
362-
error!(
363-
"OfferTrace only supports one content key at a time, got {}",
364-
content_keys.len()
365-
);
363+
if let Err(err) = tx.send(OfferTraceMultipleItems::Success(content_keys)) {
364+
warn!(%err, "Unable to send OfferTrace result all keys declined");
366365
}
367-
let _ = tx.send(OfferTrace::Success(content_keys[0]));
368366
}
369367
return Ok(());
370368
}
@@ -392,7 +390,7 @@ impl<
392390
.collect()),
393391
Request::PopulatedOfferWithResult(offer) => Ok(content_keys
394392
.iter()
395-
.zip(vec![offer.content_item])
393+
.zip(offer.content_items)
396394
.filter(|(is_accepted, _item)| **is_accepted == AcceptCode::Accepted)
397395
.map(|(_is_accepted, (_key, val))| val)
398396
.collect()),
@@ -411,7 +409,9 @@ impl<
411409
"Error decoding previously offered content items"
412410
);
413411
if let Some(tx) = gossip_result_tx {
414-
let _ = tx.send(OfferTrace::Failed);
412+
if let Err(err) = tx.send(OfferTraceMultipleItems::Failed) {
413+
warn!(%err, "Unable to send OfferTrace Failed result for decoding offered content items");
414+
}
415415
}
416416
return;
417417
}
@@ -422,7 +422,9 @@ impl<
422422
Err(err) => {
423423
warn!(%err, "Unable to build content payload");
424424
if let Some(tx) = gossip_result_tx {
425-
let _ = tx.send(OfferTrace::Failed);
425+
if let Err(err) = tx.send(OfferTraceMultipleItems::Failed) {
426+
warn!(%err, "Unable to send OfferTrace Failed result failed to build content payload");
427+
}
426428
}
427429
return;
428430
}
@@ -431,16 +433,13 @@ impl<
431433
.connect_outbound_stream(cid, peer, &content_payload)
432434
.await;
433435
if let Some(tx) = gossip_result_tx {
434-
if result {
435-
if content_keys.len() != 1 {
436-
error!(
437-
"OfferTrace only supports one content key at a time, got {}",
438-
content_keys.len()
439-
);
440-
}
441-
let _ = tx.send(OfferTrace::Success(content_keys[0]));
436+
let result = if result {
437+
OfferTraceMultipleItems::Success(content_keys)
442438
} else {
443-
let _ = tx.send(OfferTrace::Failed);
439+
OfferTraceMultipleItems::Failed
440+
};
441+
if let Err(err) = tx.send(result) {
442+
warn!(%err, "Unable to send OfferTrace result");
444443
}
445444
}
446445
// explicitly drop permit in the thread so the permit is included in the thread

0 commit comments

Comments
 (0)