Skip to content

Commit 478789a

Browse files
authored
Stop modifying ingresses in the Store
In the Ingress Controller we often get objects from our Stores then make modifications. Since these objects are passed by reference or contain references, they are modified in the Store also. Normally this is OK since we don't update the items in the API. However, now that we update status of Ingresses, we need unmodified versions of the ingress objects. * DeepCopy when getting ingress objects from the store * Get fresh ingress from the store before setting status
1 parent d30fedf commit 478789a

File tree

5 files changed

+186
-34
lines changed

5 files changed

+186
-34
lines changed

install/rbac/rbac.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ rules:
5050
verbs:
5151
- list
5252
- watch
53+
- get
5354
- apiGroups:
5455
- "extensions"
5556
resources:

nginx-controller/controller/controller.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -120,31 +120,6 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc
120120

121121
glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass)
122122

123-
lbc.statusUpdater = &StatusUpdater{
124-
client: input.KubeClient,
125-
namespace: input.ControllerNamespace,
126-
externalServiceName: input.ExternalServiceName,
127-
}
128-
129-
if input.ReportIngressStatus && input.LeaderElectionEnabled {
130-
leaderCallbacks := leaderelection.LeaderCallbacks{
131-
OnStartedLeading: func(stop <-chan struct{}) {
132-
glog.V(3).Info("started leading, updating ingress status")
133-
ingresses, mergeableIngresses := lbc.getManagedIngresses()
134-
err := lbc.statusUpdater.UpdateManagedAndMergeableIngresses(ingresses, mergeableIngresses)
135-
if err != nil {
136-
glog.V(3).Infof("error updating status when starting leading: %v", err)
137-
}
138-
},
139-
}
140-
141-
var err error
142-
lbc.leaderElector, err = NewLeaderElector(input.KubeClient, leaderCallbacks, input.ControllerNamespace)
143-
if err != nil {
144-
glog.V(3).Infof("Error starting LeaderElection: %v", err)
145-
}
146-
}
147-
148123
ingHandlers := cache.ResourceEventHandlerFuncs{
149124
AddFunc: func(obj interface{}) {
150125
addIng := obj.(*extensions.Ingress)
@@ -221,6 +196,34 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc
221196
cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", input.Namespace, fields.Everything()),
222197
&extensions.Ingress{}, input.ResyncPeriod, ingHandlers)
223198

199+
// statusUpdater requires ingLister to be instantiated, above.
200+
lbc.statusUpdater = &StatusUpdater{
201+
client: input.KubeClient,
202+
namespace: input.ControllerNamespace,
203+
externalServiceName: input.ExternalServiceName,
204+
ingLister: &lbc.ingLister,
205+
keyFunc: keyFunc,
206+
}
207+
208+
if input.ReportIngressStatus && input.LeaderElectionEnabled {
209+
leaderCallbacks := leaderelection.LeaderCallbacks{
210+
OnStartedLeading: func(stop <-chan struct{}) {
211+
glog.V(3).Info("started leading, updating ingress status")
212+
ingresses, mergeableIngresses := lbc.getManagedIngresses()
213+
err := lbc.statusUpdater.UpdateManagedAndMergeableIngresses(ingresses, mergeableIngresses)
214+
if err != nil {
215+
glog.V(3).Infof("error updating status when starting leading: %v", err)
216+
}
217+
},
218+
}
219+
220+
var err error
221+
lbc.leaderElector, err = NewLeaderElector(input.KubeClient, leaderCallbacks, input.ControllerNamespace)
222+
if err != nil {
223+
glog.V(3).Infof("Error starting LeaderElection: %v", err)
224+
}
225+
}
226+
224227
svcHandlers := cache.ResourceEventHandlerFuncs{
225228
AddFunc: func(obj interface{}) {
226229
addSvc := obj.(*api_v1.Service)
@@ -629,7 +632,7 @@ func (lbc *LoadBalancerController) sync(task Task) {
629632

630633
func (lbc *LoadBalancerController) syncIng(task Task) {
631634
key := task.Key
632-
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
635+
ing, ingExists, err := lbc.ingLister.GetByKeySafe(key)
633636
if err != nil {
634637
lbc.syncQueue.requeue(task, err)
635638
return
@@ -645,8 +648,6 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
645648
} else {
646649
glog.V(2).Infof("Adding or Updating Ingress: %v\n", key)
647650

648-
ing := obj.(*extensions.Ingress)
649-
650651
if isMaster(ing) {
651652
mergeableIngExs, err := lbc.createMergableIngresses(ing)
652653
if err != nil {

nginx-controller/controller/status.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type StatusUpdater struct {
2424
externalStatusAddress string
2525
externalServiceAddresses []string
2626
status []api_v1.LoadBalancerIngress
27+
keyFunc func(obj interface{}) (string, error)
28+
ingLister *StoreToIngressLister
2729
}
2830

2931
// UpdateManagedAndMergeableIngresses handles the full return format of LoadBalancerController.getManagedIngresses
@@ -67,12 +69,27 @@ func (su *StatusUpdater) updateIngressWithStatus(ing v1beta1.Ingress, status []a
6769
if reflect.DeepEqual(ing.Status.LoadBalancer.Ingress, status) {
6870
return nil
6971
}
70-
// Objects from the LoadBalancerController.ingLister.Store are retrieved by reference
71-
// and are not safe to modify.
72-
ingCopy := ing.DeepCopy()
72+
73+
// Get a pristine Ingress from the Store. Required because annotations can be modified
74+
// for mergable Ingress objects and the update status API call will update annotations, not just status.
75+
key, err := su.keyFunc(&ing)
76+
if err != nil {
77+
glog.V(3).Infof("error getting key for ing: %v", err)
78+
return err
79+
}
80+
ingCopy, exists, err := su.ingLister.GetByKeySafe(key)
81+
if err != nil {
82+
glog.V(3).Infof("error getting ing from Store by key: %v", err)
83+
return err
84+
}
85+
if !exists {
86+
glog.V(3).Infof("ing doesn't exist in Store")
87+
return nil
88+
}
89+
7390
ingCopy.Status.LoadBalancer.Ingress = status
7491
clientIngress := su.client.ExtensionsV1beta1().Ingresses(ingCopy.Namespace)
75-
_, err := clientIngress.UpdateStatus(ingCopy)
92+
_, err = clientIngress.UpdateStatus(ingCopy)
7693
if err != nil {
7794
glog.V(3).Infof("error setting ingress status: %v", err)
7895
err = su.retryStatusUpdate(clientIngress, ingCopy)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package controller
2+
3+
import (
4+
"testing"
5+
6+
"k8s.io/api/core/v1"
7+
extensions "k8s.io/api/extensions/v1beta1"
8+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/fields"
10+
"k8s.io/client-go/kubernetes/fake"
11+
"k8s.io/client-go/tools/cache"
12+
)
13+
14+
func TestStatusUpdate(t *testing.T) {
15+
ing := extensions.Ingress{
16+
ObjectMeta: meta_v1.ObjectMeta{
17+
Name: "ing-1",
18+
Namespace: "namespace",
19+
},
20+
Status: extensions.IngressStatus{
21+
LoadBalancer: v1.LoadBalancerStatus{
22+
Ingress: []v1.LoadBalancerIngress{
23+
{
24+
IP: "1.2.3.4",
25+
},
26+
},
27+
},
28+
},
29+
}
30+
fakeClient := fake.NewSimpleClientset(
31+
&extensions.IngressList{Items: []extensions.Ingress{
32+
ing,
33+
}},
34+
)
35+
ingLister := StoreToIngressLister{}
36+
ingLister.Store, _ = cache.NewInformer(
37+
cache.NewListWatchFromClient(fakeClient.Extensions().RESTClient(), "ingresses", "nginx-ingress", fields.Everything()),
38+
&extensions.Ingress{}, 2, nil)
39+
40+
ingLister.Store.Add(&ing)
41+
42+
su := StatusUpdater{
43+
client: fakeClient,
44+
namespace: "namespace",
45+
externalServiceName: "service-name",
46+
externalStatusAddress: "123.123.123.123",
47+
ingLister: &ingLister,
48+
keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc,
49+
}
50+
err := su.ClearIngressStatus(ing)
51+
if err != nil {
52+
t.Errorf("error clearing ing status: %v", err)
53+
}
54+
ings, _ := fakeClient.ExtensionsV1beta1().Ingresses("namespace").List(meta_v1.ListOptions{})
55+
ingf := ings.Items[0]
56+
if !checkStatus("", ingf) {
57+
t.Errorf("expected: %v actual: %v", "", ingf.Status.LoadBalancer.Ingress[0])
58+
}
59+
60+
su.SaveStatusFromExternalStatus("1.1.1.1")
61+
err = su.UpdateIngressStatus(ing)
62+
if err != nil {
63+
t.Errorf("error updating ing status: %v", err)
64+
}
65+
ring, _ := fakeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{})
66+
if !checkStatus("1.1.1.1", *ring) {
67+
t.Errorf("expected: %v actual: %v", "", ring.Status.LoadBalancer.Ingress)
68+
}
69+
70+
svc := v1.Service{
71+
ObjectMeta: meta_v1.ObjectMeta{
72+
Namespace: "namespace",
73+
Name: "service-name",
74+
},
75+
Status: v1.ServiceStatus{
76+
LoadBalancer: v1.LoadBalancerStatus{
77+
Ingress: []v1.LoadBalancerIngress{v1.LoadBalancerIngress{
78+
IP: "2.2.2.2",
79+
}},
80+
},
81+
},
82+
}
83+
su.SaveStatusFromExternalService(&svc)
84+
err = su.UpdateIngressStatus(ing)
85+
if err != nil {
86+
t.Errorf("error updating ing status: %v", err)
87+
}
88+
ring, _ = fakeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{})
89+
if !checkStatus("1.1.1.1", *ring) {
90+
t.Errorf("expected: %v actual: %v", "1.1.1.1", ring.Status.LoadBalancer.Ingress)
91+
}
92+
93+
su.SaveStatusFromExternalStatus("")
94+
err = su.UpdateIngressStatus(ing)
95+
if err != nil {
96+
t.Errorf("error updating ing status: %v", err)
97+
}
98+
ring, _ = fakeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{})
99+
if !checkStatus("2.2.2.2", *ring) {
100+
t.Errorf("expected: %v actual: %v", "2.2.2.2", ring.Status.LoadBalancer.Ingress)
101+
}
102+
103+
su.ClearStatusFromExternalService()
104+
err = su.UpdateIngressStatus(ing)
105+
if err != nil {
106+
t.Errorf("error updating ing status: %v", err)
107+
}
108+
ring, _ = fakeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{})
109+
if !checkStatus("", *ring) {
110+
t.Errorf("expected: %v actual: %v", "", ring.Status.LoadBalancer.Ingress)
111+
}
112+
}
113+
114+
func checkStatus(expected string, actual extensions.Ingress) bool {
115+
if len(actual.Status.LoadBalancer.Ingress) == 0 {
116+
if expected == "" {
117+
return true
118+
}
119+
return false
120+
}
121+
return expected == actual.Status.LoadBalancer.Ingress[0].IP
122+
}

nginx-controller/controller/utils.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,21 @@ type StoreToIngressLister struct {
163163
cache.Store
164164
}
165165

166+
// GetByKeySafe calls Store.GetByKeySafe and returns a copy of the ingress so it is
167+
// safe to modify.
168+
func (s *StoreToIngressLister) GetByKeySafe(key string) (ing *extensions.Ingress, exists bool, err error) {
169+
item, exists, err := s.Store.GetByKey(key)
170+
if !exists || err != nil {
171+
return nil, exists, err
172+
}
173+
ing = item.(*extensions.Ingress).DeepCopy()
174+
return
175+
}
176+
166177
// List lists all Ingress' in the store.
167178
func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) {
168179
for _, m := range s.Store.List() {
169-
ing.Items = append(ing.Items, *(m.(*extensions.Ingress)))
180+
ing.Items = append(ing.Items, *(m.(*extensions.Ingress)).DeepCopy())
170181
}
171182
return ing, nil
172183
}
@@ -175,7 +186,7 @@ func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) {
175186
// Note that this ignores services without the right nodePorts.
176187
func (s *StoreToIngressLister) GetServiceIngress(svc *api_v1.Service) (ings []extensions.Ingress, err error) {
177188
for _, m := range s.Store.List() {
178-
ing := *m.(*extensions.Ingress)
189+
ing := *m.(*extensions.Ingress).DeepCopy()
179190
if ing.Namespace != svc.Namespace {
180191
continue
181192
}

0 commit comments

Comments
 (0)