Skip to content

Commit 7b669b2

Browse files
committed
[Mempool] Add health check to peer prioritization logic.
1 parent e1ac46a commit 7b669b2

File tree

4 files changed

+263
-7
lines changed

4 files changed

+263
-7
lines changed

config/src/config/mempool_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub struct MempoolConfig {
5454
pub max_broadcasts_per_peer: usize,
5555
/// Maximum number of inbound network messages to the Mempool application
5656
pub max_network_channel_size: usize,
57+
/// The maximum amount of time a node can be out of sync before being considered unhealthy
58+
pub max_sync_lag_before_unhealthy_secs: usize,
5759
/// The interval to take a snapshot of the mempool to logs, only used when trace logging is enabled
5860
pub mempool_snapshot_interval_secs: u64,
5961
/// The maximum amount of time to wait for an ACK of Mempool submission to an upstream node.
@@ -114,6 +116,7 @@ impl Default for MempoolConfig {
114116
shared_mempool_ack_timeout_ms: 2_000,
115117
shared_mempool_max_concurrent_inbound_syncs: 4,
116118
max_broadcasts_per_peer: 20,
119+
max_sync_lag_before_unhealthy_secs: 300, // 5 minutes
117120
max_network_channel_size: 1024,
118121
mempool_snapshot_interval_secs: 180,
119122
capacity: 2_000_000,

mempool/src/shared_mempool/network.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf
267267
self.num_committed_txns_received_since_peers_updated
268268
.load(Ordering::Relaxed),
269269
);
270+
270271
// Resetting the counter
271272
self.num_mempool_txns_received_since_peers_updated = 0;
272273
self.num_committed_txns_received_since_peers_updated

mempool/src/shared_mempool/priority.rs

Lines changed: 258 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,28 @@ use std::{
2020
time::Instant,
2121
};
2222

23+
/// The number of microseconds in one second
24+
const MICROS_PER_SECOND: u64 = 1000 * 1000; // 1 million
25+
2326
/// A simple struct that offers comparisons and ordering for peer prioritization
2427
#[derive(Clone, Debug)]
2528
struct PrioritizedPeersComparator {
29+
// The current mempool configuration
30+
mempool_config: MempoolConfig,
31+
32+
// The random state used to hash peer IDs
2633
random_state: RandomState,
34+
35+
// The time service used to calculate timestamps
36+
time_service: TimeService,
2737
}
2838

2939
impl PrioritizedPeersComparator {
30-
fn new() -> Self {
40+
fn new(mempool_config: MempoolConfig, time_service: TimeService) -> Self {
3141
Self {
42+
mempool_config,
3243
random_state: RandomState::new(),
44+
time_service,
3345
}
3446
}
3547

@@ -68,7 +80,18 @@ impl PrioritizedPeersComparator {
6880
let (peer_network_id_a, monitoring_metadata_a) = peer_a;
6981
let (peer_network_id_b, monitoring_metadata_b) = peer_b;
7082

71-
// First, compare by network ID (i.e., Validator > VFN > Public)
83+
// First, compare the peers by health (e.g., sync lag)
84+
let unhealthy_ordering = compare_peer_health(
85+
&self.mempool_config,
86+
&self.time_service,
87+
monitoring_metadata_a,
88+
monitoring_metadata_b,
89+
);
90+
if !unhealthy_ordering.is_eq() {
91+
return unhealthy_ordering; // Only return if it's not equal
92+
}
93+
94+
// Next, compare by network ID (i.e., Validator > VFN > Public)
7295
let network_ordering = compare_network_id(
7396
&peer_network_id_a.network_id(),
7497
&peer_network_id_b.network_id(),
@@ -153,10 +176,14 @@ impl PrioritizedPeersState {
153176
node_type: NodeType,
154177
time_service: TimeService,
155178
) -> Self {
179+
let prioritized_peers = Arc::new(RwLock::new(Vec::new()));
180+
let peer_comparator =
181+
PrioritizedPeersComparator::new(mempool_config.clone(), time_service.clone());
182+
156183
Self {
157184
mempool_config,
158-
prioritized_peers: Arc::new(RwLock::new(Vec::new())),
159-
peer_comparator: PrioritizedPeersComparator::new(),
185+
prioritized_peers,
186+
peer_comparator,
160187
observed_all_ping_latencies: false,
161188
last_peer_priority_update: None,
162189
time_service,
@@ -440,7 +467,7 @@ impl PrioritizedPeersState {
440467
// Set the last peer priority update time
441468
self.last_peer_priority_update = Some(self.time_service.now());
442469
info!(
443-
"Updated prioritized peers. Peer count: {:?}, Latencies: {:?},\n Prioritized peers: {:?},\n Sender bucket assignment: {:?}",
470+
"Updated prioritized peers. Peer count: {:?}, Latencies: {:?}, Prioritized peers: {:?}, Sender bucket assignment: {:?}",
444471
peers_and_metadata.len(),
445472
peers_and_metadata
446473
.iter()
@@ -529,6 +556,60 @@ fn compare_ping_latency(
529556
}
530557
}
531558

559+
/// Returns true iff the given peer monitoring metadata is healthy. A peer is
560+
/// considered healthy if its latest ledger timestamp is within the max acceptable
561+
/// sync lag. If the monitoring metadata is missing, the peer is considered unhealthy.
562+
fn check_peer_metadata_health(
563+
mempool_config: &MempoolConfig,
564+
time_service: &TimeService,
565+
monitoring_metadata: &Option<&PeerMonitoringMetadata>,
566+
) -> bool {
567+
monitoring_metadata
568+
.and_then(|metadata| {
569+
metadata
570+
.latest_node_info_response
571+
.as_ref()
572+
.map(|node_information_response| {
573+
// Get the peer's ledger timestamp and the current timestamp
574+
let peer_ledger_timestamp_usecs =
575+
node_information_response.ledger_timestamp_usecs;
576+
let current_timestamp_usecs = get_timestamp_now_usecs(time_service);
577+
578+
// Calculate the max sync lag before the peer is considered unhealthy (in microseconds)
579+
let max_sync_lag_secs =
580+
mempool_config.max_sync_lag_before_unhealthy_secs as u64;
581+
let max_sync_lag_usecs = max_sync_lag_secs * MICROS_PER_SECOND;
582+
583+
// Determine if the peer is healthy
584+
current_timestamp_usecs.saturating_sub(peer_ledger_timestamp_usecs)
585+
< max_sync_lag_usecs
586+
})
587+
})
588+
.unwrap_or(false) // If metadata is missing, consider the peer unhealthy
589+
}
590+
591+
/// Compares the health of the given peer monitoring metadata. Healthy
592+
/// peers are prioritized over unhealthy peers, or peers missing metadata.
593+
fn compare_peer_health(
594+
mempool_config: &MempoolConfig,
595+
time_service: &TimeService,
596+
monitoring_metadata_a: &Option<&PeerMonitoringMetadata>,
597+
monitoring_metadata_b: &Option<&PeerMonitoringMetadata>,
598+
) -> Ordering {
599+
// Check the health of the peer monitoring metadata
600+
let is_healthy_a =
601+
check_peer_metadata_health(mempool_config, time_service, monitoring_metadata_a);
602+
let is_healthy_b =
603+
check_peer_metadata_health(mempool_config, time_service, monitoring_metadata_b);
604+
605+
// Compare the health statuses
606+
match (is_healthy_a, is_healthy_b) {
607+
(true, false) => Ordering::Greater, // A is healthy, B is unhealthy
608+
(false, true) => Ordering::Less, // A is unhealthy, B is healthy
609+
_ => Ordering::Equal, // Both are healthy or unhealthy
610+
}
611+
}
612+
532613
/// Compares the validator distance for the given pair of monitoring metadata.
533614
/// The peer with the lowest validator distance is prioritized.
534615
fn compare_validator_distance(
@@ -557,6 +638,11 @@ fn compare_validator_distance(
557638
}
558639
}
559640

641+
/// Returns the current timestamp (in microseconds) since the Unix epoch
642+
fn get_timestamp_now_usecs(time_service: &TimeService) -> u64 {
643+
time_service.now_unix_time().as_micros() as u64
644+
}
645+
560646
#[cfg(test)]
561647
mod test {
562648
use super::*;
@@ -565,7 +651,8 @@ mod test {
565651
network_id::{NetworkId, PeerNetworkId},
566652
};
567653
use aptos_peer_monitoring_service_types::{
568-
response::NetworkInformationResponse, PeerMonitoringMetadata,
654+
response::{NetworkInformationResponse, NodeInformationResponse},
655+
PeerMonitoringMetadata,
569656
};
570657
use aptos_types::PeerId;
571658
use core::cmp::Ordering;
@@ -597,6 +684,136 @@ mod test {
597684
);
598685
}
599686

687+
#[test]
688+
fn test_check_peer_metadata_health() {
689+
// Create a mempool config with a max sync lag of 10 seconds
690+
let mempool_config = MempoolConfig {
691+
max_sync_lag_before_unhealthy_secs: 10,
692+
..MempoolConfig::default()
693+
};
694+
695+
// Create a mock time service for testing
696+
let time_service = TimeService::mock();
697+
698+
// Create monitoring metadata with no sync lag (healthy)
699+
let monitoring_metadata = create_metadata_with_sync_lag(&time_service, 0);
700+
701+
// Verify the peer is healthy
702+
let is_peer_healthy =
703+
check_peer_metadata_health(&mempool_config, &time_service, &Some(&monitoring_metadata));
704+
assert!(is_peer_healthy);
705+
706+
// Elapse some time, but not enough to make the peer unhealthy
707+
time_service.clone().into_mock().advance_secs(5);
708+
709+
// Verify the peer is still healthy
710+
let is_peer_healthy =
711+
check_peer_metadata_health(&mempool_config, &time_service, &Some(&monitoring_metadata));
712+
assert!(is_peer_healthy);
713+
714+
// Elapse some more time, but not enough to make the peer unhealthy
715+
time_service.clone().into_mock().advance_secs(4);
716+
717+
// Verify the peer is still healthy
718+
let is_peer_healthy =
719+
check_peer_metadata_health(&mempool_config, &time_service, &Some(&monitoring_metadata));
720+
assert!(is_peer_healthy);
721+
722+
// Elapse some more time to make the peer unhealthy
723+
time_service.clone().into_mock().advance_secs(1);
724+
725+
// Verify the peer is now unhealthy
726+
let is_peer_healthy =
727+
check_peer_metadata_health(&mempool_config, &time_service, &Some(&monitoring_metadata));
728+
assert!(!is_peer_healthy);
729+
}
730+
731+
#[test]
732+
fn test_compare_peer_health_ordering() {
733+
// Create a mempool config with a max sync lag of 10 seconds
734+
let mempool_config = MempoolConfig {
735+
max_sync_lag_before_unhealthy_secs: 10,
736+
..MempoolConfig::default()
737+
};
738+
739+
// Create a mock time service for testing and elapse some time
740+
let time_service = TimeService::mock();
741+
time_service.clone().into_mock().advance_secs(100);
742+
743+
// Create monitoring metadata for healthy peer 1
744+
let healthy_metadata_1 = create_metadata_with_sync_lag(&time_service, 9);
745+
746+
// Create monitoring metadata for healthy peer 2
747+
let healthy_metadata_2 = create_metadata_with_sync_lag(&time_service, 5);
748+
749+
// Create monitoring metadata for unhealthy peer 1
750+
let unhealthy_metadata_1 = create_metadata_with_sync_lag(&time_service, 11);
751+
752+
// Create monitoring metadata for unhealthy peer 2
753+
let unhealthy_metadata_2 = create_metadata_with_sync_lag(&time_service, 20);
754+
755+
// Verify health metadata against others
756+
for (metadata_1, metadata_2, expected_ordering) in [
757+
(
758+
Some(&healthy_metadata_1),
759+
Some(&healthy_metadata_2),
760+
Ordering::Equal, // Both healthy
761+
),
762+
(
763+
Some(&healthy_metadata_1),
764+
Some(&unhealthy_metadata_1),
765+
Ordering::Greater, // Prioritize healthy
766+
),
767+
(Some(&healthy_metadata_1), None, Ordering::Greater), // Prioritize healthy
768+
] {
769+
verify_metadata_health_ordering(
770+
&mempool_config,
771+
&time_service,
772+
&metadata_1,
773+
&metadata_2,
774+
expected_ordering,
775+
)
776+
}
777+
778+
// Verify unhealthy metadata against others
779+
for (metadata_1, metadata_2, expected_ordering) in [
780+
(
781+
Some(&unhealthy_metadata_1),
782+
Some(&unhealthy_metadata_2),
783+
Ordering::Equal, // Both unhealthy
784+
),
785+
(
786+
Some(&unhealthy_metadata_1),
787+
Some(&healthy_metadata_1),
788+
Ordering::Less, // Prioritize healthy
789+
),
790+
(Some(&unhealthy_metadata_1), None, Ordering::Equal), // Unhealthy and missing are equal
791+
] {
792+
verify_metadata_health_ordering(
793+
&mempool_config,
794+
&time_service,
795+
&metadata_1,
796+
&metadata_2,
797+
expected_ordering,
798+
)
799+
}
800+
801+
// Verify missing metadata against others
802+
for (metadata_1, metadata_2, expected_ordering) in [
803+
(None, None, Ordering::Equal), // Both missing
804+
(None, Some(&healthy_metadata_1), Ordering::Less), // Prioritize healthy
805+
(None, Some(&unhealthy_metadata_1), Ordering::Equal), // Missing and unhealthy are equal
806+
] {
807+
verify_metadata_health_ordering(
808+
&mempool_config,
809+
&time_service,
810+
&metadata_1,
811+
&metadata_2,
812+
expected_ordering,
813+
)
814+
}
815+
}
816+
600817
#[test]
601818
fn test_compare_validator_distance() {
602819
// Create monitoring metadata with the same distance
@@ -1276,6 +1493,23 @@ mod test {
12761493
PeerMonitoringMetadata::new(average_ping_latency_secs, None, None, None, None)
12771494
}
12781495

1496+
/// Creates peer monitoring metadata with the given sync lag (in seconds)
1497+
fn create_metadata_with_sync_lag(
1498+
time_service: &TimeService,
1499+
sync_lag_secs: usize,
1500+
) -> PeerMonitoringMetadata {
1501+
let ledger_timestamp_usecs = get_timestamp_in_past_usecs(time_service, sync_lag_secs);
1502+
1503+
let healthy_node_info_response = NodeInformationResponse {
1504+
ledger_timestamp_usecs,
1505+
..Default::default()
1506+
};
1507+
PeerMonitoringMetadata {
1508+
latest_node_info_response: Some(healthy_node_info_response),
1509+
..Default::default()
1510+
}
1511+
}
1512+
12791513
/// Creates a validator peer with a random peer ID
12801514
fn create_validator_peer() -> PeerNetworkId {
12811515
PeerNetworkId::new(NetworkId::Validator, PeerId::random())
@@ -1290,4 +1524,22 @@ mod test {
12901524
fn create_public_peer() -> PeerNetworkId {
12911525
PeerNetworkId::new(NetworkId::Public, PeerId::random())
12921526
}
1527+
1528+
/// Returns a timestamp in the past (in microseconds)
1529+
fn get_timestamp_in_past_usecs(time_service: &TimeService, secs_in_past: usize) -> u64 {
1530+
let now_usecs = get_timestamp_now_usecs(time_service);
1531+
now_usecs - ((secs_in_past as u64) * MICROS_PER_SECOND)
1532+
}
1533+
1534+
/// Verifies that the ordering of peer monitoring metadata is as expected
1535+
fn verify_metadata_health_ordering(
1536+
mempool_config: &MempoolConfig,
1537+
time_service: &TimeService,
1538+
metadata_1: &Option<&PeerMonitoringMetadata>,
1539+
metadata_2: &Option<&PeerMonitoringMetadata>,
1540+
expected_ordering: Ordering,
1541+
) {
1542+
let ordering = compare_peer_health(mempool_config, time_service, metadata_1, metadata_2);
1543+
assert_eq!(ordering, expected_ordering);
1544+
}
12931545
}

peer-monitoring-service/types/src/response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub struct ServerProtocolVersionResponse {
9191
}
9292

9393
/// A response for the node information request
94-
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
94+
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
9595
pub struct NodeInformationResponse {
9696
pub build_information: BTreeMap<String, String>, // The build information of the node
9797
pub highest_synced_epoch: u64, // The highest synced epoch of the node

0 commit comments

Comments
 (0)