Skip to content

Commit 8b5ccac

Browse files
authored
Error from RPC send_response when request doesn't exist on the active inbound requests (#7663)
Lighthouse is currently loggign a lot errors in the `RPC` behaviour whenever a response is received for a request_id that no longer exists in active_inbound_requests. This is likely due to a data race or timing issue (e.g., the peer disconnecting before the response is handled). This PR addresses that by removing the error logging from the RPC layer. Instead, RPC::send_response now simply returns an Err, shifting the responsibility to the main service. The main service can then determine whether the peer is still connected and only log an error if the peer remains connected. Thanks @ackintosh for helping debug!
1 parent 8e55684 commit 8b5ccac

File tree

3 files changed

+81
-66
lines changed

3 files changed

+81
-66
lines changed

beacon_node/lighthouse_network/src/rpc/mod.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::marker::PhantomData;
1717
use std::sync::Arc;
1818
use std::task::{Context, Poll};
1919
use std::time::Duration;
20-
use tracing::{debug, error, instrument, trace};
20+
use tracing::{debug, instrument, trace};
2121
use types::{EthSpec, ForkContext};
2222

2323
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -98,6 +98,13 @@ pub struct InboundRequestId {
9898
substream_id: SubstreamId,
9999
}
100100

101+
// An Active inbound request received via Rpc.
102+
struct ActiveInboundRequest<E: EthSpec> {
103+
pub peer_id: PeerId,
104+
pub request_type: RequestType<E>,
105+
pub peer_disconnected: bool,
106+
}
107+
101108
impl InboundRequestId {
102109
/// Creates an _unchecked_ [`InboundRequestId`].
103110
///
@@ -150,7 +157,7 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
150157
/// Rate limiter for our own requests.
151158
outbound_request_limiter: SelfRateLimiter<Id, E>,
152159
/// Active inbound requests that are awaiting a response.
153-
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
160+
active_inbound_requests: HashMap<InboundRequestId, ActiveInboundRequest<E>>,
154161
/// Queue of events to be processed.
155162
events: Vec<BehaviourAction<Id, E>>,
156163
fork_context: Arc<ForkContext>,
@@ -199,8 +206,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
199206
}
200207

201208
/// Sends an RPC response.
202-
///
203-
/// The peer must be connected for this to succeed.
209+
/// Returns an `Err` if the request does exist in the active inbound requests list.
204210
#[instrument(parent = None,
205211
level = "trace",
206212
fields(service = "libp2p_rpc"),
@@ -209,26 +215,41 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
209215
)]
210216
pub fn send_response(
211217
&mut self,
212-
peer_id: PeerId,
213218
request_id: InboundRequestId,
214219
response: RpcResponse<E>,
215-
) {
216-
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
220+
) -> Result<(), RpcResponse<E>> {
221+
let Some(ActiveInboundRequest {
222+
peer_id,
223+
request_type,
224+
peer_disconnected,
225+
}) = self.active_inbound_requests.remove(&request_id)
217226
else {
218-
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
219-
return;
227+
return Err(response);
220228
};
221229

222230
// Add the request back to active requests if the response is `Success` and requires stream
223231
// termination.
224232
if request_type.protocol().terminator().is_some()
225233
&& matches!(response, RpcResponse::Success(_))
226234
{
227-
self.active_inbound_requests
228-
.insert(request_id, (peer_id, request_type.clone()));
235+
self.active_inbound_requests.insert(
236+
request_id,
237+
ActiveInboundRequest {
238+
peer_id,
239+
request_type: request_type.clone(),
240+
peer_disconnected,
241+
},
242+
);
243+
}
244+
245+
if peer_disconnected {
246+
trace!(%peer_id, ?request_id, %response,
247+
"Discarding response, peer is no longer connected");
248+
return Ok(());
229249
}
230250

231251
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
252+
Ok(())
232253
}
233254

234255
fn send_response_inner(
@@ -425,9 +446,10 @@ where
425446
self.events.push(error_msg);
426447
}
427448

428-
self.active_inbound_requests.retain(
429-
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
430-
);
449+
self.active_inbound_requests
450+
.values_mut()
451+
.filter(|request| request.peer_id == peer_id)
452+
.for_each(|request| request.peer_disconnected = true);
431453

432454
if let Some(limiter) = self.response_limiter.as_mut() {
433455
limiter.peer_disconnected(peer_id);
@@ -468,9 +490,17 @@ where
468490
.active_inbound_requests
469491
.iter()
470492
.filter(
471-
|(_inbound_request_id, (request_peer_id, active_request_type))| {
493+
|(
494+
_inbound_request_id,
495+
ActiveInboundRequest {
496+
peer_id: request_peer_id,
497+
request_type: active_request_type,
498+
peer_disconnected,
499+
},
500+
)| {
472501
*request_peer_id == peer_id
473502
&& active_request_type.protocol() == request_type.protocol()
503+
&& !peer_disconnected
474504
},
475505
)
476506
.count()
@@ -494,19 +524,25 @@ where
494524
}
495525

496526
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
497-
self.active_inbound_requests
498-
.insert(request_id, (peer_id, request_type.clone()));
527+
self.active_inbound_requests.insert(
528+
request_id,
529+
ActiveInboundRequest {
530+
peer_id,
531+
request_type: request_type.clone(),
532+
peer_disconnected: false,
533+
},
534+
);
499535

500536
// If we received a Ping, we queue a Pong response.
501537
if let RequestType::Ping(_) = request_type {
502538
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
503539
self.send_response(
504-
peer_id,
505540
request_id,
506541
RpcResponse::Success(RpcSuccessResponse::Pong(Ping {
507542
data: self.seq_number,
508543
})),
509-
);
544+
)
545+
.expect("Request to exist");
510546
}
511547

512548
self.events.push(ToSwarm::GenerateEvent(RPCMessage {

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY
1111
use crate::rpc::methods::MetadataRequest;
1212
use crate::rpc::{
1313
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
14-
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
15-
RpcSuccessResponse, RPC,
14+
RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC,
1615
};
1716
use crate::types::{
1817
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
@@ -39,7 +38,7 @@ use std::path::PathBuf;
3938
use std::pin::Pin;
4039
use std::sync::Arc;
4140
use std::time::Duration;
42-
use tracing::{debug, info, instrument, trace, warn};
41+
use tracing::{debug, error, info, instrument, trace, warn};
4342
use types::{
4443
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
4544
};
@@ -1146,35 +1145,22 @@ impl<E: EthSpec> Network<E> {
11461145
name = "libp2p",
11471146
skip_all
11481147
)]
1149-
pub fn send_response(
1148+
pub fn send_response<T: Into<RpcResponse<E>>>(
11501149
&mut self,
11511150
peer_id: PeerId,
11521151
inbound_request_id: InboundRequestId,
1153-
response: Response<E>,
1152+
response: T,
11541153
) {
1155-
self.eth2_rpc_mut()
1156-
.send_response(peer_id, inbound_request_id, response.into())
1157-
}
1158-
1159-
/// Inform the peer that their request produced an error.
1160-
#[instrument(parent = None,
1161-
level = "trace",
1162-
fields(service = "libp2p"),
1163-
name = "libp2p",
1164-
skip_all
1165-
)]
1166-
pub fn send_error_response(
1167-
&mut self,
1168-
peer_id: PeerId,
1169-
inbound_request_id: InboundRequestId,
1170-
error: RpcErrorResponse,
1171-
reason: String,
1172-
) {
1173-
self.eth2_rpc_mut().send_response(
1174-
peer_id,
1175-
inbound_request_id,
1176-
RpcResponse::Error(error, reason.into()),
1177-
)
1154+
if let Err(response) = self
1155+
.eth2_rpc_mut()
1156+
.send_response(inbound_request_id, response.into())
1157+
{
1158+
if self.network_globals.peers.read().is_connected(&peer_id) {
1159+
error!(%peer_id, ?inbound_request_id, %response,
1160+
"Request not found in RPC active requests"
1161+
);
1162+
}
1163+
}
11781164
}
11791165

11801166
/* Peer management functions */
@@ -1460,19 +1446,6 @@ impl<E: EthSpec> Network<E> {
14601446
name = "libp2p",
14611447
skip_all
14621448
)]
1463-
fn send_meta_data_response(
1464-
&mut self,
1465-
_req: MetadataRequest<E>,
1466-
inbound_request_id: InboundRequestId,
1467-
peer_id: PeerId,
1468-
) {
1469-
let metadata = self.network_globals.local_metadata.read().clone();
1470-
// The encoder is responsible for sending the negotiated version of the metadata
1471-
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
1472-
self.eth2_rpc_mut()
1473-
.send_response(peer_id, inbound_request_id, event);
1474-
}
1475-
14761449
// RPC Propagation methods
14771450
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
14781451
#[must_use = "return the response"]
@@ -1760,9 +1733,13 @@ impl<E: EthSpec> Network<E> {
17601733
self.peer_manager_mut().ping_request(&peer_id, ping.data);
17611734
None
17621735
}
1763-
RequestType::MetaData(req) => {
1736+
RequestType::MetaData(_req) => {
17641737
// send the requested meta-data
1765-
self.send_meta_data_response(req, inbound_request_id, peer_id);
1738+
let metadata = self.network_globals.local_metadata.read().clone();
1739+
// The encoder is responsible for sending the negotiated version of the metadata
1740+
let response =
1741+
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
1742+
self.send_response(peer_id, inbound_request_id, response);
17661743
None
17671744
}
17681745
RequestType::Goodbye(reason) => {

beacon_node/network/src/service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender;
1111
use futures::future::OptionFuture;
1212
use futures::prelude::*;
1313

14+
use lighthouse_network::rpc::methods::RpcResponse;
1415
use lighthouse_network::rpc::InboundRequestId;
1516
use lighthouse_network::rpc::RequestType;
1617
use lighthouse_network::service::Network;
@@ -627,10 +628,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
627628
error,
628629
inbound_request_id,
629630
reason,
630-
} => {
631-
self.libp2p
632-
.send_error_response(peer_id, inbound_request_id, error, reason);
633-
}
631+
} => self.libp2p.send_response(
632+
peer_id,
633+
inbound_request_id,
634+
RpcResponse::Error(error, reason.into()),
635+
),
634636
NetworkMessage::ValidationResult {
635637
propagation_source,
636638
message_id,

0 commit comments

Comments
 (0)