Skip to content

Commit 5f6d9b6

Browse files
authored
Merge pull request kubernetes#77210 from feiskyer/nsg-race
Add etag for NSG updates so as to fix nsg race condition
2 parents 7a8e11c + 30f1bf2 commit 5f6d9b6

File tree

6 files changed

+110
-14
lines changed

6 files changed

+110
-14
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ go_test(
100100
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
101101
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
102102
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
103+
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
103104
"//staging/src/k8s.io/cloud-provider:go_default_library",
104105
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
105106
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",

staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute"
2424
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
25+
"github.com/Azure/go-autorest/autorest/to"
2526

2627
"k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
@@ -146,7 +147,7 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
146147
ctx, cancel := getContextWithCancel()
147148
defer cancel()
148149

149-
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg)
150+
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
150151
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
151152
if err == nil {
152153
if isSuccessHTTPResponse(resp) {
@@ -156,6 +157,11 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
156157
return fmt.Errorf("HTTP response %q", resp.Status)
157158
}
158159
}
160+
161+
// Invalidate the cache because ETAG precondition mismatch.
162+
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
163+
az.nsgCache.Delete(*sg.Name)
164+
}
159165
return err
160166
}
161167

@@ -168,14 +174,20 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.Secur
168174
ctx, cancel := getContextWithCancel()
169175
defer cancel()
170176

171-
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg)
177+
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
172178
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
173-
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
179+
done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
174180
if done && err == nil {
175181
// Invalidate the cache right after updating
176182
az.nsgCache.Delete(*sg.Name)
177183
}
178-
return done, err
184+
185+
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
186+
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
187+
az.nsgCache.Delete(*sg.Name)
188+
return true, err
189+
}
190+
return done, retryError
179191
})
180192
}
181193

@@ -538,17 +550,22 @@ func isSuccessHTTPResponse(resp *http.Response) bool {
538550
}
539551

540552
func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
541-
if err != nil {
542-
return true
543-
}
544-
545553
if resp != nil {
546-
// HTTP 4xx or 5xx suggests we should retry
554+
// HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry.
555+
if resp.StatusCode == http.StatusPreconditionFailed {
556+
return false
557+
}
558+
559+
// HTTP 4xx (except 412) or 5xx suggests we should retry.
547560
if 399 < resp.StatusCode && resp.StatusCode < 600 {
548561
return true
549562
}
550563
}
551564

565+
if err != nil {
566+
return true
567+
}
568+
552569
return false
553570
}
554571

staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,12 @@ func (t *timedCache) Delete(key string) error {
122122
key: key,
123123
})
124124
}
125+
126+
// Set sets the data cache for the key.
127+
// It is only used for testing.
128+
func (t *timedCache) Set(key string, data interface{}) {
129+
t.store.Add(&cacheEntry{
130+
key: key,
131+
data: data,
132+
})
133+
}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_client.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type SubnetsClient interface {
8181

8282
// SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient
8383
type SecurityGroupsClient interface {
84-
CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error)
84+
CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error)
8585
Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error)
8686
Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error)
8787
List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error)
@@ -714,7 +714,7 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient {
714714
}
715715
}
716716

717-
func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) {
717+
func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
718718
/* Write rate limiting */
719719
if !az.rateLimiterWriter.TryAccept() {
720720
err = createRateLimitErr(true, "NSGCreateOrUpdate")
@@ -727,7 +727,13 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
727727
}()
728728

729729
mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID)
730-
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, networkSecurityGroupName, parameters)
730+
req, err := az.createOrUpdatePreparer(ctx, resourceGroupName, networkSecurityGroupName, parameters, etag)
731+
if err != nil {
732+
mc.Observe(err)
733+
return nil, err
734+
}
735+
736+
future, err := az.client.CreateOrUpdateSender(req)
731737
if err != nil {
732738
mc.Observe(err)
733739
return future.Response(), err
@@ -738,6 +744,34 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
738744
return future.Response(), err
739745
}
740746

747+
// createOrUpdatePreparer prepares the CreateOrUpdate request.
748+
func (az *azSecurityGroupsClient) createOrUpdatePreparer(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (*http.Request, error) {
749+
pathParameters := map[string]interface{}{
750+
"networkSecurityGroupName": autorest.Encode("path", networkSecurityGroupName),
751+
"resourceGroupName": autorest.Encode("path", resourceGroupName),
752+
"subscriptionId": autorest.Encode("path", az.client.SubscriptionID),
753+
}
754+
755+
const APIVersion = "2017-09-01"
756+
queryParameters := map[string]interface{}{
757+
"api-version": APIVersion,
758+
}
759+
760+
preparerDecorators := []autorest.PrepareDecorator{
761+
autorest.AsContentType("application/json; charset=utf-8"),
762+
autorest.AsPut(),
763+
autorest.WithBaseURL(az.client.BaseURI),
764+
autorest.WithPathParameters("/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkSecurityGroups/{networkSecurityGroupName}", pathParameters),
765+
autorest.WithJSON(parameters),
766+
autorest.WithQueryParameters(queryParameters),
767+
}
768+
if etag != "" {
769+
preparerDecorators = append(preparerDecorators, autorest.WithHeader("If-Match", autorest.String(etag)))
770+
}
771+
preparer := autorest.CreatePreparer(preparerDecorators...)
772+
return preparer.Prepare((&http.Request{}).WithContext(ctx))
773+
}
774+
741775
func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) {
742776
/* Write rate limiting */
743777
if !az.rateLimiterWriter.TryAccept() {

staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ import (
3636
"github.com/Azure/go-autorest/autorest/to"
3737
)
3838

39+
var (
40+
errPreconditionFailedEtagMismatch = fmt.Errorf("PreconditionFailedEtagMismatch")
41+
)
42+
3943
type fakeAzureLBClient struct {
4044
mutex *sync.Mutex
4145
FakeStore map[string]map[string]network.LoadBalancer
@@ -417,13 +421,21 @@ func newFakeAzureNSGClient() *fakeAzureNSGClient {
417421
return fNSG
418422
}
419423

420-
func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) {
424+
func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
421425
fNSG.mutex.Lock()
422426
defer fNSG.mutex.Unlock()
423427

424428
if _, ok := fNSG.FakeStore[resourceGroupName]; !ok {
425429
fNSG.FakeStore[resourceGroupName] = make(map[string]network.SecurityGroup)
426430
}
431+
432+
if nsg, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok {
433+
if etag != "" && to.String(nsg.Etag) != "" && etag != to.String(nsg.Etag) {
434+
return &http.Response{
435+
StatusCode: http.StatusPreconditionFailed,
436+
}, errPreconditionFailedEtagMismatch
437+
}
438+
}
427439
fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters
428440

429441
return nil, nil

staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/types"
3232
"k8s.io/apimachinery/pkg/util/sets"
33+
"k8s.io/client-go/tools/record"
3334
servicehelpers "k8s.io/cloud-provider/service/helpers"
3435
"k8s.io/legacy-cloud-providers/azure/auth"
3536

@@ -855,6 +856,25 @@ func TestReconcileSecurityWithSourceRanges(t *testing.T) {
855856
validateSecurityGroup(t, sg, svc)
856857
}
857858

859+
func TestReconcileSecurityGroupEtagMismatch(t *testing.T) {
860+
az := getTestCloud()
861+
862+
sg := getTestSecurityGroup(az)
863+
cachedSG := *sg
864+
cachedSG.Etag = to.StringPtr("1111111-0000-0000-0000-000000000000")
865+
az.nsgCache.Set(to.String(sg.Name), &cachedSG)
866+
867+
svc1 := getTestService("servicea", v1.ProtocolTCP, 80)
868+
clusterResources := getClusterResources(az, 1, 1)
869+
lb, _ := az.reconcileLoadBalancer(testClusterName, &svc1, clusterResources.nodes, true)
870+
lbStatus, _ := az.getServiceLoadBalancerStatus(&svc1, lb)
871+
872+
newSG, err := az.reconcileSecurityGroup(testClusterName, &svc1, &lbStatus.Ingress[0].IP, true /* wantLb */)
873+
assert.Nil(t, newSG)
874+
assert.NotNil(t, err)
875+
assert.Equal(t, err, errPreconditionFailedEtagMismatch)
876+
}
877+
858878
func TestReconcilePublicIPWithNewService(t *testing.T) {
859879
az := getTestCloud()
860880
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
@@ -958,6 +978,7 @@ func getTestCloud() (az *Cloud) {
958978
nodeResourceGroups: map[string]string{},
959979
unmanagedNodes: sets.NewString(),
960980
routeCIDRs: map[string]string{},
981+
eventRecorder: &record.FakeRecorder{},
961982
}
962983
az.DisksClient = newFakeDisksClient()
963984
az.InterfacesClient = newFakeAzureInterfacesClient()
@@ -1186,6 +1207,7 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr
11861207

11871208
sg := network.SecurityGroup{
11881209
Name: &az.SecurityGroupName,
1210+
Etag: to.StringPtr("0000000-0000-0000-0000-000000000000"),
11891211
SecurityGroupPropertiesFormat: &network.SecurityGroupPropertiesFormat{
11901212
SecurityRules: &rules,
11911213
},
@@ -1197,7 +1219,8 @@ func getTestSecurityGroup(az *Cloud, services ...v1.Service) *network.SecurityGr
11971219
ctx,
11981220
az.ResourceGroup,
11991221
az.SecurityGroupName,
1200-
sg)
1222+
sg,
1223+
"")
12011224

12021225
return &sg
12031226
}

0 commit comments

Comments
 (0)