Skip to content

Commit 2af26dc

Browse files
committed
Add UpdateVMs() for VMSS client to allow update multiple VMSSVMs by
sequential sync requests and concurent async requests.
1 parent 665c664 commit 2af26dc

File tree

7 files changed

+215
-0
lines changed

7 files changed

+215
-0
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,10 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, res
530530
return nil
531531
}
532532

533+
func (fVMC *fakeVirtualMachineScaleSetVMsClient) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
534+
return nil
535+
}
536+
533537
type fakeVirtualMachineScaleSetsClient struct {
534538
mutex *sync.Mutex
535539
FakeStore map[string]map[string]compute.VirtualMachineScaleSet

staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/azure_armclient.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"io/ioutil"
2727
"net/http"
2828
"strings"
29+
"sync"
2930
"time"
3031
"unicode"
3132

@@ -335,6 +336,86 @@ func (c *Client) PutResource(ctx context.Context, resourceID string, parameters
335336
return c.PutResourceWithDecorators(ctx, resourceID, parameters, putDecorators)
336337
}
337338

339+
// PutResources puts a list of resources from resources map[resourceID]parameters.
340+
// Those resources sync requests are sequential while async requests are concurent. It 's especially
341+
// useful when the ARM API doesn't support concurrent requests.
342+
func (c *Client) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse {
343+
if len(resources) == 0 {
344+
return nil
345+
}
346+
347+
// Sequential sync requests.
348+
futures := make(map[string]*azure.Future)
349+
responses := make(map[string]*PutResourcesResponse)
350+
for resourceID, parameters := range resources {
351+
decorators := []autorest.PrepareDecorator{
352+
autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
353+
autorest.WithJSON(parameters),
354+
}
355+
request, err := c.PreparePutRequest(ctx, decorators...)
356+
if err != nil {
357+
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.prepare", resourceID, err)
358+
responses[resourceID] = &PutResourcesResponse{
359+
Error: retry.NewError(false, err),
360+
}
361+
continue
362+
}
363+
364+
future, resp, clientErr := c.SendAsync(ctx, request)
365+
defer c.CloseResponse(ctx, resp)
366+
if clientErr != nil {
367+
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "put.send", resourceID, clientErr.Error())
368+
responses[resourceID] = &PutResourcesResponse{
369+
Error: clientErr,
370+
}
371+
continue
372+
}
373+
374+
futures[resourceID] = future
375+
}
376+
377+
// Concurrent async requests.
378+
wg := sync.WaitGroup{}
379+
var responseLock sync.Mutex
380+
for resourceID, future := range futures {
381+
wg.Add(1)
382+
go func(resourceID string, future *azure.Future) {
383+
defer wg.Done()
384+
response, err := c.WaitForAsyncOperationResult(ctx, future, "armclient.PutResource")
385+
if err != nil {
386+
if response != nil {
387+
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', response code %d", err.Error(), response.StatusCode)
388+
} else {
389+
klog.V(5).Infof("Received error in WaitForAsyncOperationResult: '%s', no response", err.Error())
390+
}
391+
392+
retriableErr := retry.GetError(response, err)
393+
if !retriableErr.Retriable &&
394+
strings.Contains(strings.ToUpper(err.Error()), strings.ToUpper("InternalServerError")) {
395+
klog.V(5).Infof("Received InternalServerError in WaitForAsyncOperationResult: '%s', setting error retriable", err.Error())
396+
retriableErr.Retriable = true
397+
}
398+
399+
responseLock.Lock()
400+
responses[resourceID] = &PutResourcesResponse{
401+
Error: retriableErr,
402+
}
403+
responseLock.Unlock()
404+
return
405+
}
406+
407+
responseLock.Lock()
408+
responses[resourceID] = &PutResourcesResponse{
409+
Response: response,
410+
}
411+
responseLock.Unlock()
412+
}(resourceID, future)
413+
}
414+
415+
wg.Wait()
416+
return responses
417+
}
418+
338419
// PutResourceWithDecorators puts a resource by resource ID
339420
func (c *Client) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) {
340421
request, err := c.PreparePutRequest(ctx, decorators...)

staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ import (
2727
"k8s.io/legacy-cloud-providers/azure/retry"
2828
)
2929

30+
// PutResourcesResponse defines the response for PutResources.
31+
type PutResourcesResponse struct {
32+
Response *http.Response
33+
Error *retry.Error
34+
}
35+
3036
// Interface is the client interface for ARM.
3137
// Don't forget to run the following command to generate the mock client:
3238
// mockgen -source=$GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/interface.go -package=mockarmclient Interface > $GOPATH/src/k8s.io/kubernetes/staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go
@@ -61,6 +67,11 @@ type Interface interface {
6167
// PutResource puts a resource by resource ID
6268
PutResource(ctx context.Context, resourceID string, parameters interface{}) (*http.Response, *retry.Error)
6369

70+
// PutResources puts a list of resources from resources map[resourceID]parameters.
71+
// Those resources sync requests are sequential while async requests are concurent. It 's especially
72+
// useful when the ARM API doesn't support concurrent requests.
73+
PutResources(ctx context.Context, resources map[string]interface{}) map[string]*PutResourcesResponse
74+
6475
// PutResourceWithDecorators puts a resource with decorators by resource ID
6576
PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error)
6677

staging/src/k8s.io/legacy-cloud-providers/azure/clients/armclient/mockarmclient/interface.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
autorest "github.com/Azure/go-autorest/autorest"
2727
azure "github.com/Azure/go-autorest/autorest/azure"
2828
gomock "github.com/golang/mock/gomock"
29+
armclient "k8s.io/legacy-cloud-providers/azure/clients/armclient"
2930
retry "k8s.io/legacy-cloud-providers/azure/retry"
3031
)
3132

@@ -227,6 +228,20 @@ func (mr *MockInterfaceMockRecorder) PutResource(ctx, resourceID, parameters int
227228
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResource", reflect.TypeOf((*MockInterface)(nil).PutResource), ctx, resourceID, parameters)
228229
}
229230

231+
// PutResources mocks base method
232+
func (m *MockInterface) PutResources(ctx context.Context, resources map[string]interface{}) map[string]*armclient.PutResourcesResponse {
233+
m.ctrl.T.Helper()
234+
ret := m.ctrl.Call(m, "PutResources", ctx, resources)
235+
ret0, _ := ret[0].(map[string]*armclient.PutResourcesResponse)
236+
return ret0
237+
}
238+
239+
// PutResources indicates an expected call of PutResources
240+
func (mr *MockInterfaceMockRecorder) PutResources(ctx, resources interface{}) *gomock.Call {
241+
mr.mock.ctrl.T.Helper()
242+
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutResources", reflect.TypeOf((*MockInterface)(nil).PutResources), ctx, resources)
243+
}
244+
230245
// PutResourceWithDecorators mocks base method
231246
func (m *MockInterface) PutResourceWithDecorators(ctx context.Context, resourceID string, parameters interface{}, decorators []autorest.PrepareDecorator) (*http.Response, *retry.Error) {
232247
m.ctrl.T.Helper()

staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/azure_vmssvmclient.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/Azure/go-autorest/autorest/azure"
3030
"github.com/Azure/go-autorest/autorest/to"
3131

32+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3233
"k8s.io/client-go/util/flowcontrol"
3334
"k8s.io/klog"
3435
azclients "k8s.io/legacy-cloud-providers/azure/clients"
@@ -367,3 +368,89 @@ func (page VirtualMachineScaleSetVMListResultPage) Values() []compute.VirtualMac
367368
}
368369
return *page.vmssvlr.Value
369370
}
371+
372+
// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
373+
func (c *Client) UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
374+
mc := metrics.NewMetricContext("vmssvm", "update_vms", resourceGroupName, c.subscriptionID, source)
375+
376+
// Report errors if the client is rate limited.
377+
if !c.rateLimiterWriter.TryAccept() {
378+
mc.RateLimitedCount()
379+
return retry.GetRateLimitError(true, "VMSSVMUpdateVMs")
380+
}
381+
382+
// Report errors if the client is throttled.
383+
if c.RetryAfterWriter.After(time.Now()) {
384+
mc.ThrottledCount()
385+
rerr := retry.GetThrottlingError("VMSSVMUpdateVMs", "client throttled", c.RetryAfterWriter)
386+
return rerr
387+
}
388+
389+
rerr := c.updateVMSSVMs(ctx, resourceGroupName, VMScaleSetName, instances)
390+
mc.Observe(rerr.Error())
391+
if rerr != nil {
392+
if rerr.IsThrottled() {
393+
// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
394+
c.RetryAfterWriter = rerr.RetryAfter
395+
}
396+
397+
return rerr
398+
}
399+
400+
return nil
401+
}
402+
403+
// updateVMSSVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
404+
func (c *Client) updateVMSSVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM) *retry.Error {
405+
resources := make(map[string]interface{})
406+
for instanceID, parameter := range instances {
407+
resourceID := armclient.GetChildResourceID(
408+
c.subscriptionID,
409+
resourceGroupName,
410+
"Microsoft.Compute/virtualMachineScaleSets",
411+
VMScaleSetName,
412+
"virtualMachines",
413+
instanceID,
414+
)
415+
resources[resourceID] = parameter
416+
}
417+
418+
responses := c.armClient.PutResources(ctx, resources)
419+
errors := make([]*retry.Error, 0)
420+
for resourceID, resp := range responses {
421+
if resp == nil {
422+
continue
423+
}
424+
425+
defer c.armClient.CloseResponse(ctx, resp.Response)
426+
if resp.Error != nil {
427+
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.request", resourceID, resp.Error.Error())
428+
errors = append(errors, resp.Error)
429+
continue
430+
}
431+
432+
if resp.Response != nil && resp.Response.StatusCode != http.StatusNoContent {
433+
_, rerr := c.updateResponder(resp.Response)
434+
if rerr != nil {
435+
klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "vmssvm.put.respond", resourceID, rerr.Error())
436+
errors = append(errors, rerr)
437+
}
438+
}
439+
}
440+
441+
// Aggregate errors.
442+
if len(errors) > 0 {
443+
rerr := &retry.Error{}
444+
errs := make([]error, 0)
445+
for _, err := range errors {
446+
if err.IsThrottled() && err.RetryAfter.After(err.RetryAfter) {
447+
rerr.RetryAfter = err.RetryAfter
448+
}
449+
errs = append(errs, err.Error())
450+
}
451+
rerr.RawError = utilerrors.Flatten(utilerrors.NewAggregate(errs))
452+
return rerr
453+
}
454+
455+
return nil
456+
}

staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,7 @@ type Interface interface {
4242

4343
// Update updates a VirtualMachineScaleSetVM.
4444
Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error
45+
46+
// UpdateVMs updates a list of VirtualMachineScaleSetVM from map[instanceID]compute.VirtualMachineScaleSetVM.
47+
UpdateVMs(ctx context.Context, resourceGroupName string, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error
4548
}

staging/src/k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient/interface.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,17 @@ func (mr *MockInterfaceMockRecorder) Update(ctx, resourceGroupName, VMScaleSetNa
9393
mr.mock.ctrl.T.Helper()
9494
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockInterface)(nil).Update), ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source)
9595
}
96+
97+
// UpdateVMs mocks base method
98+
func (m *MockInterface) UpdateVMs(ctx context.Context, resourceGroupName, VMScaleSetName string, instances map[string]compute.VirtualMachineScaleSetVM, source string) *retry.Error {
99+
m.ctrl.T.Helper()
100+
ret := m.ctrl.Call(m, "UpdateVMs", ctx, resourceGroupName, VMScaleSetName, instances, source)
101+
ret0, _ := ret[0].(*retry.Error)
102+
return ret0
103+
}
104+
105+
// UpdateVMs indicates an expected call of UpdateVMs
106+
func (mr *MockInterfaceMockRecorder) UpdateVMs(ctx, resourceGroupName, VMScaleSetName, instances, source interface{}) *gomock.Call {
107+
mr.mock.ctrl.T.Helper()
108+
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateVMs", reflect.TypeOf((*MockInterface)(nil).UpdateVMs), ctx, resourceGroupName, VMScaleSetName, instances, source)
109+
}

0 commit comments

Comments
 (0)