Skip to content

Commit f75d8e9

Browse files
authored
Merge pull request kubernetes#124624 from atiratree/fix-flake
[test] statefulset controller: fix requests tracker concurrency
2 parents 71d4e4b + 8ca077a commit f75d8e9

File tree

1 file changed

+84
-74
lines changed

1 file changed

+84
-74
lines changed

pkg/controller/statefulset/stateful_set_control_test.go

Lines changed: 84 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"math"
2324
"math/rand"
2425
"reflect"
2526
"runtime"
@@ -2423,51 +2424,53 @@ type requestTracker struct {
24232424
err error
24242425
after int
24252426

2426-
parallelLock sync.Mutex
2427-
parallel int
2428-
maxParallel int
2429-
2430-
delay time.Duration
2427+
// this block should be updated consistently
2428+
parallelLock sync.Mutex
2429+
shouldTrackParallelRequests bool
2430+
parallelRequests int
2431+
maxParallelRequests int
2432+
parallelRequestDelay time.Duration
24312433
}
24322434

2433-
func (rt *requestTracker) errorReady() bool {
2434-
rt.Lock()
2435-
defer rt.Unlock()
2436-
return rt.err != nil && rt.requests >= rt.after
2437-
}
2438-
2439-
func (rt *requestTracker) inc() {
2440-
rt.parallelLock.Lock()
2441-
rt.parallel++
2442-
if rt.maxParallel < rt.parallel {
2443-
rt.maxParallel = rt.parallel
2435+
func (rt *requestTracker) trackParallelRequests() {
2436+
if !rt.shouldTrackParallelRequests {
2437+
// do not track parallel requests unless specifically enabled
2438+
return
24442439
}
2445-
rt.parallelLock.Unlock()
2446-
2447-
rt.Lock()
2448-
defer rt.Unlock()
2449-
rt.requests++
2450-
if rt.delay != 0 {
2451-
time.Sleep(rt.delay)
2440+
if rt.parallelLock.TryLock() {
2441+
// lock acquired: we are the only or the first concurrent request
2442+
// initialize the next set of parallel requests
2443+
rt.parallelRequests = 1
2444+
} else {
2445+
// lock is held by other requests
2446+
// now wait for the lock to increase the parallelRequests
2447+
rt.parallelLock.Lock()
2448+
rt.parallelRequests++
2449+
}
2450+
defer rt.parallelLock.Unlock()
2451+
// update the local maximum of parallel collisions
2452+
if rt.maxParallelRequests < rt.parallelRequests {
2453+
rt.maxParallelRequests = rt.parallelRequests
2454+
}
2455+
// increase the chance of collisions
2456+
if rt.parallelRequestDelay > 0 {
2457+
time.Sleep(rt.parallelRequestDelay)
24522458
}
24532459
}
24542460

2455-
func (rt *requestTracker) reset() {
2456-
rt.parallelLock.Lock()
2457-
rt.parallel = 0
2458-
rt.parallelLock.Unlock()
2459-
2460-
rt.Lock()
2461-
defer rt.Unlock()
2462-
rt.err = nil
2463-
rt.after = 0
2464-
rt.delay = 0
2465-
}
2466-
2467-
func (rt *requestTracker) getErr() error {
2461+
func (rt *requestTracker) incWithOptionalError() error {
24682462
rt.Lock()
24692463
defer rt.Unlock()
2470-
return rt.err
2464+
rt.requests++
2465+
if rt.err != nil && rt.requests >= rt.after {
2466+
// reset and pass the error
2467+
defer func() {
2468+
rt.err = nil
2469+
rt.after = 0
2470+
}()
2471+
return rt.err
2472+
}
2473+
return nil
24712474
}
24722475

24732476
func newRequestTracker(requests int, err error, after int) requestTracker {
@@ -2512,10 +2515,9 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake
25122515
}
25132516

25142517
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
2515-
defer om.createPodTracker.inc()
2516-
if om.createPodTracker.errorReady() {
2517-
defer om.createPodTracker.reset()
2518-
return om.createPodTracker.getErr()
2518+
defer om.createPodTracker.trackParallelRequests()
2519+
if err := om.createPodTracker.incWithOptionalError(); err != nil {
2520+
return err
25192521
}
25202522
pod.SetUID(types.UID(pod.Name + "-uid"))
25212523
return om.podsIndexer.Update(pod)
@@ -2526,19 +2528,17 @@ func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error)
25262528
}
25272529

25282530
func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
2529-
defer om.updatePodTracker.inc()
2530-
if om.updatePodTracker.errorReady() {
2531-
defer om.updatePodTracker.reset()
2532-
return om.updatePodTracker.getErr()
2531+
defer om.updatePodTracker.trackParallelRequests()
2532+
if err := om.updatePodTracker.incWithOptionalError(); err != nil {
2533+
return err
25332534
}
25342535
return om.podsIndexer.Update(pod)
25352536
}
25362537

25372538
func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error {
2538-
defer om.deletePodTracker.inc()
2539-
if om.deletePodTracker.errorReady() {
2540-
defer om.deletePodTracker.reset()
2541-
return om.deletePodTracker.getErr()
2539+
defer om.deletePodTracker.trackParallelRequests()
2540+
if err := om.deletePodTracker.incWithOptionalError(); err != nil {
2541+
return err
25422542
}
25432543
if key, err := controller.KeyFunc(pod); err != nil {
25442544
return err
@@ -2733,10 +2733,9 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform
27332733
}
27342734

27352735
func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
2736-
defer ssu.updateStatusTracker.inc()
2737-
if ssu.updateStatusTracker.errorReady() {
2738-
defer ssu.updateStatusTracker.reset()
2739-
return ssu.updateStatusTracker.err
2736+
defer ssu.updateStatusTracker.trackParallelRequests()
2737+
if err := ssu.updateStatusTracker.incWithOptionalError(); err != nil {
2738+
return err
27402739
}
27412740
set.Status = *status
27422741
ssu.setsIndexer.Update(set)
@@ -2942,50 +2941,61 @@ func fakeResourceVersion(object interface{}) {
29422941
obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
29432942
}
29442943
}
2945-
29462944
func TestParallelScale(t *testing.T) {
29472945
for _, tc := range []struct {
2948-
desc string
2949-
replicas int32
2950-
desiredReplicas int32
2946+
desc string
2947+
replicas int32
2948+
desiredReplicas int32
2949+
expectedMinParallelRequests int
29512950
}{
29522951
{
2953-
desc: "scale up from 3 to 30",
2954-
replicas: 3,
2955-
desiredReplicas: 30,
2952+
desc: "scale up from 3 to 30",
2953+
replicas: 3,
2954+
desiredReplicas: 30,
2955+
expectedMinParallelRequests: 2,
29562956
},
29572957
{
2958-
desc: "scale down from 10 to 1",
2959-
replicas: 10,
2960-
desiredReplicas: 1,
2958+
desc: "scale down from 10 to 1",
2959+
replicas: 10,
2960+
desiredReplicas: 1,
2961+
expectedMinParallelRequests: 2,
29612962
},
29622963

29632964
{
2964-
desc: "scale down to 0",
2965-
replicas: 501,
2966-
desiredReplicas: 0,
2965+
desc: "scale down to 0",
2966+
replicas: 501,
2967+
desiredReplicas: 0,
2968+
expectedMinParallelRequests: 10,
29672969
},
29682970
{
2969-
desc: "scale up from 0",
2970-
replicas: 0,
2971-
desiredReplicas: 1000,
2971+
desc: "scale up from 0",
2972+
replicas: 0,
2973+
desiredReplicas: 1000,
2974+
expectedMinParallelRequests: 20,
29722975
},
29732976
} {
29742977
t.Run(tc.desc, func(t *testing.T) {
29752978
set := burst(newStatefulSet(0))
29762979
set.Spec.VolumeClaimTemplates[0].ObjectMeta.Labels = map[string]string{"test": "test"}
2977-
parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants)
2980+
parallelScale(t, set, tc.replicas, tc.desiredReplicas, tc.expectedMinParallelRequests, assertBurstInvariants)
29782981
})
29792982
}
29802983

29812984
}
29822985

2983-
func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) {
2986+
func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, expectedMinParallelRequests int, invariants invariantFunc) {
29842987
var err error
29852988
diff := desiredReplicas - replicas
2989+
2990+
// maxParallelRequests: MaxBatchSize of the controller is 500, We divide the diff by 4 to allow maximum of the half of the last batch.
2991+
if maxParallelRequests := min(500, math.Abs(float64(diff))/4); expectedMinParallelRequests < 2 || float64(expectedMinParallelRequests) > maxParallelRequests {
2992+
t.Fatalf("expectedMinParallelRequests should be between 2 and %v. Batch size of the controller is expontially increasing until 500. "+
2993+
"Got expectedMinParallelRequests %v, ", maxParallelRequests, expectedMinParallelRequests)
2994+
}
29862995
client := fake.NewSimpleClientset(set)
29872996
om, _, ssc := setupController(client)
2988-
om.createPodTracker.delay = time.Millisecond
2997+
om.createPodTracker.shouldTrackParallelRequests = true
2998+
om.createPodTracker.parallelRequestDelay = time.Millisecond
29892999

29903000
*set.Spec.Replicas = replicas
29913001
if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
@@ -3017,8 +3027,8 @@ func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplica
30173027
t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
30183028
}
30193029

3020-
if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
3021-
t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
3030+
if om.createPodTracker.maxParallelRequests < expectedMinParallelRequests {
3031+
t.Errorf("want max parallelRequests requests >= %v, got %v", expectedMinParallelRequests, om.createPodTracker.maxParallelRequests)
30223032
}
30233033
}
30243034

0 commit comments

Comments
 (0)