Skip to content

Commit c0e6d13

Browse files
authored
refactor: perform local db content lookup at OverlayService level (#1637)
* refactor: perform local db content lookup at OverlayService level
1 parent 888e8e3 commit c0e6d13

File tree

5 files changed

+213
-285
lines changed

5 files changed

+213
-285
lines changed

crates/portalnet/src/find/query_info.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub enum QueryType<TContentKey> {
5555
target: TContentKey,
5656

5757
/// A callback channel for the result of the query.
58-
callback: Option<oneshot::Sender<RecursiveFindContentResult>>,
58+
callback: oneshot::Sender<RecursiveFindContentResult>,
5959
},
6060
}
6161

crates/portalnet/src/overlay/service.rs

Lines changed: 93 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -344,23 +344,10 @@ impl<
344344
OverlayCommand::Request(request) => self.process_request(request),
345345
OverlayCommand::Event(event) => self.process_event(event),
346346
OverlayCommand::FindContentQuery { target, callback, config } => {
347-
if let Some(query_id) = self.init_find_content_query(target.clone(), Some(callback), config) {
348-
trace!(
349-
query.id = %query_id,
350-
content.id = %hex_encode_compact(target.content_id()),
351-
content.key = %target,
352-
"FindContent query initialized"
353-
);
354-
}
347+
self.init_find_content_query(target.clone(), callback, config);
355348
}
356349
OverlayCommand::FindNodeQuery { target, callback } => {
357-
if let Some(query_id) = self.init_find_nodes_query(&target, Some(callback)) {
358-
trace!(
359-
query.id = %query_id,
360-
node.id = %hex_encode_compact(target),
361-
"FindNode query initialized"
362-
);
363-
}
350+
self.init_find_nodes_query(&target, Some(callback));
364351
}
365352
OverlayCommand::RequestEventStream(callback) => {
366353
if callback.send(self.event_stream.subscribe()).is_err() {
@@ -752,40 +739,36 @@ impl<
752739
};
753740
match query.into_result() {
754741
FindContentQueryResult::ValidContent(valid_content, cancelled_peers) => {
755-
if let Some(responder) = callback {
756-
let ValidatedContent {
757-
content,
758-
was_utp_transfer,
759-
sending_peer,
760-
} = valid_content;
761-
762-
let trace = if let Some(mut trace) = query_info.trace {
763-
trace.content_validated(sending_peer);
764-
trace.cancelled = cancelled_peers;
765-
Some(trace)
766-
} else {
767-
None
768-
};
742+
let ValidatedContent {
743+
content,
744+
was_utp_transfer,
745+
sending_peer,
746+
} = valid_content;
769747

770-
if responder
771-
.send(Ok((content, was_utp_transfer, trace)))
772-
.is_err()
773-
{
774-
error!(
775-
"Failed to send RecursiveFindContent result to the initiator of the query"
776-
);
777-
}
748+
let trace = if let Some(mut trace) = query_info.trace {
749+
trace.content_validated(sending_peer);
750+
trace.cancelled = cancelled_peers;
751+
Some(trace)
752+
} else {
753+
None
754+
};
755+
756+
if callback
757+
.send(Ok((content, was_utp_transfer, trace)))
758+
.is_err()
759+
{
760+
error!(
761+
"Failed to send RecursiveFindContent result to the initiator of the query"
762+
);
778763
}
779764
}
780765
FindContentQueryResult::NoneFound => {
781-
if let Some(responder) = callback {
782-
let _ = responder.send(Err(OverlayRequestError::ContentNotFound {
783-
message: "Unable to locate content on the network before timeout"
784-
.to_string(),
785-
utp: false,
786-
trace: query_info.trace,
787-
}));
788-
}
766+
let _ = callback.send(Err(OverlayRequestError::ContentNotFound {
767+
message: "Unable to locate content on the network before timeout"
768+
.to_string(),
769+
utp: false,
770+
trace: query_info.trace,
771+
}));
789772
}
790773
}
791774
}
@@ -1017,7 +1000,6 @@ impl<
10171000
request.discv5.id = %request_id,
10181001
"Handling FindContent message",
10191002
);
1020-
10211003
let content_key = match TContentKey::try_from_bytes(&request.content_key) {
10221004
Ok(key) => key,
10231005
Err(_) => {
@@ -2314,7 +2296,7 @@ impl<
23142296
&mut self,
23152297
target: &NodeId,
23162298
callback: Option<oneshot::Sender<Vec<Enr>>>,
2317-
) -> Option<QueryId> {
2299+
) {
23182300
let closest_enrs = self
23192301
.kbuckets
23202302
.closest_to_node_id(*target, self.query_num_results);
@@ -2324,7 +2306,7 @@ impl<
23242306
if let Some(callback) = callback {
23252307
let _ = callback.send(vec![]);
23262308
}
2327-
return None;
2309+
return;
23282310
}
23292311

23302312
let query_config = QueryConfig {
@@ -2352,26 +2334,43 @@ impl<
23522334

23532335
if known_closest_peers.is_empty() {
23542336
warn!("Cannot initialize FindNode query (no known close peers)");
2355-
None
23562337
} else {
23572338
let find_nodes_query =
23582339
FindNodeQuery::with_config(query_config, query_info.key(), known_closest_peers);
2359-
Some(
2360-
self.find_node_query_pool
2361-
.add_query(query_info, find_nodes_query),
2362-
)
2340+
let query_id = self
2341+
.find_node_query_pool
2342+
.add_query(query_info, find_nodes_query);
2343+
trace!(
2344+
query.id = %query_id,
2345+
node.id = %hex_encode_compact(target),
2346+
"FindNode query initialized"
2347+
);
23632348
}
23642349
}
23652350

23662351
/// Starts a `FindContentQuery` for a target content key.
23672352
fn init_find_content_query(
23682353
&mut self,
23692354
target: TContentKey,
2370-
callback: Option<oneshot::Sender<RecursiveFindContentResult>>,
2355+
callback: oneshot::Sender<RecursiveFindContentResult>,
23712356
config: FindContentConfig,
2372-
) -> Option<QueryId> {
2357+
) {
23732358
debug!("Starting query for content key: {}", target);
23742359

2360+
// Lookup content locally before querying the network.
2361+
if let Ok(Some(content)) = self.store.read().get(&target) {
2362+
let local_enr = self.local_enr();
2363+
let mut query_trace = QueryTrace::new(&local_enr, target.content_id().into());
2364+
query_trace.node_responded_with_content(&local_enr);
2365+
query_trace.content_validated(local_enr.into());
2366+
let _ = callback.send(Ok((
2367+
RawContentValue::from(content),
2368+
false,
2369+
Some(query_trace),
2370+
)));
2371+
return;
2372+
}
2373+
23752374
// Represent the target content ID with a node ID.
23762375
let target_node_id = NodeId::new(&target.content_id());
23772376
let target_key = Key::from(target_node_id);
@@ -2389,15 +2388,13 @@ impl<
23892388
if closest_enrs.is_empty() {
23902389
// If there are no connected nodes in the routing table the query cannot proceed.
23912390
warn!("No connected nodes in routing table, find content query cannot proceed.");
2392-
if let Some(callback) = callback {
2393-
let _ = callback.send(Err(OverlayRequestError::ContentNotFound {
2394-
message: "Unable to locate content on the network: no connected nodes in the routing table"
2395-
.to_string(),
2396-
utp: false,
2397-
trace: None,
2398-
}));
2399-
}
2400-
return None;
2391+
let _ = callback.send(Err(OverlayRequestError::ContentNotFound {
2392+
message: "Unable to locate content on the network: no connected nodes in the routing table"
2393+
.to_string(),
2394+
utp: false,
2395+
trace: None,
2396+
}));
2397+
return;
24012398
}
24022399

24032400
// Convert ENRs into k-bucket keys.
@@ -2418,13 +2415,22 @@ impl<
24182415
};
24192416

24202417
let query_info = QueryInfo {
2421-
query_type: QueryType::FindContent { target, callback },
2418+
query_type: QueryType::FindContent {
2419+
target: target.clone(),
2420+
callback,
2421+
},
24222422
untrusted_enrs: SmallVec::from_vec(closest_enrs),
24232423
trace,
24242424
};
24252425

24262426
let query = FindContentQuery::with_config(query_config, target_key, closest_nodes);
2427-
Some(self.find_content_query_pool.add_query(query_info, query))
2427+
let query_id = self.find_content_query_pool.add_query(query_info, query);
2428+
trace!(
2429+
query.id = %query_id,
2430+
content.id = %hex_encode_compact(target.content_id()),
2431+
content.key = %target,
2432+
"FindContent query initialized"
2433+
);
24282434
}
24292435

24302436
/// Returns an ENR if one is known for the given NodeId.
@@ -3563,12 +3569,13 @@ mod tests {
35633569
let target_content = NodeId::random();
35643570
let target_content_key = IdentityContentKey::new(target_content.raw());
35653571

3566-
let query_id = service.init_find_content_query(
3572+
let (tx, _rx) = oneshot::channel();
3573+
service.init_find_content_query(
35673574
target_content_key.clone(),
3568-
None,
3575+
tx,
35693576
FindContentConfig::default(),
35703577
);
3571-
let query_id = query_id.expect("Query ID for new find content query is `None`");
3578+
let query_id = QueryId(0); // default query id for all initial queries
35723579

35733580
let pool = &mut service.find_content_query_pool;
35743581
let (query_info, query) = pool
@@ -3580,7 +3587,7 @@ mod tests {
35803587
&query_info.query_type,
35813588
QueryType::FindContent {
35823589
target: _target_content_key,
3583-
callback: None,
3590+
callback: _callback,
35843591
}
35853592
));
35863593

@@ -3600,13 +3607,14 @@ mod tests {
36003607
let target_content = NodeId::random();
36013608
let target_content_key = IdentityContentKey::new(target_content.raw());
36023609
let (tx, rx) = oneshot::channel();
3603-
let query_id = service.init_find_content_query(
3610+
service.init_find_content_query(
36043611
target_content_key.clone(),
3605-
Some(tx),
3612+
tx,
36063613
FindContentConfig::default(),
36073614
);
36083615

3609-
assert!(query_id.is_none());
3616+
let query = service.find_content_query_pool.iter().next();
3617+
assert!(query.is_none());
36103618
assert!(rx.await.unwrap().is_err());
36113619
}
36123620

@@ -3633,9 +3641,9 @@ mod tests {
36333641
let target_content = NodeId::random();
36343642
let target_content_key = IdentityContentKey::new(target_content.raw());
36353643

3636-
let query_id =
3637-
service.init_find_content_query(target_content_key, None, FindContentConfig::default());
3638-
let query_id = query_id.expect("Query ID for new find content query is `None`");
3644+
let (tx, _rx) = oneshot::channel();
3645+
service.init_find_content_query(target_content_key, tx, FindContentConfig::default());
3646+
let query_id = QueryId(0); // default query id for all initial queries
36393647

36403648
// update query in own span so mut ref is dropped after poll
36413649
{
@@ -3694,9 +3702,9 @@ mod tests {
36943702
let target_content = NodeId::random();
36953703
let target_content_key = IdentityContentKey::new(target_content.raw());
36963704

3697-
let query_id =
3698-
service.init_find_content_query(target_content_key, None, FindContentConfig::default());
3699-
let query_id = query_id.expect("Query ID for new find content query is `None`");
3705+
let (tx, _rx) = oneshot::channel();
3706+
service.init_find_content_query(target_content_key, tx, FindContentConfig::default());
3707+
let query_id = QueryId(0); // default query id for all initial queries
37003708

37013709
// update query in own span so mut ref is dropped after poll
37023710
{
@@ -3757,9 +3765,9 @@ mod tests {
37573765
let target_content = NodeId::random();
37583766
let target_content_key = IdentityContentKey::new(target_content.raw());
37593767

3760-
let query_id =
3761-
service.init_find_content_query(target_content_key, None, FindContentConfig::default());
3762-
let query_id = query_id.expect("Query ID for new find content query is `None`");
3768+
let (tx, _rx) = oneshot::channel();
3769+
service.init_find_content_query(target_content_key, tx, FindContentConfig::default());
3770+
let query_id = QueryId(0); // default query id for all initial queries
37633771

37643772
// update query in own span so mut ref is dropped after poll
37653773
{
@@ -3823,12 +3831,12 @@ mod tests {
38233831
let target_content_key = IdentityContentKey::new(target_content.raw());
38243832

38253833
let (callback_tx, callback_rx) = oneshot::channel();
3826-
let query_id = service.init_find_content_query(
3834+
service.init_find_content_query(
38273835
target_content_key.clone(),
3828-
Some(callback_tx),
3836+
callback_tx,
38293837
FindContentConfig::default(),
38303838
);
3831-
let query_id = query_id.expect("Query ID for new find content query is `None`");
3839+
let query_id = QueryId(0); // default query id for all initial queries
38323840

38333841
let query_event =
38343842
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll(

0 commit comments

Comments
 (0)