11use futures:: { prelude:: * , ready} ;
2+ use indexmap:: { map:: Entry , IndexMap } ;
23use linkerd_proxy_core:: resolve:: { Resolve , Update } ;
34use pin_project:: pin_project;
45use std:: {
5- collections:: { HashSet , VecDeque } ,
6+ collections:: VecDeque ,
67 future:: Future ,
78 net:: SocketAddr ,
89 pin:: Pin ,
@@ -31,8 +32,14 @@ pub struct DiscoverFuture<F, E> {
3132pub struct Discover < R : TryStream , E > {
3233 #[ pin]
3334 resolution : R ,
34- active : HashSet < SocketAddr > ,
35+
36+ /// Changes that have been received but not yet emitted.
3537 pending : VecDeque < Change < SocketAddr , E > > ,
38+
39+ /// The current state of resolved endpoints that have been observed. This is
40+ /// an `IndexMap` so that the order of observed addresses is preserved
41+ /// (mostly for tests).
42+ active : IndexMap < SocketAddr , E > ,
3643}
3744
3845// === impl FromResolve ===
7784{
7885 type Output = Result < Discover < F :: Ok , E > , F :: Error > ;
7986
87+ #[ inline]
8088 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
8189 let resolution = ready ! ( self . project( ) . future. try_poll( cx) ) ?;
8290 Poll :: Ready ( Ok ( Discover :: new ( resolution) ) )
@@ -89,7 +97,7 @@ impl<R: TryStream, E> Discover<R, E> {
8997 pub fn new ( resolution : R ) -> Self {
9098 Self {
9199 resolution,
92- active : HashSet :: default ( ) ,
100+ active : IndexMap :: default ( ) ,
93101 pending : VecDeque :: new ( ) ,
94102 }
95103 }
@@ -98,7 +106,7 @@ impl<R: TryStream, E> Discover<R, E> {
98106impl < R , E > Stream for Discover < R , E >
99107where
100108 R : TryStream < Ok = Update < E > > ,
101- E : Clone + std:: fmt:: Debug ,
109+ E : Clone + Eq + std:: fmt:: Debug ,
102110{
103111 type Item = Result < Change < SocketAddr , E > , R :: Error > ;
104112
@@ -111,50 +119,69 @@ where
111119 }
112120
113121 trace ! ( "poll" ) ;
114- match ready ! ( this. resolution. try_poll_next( cx) ) {
115- Some ( update) => match update? {
116- Update :: Reset ( endpoints) => {
117- let active = endpoints. iter ( ) . map ( |( a, _) | * a) . collect :: < HashSet < _ > > ( ) ;
118- trace ! ( new = ?active, old = ?this. active, "Reset" ) ;
119- for addr in this. active . iter ( ) {
120- // If the old addr is not in the new set, remove it.
121- if !active. contains ( addr) {
122- trace ! ( %addr, "Scheduling removal" ) ;
123- this. pending . push_back ( Change :: Remove ( * addr) ) ;
124- } else {
125- trace ! ( %addr, "Unchanged" ) ;
126- }
127- }
128- for ( addr, endpoint) in endpoints. into_iter ( ) {
129- if !this. active . contains ( & addr) {
130- trace ! ( %addr, "Scheduling addition" ) ;
131- this. pending
132- . push_back ( Change :: Insert ( addr, endpoint. clone ( ) ) ) ;
133- }
122+ let update = match ready ! ( this. resolution. try_poll_next( cx) ) {
123+ Some ( update) => update?,
124+ None => return Poll :: Ready ( None ) ,
125+ } ;
126+
127+ match update {
128+ Update :: Reset ( endpoints) => {
129+ let new_active = endpoints. into_iter ( ) . collect :: < IndexMap < _ , _ > > ( ) ;
130+ trace ! ( new = ?new_active, old = ?this. active, "Reset" ) ;
131+
132+ for addr in this. active . keys ( ) {
133+ // If the old addr is not in the new set, remove it.
134+ if !new_active. contains_key ( addr) {
135+ trace ! ( %addr, "Scheduling removal" ) ;
136+ this. pending . push_back ( Change :: Remove ( * addr) ) ;
137+ } else {
138+ trace ! ( %addr, "Unchanged" ) ;
134139 }
135- * this. active = active;
136140 }
137- Update :: Add ( endpoints) => {
138- for ( addr, endpoint) in endpoints. into_iter ( ) {
141+
142+ for ( addr, endpoint) in new_active. iter ( ) {
143+ if this. active . get ( addr) != Some ( endpoint) {
139144 trace ! ( %addr, "Scheduling addition" ) ;
140- this. active . insert ( addr ) ;
141- this . pending . push_back ( Change :: Insert ( addr, endpoint) ) ;
145+ this. pending
146+ . push_back ( Change :: Insert ( * addr, endpoint. clone ( ) ) ) ;
142147 }
143148 }
144- Update :: Remove ( addrs) => {
145- for addr in addrs. into_iter ( ) {
146- if this. active . remove ( & addr) {
147- trace ! ( %addr, "Scheduling removal" ) ;
148- this. pending . push_back ( Change :: Remove ( addr) ) ;
149+
150+ * this. active = new_active;
151+ }
152+
153+ Update :: Add ( endpoints) => {
154+ for ( addr, endpoint) in endpoints. into_iter ( ) {
155+ trace ! ( %addr, "Scheduling addition" ) ;
156+ match this. active . entry ( addr) {
157+ Entry :: Vacant ( entry) => {
158+ entry. insert ( endpoint. clone ( ) ) ;
159+ this. pending . push_back ( Change :: Insert ( addr, endpoint) ) ;
160+ }
161+ Entry :: Occupied ( mut entry) => {
162+ if entry. get ( ) != & endpoint {
163+ entry. insert ( endpoint. clone ( ) ) ;
164+ this. pending . push_back ( Change :: Insert ( addr, endpoint) ) ;
165+ }
149166 }
150167 }
151168 }
152- Update :: DoesNotExist => {
153- trace ! ( "Scheduling removals" ) ;
154- this. pending . extend ( this. active . drain ( ) . map ( Change :: Remove ) ) ;
169+ }
170+
171+ Update :: Remove ( addrs) => {
172+ for addr in addrs. into_iter ( ) {
173+ if this. active . remove ( & addr) . is_some ( ) {
174+ trace ! ( %addr, "Scheduling removal" ) ;
175+ this. pending . push_back ( Change :: Remove ( addr) ) ;
176+ }
155177 }
156- } ,
157- None => return Poll :: Ready ( None ) ,
178+ }
179+
180+ Update :: DoesNotExist => {
181+ trace ! ( "Clearing all active endpoints" ) ;
182+ this. pending
183+ . extend ( this. active . drain ( ..) . map ( |( sa, _) | Change :: Remove ( sa) ) ) ;
184+ }
158185 }
159186 }
160187 }
@@ -163,50 +190,108 @@ where
163190#[ cfg( test) ]
164191mod tests {
165192 use super :: Discover ;
166- use async_stream:: stream;
167193 use futures:: prelude:: * ;
168194 use linkerd_error:: Infallible ;
169195 use linkerd_proxy_core:: resolve:: Update ;
170196 use std:: net:: SocketAddr ;
197+ use tokio_stream:: wrappers:: ReceiverStream ;
171198 use tower:: discover:: Change ;
172199
173200 const PORT : u16 = 8080 ;
174201 fn addr ( n : u8 ) -> SocketAddr {
175202 SocketAddr :: from ( ( [ 10 , 1 , 1 , n] , PORT ) )
176203 }
177204
178- #[ tokio:: test]
205+ #[ tokio:: test( flavor = "current_thread" ) ]
179206 async fn reset ( ) {
180- tokio:: pin! {
181- let stream = stream! {
182- yield Ok :: <_, Infallible >( Update :: Add ( ( 1 ..=2 ) . map( |n| ( addr( n) , n) ) . collect( ) ) ) ;
183- yield Ok ( Update :: Reset ( ( 2 ..=4 ) . map( |n| ( addr( n) , n) ) . collect( ) ) ) ;
184- } ;
185- }
186- let mut disco = Discover :: new ( stream) ;
187-
188- for i in 1 ..=2 {
189- match disco. next ( ) . await . unwrap ( ) . unwrap ( ) {
190- Change :: Remove ( _) => panic ! ( "Unexpectd Remove" ) ,
191- Change :: Insert ( a, n) => {
192- assert_eq ! ( n, i) ;
193- assert_eq ! ( addr( i) , a) ;
194- }
195- }
196- }
197- match disco. next ( ) . await . unwrap ( ) . unwrap ( ) {
198- Change :: Remove ( a) => assert_eq ! ( a, addr( 1 ) ) ,
199- change => panic ! ( "Unexpected change: {:?}" , change) ,
207+ let ( tx, rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
208+ let mut disco = Discover :: new ( ReceiverStream :: new ( rx) ) ;
209+
210+ // Use reset to set a new state with 3 addresses.
211+ tx. try_send ( Ok :: < _ , Infallible > ( Update :: Reset (
212+ ( 1 ..=3 ) . map ( |n| ( addr ( n) , n) ) . collect ( ) ,
213+ ) ) )
214+ . expect ( "must send" ) ;
215+ for i in 1 ..=3 {
216+ assert ! ( matches!(
217+ disco. try_next( ) . await ,
218+ Ok ( Some ( Change :: Insert ( sa, n) ) ) if sa == addr( i) && n == i
219+ ) ) ;
200220 }
221+
222+ // Reset to a new state with 3 addresses, one of which is unchanged, one of which is
223+ // changed, and one of which is added.
224+ tx. try_send ( Ok ( Update :: Reset ( vec ! [
225+ // Restore the original `2` value. We shouldn't see an update for it.
226+ ( addr( 2 ) , 2 ) ,
227+ // Set a new value for `3`.
228+ ( addr( 3 ) , 4 ) ,
229+ // Add a new address, too.
230+ ( addr( 4 ) , 5 ) ,
231+ ] ) ) )
232+ . expect ( "must send" ) ;
233+ // The first address is removed now.
234+ assert ! ( matches!(
235+ disco. try_next( ) . await ,
236+ Ok ( Some ( Change :: Remove ( a) ) ) if a == addr( 1 )
237+ ) ) ;
238+ // Then process the changed and new addresses.
201239 for i in 3 ..=4 {
202- match disco. next ( ) . await . unwrap ( ) . unwrap ( ) {
203- Change :: Remove ( _) => panic ! ( "Unexpectd Remove" ) ,
204- Change :: Insert ( a, n) => {
205- assert_eq ! ( n, i) ;
206- assert_eq ! ( addr( i) , a) ;
207- }
208- }
240+ assert ! ( matches!(
241+ disco. try_next( ) . await ,
242+ Ok ( Some ( Change :: Insert ( sa, n) ) ) if sa == addr( i) && n == i + 1
243+ ) ) ;
209244 }
245+
246+ // No more updates.
247+ drop ( tx) ;
210248 assert ! ( disco. next( ) . await . is_none( ) ) ;
211249 }
250+
251+ #[ tokio:: test( flavor = "current_thread" ) ]
252+ async fn deduplicate_redundant ( ) {
253+ let ( tx, rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
254+ let mut disco = Discover :: new ( ReceiverStream :: new ( rx) ) ;
255+
256+ // The initial update is observed.
257+ tx. try_send ( Ok :: < _ , Infallible > ( Update :: Add ( vec ! [ ( addr( 1 ) , "a" ) ] ) ) )
258+ . expect ( "must send" ) ;
259+ assert ! ( matches!(
260+ disco. try_next( ) . await ,
261+ Ok ( Some ( Change :: Insert ( sa, "a" ) ) ) if sa == addr( 1 )
262+ ) ) ;
263+
264+ // A redundant update is not.
265+ tx. try_send ( Ok ( Update :: Add ( vec ! [ ( addr( 1 ) , "a" ) ] ) ) )
266+ . expect ( "must send" ) ;
267+ assert ! ( disco. try_next( ) . now_or_never( ) . is_none( ) ) ;
268+
269+ // A new value for an existing address is observed.
270+ tx. try_send ( Ok ( Update :: Add ( vec ! [ ( addr( 1 ) , "b" ) ] ) ) )
271+ . expect ( "must send" ) ;
272+ assert ! ( matches!(
273+ disco. try_next( ) . await ,
274+ Ok ( Some ( Change :: Insert ( sa, "b" ) ) ) if sa == addr( 1 )
275+ ) ) ;
276+
277+ // Remove the address.
278+ tx. try_send ( Ok ( Update :: Remove ( vec ! [ addr( 1 ) ] ) ) )
279+ . expect ( "must send" ) ;
280+ assert ! ( matches!(
281+ disco. try_next( ) . await ,
282+ Ok ( Some ( Change :: Remove ( sa) ) ) if sa == addr( 1 )
283+ ) ) ;
284+
285+ // Re-adding the address is observed.
286+ tx. try_send ( Ok ( Update :: Add ( vec ! [ ( addr( 1 ) , "b" ) ] ) ) )
287+ . expect ( "must send" ) ;
288+ assert ! ( matches!(
289+ disco. try_next( ) . await ,
290+ Ok ( Some ( Change :: Insert ( sa, "b" ) ) ) if sa == addr( 1 )
291+ ) ) ;
292+
293+ // No more updates.
294+ drop ( tx) ;
295+ assert ! ( disco. next( ) . await . is_none( ) , ) ;
296+ }
212297}
0 commit comments