Skip to content

Commit 49ffd61

Browse files
authored
Merge pull request kubernetes#127089 from sanposhiho/revert-126574
Revert: Fix data race in APF tests
2 parents 4bc6a11 + d267721 commit 49ffd61

File tree

1 file changed

+44
-75
lines changed

1 file changed

+44
-75
lines changed

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go

Lines changed: 44 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package filters
1818

1919
import (
2020
"context"
21-
"errors"
2221
"fmt"
2322
"net/http"
2423
"net/http/httptest"
@@ -27,7 +26,6 @@ import (
2726
"reflect"
2827
"strings"
2928
"sync"
30-
"sync/atomic"
3129
"testing"
3230
"time"
3331

@@ -130,20 +128,20 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes
130128
t.Errorf("execute should not be invoked")
131129
}
132130
// atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time.
133-
if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyExecuting); decision != decisionSkipFilter && want != got {
134-
t.Errorf("Wanted %d requests executing, got %d", want, got)
131+
if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 {
132+
t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting)
135133
}
136134
}
137135
postExecuteFunc := func() {}
138136
// atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time.
139137
postEnqueueFunc := func() {
140-
if want, got := int32(1), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
141-
t.Errorf("Wanted %d requests in queue, got %d", want, got)
138+
if atomicReadOnlyWaiting != 1 {
139+
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
142140
}
143141
}
144142
postDequeueFunc := func() {
145-
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
146-
t.Errorf("Wanted %d requests in queue, got %d", want, got)
143+
if atomicReadOnlyWaiting != 0 {
144+
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
147145
}
148146
}
149147
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
@@ -179,19 +177,11 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
179177
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
180178
Groups: []string{user.AllUnauthenticated},
181179
}))
182-
func() {
183-
// the defer ensures that the following assertion is
184-
// executed, even if the APF handler panics
185-
// TODO: all test(s) using this filter must run serially to each other
186-
defer func() {
187-
t.Logf("the APF handler has finished, checking atomicReadOnlyExecuting")
188-
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
189-
t.Errorf("Wanted %d requests executing, got %d", want, got)
190-
}
191-
}()
192-
apfHandler.ServeHTTP(w, r)
193-
postExecute()
194-
}()
180+
apfHandler.ServeHTTP(w, r)
181+
postExecute()
182+
if atomicReadOnlyExecuting != 0 {
183+
t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting)
184+
}
195185
}), requestInfoFactory)
196186

197187
return handler
@@ -280,8 +270,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
280270
onExecuteFunc := func() {
281271
preStartExecute.Done()
282272
preStartExecute.Wait()
283-
if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyExecuting); want != got {
284-
t.Errorf("Wanted %d requests executing, got %d", want, got)
273+
if int(atomicReadOnlyExecuting) != concurrentRequests {
274+
t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting)
285275
}
286276
postStartExecute.Done()
287277
postStartExecute.Wait()
@@ -290,8 +280,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
290280
postEnqueueFunc := func() {
291281
preEnqueue.Done()
292282
preEnqueue.Wait()
293-
if want, got := int32(concurrentRequests), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
294-
t.Errorf("Wanted %d requests in queue, got %d", want, got)
283+
if int(atomicReadOnlyWaiting) != concurrentRequests {
284+
t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting)
295285

296286
}
297287
postEnqueue.Done()
@@ -301,8 +291,8 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
301291
postDequeueFunc := func() {
302292
preDequeue.Done()
303293
preDequeue.Wait()
304-
if want, got := int32(0), atomic.LoadInt32(&atomicReadOnlyWaiting); want != got {
305-
t.Errorf("Wanted %d requests in queue, got %d", want, got)
294+
if atomicReadOnlyWaiting != 0 {
295+
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
306296
}
307297
postDequeue.Done()
308298
postDequeue.Wait()
@@ -355,21 +345,19 @@ func TestApfCancelWaitRequest(t *testing.T) {
355345
}
356346

357347
type fakeWatchApfFilter struct {
358-
t *testing.T
359348
lock sync.Mutex
360349
inflight int
361350
capacity int
362351

363-
postExecutePanic error
364-
preExecutePanic error
352+
postExecutePanic bool
353+
preExecutePanic bool
365354

366355
utilflowcontrol.WatchTracker
367356
utilflowcontrol.MaxSeatsTracker
368357
}
369358

370-
func newFakeWatchApfFilter(t *testing.T, capacity int) *fakeWatchApfFilter {
359+
func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
371360
return &fakeWatchApfFilter{
372-
t: t,
373361
capacity: capacity,
374362
WatchTracker: utilflowcontrol.NewWatchTracker(),
375363
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
@@ -397,23 +385,17 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context,
397385
return
398386
}
399387

400-
func() {
401-
defer func() {
402-
f.lock.Lock()
403-
defer f.lock.Unlock()
404-
f.inflight--
405-
}()
388+
if f.preExecutePanic {
389+
panic("pre-exec-panic")
390+
}
391+
execFn()
392+
if f.postExecutePanic {
393+
panic("post-exec-panic")
394+
}
406395

407-
if f.preExecutePanic != nil {
408-
f.t.Logf("going to panic (pre-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.preExecutePanic, f)
409-
panic(f.preExecutePanic)
410-
}
411-
execFn()
412-
if f.postExecutePanic != nil {
413-
f.t.Logf("going to panic (post-exec) as expected with error: %v, fakeWatchApfFilter: %#v", f.postExecutePanic, f)
414-
panic(f.postExecutePanic)
415-
}
416-
}()
396+
f.lock.Lock()
397+
defer f.lock.Unlock()
398+
f.inflight--
417399
}
418400

419401
func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
@@ -465,7 +447,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
465447
allRunning := sync.WaitGroup{}
466448
allRunning.Add(2 * concurrentRequests)
467449

468-
fakeFilter := newFakeWatchApfFilter(t, concurrentRequests)
450+
fakeFilter := newFakeWatchApfFilter(concurrentRequests)
469451

470452
onExecuteFunc := func() {
471453
firstRunning.Done()
@@ -511,7 +493,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
511493
}
512494

513495
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
514-
fakeFilter := newFakeWatchApfFilter(t, 0)
496+
fakeFilter := newFakeWatchApfFilter(0)
515497

516498
onExecuteFunc := func() {
517499
t.Errorf("Request unexepectedly executing")
@@ -530,7 +512,7 @@ func TestApfWatchPanic(t *testing.T) {
530512
epmetrics.Register()
531513
fcmetrics.Register()
532514

533-
fakeFilter := newFakeWatchApfFilter(t, 1)
515+
fakeFilter := newFakeWatchApfFilter(1)
534516

535517
onExecuteFunc := func() {
536518
panic("test panic")
@@ -557,11 +539,11 @@ func TestApfWatchPanic(t *testing.T) {
557539
func TestApfWatchHandlePanic(t *testing.T) {
558540
epmetrics.Register()
559541
fcmetrics.Register()
560-
preExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
561-
preExecutePanicingFilter.preExecutePanic = http.ErrAbortHandler
542+
preExecutePanicingFilter := newFakeWatchApfFilter(1)
543+
preExecutePanicingFilter.preExecutePanic = true
562544

563-
postExecutePanicingFilter := newFakeWatchApfFilter(t, 1)
564-
postExecutePanicingFilter.postExecutePanic = http.ErrAbortHandler
545+
postExecutePanicingFilter := newFakeWatchApfFilter(1)
546+
postExecutePanicingFilter.postExecutePanic = true
565547

566548
testCases := []struct {
567549
name string
@@ -577,31 +559,18 @@ func TestApfWatchHandlePanic(t *testing.T) {
577559
},
578560
}
579561

562+
onExecuteFunc := func() {
563+
time.Sleep(5 * time.Second)
564+
}
565+
postExecuteFunc := func() {}
566+
580567
for _, test := range testCases {
581568
t.Run(test.name, func(t *testing.T) {
582-
onExecuteFunc := func() {
583-
time.Sleep(5 * time.Second)
584-
585-
// this function should not be executed if
586-
// pre-execute panic is set
587-
if test.filter.preExecutePanic != nil {
588-
t.Errorf("did not expect the execute function to be executed")
589-
}
590-
t.Logf("on-execute function invoked")
591-
}
592-
593-
// we either panic before the execute function, or after,
594-
// so the following function should never be executed.
595-
postExecuteFunc := func() {
596-
t.Errorf("did not expect the post-execute function to be invoked")
597-
}
598-
599569
apfHandler := newApfHandlerWithFilter(t, test.filter, time.Minute/4, onExecuteFunc, postExecuteFunc)
600570
handler := func(w http.ResponseWriter, r *http.Request) {
601571
defer func() {
602-
recovered := recover()
603-
if err, ok := recovered.(error); !ok || !errors.Is(err, http.ErrAbortHandler) {
604-
t.Errorf("expected panic with error: %v, but got: %v", http.ErrAbortHandler, err)
572+
if err := recover(); err == nil {
573+
t.Errorf("expected panic, got %v", err)
605574
}
606575
}()
607576
apfHandler.ServeHTTP(w, r)

0 commit comments

Comments
 (0)