Skip to content

Commit 7d695a4

Browse files
apollo_propeller: implement outbound substream with message batching
1 parent ed6cb21 commit 7d695a4

File tree

3 files changed

+176
-8
lines changed

3 files changed

+176
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_propeller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ apollo_protobuf.workspace = true
1414
asynchronous-codec.workspace = true
1515
futures.workspace = true
1616
libp2p.workspace = true
17+
prost.workspace = true
1718
reed-solomon-simd.workspace = true
1819
sha2.workspace = true
1920
thiserror.workspace = true

crates/apollo_propeller/src/handler.rs

Lines changed: 174 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use libp2p::swarm::handler::{
1616
FullyNegotiatedOutbound,
1717
};
1818
use libp2p::swarm::{Stream, StreamProtocol, SubstreamProtocol};
19+
use prost::Message;
20+
use tracing::warn;
1921

2022
use crate::protocol::{PropellerCodec, PropellerProtocol};
2123
use crate::PropellerUnit;
@@ -37,34 +39,62 @@ pub enum HandlerIn {
3739
}
3840

3941
/// Protocol Handler that manages substreams with a peer.
42+
///
43+
/// We use separate unidirectional substreams: outbound for sending and inbound for receiving.
44+
// TODO(AndrewL): Add this to the specs.
4045
pub struct Handler {
4146
/// Upgrade configuration for the propeller protocol.
4247
listen_protocol: PropellerProtocol,
4348
/// The single long-lived inbound substream.
4449
inbound_substream: Option<InboundSubstreamState>,
50+
/// The single long-lived outbound substream.
51+
outbound_substream: OutboundSubstreamState,
4552
/// Queue of messages to send.
4653
send_queue: VecDeque<ProtoUnit>,
4754
/// Queue of received messages to emit.
4855
receive_queue: VecDeque<PropellerUnit>,
56+
/// Maximum wire message size for batching.
57+
max_wire_message_size: usize,
4958
}
5059

51-
/// State of the inbound substream, opened either by us or by the remote.
60+
/// State of the inbound substream, opened by the remote peer.
5261
enum InboundSubstreamState {
5362
/// Waiting for a message from the remote. The idle state for an inbound substream.
5463
WaitingInput(Framed<Stream, PropellerCodec>),
5564
/// The substream is being closed.
5665
Closing(Framed<Stream, PropellerCodec>),
5766
}
5867

68+
/// State of the outbound substream, opened by us.
69+
enum OutboundSubstreamState {
70+
/// No substream exists and no request is pending.
71+
Idle,
72+
/// A substream request has been sent but not yet negotiated.
73+
Pending,
74+
/// The substream is active and ready to send messages.
75+
Active {
76+
substream: Framed<Stream, PropellerCodec>,
77+
/// True if we've sent data and need to flush the stream.
78+
///
79+
/// In this implementation we use flush to ensure that the data was actually sent. Without
80+
/// flushing periodically we cannot be certain what was sent and what failed when a failure
81+
/// occurs.
82+
// TODO(AndrewL): Discuss not flushing
83+
should_flush: bool,
84+
},
85+
}
86+
5987
impl Handler {
6088
/// Builds a new [`Handler`].
6189
pub fn new(stream_protocol: StreamProtocol, max_wire_message_size: usize) -> Self {
6290
let protocol = PropellerProtocol::new(stream_protocol, max_wire_message_size);
6391
Handler {
6492
listen_protocol: protocol,
6593
inbound_substream: None,
94+
outbound_substream: OutboundSubstreamState::Idle,
6695
send_queue: VecDeque::new(),
6796
receive_queue: VecDeque::new(),
97+
max_wire_message_size,
6898
}
6999
}
70100

@@ -153,6 +183,33 @@ impl Handler {
153183
}
154184
}
155185

186+
/// Create a batch of messages from the send queue that fits within max_wire_message_size.
187+
fn create_message_batch(
188+
send_queue: &mut VecDeque<ProtoUnit>,
189+
max_wire_message_size: usize,
190+
) -> ProtoBatch {
191+
if send_queue.is_empty() {
192+
return ProtoBatch { batch: Vec::new() };
193+
}
194+
195+
let mut batch = ProtoBatch { batch: vec![send_queue.pop_front().unwrap()] };
196+
if batch.encoded_len() >= max_wire_message_size {
197+
warn!("Propeller unit size exceeds max wire message size, sending will fail");
198+
}
199+
200+
while let Some(msg) = send_queue.front() {
201+
batch.batch.push(msg.clone());
202+
if batch.encoded_len() < max_wire_message_size {
203+
send_queue.pop_front();
204+
} else {
205+
batch.batch.pop();
206+
break;
207+
}
208+
}
209+
210+
batch
211+
}
212+
156213
fn on_fully_negotiated_inbound(&mut self, substream: Framed<Stream, PropellerCodec>) {
157214
if self.inbound_substream.is_some() {
158215
// TODO(AndrewL): Either remove this warning or make it once every N ms.
@@ -165,25 +222,125 @@ impl Handler {
165222

166223
fn on_fully_negotiated_outbound(
167224
&mut self,
168-
_fully_negotiated_outbound: FullyNegotiatedOutbound<
225+
fully_negotiated_outbound: FullyNegotiatedOutbound<
169226
<Handler as ConnectionHandler>::OutboundProtocol,
170227
>,
171228
) {
172-
// TODO(AndrewL): Implement outbound substream handling
173-
todo!("Outbound substream handling not yet implemented")
229+
if let OutboundSubstreamState::Active { should_flush, .. } = &self.outbound_substream {
230+
if *should_flush {
231+
tracing::warn!(
232+
"New outbound substream while existing substream has pending data, data may \
233+
be lost"
234+
);
235+
}
236+
}
237+
238+
let substream = fully_negotiated_outbound.protocol;
239+
self.outbound_substream = OutboundSubstreamState::Active { substream, should_flush: false };
174240
}
175241

176242
fn poll_send(
177243
&mut self,
178-
_cx: &mut Context<'_>,
244+
cx: &mut Context<'_>,
179245
) -> Poll<
180246
ConnectionHandlerEvent<
181247
<Handler as ConnectionHandler>::OutboundProtocol,
182248
(),
183249
<Handler as ConnectionHandler>::ToBehaviour,
184250
>,
185251
> {
186-
// TODO(AndrewL): Implement outbound message sending
252+
// No need to optimize requesting a sub-stream only when there are units to send since the
253+
// streams are long-lived (We only open a sub-stream twice per connection assuming no errors
254+
// occur).
255+
// If we don't have an outbound substream and haven't requested one, request one
256+
match &self.outbound_substream {
257+
OutboundSubstreamState::Idle => {
258+
self.outbound_substream = OutboundSubstreamState::Pending;
259+
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
260+
protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
261+
});
262+
}
263+
OutboundSubstreamState::Pending => {
264+
return Poll::Pending;
265+
}
266+
_ => {}
267+
}
268+
269+
loop {
270+
let OutboundSubstreamState::Active { mut substream, mut should_flush } =
271+
std::mem::replace(&mut self.outbound_substream, OutboundSubstreamState::Idle)
272+
else {
273+
unreachable!("outbound_substream is Active at the start of this loop");
274+
};
275+
276+
if self.send_queue.is_empty() {
277+
// Queue is empty, maybe we just need to flush the stream
278+
if should_flush {
279+
match Sink::poll_flush(Pin::new(&mut substream), cx) {
280+
Poll::Ready(Ok(())) => {
281+
should_flush = false;
282+
self.outbound_substream =
283+
OutboundSubstreamState::Active { substream, should_flush };
284+
continue;
285+
}
286+
Poll::Ready(Err(e)) => {
287+
tracing::error!("Failed to flush outbound stream: {e}");
288+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
289+
HandlerOut::SendError(e.to_string()),
290+
));
291+
}
292+
Poll::Pending => {
293+
self.outbound_substream =
294+
OutboundSubstreamState::Active { substream, should_flush };
295+
break;
296+
}
297+
}
298+
} else {
299+
self.outbound_substream =
300+
OutboundSubstreamState::Active { substream, should_flush };
301+
break;
302+
}
303+
} else {
304+
match Sink::poll_ready(Pin::new(&mut substream), cx) {
305+
Poll::Ready(Ok(())) => {
306+
let message = Self::create_message_batch(
307+
&mut self.send_queue,
308+
self.max_wire_message_size,
309+
);
310+
match Sink::start_send(Pin::new(&mut substream), message) {
311+
Ok(()) => {
312+
// Try sending more messages if there are any
313+
should_flush = true;
314+
self.outbound_substream =
315+
OutboundSubstreamState::Active { substream, should_flush };
316+
continue;
317+
}
318+
Err(e) => {
319+
// TODO(AndrewL): Units were lost, consider a re-try mechanism.
320+
tracing::error!("Failed to send message on outbound stream: {e}");
321+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
322+
HandlerOut::SendError(e.to_string()),
323+
));
324+
}
325+
}
326+
}
327+
Poll::Ready(Err(e)) => {
328+
// TODO(AndrewL): Units were lost, consider a re-try mechanism.
329+
tracing::error!("Failed to send message on outbound stream: {e}");
330+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
331+
HandlerOut::SendError(e.to_string()),
332+
));
333+
}
334+
Poll::Pending => {
335+
// Not ready to send more messages yet
336+
self.outbound_substream =
337+
OutboundSubstreamState::Active { substream, should_flush };
338+
break;
339+
}
340+
}
341+
}
342+
}
343+
187344
Poll::Pending
188345
}
189346

@@ -259,9 +416,18 @@ impl ConnectionHandler for Handler {
259416
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
260417
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
261418
}
262-
_ => {
263-
// TODO(AndrewL): Handle DialUpgradeError variants
419+
ConnectionEvent::DialUpgradeError(_) => {
420+
// Reset to Idle so we can request a new substream
421+
if matches!(self.outbound_substream, OutboundSubstreamState::Pending) {
422+
self.outbound_substream = OutboundSubstreamState::Idle;
423+
} else {
424+
tracing::error!(
425+
"Dial upgrade error but no pending substream found. (File a bug report if \
426+
you see this)"
427+
);
428+
}
264429
}
430+
_ => {}
265431
}
266432
}
267433
}

0 commit comments

Comments
 (0)