@@ -24,6 +24,8 @@ import (
2424 "k8s.io/apimachinery/pkg/fields"
2525 "k8s.io/apimachinery/pkg/util/wait"
2626
27+ discoveryv1 "k8s.io/api/discovery/v1"
28+ discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
2729 "k8s.io/client-go/kubernetes"
2830 "k8s.io/client-go/rest"
2931 "k8s.io/client-go/tools/cache"
@@ -120,11 +122,12 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
120122 status = DELETED
121123 }
122124 item := & store.Namespace {
123- Name : data .GetName (),
124- Endpoints : make (map [string ]* store.Endpoints ),
125- Services : make (map [string ]* store.Service ),
126- Ingresses : make (map [string ]* store.Ingress ),
127- Secret : make (map [string ]* store.Secret ),
125+ Name : data .GetName (),
126+ Endpoints : make (map [string ]map [string ]* store.Endpoints ),
127+ Services : make (map [string ]* store.Service ),
128+ Ingresses : make (map [string ]* store.Ingress ),
129+ Secret : make (map [string ]* store.Secret ),
130+ HAProxyRuntime : make (map [string ]map [string ]* store.RuntimeBackend ),
128131 CRs : & store.CustomResources {
129132 Global : make (map [string ]* models.Global ),
130133 Defaults : make (map [string ]* models.Defaults ),
@@ -144,11 +147,12 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
144147 }
145148 status := DELETED
146149 item := & store.Namespace {
147- Name : data .GetName (),
148- Endpoints : make (map [string ]* store.Endpoints ),
149- Services : make (map [string ]* store.Service ),
150- Ingresses : make (map [string ]* store.Ingress ),
151- Secret : make (map [string ]* store.Secret ),
150+ Name : data .GetName (),
151+ Endpoints : make (map [string ]map [string ]* store.Endpoints ),
152+ Services : make (map [string ]* store.Service ),
153+ Ingresses : make (map [string ]* store.Ingress ),
154+ Secret : make (map [string ]* store.Secret ),
155+ HAProxyRuntime : make (map [string ]map [string ]* store.RuntimeBackend ),
152156 CRs : & store.CustomResources {
153157 Global : make (map [string ]* models.Global ),
154158 Defaults : make (map [string ]* models.Defaults ),
@@ -191,7 +195,7 @@ func (k *K8s) EventsNamespaces(channel chan SyncDataEvent, stop chan struct{}, i
191195 go informer .Run (stop )
192196}
193197
194- func (k * K8s ) EventsEndpoints (channel chan SyncDataEvent , stop chan struct {}, informer cache.SharedIndexInformer ) {
198+ func (k * K8s ) EventsEndpointSlices (channel chan SyncDataEvent , stop chan struct {}, informer cache.SharedIndexInformer ) {
195199 informer .AddEventHandler (cache.ResourceEventHandlerFuncs {
196200 AddFunc : func (obj interface {}) {
197201 item , err := k .convertToEndpoints (obj , ADDED )
@@ -227,44 +231,130 @@ func (k *K8s) EventsEndpoints(channel chan SyncDataEvent, stop chan struct{}, in
227231}
228232
229233func (k * K8s ) convertToEndpoints (obj interface {}, status store.Status ) (* store.Endpoints , error ) {
230- data , ok := obj .(* corev1.Endpoints )
231- if ! ok {
232- k .Logger .Errorf ("%s: Invalid data from k8s api, %s" , ENDPOINTS , obj )
233- return nil , ErrIgnored
234+ getServiceName := func (labels map [string ]string ) string {
235+ return labels ["kubernetes.io/service-name" ]
234236 }
235- if data .GetNamespace () == "kube-system" {
236- if data .ObjectMeta .Name == "kube-controller-manager" ||
237- data .ObjectMeta .Name == "kube-scheduler" ||
238- data .ObjectMeta .Name == "kubernetes-dashboard" ||
239- data .ObjectMeta .Name == "kube-dns" {
240- return nil , ErrIgnored
237+
238+ shouldIgnoreObject := func (namespace string , labels map [string ]string ) bool {
239+ serviceName := getServiceName (labels )
240+ if namespace == "kube-system" {
241+ if serviceName == "kube-controller-manager" ||
242+ serviceName == "kube-scheduler" ||
243+ serviceName == "kubernetes-dashboard" ||
244+ serviceName == "kube-dns" {
245+ return true
246+ }
241247 }
248+ return false
242249 }
243- if data .ObjectMeta .GetDeletionTimestamp () != nil {
244- // detect endpoints that are in terminating state
245- status = DELETED
246- }
247- item := & store.Endpoints {
248- Namespace : data .GetNamespace (),
249- Service : data .GetName (),
250- Ports : make (map [string ]* store.PortEndpoints ),
251- Status : status ,
252- }
253- for _ , subset := range data .Subsets {
254- for _ , port := range subset .Ports {
255- addresses := make (map [string ]struct {})
256- for _ , address := range subset .Addresses {
257- addresses [address .IP ] = struct {}{}
258- }
259- item .Ports [port .Name ] = & store.PortEndpoints {
260- Port : int64 (port .Port ),
261- AddrCount : len (addresses ),
262- AddrNew : addresses ,
263- HAProxySrvs : make ([]* store.HAProxySrv , 0 , len (addresses )),
250+ switch data := obj .(type ) {
251+ case * discoveryv1beta1.EndpointSlice :
252+ if shouldIgnoreObject (data .GetNamespace (), data .GetLabels ()) {
253+ return nil , ErrIgnored
254+ }
255+ item := & store.Endpoints {
256+ SliceName : data .Name ,
257+ Namespace : data .GetNamespace (),
258+ Service : getServiceName (data .GetLabels ()),
259+ Ports : make (map [string ]* store.PortEndpoints ),
260+ Status : status ,
261+ }
262+ addresses := make (map [string ]struct {})
263+ for _ , endpoints := range data .Endpoints {
264+ for _ , address := range endpoints .Addresses {
265+ addresses [address ] = struct {}{}
266+ }
267+ }
268+ for _ , port := range data .Ports {
269+ item .Ports [* port .Name ] = & store.PortEndpoints {
270+ Port : int64 (* port .Port ),
271+ Addresses : addresses ,
272+ }
273+ }
274+ return item , nil
275+ case * discoveryv1.EndpointSlice :
276+ if shouldIgnoreObject (data .GetNamespace (), data .GetLabels ()) {
277+ return nil , ErrIgnored
278+ }
279+ item := & store.Endpoints {
280+ SliceName : data .Name ,
281+ Namespace : data .GetNamespace (),
282+ Service : getServiceName (data .GetLabels ()),
283+ Ports : make (map [string ]* store.PortEndpoints ),
284+ Status : status ,
285+ }
286+ addresses := make (map [string ]struct {})
287+ for _ , endpoints := range data .Endpoints {
288+ for _ , address := range endpoints .Addresses {
289+ addresses [address ] = struct {}{}
290+ }
291+ }
292+ for _ , port := range data .Ports {
293+ item .Ports [* port .Name ] = & store.PortEndpoints {
294+ Port : int64 (* port .Port ),
295+ Addresses : addresses ,
296+ }
297+ }
298+ return item , nil
299+ case * corev1.Endpoints :
300+ item := & store.Endpoints {
301+ Namespace : data .GetNamespace (),
302+ Service : data .GetName (),
303+ Ports : make (map [string ]* store.PortEndpoints ),
304+ Status : status ,
305+ }
306+ for _ , subset := range data .Subsets {
307+ for _ , port := range subset .Ports {
308+ addresses := make (map [string ]struct {})
309+ for _ , address := range subset .Addresses {
310+ addresses [address .IP ] = struct {}{}
311+ }
312+ item .Ports [port .Name ] = & store.PortEndpoints {
313+ Port : int64 (port .Port ),
314+ Addresses : addresses ,
315+ }
264316 }
265317 }
318+ return item , nil
319+ default :
320+ k .Logger .Errorf ("%s: Invalid data from k8s api, %s" , ENDPOINTS , obj )
321+ return nil , ErrIgnored
266322 }
267- return item , nil
323+ }
324+
325+ func (k * K8s ) EventsEndpoints (channel chan SyncDataEvent , stop chan struct {}, informer cache.SharedIndexInformer ) {
326+ informer .AddEventHandler (cache.ResourceEventHandlerFuncs {
327+ AddFunc : func (obj interface {}) {
328+ item , err := k .convertToEndpoints (obj , ADDED )
329+ if errors .Is (err , ErrIgnored ) {
330+ return
331+ }
332+ k .Logger .Tracef ("%s %s: %s" , ENDPOINTS , item .Status , item .Service )
333+ channel <- SyncDataEvent {SyncType : ENDPOINTS , Namespace : item .Namespace , Data : item }
334+ },
335+ DeleteFunc : func (obj interface {}) {
336+ item , err := k .convertToEndpoints (obj , DELETED )
337+ if errors .Is (err , ErrIgnored ) {
338+ return
339+ }
340+ k .Logger .Tracef ("%s %s: %s" , ENDPOINTS , item .Status , item .Service )
341+ channel <- SyncDataEvent {SyncType : ENDPOINTS , Namespace : item .Namespace , Data : item }
342+ },
343+ UpdateFunc : func (oldObj , newObj interface {}) {
344+ item1 , err := k .convertToEndpoints (oldObj , EMPTY )
345+ if errors .Is (err , ErrIgnored ) {
346+ return
347+ }
348+ item2 , _ := k .convertToEndpoints (newObj , MODIFIED )
349+ if item2 .Equal (item1 ) {
350+ return
351+ }
352+ // fix modified state for ones that are deleted,new,same
353+ k .Logger .Tracef ("%s %s: %s" , ENDPOINTS , item2 .Status , item2 .Service )
354+ channel <- SyncDataEvent {SyncType : ENDPOINTS , Namespace : item2 .Namespace , Data : item2 }
355+ },
356+ })
357+ go informer .Run (stop )
268358}
269359
270360func (k * K8s ) EventsIngressClass (channel chan SyncDataEvent , stop chan struct {}, informer cache.SharedIndexInformer ) {
0 commit comments