Skip to content

Commit 6178324

Browse files
authored
refactor GlobalConfiguration controller (#6327)
1 parent 8b07f97 commit 6178324

File tree

3 files changed

+171
-157
lines changed

3 files changed

+171
-157
lines changed

internal/k8s/controller.go

Lines changed: 0 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -565,20 +565,6 @@ func (nsi *namespacedInformer) addPolicyHandler(handlers cache.ResourceEventHand
565565
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
566566
}
567567

568-
func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.ResourceEventHandlerFuncs, namespace string, name string) {
569-
lbc.globalConfigurationLister, lbc.globalConfigurationController = cache.NewInformer(
570-
cache.NewListWatchFromClient(
571-
lbc.confClient.K8sV1().RESTClient(),
572-
"globalconfigurations",
573-
namespace,
574-
fields.Set{"metadata.name": name}.AsSelector()),
575-
&conf_v1.GlobalConfiguration{},
576-
lbc.resync,
577-
handlers,
578-
)
579-
lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.globalConfigurationController.HasSynced)
580-
}
581-
582568
func (nsi *namespacedInformer) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) {
583569
informer := nsi.confSharedInformerFactory.K8s().V1().TransportServers().Informer()
584570
informer.AddEventHandler(handlers)
@@ -1362,55 +1348,6 @@ func (lbc *LoadBalancerController) syncTransportServer(task task) {
13621348
lbc.processProblems(problems)
13631349
}
13641350

1365-
func (lbc *LoadBalancerController) syncGlobalConfiguration(task task) {
1366-
key := task.Key
1367-
obj, gcExists, err := lbc.globalConfigurationLister.GetByKey(key)
1368-
if err != nil {
1369-
lbc.syncQueue.Requeue(task, err)
1370-
return
1371-
}
1372-
1373-
var changes []ResourceChange
1374-
var problems []ConfigurationProblem
1375-
var validationErr error
1376-
1377-
if !gcExists {
1378-
glog.V(2).Infof("Deleting GlobalConfiguration: %v\n", key)
1379-
1380-
changes, problems = lbc.configuration.DeleteGlobalConfiguration()
1381-
} else {
1382-
glog.V(2).Infof("Adding or Updating GlobalConfiguration: %v\n", key)
1383-
1384-
gc := obj.(*conf_v1.GlobalConfiguration)
1385-
changes, problems, validationErr = lbc.configuration.AddOrUpdateGlobalConfiguration(gc)
1386-
}
1387-
1388-
updateErr := lbc.processChangesFromGlobalConfiguration(changes)
1389-
1390-
if gcExists {
1391-
eventTitle := "Updated"
1392-
eventType := api_v1.EventTypeNormal
1393-
eventMessage := fmt.Sprintf("GlobalConfiguration %s was added or updated", key)
1394-
1395-
if validationErr != nil {
1396-
eventTitle = "AddedOrUpdatedWithError"
1397-
eventType = api_v1.EventTypeWarning
1398-
eventMessage = fmt.Sprintf("GlobalConfiguration %s is updated with errors: %v", key, validationErr)
1399-
}
1400-
1401-
if updateErr != nil {
1402-
eventTitle += "WithError"
1403-
eventType = api_v1.EventTypeWarning
1404-
eventMessage = fmt.Sprintf("%s; with reload error: %v", eventMessage, updateErr)
1405-
}
1406-
1407-
gc := obj.(*conf_v1.GlobalConfiguration)
1408-
lbc.recorder.Eventf(gc, eventType, eventTitle, eventMessage)
1409-
}
1410-
1411-
lbc.processProblems(problems)
1412-
}
1413-
14141351
func (lbc *LoadBalancerController) syncVirtualServer(task task) {
14151352
key := task.Key
14161353
var obj interface{}
@@ -1581,66 +1518,6 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) {
15811518
}
15821519
}
15831520

1584-
// processChangesFromGlobalConfiguration processes changes that come from updates to the GlobalConfiguration resource.
1585-
// Such changes need to be processed at once to prevent any inconsistencies in the generated NGINX config.
1586-
func (lbc *LoadBalancerController) processChangesFromGlobalConfiguration(changes []ResourceChange) error {
1587-
var updatedTSExes []*configs.TransportServerEx
1588-
var updatedVSExes []*configs.VirtualServerEx
1589-
var deletedTSKeys []string
1590-
var deletedVSKeys []string
1591-
1592-
var updatedResources []Resource
1593-
1594-
for _, c := range changes {
1595-
switch impl := c.Resource.(type) {
1596-
case *VirtualServerConfiguration:
1597-
if c.Op == AddOrUpdate {
1598-
vsEx := lbc.createVirtualServerEx(impl.VirtualServer, impl.VirtualServerRoutes)
1599-
1600-
updatedVSExes = append(updatedVSExes, vsEx)
1601-
updatedResources = append(updatedResources, impl)
1602-
} else if c.Op == Delete {
1603-
key := getResourceKey(&impl.VirtualServer.ObjectMeta)
1604-
1605-
deletedVSKeys = append(deletedVSKeys, key)
1606-
}
1607-
case *TransportServerConfiguration:
1608-
if c.Op == AddOrUpdate {
1609-
tsEx := lbc.createTransportServerEx(impl.TransportServer, impl.ListenerPort)
1610-
1611-
updatedTSExes = append(updatedTSExes, tsEx)
1612-
updatedResources = append(updatedResources, impl)
1613-
} else if c.Op == Delete {
1614-
key := getResourceKey(&impl.TransportServer.ObjectMeta)
1615-
1616-
deletedTSKeys = append(deletedTSKeys, key)
1617-
}
1618-
}
1619-
}
1620-
1621-
var updateErr error
1622-
1623-
if len(updatedTSExes) > 0 || len(deletedTSKeys) > 0 {
1624-
tsUpdateErrs := lbc.configurator.UpdateTransportServers(updatedTSExes, deletedTSKeys)
1625-
1626-
if len(tsUpdateErrs) > 0 {
1627-
updateErr = fmt.Errorf("errors received from updating TransportServers after GlobalConfiguration change: %v", tsUpdateErrs)
1628-
}
1629-
}
1630-
1631-
if len(updatedVSExes) > 0 || len(deletedVSKeys) > 0 {
1632-
vsUpdateErrs := lbc.configurator.UpdateVirtualServers(updatedVSExes, deletedVSKeys)
1633-
1634-
if len(vsUpdateErrs) > 0 {
1635-
updateErr = fmt.Errorf("errors received from updating VirtualSrvers after GlobalConfiguration change: %v", vsUpdateErrs)
1636-
}
1637-
}
1638-
1639-
lbc.updateResourcesStatusAndEvents(updatedResources, configs.Warnings{}, updateErr)
1640-
1641-
return updateErr
1642-
}
1643-
16441521
func (lbc *LoadBalancerController) updateTransportServerStatusAndEventsOnDelete(tsConfig *TransportServerConfiguration, changeError string, deleteErr error) {
16451522
eventType := api_v1.EventTypeWarning
16461523
eventTitle := "Rejected"
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package k8s
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/golang/glog"
8+
"github.com/nginxinc/kubernetes-ingress/internal/configs"
9+
conf_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1"
10+
api_v1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/fields"
12+
"k8s.io/client-go/tools/cache"
13+
)
14+
15+
func createGlobalConfigurationHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
16+
return cache.ResourceEventHandlerFuncs{
17+
AddFunc: func(obj interface{}) {
18+
gc := obj.(*conf_v1.GlobalConfiguration)
19+
glog.V(3).Infof("Adding GlobalConfiguration: %v", gc.Name)
20+
lbc.AddSyncQueue(gc)
21+
},
22+
DeleteFunc: func(obj interface{}) {
23+
gc, isGc := obj.(*conf_v1.GlobalConfiguration)
24+
if !isGc {
25+
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
26+
if !ok {
27+
glog.V(3).Infof("Error received unexpected object: %v", obj)
28+
return
29+
}
30+
gc, ok = deletedState.Obj.(*conf_v1.GlobalConfiguration)
31+
if !ok {
32+
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-GlobalConfiguration object: %v", deletedState.Obj)
33+
return
34+
}
35+
}
36+
glog.V(3).Infof("Removing GlobalConfiguration: %v", gc.Name)
37+
lbc.AddSyncQueue(gc)
38+
},
39+
UpdateFunc: func(old, cur interface{}) {
40+
curGc := cur.(*conf_v1.GlobalConfiguration)
41+
if !reflect.DeepEqual(old, cur) {
42+
glog.V(3).Infof("GlobalConfiguration %v changed, syncing", curGc.Name)
43+
lbc.AddSyncQueue(curGc)
44+
}
45+
},
46+
}
47+
}
48+
49+
func (lbc *LoadBalancerController) addGlobalConfigurationHandler(handlers cache.ResourceEventHandlerFuncs, namespace string, name string) {
50+
options := cache.InformerOptions{
51+
ListerWatcher: cache.NewListWatchFromClient(
52+
lbc.confClient.K8sV1().RESTClient(),
53+
"globalconfigurations",
54+
namespace,
55+
fields.Set{"metadata.name": name}.AsSelector()),
56+
ObjectType: &conf_v1.GlobalConfiguration{},
57+
ResyncPeriod: lbc.resync,
58+
Handler: handlers,
59+
}
60+
lbc.globalConfigurationLister, lbc.globalConfigurationController = cache.NewInformerWithOptions(options)
61+
lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.globalConfigurationController.HasSynced)
62+
}
63+
64+
func (lbc *LoadBalancerController) syncGlobalConfiguration(task task) {
65+
key := task.Key
66+
obj, gcExists, err := lbc.globalConfigurationLister.GetByKey(key)
67+
if err != nil {
68+
lbc.syncQueue.Requeue(task, err)
69+
return
70+
}
71+
72+
var changes []ResourceChange
73+
var problems []ConfigurationProblem
74+
var validationErr error
75+
76+
if !gcExists {
77+
glog.V(2).Infof("Deleting GlobalConfiguration: %v\n", key)
78+
79+
changes, problems = lbc.configuration.DeleteGlobalConfiguration()
80+
} else {
81+
glog.V(2).Infof("Adding or Updating GlobalConfiguration: %v\n", key)
82+
83+
gc := obj.(*conf_v1.GlobalConfiguration)
84+
changes, problems, validationErr = lbc.configuration.AddOrUpdateGlobalConfiguration(gc)
85+
}
86+
87+
updateErr := lbc.processChangesFromGlobalConfiguration(changes)
88+
89+
if gcExists {
90+
eventTitle := "Updated"
91+
eventType := api_v1.EventTypeNormal
92+
eventMessage := fmt.Sprintf("GlobalConfiguration %s was added or updated", key)
93+
94+
if validationErr != nil {
95+
eventTitle = "AddedOrUpdatedWithError"
96+
eventType = api_v1.EventTypeWarning
97+
eventMessage = fmt.Sprintf("GlobalConfiguration %s is updated with errors: %v", key, validationErr)
98+
}
99+
100+
if updateErr != nil {
101+
eventTitle += "WithError"
102+
eventType = api_v1.EventTypeWarning
103+
eventMessage = fmt.Sprintf("%s; with reload error: %v", eventMessage, updateErr)
104+
}
105+
106+
gc := obj.(*conf_v1.GlobalConfiguration)
107+
lbc.recorder.Eventf(gc, eventType, eventTitle, eventMessage)
108+
}
109+
110+
lbc.processProblems(problems)
111+
}
112+
113+
// processChangesFromGlobalConfiguration processes changes that come from updates to the GlobalConfiguration resource.
114+
// Such changes need to be processed at once to prevent any inconsistencies in the generated NGINX config.
115+
func (lbc *LoadBalancerController) processChangesFromGlobalConfiguration(changes []ResourceChange) error {
116+
var updatedTSExes []*configs.TransportServerEx
117+
var updatedVSExes []*configs.VirtualServerEx
118+
var deletedTSKeys []string
119+
var deletedVSKeys []string
120+
121+
var updatedResources []Resource
122+
123+
for _, c := range changes {
124+
switch impl := c.Resource.(type) {
125+
case *VirtualServerConfiguration:
126+
if c.Op == AddOrUpdate {
127+
vsEx := lbc.createVirtualServerEx(impl.VirtualServer, impl.VirtualServerRoutes)
128+
129+
updatedVSExes = append(updatedVSExes, vsEx)
130+
updatedResources = append(updatedResources, impl)
131+
} else if c.Op == Delete {
132+
key := getResourceKey(&impl.VirtualServer.ObjectMeta)
133+
134+
deletedVSKeys = append(deletedVSKeys, key)
135+
}
136+
case *TransportServerConfiguration:
137+
if c.Op == AddOrUpdate {
138+
tsEx := lbc.createTransportServerEx(impl.TransportServer, impl.ListenerPort)
139+
140+
updatedTSExes = append(updatedTSExes, tsEx)
141+
updatedResources = append(updatedResources, impl)
142+
} else if c.Op == Delete {
143+
key := getResourceKey(&impl.TransportServer.ObjectMeta)
144+
145+
deletedTSKeys = append(deletedTSKeys, key)
146+
}
147+
}
148+
}
149+
150+
var updateErr error
151+
152+
if len(updatedTSExes) > 0 || len(deletedTSKeys) > 0 {
153+
tsUpdateErrs := lbc.configurator.UpdateTransportServers(updatedTSExes, deletedTSKeys)
154+
155+
if len(tsUpdateErrs) > 0 {
156+
updateErr = fmt.Errorf("errors received from updating TransportServers after GlobalConfiguration change: %v", tsUpdateErrs)
157+
}
158+
}
159+
160+
if len(updatedVSExes) > 0 || len(deletedVSKeys) > 0 {
161+
vsUpdateErrs := lbc.configurator.UpdateVirtualServers(updatedVSExes, deletedVSKeys)
162+
163+
if len(vsUpdateErrs) > 0 {
164+
updateErr = fmt.Errorf("errors received from updating VirtualSrvers after GlobalConfiguration change: %v", vsUpdateErrs)
165+
}
166+
}
167+
168+
lbc.updateResourcesStatusAndEvents(updatedResources, configs.Warnings{}, updateErr)
169+
170+
return updateErr
171+
}

internal/k8s/handlers.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -326,40 +326,6 @@ func createVirtualServerRouteHandlers(lbc *LoadBalancerController) cache.Resourc
326326
}
327327
}
328328

329-
func createGlobalConfigurationHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
330-
return cache.ResourceEventHandlerFuncs{
331-
AddFunc: func(obj interface{}) {
332-
gc := obj.(*conf_v1.GlobalConfiguration)
333-
glog.V(3).Infof("Adding GlobalConfiguration: %v", gc.Name)
334-
lbc.AddSyncQueue(gc)
335-
},
336-
DeleteFunc: func(obj interface{}) {
337-
gc, isGc := obj.(*conf_v1.GlobalConfiguration)
338-
if !isGc {
339-
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
340-
if !ok {
341-
glog.V(3).Infof("Error received unexpected object: %v", obj)
342-
return
343-
}
344-
gc, ok = deletedState.Obj.(*conf_v1.GlobalConfiguration)
345-
if !ok {
346-
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-GlobalConfiguration object: %v", deletedState.Obj)
347-
return
348-
}
349-
}
350-
glog.V(3).Infof("Removing GlobalConfiguration: %v", gc.Name)
351-
lbc.AddSyncQueue(gc)
352-
},
353-
UpdateFunc: func(old, cur interface{}) {
354-
curGc := cur.(*conf_v1.GlobalConfiguration)
355-
if !reflect.DeepEqual(old, cur) {
356-
glog.V(3).Infof("GlobalConfiguration %v changed, syncing", curGc.Name)
357-
lbc.AddSyncQueue(curGc)
358-
}
359-
},
360-
}
361-
}
362-
363329
func createTransportServerHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
364330
return cache.ResourceEventHandlerFuncs{
365331
AddFunc: func(obj interface{}) {

0 commit comments

Comments
 (0)