Skip to content

Commit 8ddd20d

Browse files
committed
refactor: remove async from send_warning method
1 parent 9c10396 commit 8ddd20d

File tree

5 files changed

+76
-116
lines changed

5 files changed

+76
-116
lines changed

src/chain/chain.rs

Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,7 @@ impl<H: HeaderStore> Chain<H> {
137137
warning: format!(
138138
"Unexpected error fetching a header from the header store at height {height}"
139139
),
140-
})
141-
.await;
140+
});
142141
}
143142
header_opt.map_err(HeaderPersistenceError::Database)
144143
}
@@ -249,11 +248,9 @@ impl<H: HeaderStore> Chain<H> {
249248
.write(self.header_chain.headers())
250249
.await
251250
{
252-
self.dialog
253-
.send_warning(Warning::FailedPersistance {
254-
warning: format!("Could not save headers to disk: {e}"),
255-
})
256-
.await;
251+
self.dialog.send_warning(Warning::FailedPersistance {
252+
warning: format!("Could not save headers to disk: {e}"),
253+
});
257254
}
258255
}
259256

@@ -266,11 +263,9 @@ impl<H: HeaderStore> Chain<H> {
266263
.write_over(self.header_chain.headers(), height)
267264
.await
268265
{
269-
self.dialog
270-
.send_warning(Warning::FailedPersistance {
271-
warning: format!("Could not save headers to disk: {e}"),
272-
})
273-
.await;
266+
self.dialog.send_warning(Warning::FailedPersistance {
267+
warning: format!("Could not save headers to disk: {e}"),
268+
});
274269
}
275270
}
276271

@@ -285,15 +280,15 @@ impl<H: HeaderStore> Chain<H> {
285280
.map_err(HeaderPersistenceError::Database)?;
286281
if let Some(first) = loaded_headers.values().next() {
287282
if first.prev_blockhash.ne(&self.tip()) {
288-
self.dialog.send_warning(Warning::UnlinkableAnchor).await;
283+
self.dialog.send_warning(Warning::UnlinkableAnchor);
289284
// The header chain did not align, so just start from the anchor
290285
return Err(HeaderPersistenceError::CannotLocateHistory);
291286
} else if loaded_headers
292287
.iter()
293288
.zip(loaded_headers.iter().skip(1))
294289
.any(|(first, second)| first.1.block_hash().ne(&second.1.prev_blockhash))
295290
{
296-
self.dialog.send_warning(Warning::CorruptedHeaders).await;
291+
self.dialog.send_warning(Warning::CorruptedHeaders);
297292
return Err(HeaderPersistenceError::HeadersDoNotLink);
298293
}
299294
loaded_headers.iter().for_each(|header| {
@@ -418,8 +413,7 @@ impl<H: HeaderStore> Chain<H> {
418413
self.dialog
419414
.send_warning(
420415
Warning::UnexpectedSyncError { warning: "Unmatched checkpoint sent by a peer. Restarting header sync with new peers.".into() }
421-
)
422-
.await;
416+
);
423417
return Err(HeaderSyncError::InvalidCheckpoint);
424418
}
425419
}
@@ -432,7 +426,7 @@ impl<H: HeaderStore> Chain<H> {
432426
// we only accept it if there is more work provided. otherwise, we disconnect the peer sending
433427
// us this fork
434428
async fn evaluate_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> {
435-
self.dialog.send_warning(Warning::EvaluatingFork).await;
429+
self.dialog.send_warning(Warning::EvaluatingFork);
436430
// We only care about the headers these two chains do not have in common
437431
let uncommon: Vec<Header> = header_batch
438432
.inner()
@@ -472,11 +466,9 @@ impl<H: HeaderStore> Chain<H> {
472466
self.flush_over_height(stem).await;
473467
Ok(())
474468
} else {
475-
self.dialog
476-
.send_warning(Warning::UnexpectedSyncError {
477-
warning: "Peer sent us a fork with less work than the current chain".into(),
478-
})
479-
.await;
469+
self.dialog.send_warning(Warning::UnexpectedSyncError {
470+
warning: "Peer sent us a fork with less work than the current chain".into(),
471+
});
480472
Err(HeaderSyncError::LessWorkFork)
481473
}
482474
} else {
@@ -515,8 +507,7 @@ impl<H: HeaderStore> Chain<H> {
515507
warning:
516508
"The remote peer miscalculated the difficulty adjustment when syncing a batch of headers"
517509
.into(),
518-
})
519-
.await;
510+
});
520511
return Err(HeaderSyncError::MiscalculatedDifficulty);
521512
}
522513
}
@@ -558,8 +549,7 @@ impl<H: HeaderStore> Chain<H> {
558549
warning:
559550
"The remote peer miscalculated the difficulty adjustment when syncing a batch of headers"
560551
.into(),
561-
})
562-
.await;
552+
});
563553
return Err(HeaderSyncError::MiscalculatedDifficulty);
564554
}
565555
}
@@ -577,13 +567,10 @@ impl<H: HeaderStore> Chain<H> {
577567
}
578568
},
579569
None => {
580-
self.dialog
581-
.send_warning(Warning::UnexpectedSyncError {
582-
warning:
583-
"Unable to audit the difficulty adjustment due to an index overflow"
584-
.into(),
585-
})
586-
.await;
570+
self.dialog.send_warning(Warning::UnexpectedSyncError {
571+
warning: "Unable to audit the difficulty adjustment due to an index overflow"
572+
.into(),
573+
});
587574
}
588575
}
589576
Ok(())
@@ -853,7 +840,7 @@ impl<H: HeaderStore> Chain<H> {
853840
Some(sender) => {
854841
let send_result = sender.send(Ok(IndexedBlock::new(height, block)));
855842
if send_result.is_err() {
856-
self.dialog.send_warning(Warning::ChannelDropped).await
843+
self.dialog.send_warning(Warning::ChannelDropped)
857844
};
858845
}
859846
None => {
@@ -885,7 +872,7 @@ impl<H: HeaderStore> Chain<H> {
885872
if height_opt.is_none() {
886873
let err_reponse = request.oneshot.send(Err(FetchBlockError::UnknownHash));
887874
if err_reponse.is_err() {
888-
self.dialog.send_warning(Warning::ChannelDropped).await;
875+
self.dialog.send_warning(Warning::ChannelDropped);
889876
}
890877
} else {
891878
self.dialog
@@ -902,12 +889,10 @@ impl<H: HeaderStore> Chain<H> {
902889
let mut db = self.db.lock().await;
903890
let range_opt = db.load(range).await;
904891
if range_opt.is_err() {
905-
self.dialog
906-
.send_warning(Warning::FailedPersistance {
907-
warning: "Unexpected error fetching a range of headers from the header store"
908-
.to_string(),
909-
})
910-
.await;
892+
self.dialog.send_warning(Warning::FailedPersistance {
893+
warning: "Unexpected error fetching a range of headers from the header store"
894+
.to_string(),
895+
});
911896
}
912897
range_opt.map_err(HeaderPersistenceError::Database)
913898
}

src/core/dialog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Dialog {
4848
let _ = self.log_tx.send(Log::Dialog(message)).await;
4949
}
5050

51-
pub(crate) async fn send_warning(&self, warning: Warning) {
51+
pub(crate) fn send_warning(&self, warning: Warning) {
5252
let _ = self.warn_tx.send(warning);
5353
}
5454

src/core/node.rs

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
261261
}
262262
PeerMessage::Reject(payload) => {
263263
self.dialog
264-
.send_warning(Warning::TransactionRejected(payload)).await;
264+
.send_warning(Warning::TransactionRejected(payload));
265265
}
266266
PeerMessage::FeeFilter(feerate) => {
267267
let mut peer_map = self.peer_map.lock().await;
@@ -312,23 +312,23 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
312312
let header_opt = chain.fetch_header(request.height).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }).and_then(|opt| opt.ok_or(FetchHeaderError::UnknownHeight));
313313
let send_result = request.oneshot.send(header_opt);
314314
if send_result.is_err() {
315-
self.dialog.send_warning(Warning::ChannelDropped).await
315+
self.dialog.send_warning(Warning::ChannelDropped)
316316
};
317317
},
318318
ClientMessage::GetHeaderBatch(request) => {
319319
let chain = self.chain.lock().await;
320320
let range_opt = chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() });
321321
let send_result = request.oneshot.send(range_opt);
322322
if send_result.is_err() {
323-
self.dialog.send_warning(Warning::ChannelDropped).await
323+
self.dialog.send_warning(Warning::ChannelDropped);
324324
};
325325
},
326326
ClientMessage::GetBroadcastMinFeeRate(request) => {
327327
let peer_map = self.peer_map.lock().await;
328328
let fee_rate = peer_map.broadcast_min();
329329
let send_result = request.send(fee_rate);
330330
if send_result.is_err() {
331-
self.dialog.send_warning(Warning::ChannelDropped).await
331+
self.dialog.send_warning(Warning::ChannelDropped);
332332
};
333333
}
334334
ClientMessage::NoOp => (),
@@ -363,12 +363,10 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
363363
peer_map.clean().await;
364364
// Find more peers when lower than the desired threshold.
365365
if peer_map.live() < self.next_required_peers().await {
366-
self.dialog
367-
.send_warning(Warning::NotEnoughConnections)
368-
.await;
366+
self.dialog.send_warning(Warning::NotEnoughConnections);
369367
let address = peer_map.next_peer().await?;
370368
if peer_map.dispatch(address).await.is_err() {
371-
self.dialog.send_warning(Warning::CouldNotConnect).await;
369+
self.dialog.send_warning(Warning::CouldNotConnect);
372370
}
373371
}
374372
Ok(())
@@ -419,8 +417,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
419417
self.dialog.send_info(Log::TxSent(txid)).await;
420418
} else {
421419
self.dialog
422-
.send_warning(Warning::TransactionRejected(RejectPayload::from_txid(txid)))
423-
.await;
420+
.send_warning(Warning::TransactionRejected(RejectPayload::from_txid(txid)));
424421
}
425422
}
426423
}
@@ -474,7 +471,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
474471
}
475472
NodeState::TransactionsSynced => {
476473
if last_block.stale() {
477-
self.dialog.send_warning(Warning::PotentialStaleTip).await;
474+
self.dialog.send_warning(Warning::PotentialStaleTip);
478475
self.dialog
479476
.send_dialog("Disconnecting from remote nodes to find new connections")
480477
.await;
@@ -531,7 +528,7 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
531528
if !version_message.services.has(ServiceFlags::COMPACT_FILTERS)
532529
|| !version_message.services.has(ServiceFlags::NETWORK)
533530
{
534-
self.dialog.send_warning(Warning::NoCompactFilters).await;
531+
self.dialog.send_warning(Warning::NoCompactFilters);
535532
return Ok(MainThreadMessage::Disconnect);
536533
}
537534
}
@@ -599,19 +596,15 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
599596
return self.next_stateful_message(chain.deref_mut()).await;
600597
}
601598
HeaderSyncError::LessWorkFork => {
602-
self.dialog
603-
.send_warning(Warning::UnexpectedSyncError {
604-
warning: "A peer sent us a fork with less work.".into(),
605-
})
606-
.await;
599+
self.dialog.send_warning(Warning::UnexpectedSyncError {
600+
warning: "A peer sent us a fork with less work.".into(),
601+
});
607602
return Some(MainThreadMessage::Disconnect);
608603
}
609604
_ => {
610-
self.dialog
611-
.send_warning(Warning::UnexpectedSyncError {
612-
warning: format!("Unexpected header syncing error: {}", e),
613-
})
614-
.await;
605+
self.dialog.send_warning(Warning::UnexpectedSyncError {
606+
warning: format!("Unexpected header syncing error: {}", e),
607+
});
615608
let mut lock = self.peer_map.lock().await;
616609
lock.ban(peer_id).await;
617610
return Some(MainThreadMessage::Disconnect);
@@ -634,24 +627,16 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
634627
AppendAttempt::Extended => self.next_stateful_message(chain.deref_mut()).await,
635628
AppendAttempt::Conflict(_) => {
636629
// TODO: Request the filter and block from the peer
637-
self.dialog
638-
.send_warning(Warning::UnexpectedSyncError {
639-
warning: "Found a conflict while peers are sending filter headers"
640-
.into(),
641-
})
642-
.await;
630+
self.dialog.send_warning(Warning::UnexpectedSyncError {
631+
warning: "Found a conflict while peers are sending filter headers".into(),
632+
});
643633
Some(MainThreadMessage::Disconnect)
644634
}
645635
},
646636
Err(e) => {
647-
self.dialog
648-
.send_warning(Warning::UnexpectedSyncError {
649-
warning: format!(
650-
"Compact filter header syncing encountered an error: {}",
651-
e
652-
),
653-
})
654-
.await;
637+
self.dialog.send_warning(Warning::UnexpectedSyncError {
638+
warning: format!("Compact filter header syncing encountered an error: {}", e),
639+
});
655640
let mut lock = self.peer_map.lock().await;
656641
lock.ban(peer_id).await;
657642
Some(MainThreadMessage::Disconnect)
@@ -665,11 +650,9 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
665650
match chain.sync_filter(filter).await {
666651
Ok(potential_message) => potential_message.map(MainThreadMessage::GetFilters),
667652
Err(e) => {
668-
self.dialog
669-
.send_warning(Warning::UnexpectedSyncError {
670-
warning: format!("Compact filter syncing encountered an error: {}", e),
671-
})
672-
.await;
653+
self.dialog.send_warning(Warning::UnexpectedSyncError {
654+
warning: format!("Compact filter syncing encountered an error: {}", e),
655+
});
673656
match e {
674657
CFilterSyncError::Filter(_) => Some(MainThreadMessage::Disconnect),
675658
_ => {
@@ -686,11 +669,9 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
686669
async fn handle_block(&self, peer_id: u32, block: Block) -> Option<MainThreadMessage> {
687670
let mut chain = self.chain.lock().await;
688671
if let Err(e) = chain.check_send_block(block).await {
689-
self.dialog
690-
.send_warning(Warning::UnexpectedSyncError {
691-
warning: format!("Unexpected block scanning error: {}", e),
692-
})
693-
.await;
672+
self.dialog.send_warning(Warning::UnexpectedSyncError {
673+
warning: format!("Unexpected block scanning error: {}", e),
674+
});
694675
let mut lock = self.peer_map.lock().await;
695676
lock.ban(peer_id).await;
696677
return Some(MainThreadMessage::Disconnect);

0 commit comments

Comments
 (0)