18
18
import { experimental , logVerbosity , StatusObject } from "@grpc/grpc-js" ;
19
19
import { isIPv4 , isIPv6 } from "net" ;
20
20
import { Locality__Output } from "../generated/envoy/config/core/v3/Locality" ;
21
+ import { SocketAddress__Output } from "../generated/envoy/config/core/v3/SocketAddress" ;
21
22
import { ClusterLoadAssignment__Output } from "../generated/envoy/config/endpoint/v3/ClusterLoadAssignment" ;
22
23
import { Any__Output } from "../generated/google/protobuf/Any" ;
23
24
import { BaseXdsStreamState , HandleResponseResult , RejectedResourceEntry , ResourcePair , Watcher , XdsStreamState } from "./xds-stream-state" ;
@@ -32,6 +33,10 @@ function localitiesEqual(a: Locality__Output, b: Locality__Output) {
32
33
return a . region === b . region && a . sub_zone === b . sub_zone && a . zone === b . zone ;
33
34
}
34
35
36
+ function addressesEqual ( a : SocketAddress__Output , b : SocketAddress__Output ) {
37
+ return a . address === b . address && a . port_value === b . port_value ;
38
+ }
39
+
35
40
export class EdsState extends BaseXdsStreamState < ClusterLoadAssignment__Output > implements XdsStreamState < ClusterLoadAssignment__Output > {
36
41
protected getResourceName ( resource : ClusterLoadAssignment__Output ) : string {
37
42
return resource . cluster_name ;
@@ -50,6 +55,7 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
50
55
*/
51
56
public validateResponse ( message : ClusterLoadAssignment__Output ) {
52
57
const seenLocalities : { locality : Locality__Output , priority : number } [ ] = [ ] ;
58
+ const seenAddresses : SocketAddress__Output [ ] = [ ] ;
53
59
const priorityTotalWeights : Map < number , number > = new Map ( ) ;
54
60
for ( const endpoint of message . endpoints ) {
55
61
if ( ! endpoint . locality ) {
@@ -72,6 +78,12 @@ export class EdsState extends BaseXdsStreamState<ClusterLoadAssignment__Output>
72
78
if ( ! ( isIPv4 ( socketAddress . address ) || isIPv6 ( socketAddress . address ) ) ) {
73
79
return false ;
74
80
}
81
+ for ( const address of seenAddresses ) {
82
+ if ( addressesEqual ( socketAddress , address ) ) {
83
+ return false ;
84
+ }
85
+ }
86
+ seenAddresses . push ( socketAddress ) ;
75
87
}
76
88
priorityTotalWeights . set ( endpoint . priority , ( priorityTotalWeights . get ( endpoint . priority ) ?? 0 ) + ( endpoint . load_balancing_weight ?. value ?? 0 ) ) ;
77
89
}
0 commit comments