Skip to content

Commit e277809

Browse files
authored
Merge pull request #72 from zvlb/main
Update logit with nodeID for listeners
2 parents 01afbe4 + 0340bea commit e277809

File tree

2 files changed

+130
-81
lines changed

2 files changed

+130
-81
lines changed

controllers/listener_controller.go

Lines changed: 129 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22+
"slices"
2223
"sort"
2324

2425
"github.com/go-logr/logr"
@@ -75,6 +76,7 @@ func (r *ListenerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
7576
instance := &v1alpha1.Listener{}
7677
err := r.Get(ctx, req.NamespacedName, instance)
7778
if err != nil {
79+
// if listener not found, delete him from cache
7880
if api_errors.IsNotFound(err) {
7981
r.log.V(1).Info("Listener instance not found. Delete object from xDS cache")
8082
nodeIDs, err := r.Cache.GetNodeIDsForResource(resourcev3.ListenerType, getResourceName(req.Namespace, req.Name))
@@ -95,6 +97,12 @@ func (r *ListenerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
9597
return ctrl.Result{}, errors.New(errors.EmptySpecMessage)
9698
}
9799

100+
// Get listener NodeIDs
101+
nodeIDs := k8s.NodeIDs(instance)
102+
if len(nodeIDs) == 0 {
103+
return ctrl.Result{}, errors.New(errors.NodeIDsEmpty)
104+
}
105+
98106
// Get Envoy Listener from listener instance spec
99107
listener := &listenerv3.Listener{}
100108
if err := r.Unmarshaler.Unmarshal(instance.Spec.Raw, listener); err != nil {
@@ -113,114 +121,134 @@ func (r *ListenerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
113121

114122
listener.Name = getResourceName(req.Namespace, req.Name)
115123

116-
var chains []*listenerv3.FilterChain
117-
var routeConfigs []*routev3.RouteConfiguration
118-
var errs []error
119-
activeDomains := make(map[string]struct{})
124+
// Create HashMap for fast searching of certificates
120125
index, err := k8s.IndexCertificateSecrets(ctx, r.Client, instance.Namespace)
121126
if err != nil {
122127
return ctrl.Result{}, errors.Wrap(err, "cannot generate TLS certificates index from Kubernetes secrets")
123128
}
124129

130+
// TODO: Why we sort it?
125131
sort.Slice(virtualServices.Items, func(i, j int) bool {
126132
return virtualServices.Items[i].CreationTimestamp.Before(&virtualServices.Items[j].CreationTimestamp)
127133
})
128134

129-
L1:
130-
for _, vs := range virtualServices.Items {
131-
tlsFactory := tls.NewTlsFactory(ctx, vs.Spec.TlsConfig, r.Client, r.DiscoveryClient, r.Config.GetDefaultIssuer(), instance.Namespace, index)
132-
vsFactory := virtualservice.NewVirtualServiceFactory(r.Client, r.Unmarshaler, &vs, instance, *tlsFactory)
133-
134-
virtSvc, err := vsFactory.Create(ctx, getResourceName(vs.Namespace, vs.Name))
135-
if err != nil {
136-
if errors.NeedStatusUpdate(err) {
137-
if err := vs.SetError(ctx, r.Client, errors.Wrap(err, "cannot get Virtual Service struct").Error()); err != nil {
135+
// Save listener FilterChains
136+
listenerFilterChains := listener.FilterChains
137+
138+
// Collect all VirtualServices for listener in each NodeID
139+
for _, nodeID := range nodeIDs {
140+
// errs for collect all errors with processing VirtualServices
141+
var errs []error
142+
143+
// activeDomains for collect all domains with active VirtualServices. It's needed for check dublicates
144+
activeDomains := make(map[string]struct{})
145+
146+
// routeConfigs for collect Routes for listener
147+
var routeConfigs []*routev3.RouteConfiguration
148+
149+
// chains for collect Filter Chains for listener
150+
var chains []*listenerv3.FilterChain
151+
152+
for _, vs := range virtualServices.Items {
153+
// If Virtual Service has nodeID or Virtual Service don't have any nondeID (or all NodeID)
154+
if slices.Contains(k8s.NodeIDs(&vs), nodeID) || k8s.NodeIDs(&vs) == nil {
155+
// Create Factory for TLS
156+
tlsFactory := tls.NewTlsFactory(
157+
ctx,
158+
vs.Spec.TlsConfig,
159+
r.Client,
160+
r.DiscoveryClient,
161+
r.Config.GetDefaultIssuer(),
162+
instance.Namespace,
163+
index,
164+
)
165+
166+
// Create Factory for VirtualService
167+
vsFactory := virtualservice.NewVirtualServiceFactory(
168+
r.Client,
169+
r.Unmarshaler,
170+
&vs,
171+
instance,
172+
*tlsFactory,
173+
)
174+
175+
// Create VirtualService
176+
virtSvc, err := vsFactory.Create(ctx, getResourceName(vs.Namespace, vs.Name))
177+
if err != nil {
178+
if errors.NeedStatusUpdate(err) {
179+
if err := vs.SetError(ctx, r.Client, errors.Wrap(err, "cannot get Virtual Service struct").Error()); err != nil {
180+
errs = append(errs, err)
181+
}
182+
continue
183+
}
138184
errs = append(errs, err)
185+
continue
139186
}
140-
continue
141-
}
142-
errs = append(errs, err)
143-
continue
144-
}
145187

146-
for _, domain := range virtSvc.VirtualHost.Domains {
147-
_, ok := activeDomains[domain]
148-
if ok {
149-
r.log.Error(nil, "domain already in use", "name:", domain)
150-
if err := vs.SetError(ctx, r.Client, fmt.Sprintf("duplicate domain: %s", domain)); err != nil {
151-
errs = append(errs, err)
188+
// Check domain dublicates
189+
for _, domain := range virtSvc.VirtualHost.Domains {
190+
_, ok := activeDomains[domain]
191+
if ok {
192+
r.log.Error(nil, "domain already in use", "name:", domain)
193+
if err := vs.SetError(ctx, r.Client, fmt.Sprintf("duplicate domain: %s", domain)); err != nil {
194+
errs = append(errs, err)
195+
}
196+
continue
197+
}
198+
activeDomains[domain] = struct{}{}
199+
}
200+
201+
// Set status about domains woth errors
202+
if virtSvc.Tls != nil {
203+
if len(virtSvc.Tls.ErrorDomains) > 0 {
204+
if err := vs.SetDomainsStatus(ctx, r.Client, virtSvc.Tls.ErrorDomains); err != nil {
205+
errs = append(errs, err)
206+
}
207+
}
152208
}
153-
continue L1
154-
}
155-
activeDomains[domain] = struct{}{}
156-
}
157209

158-
if virtSvc.Tls != nil {
159-
if len(virtSvc.Tls.ErrorDomains) > 0 {
160-
if err := vs.SetDomainsStatus(ctx, r.Client, virtSvc.Tls.ErrorDomains); err != nil {
210+
// Collect routes
211+
routeConfigs = append(routeConfigs, virtSvc.RouteConfig)
212+
213+
// Get and collect Filter Chains
214+
filterChains, err := virtualservice.FilterChains(&virtSvc)
215+
if err != nil {
216+
if errors.NeedStatusUpdate(err) {
217+
if err := vs.SetError(ctx, r.Client, errors.Wrap(err, "failed to get filterchain").Error()); err != nil {
218+
errs = append(errs, err)
219+
}
220+
continue
221+
}
161222
errs = append(errs, err)
223+
continue
162224
}
163-
}
164-
}
165225

166-
// If VirtualService nodeIDs is not empty and listener does not contains all of them - skip. TODO: Add to status
167-
if !k8s.NodeIDsContains(virtSvc.NodeIDs, k8s.NodeIDs(instance)) {
168-
r.log.Info("NodeID mismatch", "VirtualService", vs.Name)
169-
if err := vs.SetError(ctx, r.Client, "VirtualService nodeIDs is not empty and listener does not contains all of them"); err != nil {
170-
errs = append(errs, err)
171-
}
172-
continue
173-
}
226+
chains = append(chains, filterChains...)
174227

175-
routeConfigs = append(routeConfigs, virtSvc.RouteConfig)
228+
if err := vs.SetValid(ctx, r.Client); err != nil {
229+
errs = append(errs, err)
230+
}
176231

177-
ch, err := virtualservice.FilterChains(&virtSvc)
178-
if err != nil {
179-
if errors.NeedStatusUpdate(err) {
180-
if err := vs.SetError(ctx, r.Client, errors.Wrap(err, "failed to get filterchain").Error()); err != nil {
232+
if err := vs.SetLastAppliedHash(ctx, r.Client); err != nil {
181233
errs = append(errs, err)
182234
}
183235
}
184-
continue
185-
}
186-
187-
chains = append(chains, ch...)
188-
189-
if err := vs.SetValid(ctx, r.Client); err != nil {
190-
errs = append(errs, err)
191236
}
192237

193-
if err := vs.SetLastAppliedHash(ctx, r.Client); err != nil {
194-
errs = append(errs, err)
195-
}
196-
}
197-
198-
if len(errs) != 0 {
199-
r.log.Error(nil, "FilterChain build errors")
200-
for _, e := range errs {
201-
r.log.Error(e, "")
202-
}
203-
return ctrl.Result{}, errors.New("failed to generate FilterChains or RouteConfigs")
204-
}
205-
206-
listener.FilterChains = append(listener.FilterChains, chains...)
207-
208-
// Add routeConfigs to xds cache
209-
for _, rtConfig := range routeConfigs {
210-
for _, nodeID := range k8s.NodeIDs(instance) {
211-
r.log.V(1).Info("Adding route", "name:", rtConfig.Name)
212-
if err := r.Cache.Update(nodeID, rtConfig); err != nil {
213-
return ctrl.Result{}, errors.Wrap(err, errors.CannotUpdateCacheMessage)
238+
// Check errors
239+
if len(errs) != 0 {
240+
r.log.Error(nil, "FilterChain build errors")
241+
for _, e := range errs {
242+
r.log.Error(e, "")
214243
}
244+
return ctrl.Result{}, errors.New("failed to generate FilterChains or RouteConfigs")
215245
}
216-
}
217246

218-
if err := listener.ValidateAll(); err != nil {
219-
return reconcile.Result{}, errors.WrapUKS(err, errors.CannotValidateCacheResourceMessage)
220-
}
247+
// Add builded FilterChains to Listener
248+
listener.FilterChains = listenerFilterChains
249+
listener.FilterChains = append(listener.FilterChains, chains...)
221250

222-
// Add listener to xds cache
223-
for _, nodeID := range k8s.NodeIDs(instance) {
251+
// Clear Listener, if don'r have FilterChains
224252
if len(listener.FilterChains) == 0 {
225253
r.log.WithValues("NodeID", nodeID).Info("Listener FilterChain is empty, deleting")
226254
if err := r.Cache.Delete(nodeID, resourcev3.ListenerType, getResourceName(req.Namespace, req.Name)); err != nil {
@@ -229,9 +257,30 @@ L1:
229257
return ctrl.Result{}, nil
230258
}
231259

260+
// Validate Listener
261+
if err := listener.ValidateAll(); err != nil {
262+
return reconcile.Result{}, errors.WrapUKS(err, errors.CannotValidateCacheResourceMessage)
263+
}
264+
265+
// Update listener in xDS cache
266+
r.log.V(1).WithValues("NodeID", nodeID).Info("Update listener", "name:", listener.Name)
232267
if err := r.Cache.Update(nodeID, listener); err != nil {
233268
return ctrl.Result{}, errors.Wrap(err, errors.CannotUpdateCacheMessage)
234269
}
270+
271+
// Update routes in xDS cache
272+
for _, rtConfig := range routeConfigs {
273+
// Validate RouteConfig
274+
if err := rtConfig.ValidateAll(); err != nil {
275+
return reconcile.Result{}, errors.WrapUKS(err, errors.CannotValidateCacheResourceMessage)
276+
}
277+
278+
r.log.V(1).WithValues("NodeID", nodeID).Info("Update route", "name:", rtConfig.Name)
279+
if err := r.Cache.Update(nodeID, rtConfig); err != nil {
280+
return ctrl.Result{}, errors.Wrap(err, errors.CannotUpdateCacheMessage)
281+
}
282+
}
283+
235284
}
236285

237286
r.log.Info("Listener reconcilation finished")

pkg/errors/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929
ZeroParamMessage = `need choose one 1 param for configure TLS. \
3030
You can choose one of 'secretRef', 'certManager', 'autoDiscovery'.\
3131
If you don't want use TLS for connection - don't install tlsConfig`
32-
NodeIDsEmpty = "NodeID not set"
32+
NodeIDsEmpty = "Object don't have any NodeID"
3333
SecretNotTLSTypeMessage = "kuberentes Secret is not a type TLS"
3434
ControlLabelNotExistMessage = "kuberentes Secret doesn't have control label"
3535
ControlLabelWrongMessage = "kubernetes Secret have label, but value not true"

0 commit comments

Comments
 (0)