@@ -105,6 +105,7 @@ impl UtxoFuture {
105
105
pub fn resolve < L : Deref > ( & self , graph : & NetworkGraph < L > , result : Result < TxOut , UtxoLookupError > )
106
106
where L :: Target : Logger {
107
107
let announcement = {
108
+ let mut pending_checks = graph. pending_checks . internal . lock ( ) . unwrap ( ) ;
108
109
let mut async_messages = self . state . lock ( ) . unwrap ( ) ;
109
110
110
111
if async_messages. channel_announce . is_none ( ) {
@@ -114,6 +115,12 @@ impl UtxoFuture {
114
115
async_messages. complete = Some ( result) ;
115
116
return ;
116
117
}
118
+ let announcement_msg = match async_messages. channel_announce . as_ref ( ) . unwrap ( ) {
119
+ ChannelAnnouncement :: Full ( signed_msg) => & signed_msg. contents ,
120
+ ChannelAnnouncement :: Unsigned ( msg) => & msg,
121
+ } ;
122
+
123
+ pending_checks. lookup_completed ( announcement_msg, & Arc :: downgrade ( & self . state ) ) ;
117
124
118
125
async_messages. channel_announce . take ( ) . unwrap ( )
119
126
} ;
@@ -134,13 +141,87 @@ impl UtxoFuture {
134
141
}
135
142
}
136
143
144
+ struct PendingChecksContext {
145
+ channels : HashMap < u64 , Weak < Mutex < UtxoMessages > > > ,
146
+ }
147
+
148
+ impl PendingChecksContext {
149
+ fn lookup_completed ( & mut self ,
150
+ msg : & msgs:: UnsignedChannelAnnouncement , completed_state : & Weak < Mutex < UtxoMessages > >
151
+ ) {
152
+ if let hash_map:: Entry :: Occupied ( e) = self . channels . entry ( msg. short_channel_id ) {
153
+ if Weak :: ptr_eq ( e. get ( ) , & completed_state) {
154
+ e. remove ( ) ;
155
+ }
156
+ }
157
+ }
158
+ }
159
+
137
160
/// A set of messages which are pending UTXO lookups for processing.
138
161
pub ( super ) struct PendingChecks {
162
+ internal : Mutex < PendingChecksContext > ,
139
163
}
140
164
141
165
impl PendingChecks {
142
166
pub ( super ) fn new ( ) -> Self {
143
- PendingChecks { }
167
+ PendingChecks { internal : Mutex :: new ( PendingChecksContext {
168
+ channels : HashMap :: new ( ) ,
169
+ } ) }
170
+ }
171
+
172
+ fn check_replace_previous_entry ( msg : & msgs:: UnsignedChannelAnnouncement ,
173
+ full_msg : Option < & msgs:: ChannelAnnouncement > , replacement : Option < Weak < Mutex < UtxoMessages > > > ,
174
+ pending_channels : & mut HashMap < u64 , Weak < Mutex < UtxoMessages > > >
175
+ ) -> Result < ( ) , msgs:: LightningError > {
176
+ match pending_channels. entry ( msg. short_channel_id ) {
177
+ hash_map:: Entry :: Occupied ( mut e) => {
178
+ // There's already a pending lookup for the given SCID. Check if the messages
179
+ // are the same and, if so, return immediately (don't bother spawning another
180
+ // lookup if we haven't gotten that far yet).
181
+ match Weak :: upgrade ( & e. get ( ) ) {
182
+ Some ( pending_msgs) => {
183
+ let pending_matches = match & pending_msgs. lock ( ) . unwrap ( ) . channel_announce {
184
+ Some ( ChannelAnnouncement :: Full ( pending_msg) ) => Some ( pending_msg) == full_msg,
185
+ Some ( ChannelAnnouncement :: Unsigned ( pending_msg) ) => pending_msg == msg,
186
+ None => {
187
+ // This shouldn't actually be reachable. We set the
188
+ // `channel_announce` field under the same lock as setting the
189
+ // channel map entry. Still, we can just treat it as
190
+ // non-matching and let the new request fly.
191
+ debug_assert ! ( false ) ;
192
+ false
193
+ } ,
194
+ } ;
195
+ if pending_matches {
196
+ return Err ( LightningError {
197
+ err : "Channel announcement is already being checked" . to_owned ( ) ,
198
+ action : ErrorAction :: IgnoreDuplicateGossip ,
199
+ } ) ;
200
+ } else {
201
+ // The earlier lookup is a different message. If we have another
202
+ // request in-flight now replace the original.
203
+ // Note that in the replace case whether to replace is somewhat
204
+ // arbitrary - both results will be handled, we're just updating the
205
+ // value that will be compared to future lookups with the same SCID.
206
+ if let Some ( item) = replacement {
207
+ * e. get_mut ( ) = item;
208
+ }
209
+ }
210
+ } ,
211
+ None => {
212
+ // The earlier lookup already resolved. We can't be sure its the same
213
+ // so just remove/replace it and move on.
214
+ if let Some ( item) = replacement {
215
+ * e. get_mut ( ) = item;
216
+ } else { e. remove ( ) ; }
217
+ } ,
218
+ }
219
+ } ,
220
+ hash_map:: Entry :: Vacant ( v) => {
221
+ if let Some ( item) = replacement { v. insert ( item) ; }
222
+ } ,
223
+ }
224
+ Ok ( ( ) )
144
225
}
145
226
146
227
pub ( super ) fn check_channel_announcement < U : Deref > ( & self ,
@@ -177,6 +258,9 @@ impl PendingChecks {
177
258
}
178
259
} ;
179
260
261
+ Self :: check_replace_previous_entry ( msg, full_msg, None ,
262
+ & mut self . internal . lock ( ) . unwrap ( ) . channels ) ?;
263
+
180
264
match utxo_lookup {
181
265
& None => {
182
266
// Tentatively accept, potentially exposing us to DoS attacks
@@ -186,12 +270,15 @@ impl PendingChecks {
186
270
match utxo_lookup. get_utxo ( & msg. chain_hash , msg. short_channel_id ) {
187
271
UtxoResult :: Sync ( res) => handle_result ( res) ,
188
272
UtxoResult :: Async ( future) => {
273
+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
189
274
let mut async_messages = future. state . lock ( ) . unwrap ( ) ;
190
275
if let Some ( res) = async_messages. complete . take ( ) {
191
276
// In the unlikely event the future resolved before we managed to get it,
192
277
// handle the result in-line.
193
278
handle_result ( res)
194
279
} else {
280
+ Self :: check_replace_previous_entry ( msg, full_msg,
281
+ Some ( Arc :: downgrade ( & future. state ) ) , & mut pending_checks. channels ) ?;
195
282
async_messages. channel_announce = Some (
196
283
if let Some ( msg) = full_msg { ChannelAnnouncement :: Full ( msg. clone ( ) ) }
197
284
else { ChannelAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
0 commit comments