Skip to content

Commit 27fc3d4

Browse files
apollo_propeller: implement outbound substream with message batching
1 parent f0cd693 commit 27fc3d4

File tree

3 files changed

+199
-18
lines changed

3 files changed

+199
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_propeller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ apollo_protobuf.workspace = true
1414
asynchronous-codec.workspace = true
1515
futures.workspace = true
1616
libp2p.workspace = true
17+
prost.workspace = true
1718
reed-solomon-simd.workspace = true
1819
sha2.workspace = true
1920
thiserror.workspace = true

crates/apollo_propeller/src/handler.rs

Lines changed: 197 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use libp2p::swarm::handler::{
1616
FullyNegotiatedOutbound,
1717
};
1818
use libp2p::swarm::{Stream, StreamProtocol, SubstreamProtocol};
19+
use prost::Message;
20+
use tracing::warn;
1921

2022
use crate::protocol::{PropellerCodec, PropellerProtocol};
2123
use crate::PropellerUnit;
@@ -37,34 +39,65 @@ pub enum HandlerIn {
3739
}
3840

3941
/// Protocol Handler that manages substreams with a peer.
42+
///
43+
/// We use separate unidirectional substreams: outbound for sending and inbound for receiving.
44+
// TODO(AndrewL): Add this to the specs.
4045
pub struct Handler {
4146
/// Upgrade configuration for the propeller protocol.
4247
listen_protocol: PropellerProtocol,
4348
/// The single long-lived inbound substream.
4449
inbound_substream: Option<InboundSubstreamState>,
50+
/// The single long-lived outbound substream.
51+
outbound_substream: OutboundSubstreamState,
4552
/// Queue of messages to send.
4653
send_queue: VecDeque<ProtoUnit>,
47-
/// Queue of received messages to emit.
48-
receive_queue: VecDeque<PropellerUnit>,
54+
/// Queue of events to emit to the behaviour (received units, errors, etc.).
55+
///
56+
/// Events are pushed here from various sources (inbound messages, connection errors) and
57+
/// drained as the highest priority in `poll_inner`.
58+
events_to_emit: VecDeque<HandlerOut>,
59+
/// Maximum wire message size for batching.
60+
max_wire_message_size: usize,
4961
}
5062

51-
/// State of the inbound substream, opened either by us or by the remote.
63+
/// State of the inbound substream, opened by the remote peer.
5264
enum InboundSubstreamState {
5365
/// Waiting for a message from the remote. The idle state for an inbound substream.
5466
WaitingInput(Framed<Stream, PropellerCodec>),
5567
/// The substream is being closed.
5668
Closing(Framed<Stream, PropellerCodec>),
5769
}
5870

71+
/// State of the outbound substream, opened by us.
72+
enum OutboundSubstreamState {
73+
/// No substream exists and no request is pending.
74+
Idle,
75+
/// A substream request has been sent but not yet negotiated.
76+
Pending,
77+
/// The substream is active and ready to send messages.
78+
Active {
79+
substream: Framed<Stream, PropellerCodec>,
80+
/// True if we've sent data and need to flush the stream.
81+
///
82+
/// In this implementation we use flush to ensure that the data was actually sent. Without
83+
/// flushing periodically we cannot be certain what was sent and what failed when a failure
84+
/// occurs.
85+
// TODO(AndrewL): Discuss not flushing
86+
should_flush: bool,
87+
},
88+
}
89+
5990
impl Handler {
6091
/// Builds a new [`Handler`].
6192
pub fn new(stream_protocol: StreamProtocol, max_wire_message_size: usize) -> Self {
6293
let protocol = PropellerProtocol::new(stream_protocol, max_wire_message_size);
6394
Handler {
6495
listen_protocol: protocol,
6596
inbound_substream: None,
97+
outbound_substream: OutboundSubstreamState::Idle,
6698
send_queue: VecDeque::new(),
67-
receive_queue: VecDeque::new(),
99+
events_to_emit: VecDeque::new(),
100+
max_wire_message_size,
68101
}
69102
}
70103

@@ -143,7 +176,7 @@ impl Handler {
143176
for proto_unit in batch.batch {
144177
match PropellerUnit::try_from(proto_unit) {
145178
Ok(unit) => {
146-
self.receive_queue.push_back(unit);
179+
self.events_to_emit.push_back(HandlerOut::Unit(unit));
147180
}
148181
Err(e) => {
149182
// TODO(AndrewL): Either remove this warning or make it once every N ms.
@@ -153,6 +186,33 @@ impl Handler {
153186
}
154187
}
155188

189+
/// Create a batch of messages from the send queue that fits within max_wire_message_size.
190+
fn create_message_batch(
191+
send_queue: &mut VecDeque<ProtoUnit>,
192+
max_wire_message_size: usize,
193+
) -> ProtoBatch {
194+
if send_queue.is_empty() {
195+
return ProtoBatch { batch: Vec::new() };
196+
}
197+
198+
let mut batch = ProtoBatch { batch: vec![send_queue.pop_front().unwrap()] };
199+
if batch.encoded_len() > max_wire_message_size {
200+
warn!("Propeller unit size exceeds max wire message size, sending will fail");
201+
}
202+
203+
while let Some(msg) = send_queue.front() {
204+
batch.batch.push(msg.clone());
205+
if batch.encoded_len() <= max_wire_message_size {
206+
send_queue.pop_front();
207+
} else {
208+
batch.batch.pop();
209+
break;
210+
}
211+
}
212+
213+
batch
214+
}
215+
156216
fn on_fully_negotiated_inbound(&mut self, substream: Framed<Stream, PropellerCodec>) {
157217
if self.inbound_substream.is_some() {
158218
// TODO(AndrewL): Either remove this warning or make it once every N ms.
@@ -165,25 +225,128 @@ impl Handler {
165225

166226
fn on_fully_negotiated_outbound(
167227
&mut self,
168-
_fully_negotiated_outbound: FullyNegotiatedOutbound<
228+
fully_negotiated_outbound: FullyNegotiatedOutbound<
169229
<Handler as ConnectionHandler>::OutboundProtocol,
170230
>,
171231
) {
172-
// TODO(AndrewL): Implement outbound substream handling
173-
todo!("Outbound substream handling not yet implemented")
232+
if let OutboundSubstreamState::Active { should_flush, .. } = &self.outbound_substream {
233+
if *should_flush {
234+
tracing::warn!(
235+
"New outbound substream while existing substream has pending data, data may \
236+
be lost"
237+
);
238+
}
239+
}
240+
241+
let substream = fully_negotiated_outbound.protocol;
242+
self.outbound_substream = OutboundSubstreamState::Active { substream, should_flush: false };
174243
}
175244

176245
fn poll_send(
177246
&mut self,
178-
_cx: &mut Context<'_>,
247+
cx: &mut Context<'_>,
179248
) -> Poll<
180249
ConnectionHandlerEvent<
181250
<Handler as ConnectionHandler>::OutboundProtocol,
182251
(),
183252
<Handler as ConnectionHandler>::ToBehaviour,
184253
>,
185254
> {
186-
// TODO(AndrewL): Implement outbound message sending
255+
// Only request an outbound substream when there are messages to send.
256+
// Without this guard, a DialUpgradeError (e.g. from an unsupported peer) resets state to
257+
// Idle, and the next poll would immediately request another substream — even with an empty
258+
// queue — causing infinite negotiation churn.
259+
match &self.outbound_substream {
260+
OutboundSubstreamState::Idle => {
261+
if self.send_queue.is_empty() {
262+
return Poll::Pending;
263+
}
264+
self.outbound_substream = OutboundSubstreamState::Pending;
265+
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
266+
protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
267+
});
268+
}
269+
OutboundSubstreamState::Pending => {
270+
return Poll::Pending;
271+
}
272+
_ => {}
273+
}
274+
275+
loop {
276+
let OutboundSubstreamState::Active { mut substream, mut should_flush } =
277+
std::mem::replace(&mut self.outbound_substream, OutboundSubstreamState::Idle)
278+
else {
279+
unreachable!("outbound_substream is Active at the start of this loop");
280+
};
281+
282+
if self.send_queue.is_empty() {
283+
// Queue is empty, maybe we just need to flush the stream
284+
if should_flush {
285+
match Sink::poll_flush(Pin::new(&mut substream), cx) {
286+
Poll::Ready(Ok(())) => {
287+
should_flush = false;
288+
self.outbound_substream =
289+
OutboundSubstreamState::Active { substream, should_flush };
290+
continue;
291+
}
292+
Poll::Ready(Err(e)) => {
293+
tracing::error!("Failed to flush outbound stream: {e}");
294+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
295+
HandlerOut::SendError(e.to_string()),
296+
));
297+
}
298+
Poll::Pending => {
299+
self.outbound_substream =
300+
OutboundSubstreamState::Active { substream, should_flush };
301+
break;
302+
}
303+
}
304+
} else {
305+
self.outbound_substream =
306+
OutboundSubstreamState::Active { substream, should_flush };
307+
break;
308+
}
309+
} else {
310+
match Sink::poll_ready(Pin::new(&mut substream), cx) {
311+
Poll::Ready(Ok(())) => {
312+
let message = Self::create_message_batch(
313+
&mut self.send_queue,
314+
self.max_wire_message_size,
315+
);
316+
match Sink::start_send(Pin::new(&mut substream), message) {
317+
Ok(()) => {
318+
// Try sending more messages if there are any
319+
should_flush = true;
320+
self.outbound_substream =
321+
OutboundSubstreamState::Active { substream, should_flush };
322+
continue;
323+
}
324+
Err(e) => {
325+
// TODO(AndrewL): Units were lost, consider a re-try mechanism.
326+
tracing::error!("Failed to send message on outbound stream: {e}");
327+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
328+
HandlerOut::SendError(e.to_string()),
329+
));
330+
}
331+
}
332+
}
333+
Poll::Ready(Err(e)) => {
334+
// TODO(AndrewL): Units were lost, consider a re-try mechanism.
335+
tracing::error!("Failed to send message on outbound stream: {e}");
336+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
337+
HandlerOut::SendError(e.to_string()),
338+
));
339+
}
340+
Poll::Pending => {
341+
// Not ready to send more messages yet
342+
self.outbound_substream =
343+
OutboundSubstreamState::Active { substream, should_flush };
344+
break;
345+
}
346+
}
347+
}
348+
}
349+
187350
Poll::Pending
188351
}
189352

@@ -197,9 +360,9 @@ impl Handler {
197360
<Handler as ConnectionHandler>::ToBehaviour,
198361
>,
199362
> {
200-
// First, emit any queued received messages
201-
if let Some(message) = self.receive_queue.pop_front() {
202-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(HandlerOut::Unit(message)));
363+
// Drain any queued events first (received units, errors from DialUpgradeError, etc.)
364+
if let Some(event) = self.events_to_emit.pop_front() {
365+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
203366
}
204367

205368
// Process outbound stream
@@ -210,9 +373,9 @@ impl Handler {
210373
// Poll inbound substream to receive messages
211374
self.poll_inbound_substream(cx);
212375

213-
// Check receive queue again after polling inbound substream
214-
if let Some(message) = self.receive_queue.pop_front() {
215-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(HandlerOut::Unit(message)));
376+
// Check the queue again — poll_inbound_substream may have enqueued new events
377+
if let Some(event) = self.events_to_emit.pop_front() {
378+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
216379
}
217380

218381
Poll::Pending
@@ -259,9 +422,25 @@ impl ConnectionHandler for Handler {
259422
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
260423
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
261424
}
262-
_ => {
263-
// TODO(AndrewL): Handle DialUpgradeError variants
425+
ConnectionEvent::DialUpgradeError(_) => {
426+
if !matches!(self.outbound_substream, OutboundSubstreamState::Pending) {
427+
tracing::error!(
428+
"Dial upgrade error but no pending substream found. (File a bug report if \
429+
you see this)"
430+
);
431+
}
432+
433+
// TODO(AndrewL): Handle DialUpgradeError properly. Current issues:
434+
// 1. Silent delivery failure: send_queue is not drained, so messages accumulate
435+
// with no failure signal to the behaviour.
436+
// 2. Infinite renegotiation loop: resetting to Idle while send_queue is non-empty
437+
// causes poll_send to immediately request another OutboundSubstreamRequest,
438+
// looping forever against unsupported peers.
439+
// Fix: drain send_queue, push a HandlerOut::SendError onto events_to_emit with
440+
// the dropped count, and reset to Idle.
441+
self.outbound_substream = OutboundSubstreamState::Idle;
264442
}
443+
_ => {}
265444
}
266445
}
267446
}

0 commit comments

Comments
 (0)