Skip to content

Commit dc8c427

Browse files
authored
Merge pull request kubernetes#126574 from tkashem/apf-data-race
Fix data race in APF tests
2 parents a4ec0c0 + 8fa3e61 commit dc8c427

File tree

1 file changed

+75
-44
lines changed

1 file changed

+75
-44
lines changed

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package filters
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net/http"
2324
"net/http/httptest"
@@ -26,6 +27,7 @@ import (
2627
"reflect"
2728
"strings"
2829
"sync"
30+
"sync/atomic"
2931
"testing"
3032
"time"
3133

@@ -128,20 +130,20 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
128130
t.Errorf("execute should not be invoked")
129131
}
130132
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
131-
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
132-
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
133+
if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyExecuting); decision != decisionSkipFilter && want != got {
134+
t.Errorf("Wanted %d requests executing, got %d", want, got)
133135
}
134136
}
135137
postExecuteFunc := func() {}
136138
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
137139
postEnqueueFunc := func() {
138-
if atomicReadOnlyWaiting != 1 {
139-
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
140+
if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
141+
t.Errorf("Wanted %d requests in queue, got %d", want, got)
140142
}
141143
}
142144
postDequeueFunc := func() {
143-
if atomicReadOnlyWaiting != 0 {
144-
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
145+
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
146+
t.Errorf("Wanted %d requests in queue, got %d", want, got)
145147
}
146148
}
147149
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
@@ -177,11 +179,19 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
177179
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
178180
Groups: []string{user.AllUnauthenticated},
179181
}))
180-
apfHandler.ServeHTTP(w, r)
181-
postExecute()
182-
if atomicReadOnlyExecuting != 0 {
183-
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
184-
}
182+
func() {
183+
// the defer ensures that the following assertion is
184+
// executed, even if the APF handler panics
185+
// TODO: all test(s) using this filter must run serially to each other
186+
defer func() {
187+
t.Logf("the APF handler has finished, checking atomicReadOnlyExecuting")
188+
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
189+
t.Errorf("Wanted %d requests executing, got %d", want, got)
190+
}
191+
}()
192+
apfHandler.ServeHTTP(w, r)
193+
postExecute()
194+
}()
185195
}), requestInfoFactory)
186196

187197
return handler
@@ -270,8 +280,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
270280
onExecuteFunc := func() {
271281
preStartExecute.Done()
272282
preStartExecute.Wait()
273-
if int(atomicReadOnlyExecuting) != concurrentRequests {
274-
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting)
283+
if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
284+
t.Errorf("Wanted %d requests executing, got %d", want, got)
275285
}
276286
postStartExecute.Done()
277287
postStartExecute.Wait()
@@ -280,8 +290,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
280290
postEnqueueFunc := func() {
281291
preEnqueue.Done()
282292
preEnqueue.Wait()
283-
if int(atomicReadOnlyWaiting) != concurrentRequests {
284-
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
293+
if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
294+
t.Errorf("Wanted %d requests in queue, got %d", want, got)
285295

286296
}
287297
postEnqueue.Done()
@@ -291,8 +301,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
291301
postDequeueFunc := func() {
292302
preDequeue.Done()
293303
preDequeue.Wait()
294-
if atomicReadOnlyWaiting != 0 {
295-
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
304+
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
305+
t.Errorf("Wanted %d requests in queue, got %d", want, got)
296306
}
297307
postDequeue.Done()
298308
postDequeue.Wait()
@@ -345,19 +355,21 @@ func TestApfCancelWaitRequest(t *testing.T) {
345355
}
346356

347357
type fakeWatchApfFilter struct {
358+
t *testing.T
348359
lock sync.Mutex
349360
inflight int
350361
capacity int
351362

352-
postExecutePanic bool
353-
preExecutePanic bool
363+
postExecutePanic error
364+
preExecutePanic error
354365

355366
utilflowcontrol.WatchTracker
356367
utilflowcontrol.MaxSeatsTracker
357368
}
358369

359-
func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
370+
func newFakeWatchApfFilter(t *testing.T, capacity int) *fakeWatchApfFilter {
360371
return &fakeWatchApfFilter{
372+
t: t,
361373
capacity: capacity,
362374
WatchTracker: utilflowcontrol.NewWatchTracker(),
363375
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
@@ -385,17 +397,23 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context,
385397
return
386398
}
387399

388-
if f.preExecutePanic {
389-
panic("pre-exec-panic")
390-
}
391-
execFn()
392-
if f.postExecutePanic {
393-
panic("post-exec-panic")
394-
}
400+
func() {
401+
defer func() {
402+
f.lock.Lock()
403+
defer f.lock.Unlock()
404+
f.inflight--
405+
}()
395406

396-
f.lock.Lock()
397-
defer f.lock.Unlock()
398-
f.inflight--
407+
if f.preExecutePanic != nil {
408+
f.t.Logf("going to panic (pre-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.preExecutePanic, f)
409+
panic(f.preExecutePanic)
410+
}
411+
execFn()
412+
if f.postExecutePanic != nil {
413+
f.t.Logf("going to panic (post-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.postExecutePanic, f)
414+
panic(f.postExecutePanic)
415+
}
416+
}()
399417
}
400418

401419
func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
@@ -447,7 +465,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
447465
allRunning := sync.WaitGroup{}
448466
allRunning.Add(2 * concurrentRequests)
449467

450-
fakeFilter := newFakeWatchApfFilter(concurrentRequests)
468+
fakeFilter := newFakeWatchApfFilter(t, concurrentRequests)
451469

452470
onExecuteFunc := func() {
453471
firstRunning.Done()
@@ -493,7 +511,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
493511
}
494512

495513
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
496-
fakeFilter := newFakeWatchApfFilter(0)
514+
fakeFilter := newFakeWatchApfFilter(t, 0)
497515

498516
onExecuteFunc := func() {
499517
t.Errorf("Request unexepectedly executing")
@@ -512,7 +530,7 @@ func TestApfWatchPanic(t *testing.T) {
512530
epmetrics.Register()
513531
fcmetrics.Register()
514532

515-
fakeFilter := newFakeWatchApfFilter(1)
533+
fakeFilter := newFakeWatchApfFilter(t, 1)
516534

517535
onExecuteFunc := func() {
518536
panic("test panic")
@@ -539,11 +557,11 @@ func TestApfWatchPanic(t *testing.T) {
539557
func TestApfWatchHandlePanic(t *testing.T) {
540558
epmetrics.Register()
541559
fcmetrics.Register()
542-
preExecutePanicingFilter := newFakeWatchApfFilter(1)
543-
preExecutePanicingFilter.preExecutePanic = true
560+
preExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
561+
preExecutePanicingFilter.preExecutePanic = http.ErrAbortHandler
544562

545-
postExecutePanicingFilter := newFakeWatchApfFilter(1)
546-
postExecutePanicingFilter.postExecutePanic = true
563+
postExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
564+
postExecutePanicingFilter.postExecutePanic = http.ErrAbortHandler
547565

548566
testCases := []struct {
549567
name string
@@ -559,18 +577,31 @@ func TestApfWatchHandlePanic(t *testing.T) {
559577
},
560578
}
561579

562-
onExecuteFunc := func() {
563-
time.Sleep(5 * time.Second)
564-
}
565-
postExecuteFunc := func() {}
566-
567580
for _, test := range testCases {
568581
t.Run(test.name, func(t *testing.T) {
582+
onExecuteFunc := func() {
583+
time.Sleep(5 * time.Second)
584+
585+
// this function should not be executed if
586+
// pre-execute panic is set
587+
if test.filter.preExecutePanic != nil {
588+
t.Errorf("did not expect the execute function to be executed")
589+
}
590+
t.Logf("on-execute function invoked")
591+
}
592+
593+
// we either panic before the execute function, or after,
594+
// so the following function should never be executed.
595+
postExecuteFunc := func() {
596+
t.Errorf("did not expect the post-execute function to be invoked")
597+
}
598+
569599
apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc)
570600
handler := func(w http.ResponseWriter, r *http.Request) {
571601
defer func() {
572-
if err := recover(); err == nil {
573-
t.Errorf("expected panic, got %v", err)
602+
recovered := recover()
603+
if err, ok := recovered.(error); !ok || !errors.Is(err, http.ErrAbortHandler) {
604+
t.Errorf("expected panic with error: %v, but got: %v", http.ErrAbortHandler, err)
574605
}
575606
}()
576607
apfHandler.ServeHTTP(w, r)

0 commit comments

Comments
 (0)