Skip to content

Commit d3a10e1

Browse files
authored
Merge pull request kubernetes#88094 from aramase/vm-instance-update
add delays between goroutines for vm instance update
2 parents 3d70825 + fdefdff commit d3a10e1

File tree

3 files changed

+96
-2
lines changed

3 files changed

+96
-2
lines changed

staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ package azure
2121
import (
2222
"context"
2323
"sync"
24+
"time"
25+
26+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2427
)
2528

2629
// lockMap used to lock on entries
@@ -74,3 +77,21 @@ func (lm *lockMap) unlockEntry(entry string) {
7477
func getContextWithCancel() (context.Context, context.CancelFunc) {
7578
return context.WithCancel(context.Background())
7679
}
80+
81+
// aggregateGoroutinesWithDelay aggregates goroutines and runs them
82+
// in parallel with delay before starting each goroutine
83+
func aggregateGoroutinesWithDelay(delay time.Duration, funcs ...func() error) utilerrors.Aggregate {
84+
errChan := make(chan error, len(funcs))
85+
86+
for _, f := range funcs {
87+
go func(f func() error) { errChan <- f() }(f)
88+
time.Sleep(delay)
89+
}
90+
errs := make([]error, 0)
91+
for i := 0; i < cap(errChan); i++ {
92+
if err := <-errChan; err != nil {
93+
errs = append(errs, err)
94+
}
95+
}
96+
return utilerrors.NewAggregate(errs)
97+
}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919
package azure
2020

2121
import (
22+
"fmt"
2223
"testing"
2324
"time"
2425
)
@@ -83,3 +84,67 @@ func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool {
8384
return true
8485
}
8586
}
87+
88+
// running same unit tests as https://github.com/kubernetes/apimachinery/blob/master/pkg/util/errors/errors_test.go#L371
89+
func TestAggregateGoroutinesWithDelay(t *testing.T) {
90+
testCases := []struct {
91+
errs []error
92+
expected map[string]bool
93+
}{
94+
{
95+
[]error{},
96+
nil,
97+
},
98+
{
99+
[]error{nil},
100+
nil,
101+
},
102+
{
103+
[]error{nil, nil},
104+
nil,
105+
},
106+
{
107+
[]error{fmt.Errorf("1")},
108+
map[string]bool{"1": true},
109+
},
110+
{
111+
[]error{fmt.Errorf("1"), nil},
112+
map[string]bool{"1": true},
113+
},
114+
{
115+
[]error{fmt.Errorf("1"), fmt.Errorf("267")},
116+
map[string]bool{"1": true, "267": true},
117+
},
118+
{
119+
[]error{fmt.Errorf("1"), nil, fmt.Errorf("1234")},
120+
map[string]bool{"1": true, "1234": true},
121+
},
122+
{
123+
[]error{nil, fmt.Errorf("1"), nil, fmt.Errorf("1234"), fmt.Errorf("22")},
124+
map[string]bool{"1": true, "1234": true, "22": true},
125+
},
126+
}
127+
for i, testCase := range testCases {
128+
funcs := make([]func() error, len(testCase.errs))
129+
for i := range testCase.errs {
130+
err := testCase.errs[i]
131+
funcs[i] = func() error { return err }
132+
}
133+
agg := aggregateGoroutinesWithDelay(100*time.Millisecond, funcs...)
134+
if agg == nil {
135+
if len(testCase.expected) > 0 {
136+
t.Errorf("%d: expected %v, got nil", i, testCase.expected)
137+
}
138+
continue
139+
}
140+
if len(agg.Errors()) != len(testCase.expected) {
141+
t.Errorf("%d: expected %d errors in aggregate, got %v", i, len(testCase.expected), agg)
142+
continue
143+
}
144+
for _, err := range agg.Errors() {
145+
if !testCase.expected[err.Error()] {
146+
t.Errorf("%d: expected %v, got aggregate containing %v", i, testCase.expected, err)
147+
}
148+
}
149+
}
150+
}

staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"strconv"
2727
"strings"
2828
"sync"
29+
"time"
2930

3031
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
3132
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
@@ -55,6 +56,13 @@ var (
5556
vmssVMProviderIDRE = regexp.MustCompile(`azure:///subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines/(?:\d+)`)
5657
)
5758

59+
const (
60+
// vmssVMInstanceUpdateDelay is used when updating multiple vm instances in parallel
61+
// the optimum value is 3s to prevent any conflicts that result in concurrent vmss vm
62+
// instances update
63+
vmssVMInstanceUpdateDelay = 3 * time.Second
64+
)
65+
5866
// scaleSet implements VMSet interface for Azure scale set.
5967
type scaleSet struct {
6068
*Cloud
@@ -1082,7 +1090,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac
10821090
hostUpdates = append(hostUpdates, f)
10831091
}
10841092

1085-
errs := utilerrors.AggregateGoroutines(hostUpdates...)
1093+
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
10861094
if errs != nil {
10871095
return utilerrors.Flatten(errs)
10881096
}
@@ -1355,7 +1363,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID,
13551363
hostUpdates = append(hostUpdates, f)
13561364
}
13571365

1358-
errs := utilerrors.AggregateGoroutines(hostUpdates...)
1366+
errs := aggregateGoroutinesWithDelay(vmssVMInstanceUpdateDelay, hostUpdates...)
13591367
if errs != nil {
13601368
return utilerrors.Flatten(errs)
13611369
}

0 commit comments

Comments
 (0)