Skip to content

Commit 1a660d3

Browse files
committed
flake: fix data race for TestApfWatchHandlePanic unit test
Signed-off-by: googs1025 <[email protected]>
1 parent 34349e7 commit 1a660d3

File tree

2 files changed

+29
-21
lines changed

2 files changed

+29
-21
lines changed

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ var waitingMark = &requestWatermark{
5252
phase: epmetrics.WaitingPhase,
5353
}
5454

55-
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
56-
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
55+
var atomicMutatingExecuting, atomicReadOnlyExecuting atomic.Int32
56+
var atomicMutatingWaiting, atomicReadOnlyWaiting atomic.Int32
5757

5858
// newInitializationSignal is defined for testing purposes.
5959
var newInitializationSignal = utilflowcontrol.NewInitializationSignal
@@ -143,16 +143,16 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
143143
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
144144
noteExecutingDelta := func(delta int32) {
145145
if isMutatingRequest {
146-
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
146+
watermark.recordMutating(int(atomicMutatingExecuting.Add(delta)))
147147
} else {
148-
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
148+
watermark.recordReadOnly(int(atomicReadOnlyExecuting.Add(delta)))
149149
}
150150
}
151151
noteWaitingDelta := func(delta int32) {
152152
if isMutatingRequest {
153-
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
153+
waitingMark.recordMutating(int(atomicMutatingWaiting.Add(delta)))
154154
} else {
155-
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
155+
waitingMark.recordReadOnly(int(atomicReadOnlyWaiting.Add(delta)))
156156
}
157157
}
158158
queueNote := func(inQueue bool) {

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,25 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
128128
t.Errorf("execute should not be invoked")
129129
}
130130
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
131-
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
132-
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
131+
currentValue := atomicReadOnlyExecuting.Load()
132+
if decision != decisionSkipFilter && currentValue != 1 {
133+
t.Errorf("Wanted %d requests executing, got %d", 1, currentValue)
133134
}
134135
}
136+
135137
postExecuteFunc := func() {}
136138
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
137139
postEnqueueFunc := func() {
138-
if atomicReadOnlyWaiting != 1 {
139-
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
140+
currentValue := atomicReadOnlyWaiting.Load()
141+
if currentValue != 1 {
142+
t.Errorf("Wanted %d requests in queue, got %d", 1, currentValue)
140143
}
141144
}
145+
142146
postDequeueFunc := func() {
143-
if atomicReadOnlyWaiting != 0 {
144-
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
147+
currentValue := atomicReadOnlyWaiting.Load()
148+
if currentValue != 0 {
149+
t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue)
145150
}
146151
}
147152
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
@@ -185,8 +190,9 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
185190
// TODO: all test(s) using this filter must run
186191
// serially to each other
187192
defer func() {
188-
if atomicReadOnlyExecuting != 0 {
189-
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
193+
currentValue := atomicReadOnlyExecuting.Load()
194+
if currentValue != 0 {
195+
t.Errorf("Wanted %d requests executing, got %d", 0, currentValue)
190196
}
191197
}()
192198
apfHandler.ServeHTTP(w, r)
@@ -280,8 +286,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
280286
onExecuteFunc := func() {
281287
preStartExecute.Done()
282288
preStartExecute.Wait()
283-
if int(atomicReadOnlyExecuting) != concurrentRequests {
284-
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting)
289+
currentValue := atomicReadOnlyExecuting.Load()
290+
if int(currentValue) != concurrentRequests {
291+
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, currentValue)
285292
}
286293
postStartExecute.Done()
287294
postStartExecute.Wait()
@@ -290,9 +297,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
290297
postEnqueueFunc := func() {
291298
preEnqueue.Done()
292299
preEnqueue.Wait()
293-
if int(atomicReadOnlyWaiting) != concurrentRequests {
294-
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
295-
300+
currentValue := atomicReadOnlyWaiting.Load()
301+
if int(currentValue) != concurrentRequests {
302+
t.Errorf("Wanted %d requests in queue, got %d", concurrentRequests, currentValue)
296303
}
297304
postEnqueue.Done()
298305
postEnqueue.Wait()
@@ -301,8 +308,9 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
301308
postDequeueFunc := func() {
302309
preDequeue.Done()
303310
preDequeue.Wait()
304-
if atomicReadOnlyWaiting != 0 {
305-
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
311+
currentValue := atomicReadOnlyWaiting.Load()
312+
if currentValue != 0 {
313+
t.Errorf("Wanted %d requests in queue, got %d", 0, currentValue)
306314
}
307315
postDequeue.Done()
308316
postDequeue.Wait()

0 commit comments

Comments
 (0)