@@ -16,6 +16,8 @@ use libp2p::swarm::handler::{
1616 FullyNegotiatedOutbound ,
1717} ;
1818use libp2p:: swarm:: { Stream , StreamProtocol , SubstreamProtocol } ;
19+ use prost:: Message ;
20+ use tracing:: warn;
1921
2022use crate :: protocol:: { PropellerCodec , PropellerProtocol } ;
2123use crate :: PropellerUnit ;
@@ -37,34 +39,65 @@ pub enum HandlerIn {
3739}
3840
3941/// Protocol Handler that manages substreams with a peer.
42+ ///
43+ /// We use separate unidirectional substreams: outbound for sending and inbound for receiving.
44+ // TODO(AndrewL): Add this to the specs.
4045pub struct Handler {
4146 /// Upgrade configuration for the propeller protocol.
4247 listen_protocol : PropellerProtocol ,
4348 /// The single long-lived inbound substream.
4449 inbound_substream : Option < InboundSubstreamState > ,
50+ /// The single long-lived outbound substream.
51+ outbound_substream : OutboundSubstreamState ,
4552 /// Queue of messages to send.
4653 send_queue : VecDeque < ProtoUnit > ,
47- /// Queue of received messages to emit.
48- receive_queue : VecDeque < PropellerUnit > ,
54+ /// Queue of events to emit to the behaviour (received units, errors, etc.).
55+ ///
56+ /// Events are pushed here from various sources (inbound messages, connection errors) and
57+ /// drained as the highest priority in `poll_inner`.
58+ events_to_emit : VecDeque < HandlerOut > ,
59+ /// Maximum wire message size for batching.
60+ max_wire_message_size : usize ,
4961}
5062
51- /// State of the inbound substream, opened either by us or by the remote.
63+ /// State of the inbound substream, opened by the remote peer .
5264enum InboundSubstreamState {
5365 /// Waiting for a message from the remote. The idle state for an inbound substream.
5466 WaitingInput ( Framed < Stream , PropellerCodec > ) ,
5567 /// The substream is being closed.
5668 Closing ( Framed < Stream , PropellerCodec > ) ,
5769}
5870
71+ /// State of the outbound substream, opened by us.
72+ enum OutboundSubstreamState {
73+ /// No substream exists and no request is pending.
74+ Idle ,
75+ /// A substream request has been sent but not yet negotiated.
76+ Pending ,
77+ /// The substream is active and ready to send messages.
78+ Active {
79+ substream : Framed < Stream , PropellerCodec > ,
80+ /// True if we've sent data and need to flush the stream.
81+ ///
82+ /// In this implementation we use flush to ensure that the data was actually sent. Without
83+ /// flushing periodically we cannot be certain what was sent and what failed when a failure
84+ /// occurs.
85+ // TODO(AndrewL): Discuss not flushing
86+ should_flush : bool ,
87+ } ,
88+ }
89+
5990impl Handler {
6091 /// Builds a new [`Handler`].
6192 pub fn new ( stream_protocol : StreamProtocol , max_wire_message_size : usize ) -> Self {
6293 let protocol = PropellerProtocol :: new ( stream_protocol, max_wire_message_size) ;
6394 Handler {
6495 listen_protocol : protocol,
6596 inbound_substream : None ,
97+ outbound_substream : OutboundSubstreamState :: Idle ,
6698 send_queue : VecDeque :: new ( ) ,
67- receive_queue : VecDeque :: new ( ) ,
99+ events_to_emit : VecDeque :: new ( ) ,
100+ max_wire_message_size,
68101 }
69102 }
70103
@@ -143,7 +176,7 @@ impl Handler {
143176 for proto_unit in batch. batch {
144177 match PropellerUnit :: try_from ( proto_unit) {
145178 Ok ( unit) => {
146- self . receive_queue . push_back ( unit) ;
179+ self . events_to_emit . push_back ( HandlerOut :: Unit ( unit) ) ;
147180 }
148181 Err ( e) => {
149182 // TODO(AndrewL): Either remove this warning or make it once every N ms.
@@ -153,6 +186,33 @@ impl Handler {
153186 }
154187 }
155188
189+ /// Create a batch of messages from the send queue that fits within max_wire_message_size.
190+ fn create_message_batch (
191+ send_queue : & mut VecDeque < ProtoUnit > ,
192+ max_wire_message_size : usize ,
193+ ) -> ProtoBatch {
194+ if send_queue. is_empty ( ) {
195+ return ProtoBatch { batch : Vec :: new ( ) } ;
196+ }
197+
198+ let mut batch = ProtoBatch { batch : vec ! [ send_queue. pop_front( ) . unwrap( ) ] } ;
199+ if batch. encoded_len ( ) > max_wire_message_size {
200+ warn ! ( "Propeller unit size exceeds max wire message size, sending will fail" ) ;
201+ }
202+
203+ while let Some ( msg) = send_queue. front ( ) {
204+ batch. batch . push ( msg. clone ( ) ) ;
205+ if batch. encoded_len ( ) <= max_wire_message_size {
206+ send_queue. pop_front ( ) ;
207+ } else {
208+ batch. batch . pop ( ) ;
209+ break ;
210+ }
211+ }
212+
213+ batch
214+ }
215+
156216 fn on_fully_negotiated_inbound ( & mut self , substream : Framed < Stream , PropellerCodec > ) {
157217 if self . inbound_substream . is_some ( ) {
158218 // TODO(AndrewL): Either remove this warning or make it once every N ms.
@@ -165,25 +225,128 @@ impl Handler {
165225
166226 fn on_fully_negotiated_outbound (
167227 & mut self ,
168- _fully_negotiated_outbound : FullyNegotiatedOutbound <
228+ fully_negotiated_outbound : FullyNegotiatedOutbound <
169229 <Handler as ConnectionHandler >:: OutboundProtocol ,
170230 > ,
171231 ) {
172- // TODO(AndrewL): Implement outbound substream handling
173- todo ! ( "Outbound substream handling not yet implemented" )
232+ if let OutboundSubstreamState :: Active { should_flush, .. } = & self . outbound_substream {
233+ if * should_flush {
234+ tracing:: warn!(
235+ "New outbound substream while existing substream has pending data, data may \
236+ be lost"
237+ ) ;
238+ }
239+ }
240+
241+ let substream = fully_negotiated_outbound. protocol ;
242+ self . outbound_substream = OutboundSubstreamState :: Active { substream, should_flush : false } ;
174243 }
175244
176245 fn poll_send (
177246 & mut self ,
178- _cx : & mut Context < ' _ > ,
247+ cx : & mut Context < ' _ > ,
179248 ) -> Poll <
180249 ConnectionHandlerEvent <
181250 <Handler as ConnectionHandler >:: OutboundProtocol ,
182251 ( ) ,
183252 <Handler as ConnectionHandler >:: ToBehaviour ,
184253 > ,
185254 > {
186- // TODO(AndrewL): Implement outbound message sending
255+ // Only request an outbound substream when there are messages to send.
256+ // Without this guard, a DialUpgradeError (e.g. from an unsupported peer) resets state to
257+ // Idle, and the next poll would immediately request another substream — even with an empty
258+ // queue — causing infinite negotiation churn.
259+ match & self . outbound_substream {
260+ OutboundSubstreamState :: Idle => {
261+ if self . send_queue . is_empty ( ) {
262+ return Poll :: Pending ;
263+ }
264+ self . outbound_substream = OutboundSubstreamState :: Pending ;
265+ return Poll :: Ready ( ConnectionHandlerEvent :: OutboundSubstreamRequest {
266+ protocol : SubstreamProtocol :: new ( self . listen_protocol . clone ( ) , ( ) ) ,
267+ } ) ;
268+ }
269+ OutboundSubstreamState :: Pending => {
270+ return Poll :: Pending ;
271+ }
272+ _ => { }
273+ }
274+
275+ loop {
276+ let OutboundSubstreamState :: Active { mut substream, mut should_flush } =
277+ std:: mem:: replace ( & mut self . outbound_substream , OutboundSubstreamState :: Idle )
278+ else {
279+ unreachable ! ( "outbound_substream is Active at the start of this loop" ) ;
280+ } ;
281+
282+ if self . send_queue . is_empty ( ) {
283+ // Queue is empty, maybe we just need to flush the stream
284+ if should_flush {
285+ match Sink :: poll_flush ( Pin :: new ( & mut substream) , cx) {
286+ Poll :: Ready ( Ok ( ( ) ) ) => {
287+ should_flush = false ;
288+ self . outbound_substream =
289+ OutboundSubstreamState :: Active { substream, should_flush } ;
290+ continue ;
291+ }
292+ Poll :: Ready ( Err ( e) ) => {
293+ tracing:: error!( "Failed to flush outbound stream: {e}" ) ;
294+ return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour (
295+ HandlerOut :: SendError ( e. to_string ( ) ) ,
296+ ) ) ;
297+ }
298+ Poll :: Pending => {
299+ self . outbound_substream =
300+ OutboundSubstreamState :: Active { substream, should_flush } ;
301+ break ;
302+ }
303+ }
304+ } else {
305+ self . outbound_substream =
306+ OutboundSubstreamState :: Active { substream, should_flush } ;
307+ break ;
308+ }
309+ } else {
310+ match Sink :: poll_ready ( Pin :: new ( & mut substream) , cx) {
311+ Poll :: Ready ( Ok ( ( ) ) ) => {
312+ let message = Self :: create_message_batch (
313+ & mut self . send_queue ,
314+ self . max_wire_message_size ,
315+ ) ;
316+ match Sink :: start_send ( Pin :: new ( & mut substream) , message) {
317+ Ok ( ( ) ) => {
318+ // Try sending more messages if there are any
319+ should_flush = true ;
320+ self . outbound_substream =
321+ OutboundSubstreamState :: Active { substream, should_flush } ;
322+ continue ;
323+ }
324+ Err ( e) => {
325+ // TODO(AndrewL): Units were lost, consider a re-try mechanism.
326+ tracing:: error!( "Failed to send message on outbound stream: {e}" ) ;
327+ return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour (
328+ HandlerOut :: SendError ( e. to_string ( ) ) ,
329+ ) ) ;
330+ }
331+ }
332+ }
333+ Poll :: Ready ( Err ( e) ) => {
334+ // TODO(AndrewL): Units were lost, consider a re-try mechanism.
335+ tracing:: error!( "Failed to send message on outbound stream: {e}" ) ;
336+ return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour (
337+ HandlerOut :: SendError ( e. to_string ( ) ) ,
338+ ) ) ;
339+ }
340+ Poll :: Pending => {
341+ // Not ready to send more messages yet
342+ self . outbound_substream =
343+ OutboundSubstreamState :: Active { substream, should_flush } ;
344+ break ;
345+ }
346+ }
347+ }
348+ }
349+
187350 Poll :: Pending
188351 }
189352
@@ -197,9 +360,9 @@ impl Handler {
197360 <Handler as ConnectionHandler >:: ToBehaviour ,
198361 > ,
199362 > {
200- // First, emit any queued received messages
201- if let Some ( message ) = self . receive_queue . pop_front ( ) {
202- return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour ( HandlerOut :: Unit ( message ) ) ) ;
363+ // Drain any queued events first ( received units, errors from DialUpgradeError, etc.)
364+ if let Some ( event ) = self . events_to_emit . pop_front ( ) {
365+ return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour ( event ) ) ;
203366 }
204367
205368 // Process outbound stream
@@ -210,9 +373,9 @@ impl Handler {
210373 // Poll inbound substream to receive messages
211374 self . poll_inbound_substream ( cx) ;
212375
213- // Check receive queue again after polling inbound substream
214- if let Some ( message ) = self . receive_queue . pop_front ( ) {
215- return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour ( HandlerOut :: Unit ( message ) ) ) ;
376+ // Check the queue again — poll_inbound_substream may have enqueued new events
377+ if let Some ( event ) = self . events_to_emit . pop_front ( ) {
378+ return Poll :: Ready ( ConnectionHandlerEvent :: NotifyBehaviour ( event ) ) ;
216379 }
217380
218381 Poll :: Pending
@@ -259,9 +422,25 @@ impl ConnectionHandler for Handler {
259422 ConnectionEvent :: FullyNegotiatedOutbound ( fully_negotiated_outbound) => {
260423 self . on_fully_negotiated_outbound ( fully_negotiated_outbound)
261424 }
262- _ => {
263- // TODO(AndrewL): Handle DialUpgradeError variants
425+ ConnectionEvent :: DialUpgradeError ( _) => {
426+ if !matches ! ( self . outbound_substream, OutboundSubstreamState :: Pending ) {
427+ tracing:: error!(
428+ "Dial upgrade error but no pending substream found. (File a bug report if \
429+ you see this)"
430+ ) ;
431+ }
432+
433+ // TODO(AndrewL): Handle DialUpgradeError properly. Current issues:
434+ // 1. Silent delivery failure: send_queue is not drained, so messages accumulate
435+ // with no failure signal to the behaviour.
436+ // 2. Infinite renegotiation loop: resetting to Idle while send_queue is non-empty
437+ // causes poll_send to immediately request another OutboundSubstreamRequest,
438+ // looping forever against unsupported peers.
439+ // Fix: drain send_queue, push a HandlerOut::SendError onto events_to_emit with
440+ // the dropped count, and reset to Idle.
441+ self . outbound_substream = OutboundSubstreamState :: Idle ;
264442 }
443+ _ => { }
265444 }
266445 }
267446}
0 commit comments