@@ -68,22 +68,14 @@ impl IntoConnectionHandler for Proto {
68
68
/// A pending reply to an inbound identification request.
69
69
enum Pending {
70
70
/// The reply is queued for sending.
71
- Queued ( Reply ) ,
71
+ Queued ( ReplySubstream < NegotiatedSubstream > ) ,
72
72
/// The reply is being sent.
73
73
Sending {
74
74
peer : PeerId ,
75
75
io : Pin < Box < dyn Future < Output = Result < ( ) , UpgradeError > > + Send > > ,
76
76
} ,
77
77
}
78
78
79
- /// A reply to an inbound identification request.
80
- #[ derive( Debug ) ]
81
- pub struct Reply {
82
- pub peer : PeerId ,
83
- pub io : ReplySubstream < NegotiatedSubstream > ,
84
- pub info : Info ,
85
- }
86
-
87
79
/// Protocol handler for sending and receiving identification requests.
88
80
///
89
81
/// Outbound requests are sent periodically. The handler performs expects
@@ -102,6 +94,9 @@ pub struct Handler {
102
94
> ; 4 ] ,
103
95
> ,
104
96
97
+ /// Identify request information.
98
+ info : Option < Info > ,
99
+
105
100
/// Pending replies to send.
106
101
pending_replies : VecDeque < Pending > ,
107
102
@@ -126,7 +121,7 @@ pub enum Event {
126
121
/// We actively pushed our identification information to the remote.
127
122
IdentificationPushed ,
128
123
/// We received a request for identification.
129
- Identify ( ReplySubstream < NegotiatedSubstream > ) ,
124
+ Identify ,
130
125
/// Failed to identify the remote, or to reply to an identification request.
131
126
IdentificationError ( ConnectionHandlerUpgrErr < UpgradeError > ) ,
132
127
}
@@ -137,7 +132,7 @@ pub enum InEvent {
137
132
/// Identifying information of the local node that is pushed to a remote.
138
133
Push ( Info ) ,
139
134
/// Identifying information requested from this node.
140
- Identify ( Reply ) ,
135
+ Identify ( Info ) ,
141
136
}
142
137
143
138
impl Handler {
@@ -147,6 +142,7 @@ impl Handler {
147
142
remote_peer_id,
148
143
inbound_identify_push : Default :: default ( ) ,
149
144
events : SmallVec :: new ( ) ,
145
+ info : None ,
150
146
pending_replies : VecDeque :: new ( ) ,
151
147
trigger_next_identify : Delay :: new ( initial_delay) ,
152
148
keep_alive : KeepAlive :: Yes ,
@@ -164,9 +160,22 @@ impl Handler {
164
160
> ,
165
161
) {
166
162
match output {
167
- EitherOutput :: First ( substream) => self
168
- . events
169
- . push ( ConnectionHandlerEvent :: Custom ( Event :: Identify ( substream) ) ) ,
163
+ EitherOutput :: First ( substream) => {
164
+ // If we already have `Info` we can proceed responding to the Identify request,
165
+ // if not, we request `Info` from the behaviour.
166
+ if self . info . is_none ( ) {
167
+ self . events
168
+ . push ( ConnectionHandlerEvent :: Custom ( Event :: Identify ) ) ;
169
+ }
170
+ if !self . pending_replies . is_empty ( ) {
171
+ warn ! (
172
+ "New inbound identify request from {} while a previous one \
173
+ is still pending. Queueing the new one.",
174
+ self . remote_peer_id,
175
+ ) ;
176
+ }
177
+ self . pending_replies . push_back ( Pending :: Queued ( substream) ) ;
178
+ }
170
179
EitherOutput :: Second ( fut) => {
171
180
if self . inbound_identify_push . replace ( fut) . is_some ( ) {
172
181
warn ! (
@@ -249,15 +258,8 @@ impl ConnectionHandler for Handler {
249
258
) ,
250
259
} ) ;
251
260
}
252
- InEvent :: Identify ( reply) => {
253
- if !self . pending_replies . is_empty ( ) {
254
- warn ! (
255
- "New inbound identify request from {} while a previous one \
256
- is still pending. Queueing the new one.",
257
- reply. peer,
258
- ) ;
259
- }
260
- self . pending_replies . push_back ( Pending :: Queued ( reply) ) ;
261
+ InEvent :: Identify ( info) => {
262
+ self . info = Some ( info) ;
261
263
}
262
264
}
263
265
}
@@ -301,31 +303,38 @@ impl ConnectionHandler for Handler {
301
303
}
302
304
303
305
// Check for pending replies to send.
304
- if let Some ( mut pending) = self . pending_replies . pop_front ( ) {
305
- loop {
306
- match pending {
307
- Pending :: Queued ( Reply { peer, io, info } ) => {
308
- let io = Box :: pin ( io. send ( info) ) ;
309
- pending = Pending :: Sending { peer, io } ;
310
- }
311
- Pending :: Sending { peer, mut io } => {
312
- match Future :: poll ( Pin :: new ( & mut io) , cx) {
313
- Poll :: Pending => {
314
- self . pending_replies
315
- . push_front ( Pending :: Sending { peer, io } ) ;
316
- return Poll :: Pending ;
317
- }
318
- Poll :: Ready ( Ok ( ( ) ) ) => {
319
- return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
320
- Event :: Identification ( peer) ,
321
- ) ) ;
322
- }
323
- Poll :: Ready ( Err ( err) ) => {
324
- return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
325
- Event :: IdentificationError ( ConnectionHandlerUpgrErr :: Upgrade (
326
- libp2p_core:: upgrade:: UpgradeError :: Apply ( err) ,
327
- ) ) ,
328
- ) )
306
+ if let Some ( ref info) = self . info {
307
+ if let Some ( mut pending) = self . pending_replies . pop_front ( ) {
308
+ loop {
309
+ match pending {
310
+ Pending :: Queued ( io) => {
311
+ let io = Box :: pin ( io. send ( info. clone ( ) ) ) ;
312
+ pending = Pending :: Sending {
313
+ peer : self . remote_peer_id ,
314
+ io,
315
+ } ;
316
+ }
317
+ Pending :: Sending { peer, mut io } => {
318
+ match Future :: poll ( Pin :: new ( & mut io) , cx) {
319
+ Poll :: Pending => {
320
+ self . pending_replies
321
+ . push_front ( Pending :: Sending { peer, io } ) ;
322
+ return Poll :: Pending ;
323
+ }
324
+ Poll :: Ready ( Ok ( ( ) ) ) => {
325
+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
326
+ Event :: Identification ( peer) ,
327
+ ) ) ;
328
+ }
329
+ Poll :: Ready ( Err ( err) ) => {
330
+ return Poll :: Ready ( ConnectionHandlerEvent :: Custom (
331
+ Event :: IdentificationError (
332
+ ConnectionHandlerUpgrErr :: Upgrade (
333
+ libp2p_core:: upgrade:: UpgradeError :: Apply ( err) ,
334
+ ) ,
335
+ ) ,
336
+ ) )
337
+ }
329
338
}
330
339
}
331
340
}
0 commit comments