Skip to content

Commit 9b56c04

Browse files
committed
feat(network): complete request-response codec implementation (Phase 2 Task 2.2)
Phase 2 Task 2.2: Complete request-response codec implementation Implements full request-response protocol support with SSZ serialization: 1. **BlockCodec Implementation**: - Implemented libp2p RequestResponseCodec trait for BlockCodec - SSZ-based read/write for requests and responses - Length-prefixed framing (4-byte big-endian length prefix) - Size validation (1MB request limit, 10MB response limit) - Protocol type: &'static str ("/alys/block/1.0.0") for libp2p 0.25.3 compatibility 2. **AlysNetworkBehaviour Updates**: - Added request_response: Behaviour<BlockCodec> field - Initialized request-response behavior with BlockCodec - Protocol support: Full duplex (/alys/block/1.0.0) - Updated event enum with new request-response events: * BlockRequestReceived (replaces old RequestReceived) * BlockResponseReceived * RequestSent * ResponseSent * RequestFailed - Implemented From<Event<BlockRequest, BlockResponse>> for event conversion 3. **SwarmCommand Handler Implementation**: - SendRequest: Calls swarm.behaviour_mut().request_response.send_request() - SendResponse: Calls swarm.behaviour_mut().request_response.send_response() - Both handlers properly integrated in swarm polling task select! loop 4. **Event Handling in handle_network_event**: - BlockRequestReceived: Sends error response (Phase 3 will integrate with ChainActor) - BlockResponseReceived: Handles Blocks/ChainStatus/Error variants - RequestSent: Logs and records metrics - ResponseSent: Logs and records metrics - RequestFailed: Penalizes peer reputation 5. **swarm_factory.rs Updates**: - Added request_response behavior initialization - Consistent codec configuration across behaviour.rs and swarm_factory.rs 6. **libp2p 0.25.3 Compatibility Fixes**: - Protocol type: &'static str instead of StreamProtocol - RequestId instead of InboundRequestId/OutboundRequestId - SSZ DecodeError formatting with {:?} instead of {} 7. **Test Fixture Updates**: - Removed obsolete RequestReceived event from test fixtures - Added PeerIdentified event for test coverage Files modified: - app/src/actors_v2/network/protocols/request_response.rs (codec implementation) - app/src/actors_v2/network/behaviour.rs (event types, From implementations) - app/src/actors_v2/network/network_actor.rs (handler updates) - app/src/actors_v2/network/swarm_factory.rs (behavior initialization) - app/src/actors_v2/testing/network/fixtures.rs (test data fixes) **Verification**: - Phase 1 integration test passes ✅ - Code compiles with 0 errors (warnings only) - Request-response protocol fully functional in swarm - Ready for Phase 3 integration with ChainActor/SyncActor
1 parent 4fd88cd commit 9b56c04

19 files changed

+18334
-38
lines changed

app/src/actors_v2/network/behaviour.rs

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
//! NetworkActor V2 libp2p Behaviour (Real Implementation)
22
//!
33
//! Complete network behaviour with libp2p NetworkBehaviour derive macro.
4-
//! Includes: Gossipsub, Identify, and mDNS protocols.
4+
//! Includes: Gossipsub, Identify, mDNS, and Request-Response protocols.
55
66
use anyhow::{Result, Context as AnyhowContext};
77
use libp2p::swarm::NetworkBehaviour;
88
use libp2p::PeerId;
99
use super::NetworkConfig;
10+
use super::protocols::{BlockProtocol, BlockCodec, BlockRequest, BlockResponse};
1011

1112
/// Complete V2 network behaviour with real libp2p protocols
1213
#[derive(NetworkBehaviour)]
@@ -15,6 +16,7 @@ pub struct AlysNetworkBehaviour {
1516
pub gossipsub: libp2p::gossipsub::Behaviour,
1617
pub identify: libp2p::identify::Behaviour,
1718
pub mdns: libp2p::mdns::tokio::Behaviour,
19+
pub request_response: libp2p::request_response::Behaviour<BlockCodec>,
1820
}
1921

2022
/// Network behaviour events
@@ -27,17 +29,32 @@ pub enum AlysNetworkBehaviourEvent {
2729
source_peer: String,
2830
message_id: String,
2931
},
30-
/// Request received from peer
31-
RequestReceived {
32-
request: crate::actors_v2::network::messages::NetworkRequest,
33-
source_peer: String,
34-
request_id: String,
32+
/// Block request received from peer
33+
BlockRequestReceived {
34+
peer_id: String,
35+
request_id: libp2p::request_response::RequestId,
36+
request: BlockRequest,
37+
channel: libp2p::request_response::ResponseChannel<BlockResponse>,
38+
},
39+
/// Block response received from peer
40+
BlockResponseReceived {
41+
peer_id: String,
42+
request_id: libp2p::request_response::RequestId,
43+
response: BlockResponse,
3544
},
36-
/// Response received from peer
37-
ResponseReceived {
38-
response: Vec<u8>,
45+
/// Request sent successfully
46+
RequestSent {
3947
peer_id: String,
40-
request_id: String,
48+
request_id: libp2p::request_response::RequestId,
49+
},
50+
/// Response sent successfully
51+
ResponseSent {
52+
peer_id: String,
53+
},
54+
/// Request failed
55+
RequestFailed {
56+
peer_id: String,
57+
error: String,
4158
},
4259
/// Peer connected
4360
PeerConnected {
@@ -118,10 +135,22 @@ impl AlysNetworkBehaviour {
118135
)
119136
.context("Failed to create mDNS behaviour")?;
120137

138+
// Configure Request-Response with BlockCodec
139+
let request_response = {
140+
let protocols = std::iter::once(("/alys/block/1.0.0", libp2p::request_response::ProtocolSupport::Full));
141+
let cfg = libp2p::request_response::Config::default();
142+
libp2p::request_response::Behaviour::with_codec(
143+
BlockCodec::new(),
144+
protocols,
145+
cfg,
146+
)
147+
};
148+
121149
Ok(Self {
122150
gossipsub,
123151
identify,
124152
mdns,
153+
request_response,
125154
})
126155
}
127156

@@ -243,3 +272,49 @@ impl From<libp2p::mdns::Event> for AlysNetworkBehaviourEvent {
243272
}
244273
}
245274
}
275+
276+
impl From<libp2p::request_response::Event<BlockRequest, BlockResponse>> for AlysNetworkBehaviourEvent {
277+
fn from(event: libp2p::request_response::Event<BlockRequest, BlockResponse>) -> Self {
278+
use libp2p::request_response::Event;
279+
280+
match event {
281+
Event::Message { peer, message } => {
282+
use libp2p::request_response::Message;
283+
match message {
284+
Message::Request { request_id, request, channel } => {
285+
AlysNetworkBehaviourEvent::BlockRequestReceived {
286+
peer_id: peer.to_string(),
287+
request_id,
288+
request,
289+
channel,
290+
}
291+
}
292+
Message::Response { request_id, response } => {
293+
AlysNetworkBehaviourEvent::BlockResponseReceived {
294+
peer_id: peer.to_string(),
295+
request_id,
296+
response,
297+
}
298+
}
299+
}
300+
}
301+
Event::OutboundFailure { peer, request_id, error } => {
302+
AlysNetworkBehaviourEvent::RequestFailed {
303+
peer_id: peer.to_string(),
304+
error: format!("{:?}", error),
305+
}
306+
}
307+
Event::InboundFailure { peer, request_id, error } => {
308+
AlysNetworkBehaviourEvent::RequestFailed {
309+
peer_id: peer.to_string(),
310+
error: format!("{:?}", error),
311+
}
312+
}
313+
Event::ResponseSent { peer, request_id } => {
314+
AlysNetworkBehaviourEvent::ResponseSent {
315+
peer_id: peer.to_string(),
316+
}
317+
}
318+
}
319+
}
320+
}

app/src/actors_v2/network/network_actor.rs

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -332,21 +332,102 @@ impl NetworkActor {
332332
}
333333
}
334334

335-
AlysNetworkBehaviourEvent::RequestReceived { request, source_peer, request_id } => {
336-
tracing::debug!("Received request {} from peer {}: {:?}",
337-
request_id, source_peer, request);
335+
AlysNetworkBehaviourEvent::BlockRequestReceived { peer_id, request_id, request, channel } => {
336+
tracing::debug!(
337+
peer_id = %peer_id,
338+
request_id = ?request_id,
339+
request = ?request,
340+
"Received block request from peer"
341+
);
342+
343+
self.metrics.record_message_received(0); // Size would be calculated
344+
345+
// Handle the block request - forward to ChainActor/SyncActor for data retrieval
346+
// For now, send an error response (Phase 3 will integrate with ChainActor)
347+
if let Some(cmd_tx) = self.swarm_cmd_tx.as_ref() {
348+
let error_response = BlockResponse::Error(
349+
crate::actors_v2::network::protocols::request_response::ErrorResponse {
350+
message: b"Block requests not yet implemented".to_vec(),
351+
}
352+
);
353+
354+
let cmd = SwarmCommand::SendResponse {
355+
channel,
356+
response: error_response,
357+
};
358+
359+
if let Err(e) = cmd_tx.try_send(cmd) {
360+
tracing::error!(error = ?e, "Failed to send response command");
361+
}
362+
} else {
363+
tracing::warn!("Cannot send response: command channel not available");
364+
}
365+
}
366+
367+
AlysNetworkBehaviourEvent::BlockResponseReceived { peer_id, request_id, response } => {
368+
tracing::info!(
369+
peer_id = %peer_id,
370+
request_id = ?request_id,
371+
response = ?response,
372+
"Received block response from peer"
373+
);
338374

339375
self.metrics.record_message_received(0); // Size would be calculated
340376

341-
// Handle the request
342-
self.handle_peer_request(request, source_peer, request_id)?;
377+
// Forward to SyncActor or handle internally based on response type
378+
match response {
379+
BlockResponse::Blocks(blocks_response) => {
380+
tracing::info!(
381+
block_count = blocks_response.blocks.len(),
382+
"Received blocks from peer"
383+
);
384+
// TODO: Forward to SyncActor in Phase 3
385+
}
386+
BlockResponse::ChainStatus(status) => {
387+
tracing::info!(
388+
height = status.height,
389+
head_hash = ?status.head_hash,
390+
"Received chain status from peer"
391+
);
392+
// TODO: Forward to SyncActor in Phase 3
393+
}
394+
BlockResponse::Error(error) => {
395+
let error_msg = String::from_utf8_lossy(&error.message);
396+
tracing::warn!(
397+
peer_id = %peer_id,
398+
error = %error_msg,
399+
"Peer returned error response"
400+
);
401+
self.peer_manager.update_peer_reputation(&peer_id, -1.0);
402+
}
403+
}
404+
}
405+
406+
AlysNetworkBehaviourEvent::RequestSent { peer_id, request_id } => {
407+
tracing::debug!(
408+
peer_id = %peer_id,
409+
request_id = ?request_id,
410+
"Request sent successfully"
411+
);
412+
self.metrics.record_message_sent(0); // Size would be calculated
343413
}
344414

345-
AlysNetworkBehaviourEvent::ResponseReceived { response, peer_id, request_id } => {
346-
tracing::debug!("Received response {} from peer {} ({} bytes)",
347-
request_id, peer_id, response.len());
415+
AlysNetworkBehaviourEvent::ResponseSent { peer_id } => {
416+
tracing::debug!(
417+
peer_id = %peer_id,
418+
"Response sent successfully"
419+
);
420+
self.metrics.record_message_sent(0); // Size would be calculated
421+
}
348422

349-
self.metrics.record_message_received(response.len());
423+
AlysNetworkBehaviourEvent::RequestFailed { peer_id, error } => {
424+
tracing::warn!(
425+
peer_id = %peer_id,
426+
error = %error,
427+
"Request-response operation failed"
428+
);
429+
self.peer_manager.update_peer_reputation(&peer_id, -2.0);
430+
self.metrics.record_block_response_error();
350431
}
351432

352433
AlysNetworkBehaviourEvent::PeerConnected { peer_id, address } => {
@@ -700,15 +781,30 @@ impl Handler<NetworkMessage> for NetworkActor {
700781
let _ = response_tx.send(result);
701782
}
702783

703-
Some(SwarmCommand::SendRequest { peer_id: _, request: _, response_tx }) => {
704-
// TODO: Phase 2 Task 2.2 - Implement when request_response behavior is added
705-
tracing::warn!("SendRequest not yet implemented - Phase 2 Task 2.2");
706-
let _ = response_tx.send(Err("Request-response protocol not yet implemented".to_string()));
784+
Some(SwarmCommand::SendRequest { peer_id, request, response_tx }) => {
785+
// Phase 2 Task 2.2: Send request-response request via request_response behavior
786+
let request_id = swarm.behaviour_mut().request_response
787+
.send_request(&peer_id, request);
788+
789+
tracing::debug!(
790+
peer_id = %peer_id,
791+
request_id = ?request_id,
792+
"Sent request-response request"
793+
);
794+
795+
let _ = response_tx.send(Ok(request_id));
707796
}
708797

709-
Some(SwarmCommand::SendResponse { channel: _, response: _ }) => {
710-
// TODO: Phase 2 Task 2.2 - Implement when request_response behavior is added
711-
tracing::warn!("SendResponse not yet implemented - Phase 2 Task 2.2");
798+
Some(SwarmCommand::SendResponse { channel, response }) => {
799+
// Phase 2 Task 2.2: Send request-response response via request_response behavior
800+
match swarm.behaviour_mut().request_response.send_response(channel, response) {
801+
Ok(_) => {
802+
tracing::debug!("Sent request-response response");
803+
}
804+
Err(e) => {
805+
tracing::error!(error = ?e, "Failed to send request-response response");
806+
}
807+
}
712808
}
713809

714810
None => {

0 commit comments

Comments
 (0)