@@ -77,6 +77,25 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
77
77
dbsync
78
78
}
79
79
80
+ /// Coalesce a list of peers such that each one has a unique IP:port
81
+ fn coalesce_peers_by_ipaddr ( peers : Vec < NeighborAddress > ) -> Vec < NeighborAddress > {
82
+ // coalesce peers on the same host:port
83
+ let mut same_host_port = HashSet :: new ( ) ;
84
+ let unique_ip_peers: Vec < _ > = peers
85
+ . into_iter ( )
86
+ . filter_map ( |naddr| {
87
+ if same_host_port. contains ( & naddr. addrbytes . to_socketaddr ( naddr. port ) ) {
88
+ None
89
+ } else {
90
+ same_host_port. insert ( naddr. addrbytes . to_socketaddr ( naddr. port ) ) ;
91
+ Some ( naddr)
92
+ }
93
+ } )
94
+ . collect ( ) ;
95
+
96
+ unique_ip_peers
97
+ }
98
+
80
99
/// Calculate the new set of replicas to contact.
81
100
/// This is the same as the set that was connected on the last sync, plus any
82
101
/// config hints and discovered nodes from the DB.
@@ -103,7 +122,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
103
122
peers. extend ( extra_peers) ;
104
123
}
105
124
106
- for peer in peers {
125
+ peers. shuffle ( & mut thread_rng ( ) ) ;
126
+
127
+ let unique_ip_peers = Self :: coalesce_peers_by_ipaddr ( peers) ;
128
+ for peer in unique_ip_peers {
107
129
if connected_replicas. len ( ) >= config. max_neighbors {
108
130
break ;
109
131
}
@@ -575,7 +597,9 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
575
597
. into_iter ( )
576
598
. map ( |neighbor| NeighborAddress :: from_neighbor ( & neighbor) )
577
599
. collect ( ) ;
578
- self . replicas = replicas;
600
+
601
+ let unique_ip_peers = Self :: coalesce_peers_by_ipaddr ( replicas) ;
602
+ self . replicas = unique_ip_peers. into_iter ( ) . collect ( ) ;
579
603
}
580
604
debug ! (
581
605
"{:?}: connect_begin: establish StackerDB sessions to {} neighbors" ,
0 commit comments