Skip to content

Commit 3ec1b51

Browse files
authored
Merge pull request #4179 from ProvableHQ/fix/mainnet-syncing-when-synced
[Fix] Do not attempt to sync when only one block behind
2 parents 97dbfb1 + 5b84d80 commit 3ec1b51

File tree

6 files changed

+96
-43
lines changed

6 files changed

+96
-43
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ workflows:
788788
filters:
789789
branches:
790790
only:
791-
- release-mainnet-4.5.4
791+
- fix/mainnet-syncing-when-synced
792792
- canary
793793
- testnet
794794
- mainnet
@@ -798,7 +798,7 @@ workflows:
798798
filters:
799799
branches:
800800
only:
801-
- release-mainnet-4.5.4
801+
- fix/mainnet-syncing-when-synced
802802
- canary
803803
- testnet
804804
- mainnet
@@ -810,7 +810,7 @@ workflows:
810810
filters:
811811
branches:
812812
only:
813-
- release-mainnet-4.5.4
813+
- fix/mainnet-syncing-when-synced
814814
- canary
815815
- testnet
816816
- mainnet

node/bft/src/gateway.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use snarkos_node_network::{
5151
get_repo_commit_hash,
5252
log_repo_sha_comparison,
5353
};
54-
use snarkos_node_sync::{InsertBlockResponseError, MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
54+
use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
5555
use snarkos_node_tcp::{
5656
Config,
5757
Connection,
@@ -68,6 +68,7 @@ use snarkvm::{
6868
narwhal::{BatchHeader, Data},
6969
},
7070
prelude::{Address, Field},
71+
utilities::flatten_error,
7172
};
7273

7374
use colored::Colorize;
@@ -651,15 +652,27 @@ impl<N: Network> Gateway<N> {
651652
// Send the blocks to the sync module.
652653
match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
653654
Ok(_) => Ok(true),
654-
Err(err @ InsertBlockResponseError::EmptyBlockResponse)
655-
| Err(err @ InsertBlockResponseError::NoConsensusVersion)
656-
| Err(err @ InsertBlockResponseError::ConsensusVersionMismatch { .. }) => {
657-
error!("Peer '{peer_ip}' sent an invalid block response - {err}");
658-
self.ip_ban_peer(peer_ip, Some(&err.to_string()));
659-
Err(err.into())
655+
Err(err) if err.is_benign() => {
656+
let err: anyhow::Error = err.into();
657+
let err = err.context(format!("Ignoring block response from peer '{peer_ip}'"));
658+
debug!("{}", flatten_error(err));
659+
Ok(true)
660+
}
661+
Err(err) if err.is_invalid_consensus_version() => {
662+
let err: anyhow::Error = err.into();
663+
let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
664+
665+
let msg = flatten_error(&err);
666+
error!("{msg}");
667+
self.ip_ban_peer(peer_ip, Some(&msg));
668+
Err(err)
660669
}
661670
Err(err) => {
662-
warn!("Unable to process block response from '{peer_ip}' - {err}");
671+
let err: anyhow::Error = err.into();
672+
let err = err.context(format!("Peer '{peer_ip}' sent an invalid block response"));
673+
warn!("{}", flatten_error(err));
674+
675+
// TODO(kaimast): This needs more testing to ensure disconnect is the correct action.
663676
Ok(true)
664677
}
665678
}

node/bft/src/helpers/channels.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ pub struct SyncSender<N: Network> {
263263
SocketAddr,
264264
Vec<Block<N>>,
265265
Option<ConsensusVersion>,
266-
oneshot::Sender<Result<(), InsertBlockResponseError>>,
266+
oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
267267
)>,
268268
pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
269269
pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
@@ -291,7 +291,7 @@ impl<N: Network> SyncSender<N> {
291291
peer_ip: SocketAddr,
292292
blocks: Vec<Block<N>>,
293293
latest_consensus_version: Option<ConsensusVersion>,
294-
) -> Result<(), InsertBlockResponseError> {
294+
) -> Result<(), InsertBlockResponseError<N>> {
295295
// Initialize a callback sender and receiver.
296296
let (callback_sender, callback_receiver) = oneshot::channel();
297297
// Send the request to advance with sync blocks.
@@ -320,7 +320,7 @@ pub struct SyncReceiver<N: Network> {
320320
SocketAddr,
321321
Vec<Block<N>>,
322322
Option<ConsensusVersion>,
323-
oneshot::Sender<Result<(), InsertBlockResponseError>>,
323+
oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
324324
)>,
325325
pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
326326
pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,

node/bft/src/sync/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,8 @@ impl<N: Network> Sync<N> {
347347
}
348348
}
349349

350-
// Do not attempt to sync if there are no blocks to sync.
351-
// This prevents redundant log messages and performing unnecessary computation.
352-
if !self.block_sync.can_block_sync() {
350+
// Do not attempt to sync if there are no blocks to sync, or we are too close to the tip.
351+
if self.is_synced() {
353352
return;
354353
}
355354

@@ -386,7 +385,7 @@ impl<N: Network> Sync<N> {
386385
peer_ip: SocketAddr,
387386
blocks: Vec<Block<N>>,
388387
latest_consensus_version: Option<ConsensusVersion>,
389-
) -> Result<(), InsertBlockResponseError> {
388+
) -> Result<(), InsertBlockResponseError<N>> {
390389
self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version)
391390

392391
// No need to advance block sync here, as the new response will

node/src/client/router.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use snarkos_node_router::{
3030
UnconfirmedTransaction,
3131
},
3232
};
33-
use snarkos_node_sync::InsertBlockResponseError;
3433
use snarkos_node_tcp::{Connection, ConnectionSide, Tcp};
3534
use snarkvm::{
3635
console::network::{ConsensusVersion, Network},
@@ -220,21 +219,32 @@ impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
220219
latest_consensus_version: Option<ConsensusVersion>,
221220
) -> bool {
222221
// We do not need to explicitly sync here because insert_block_response, will wake up the sync task.
223-
if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) {
224-
warn!("Failed to insert block response from '{peer_ip}' - {err}");
225-
226-
// If the error indicates the peer missed an upgrade and forked, ban it.
227-
if matches!(
228-
err,
229-
InsertBlockResponseError::ConsensusVersionMismatch { .. }
230-
| InsertBlockResponseError::NoConsensusVersion
231-
) {
222+
match self.sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) {
223+
Ok(_) => true,
224+
Err(err) if err.is_benign() => {
225+
let err: anyhow::Error = err.into();
226+
debug!("{}", flatten_error(err.context(format!("Ignoring block response from peer '{peer_ip}'"))));
227+
true
228+
}
229+
Err(err) if err.is_invalid_consensus_version() => {
230+
// If the error indicates the peer missed an upgrade and forked, ban it.
231+
let err: anyhow::Error = err.into();
232+
let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
233+
234+
let msg = flatten_error(&err);
235+
error!("{msg}");
232236
self.router().ip_ban_peer(peer_ip, Some(&err.to_string()));
237+
238+
false
233239
}
240+
Err(err) => {
241+
let err: anyhow::Error = err.into();
242+
let err = err.context(format!("Failed to insert block response from '{peer_ip}'"));
243+
warn!("{}", flatten_error(err));
234244

235-
false
236-
} else {
237-
true
245+
// TODO(kaimast): This needs more testing to ensure disconnect is the correct action.
246+
true
247+
}
238248
}
239249
}
240250

node/sync/src/block_sync.rs

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ pub struct BlockRequestsSummary {
118118
}
119119

120120
#[derive(thiserror::Error, Debug)]
121-
pub enum InsertBlockResponseError {
121+
pub enum InsertBlockResponseError<N: Network> {
122122
#[error("Empty block response")]
123123
EmptyBlockResponse,
124124
#[error("The peer did not send a consensus version")]
@@ -127,10 +127,36 @@ pub enum InsertBlockResponseError {
127127
"The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
128128
)]
129129
ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
130+
#[error("Block Sync already advanced to block {height}")]
131+
BlockSyncAlreadyAdvanced { height: u32 },
132+
#[error("No such request for height {height}")]
133+
NoSuchRequest { height: u32 },
134+
#[error("Invalid block hash for height {height} from '{peer_ip}'")]
135+
InvalidBlockHash { height: u32, peer_ip: SocketAddr },
136+
#[error(
137+
"The previous block hash in candidate block {height} from '{peer_ip}' is incorrect: expected {expected}, but got {actual}"
138+
)]
139+
InvalidPreviousBlockHash { height: u32, peer_ip: SocketAddr, expected: N::BlockHash, actual: N::BlockHash },
140+
#[error("Candidate block {height} from '{peer_ip}' is malformed")]
141+
MalformedBlock { height: u32, peer_ip: SocketAddr },
142+
#[error("The sync pool did not request block {height} from '{peer_ip}'")]
143+
WrongSyncPeer { height: u32, peer_ip: SocketAddr },
130144
#[error("{}", flatten_error(.0))]
131145
Other(#[from] anyhow::Error),
132146
}
133147

148+
impl<N: Network> InsertBlockResponseError<N> {
149+
/// Returns `true` if the error does not indicate malicious or faulty behavior.
150+
pub fn is_benign(&self) -> bool {
151+
matches!(self, Self::NoSuchRequest { .. } | Self::BlockSyncAlreadyAdvanced { .. })
152+
}
153+
154+
// Returns true if the error is about an invalid consensus version.
155+
pub fn is_invalid_consensus_version(&self) -> bool {
156+
matches!(self, Self::ConsensusVersionMismatch { .. } | Self::NoConsensusVersion)
157+
}
158+
}
159+
134160
impl<N: Network> OutstandingRequest<N> {
135161
/// Get a reference to the IPs of peers that have not responded to the request (yet).
136162
fn sync_ips(&self) -> &IndexSet<SocketAddr> {
@@ -473,7 +499,7 @@ impl<N: Network> BlockSync<N> {
473499
peer_ip: SocketAddr,
474500
blocks: Vec<Block<N>>,
475501
latest_consensus_version: Option<ConsensusVersion>,
476-
) -> Result<(), InsertBlockResponseError> {
502+
) -> Result<(), InsertBlockResponseError<N>> {
477503
// Attempt to insert the block responses, and break if we encounter an error.
478504
let result = 'outer: {
479505
let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
@@ -500,9 +526,7 @@ impl<N: Network> BlockSync<N> {
500526

501527
// Insert the candidate blocks into the sync pool.
502528
for block in blocks {
503-
if let Err(error) = self.insert_block_response(peer_ip, block) {
504-
break 'outer Err(error.into());
505-
}
529+
self.insert_block_response(peer_ip, block)?;
506530
}
507531

508532
Ok(())
@@ -916,35 +940,42 @@ impl<N: Network> BlockSync<N> {
916940

917941
/// Inserts the given block response, after checking that the request exists and the response is well-formed.
918942
/// On success, this function removes the peer IP from the request sync peers and inserts the response.
919-
fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
943+
fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<(), InsertBlockResponseError<N>> {
920944
// Retrieve the block height.
921945
let height = block.height();
922946
let mut requests = self.requests.write();
923947

924948
if self.ledger.contains_block_height(height) {
925-
bail!("The sync request was removed because we already advanced");
949+
return Err(InsertBlockResponseError::BlockSyncAlreadyAdvanced { height });
926950
}
927951

928-
let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
952+
let Some(entry) = requests.get_mut(&height) else {
953+
return Err(InsertBlockResponseError::NoSuchRequest { height });
954+
};
929955

930956
// Retrieve the request entry for the candidate block.
931957
let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
932958

933959
// Ensure the candidate block hash matches the expected hash.
934960
if let Some(expected_hash) = expected_hash {
935961
if block.hash() != *expected_hash {
936-
bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
962+
return Err(InsertBlockResponseError::InvalidBlockHash { height, peer_ip });
937963
}
938964
}
939965
// Ensure the previous block hash matches if it exists.
940966
if let Some(expected_previous_hash) = expected_previous_hash {
941967
if block.previous_hash() != *expected_previous_hash {
942-
bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
968+
return Err(InsertBlockResponseError::InvalidPreviousBlockHash {
969+
height,
970+
peer_ip,
971+
expected: *expected_previous_hash,
972+
actual: block.previous_hash(),
973+
});
943974
}
944975
}
945976
// Ensure the sync pool requested this block from the given peer.
946977
if !sync_ips.contains(&peer_ip) {
947-
bail!("The sync pool did not request block {height} from '{peer_ip}'")
978+
return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip });
948979
}
949980

950981
// Remove the peer IP from the request entry.
@@ -953,7 +984,7 @@ impl<N: Network> BlockSync<N> {
953984
if let Some(existing_block) = &entry.response {
954985
// If the candidate block was already present, ensure it is the same block.
955986
if block != *existing_block {
956-
bail!("Candidate block {height} from '{peer_ip}' is malformed");
987+
return Err(InsertBlockResponseError::MalformedBlock { height, peer_ip });
957988
}
958989
} else {
959990
entry.response = Some(block.clone());

0 commit comments

Comments
 (0)