Skip to content

Commit ef6b425

Browse files
authored
Merge pull request #3372 from ntnn/test-integration
Add `test/integration`
2 parents b60e1d4 + 6eaf3d4 commit ef6b425

File tree

13 files changed

+341
-17
lines changed

13 files changed

+341
-17
lines changed

.prow.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,33 @@ presubmits:
9595
memory: 4Gi
9696
cpu: 2
9797

98+
- name: pull-kcp-test-integration
99+
decorate: true
100+
# only run integration tests if code changed.
101+
run_if_changed: "(cmd|config|pkg|sdk|test|go.mod|go.sum|Makefile|.prow.yaml)"
102+
clone_uri: "https://github.com/kcp-dev/kcp"
103+
labels:
104+
preset-goproxy: "true"
105+
spec:
106+
containers:
107+
- image: ghcr.io/kcp-dev/infra/build:1.23.7-2
108+
command:
109+
- ./hack/run-with-prow.sh
110+
- ./hack/run-with-prometheus.sh
111+
- make
112+
- test-integration
113+
env:
114+
- name: USE_GOTESTSUM
115+
value: '1'
116+
- name: KUBE_CACHE_MUTATION_DETECTOR
117+
value: '1'
118+
- name: E2E_PARALLELISM
119+
value: '3'
120+
resources:
121+
requests:
122+
memory: 6Gi
123+
cpu: 4
124+
98125
- name: pull-kcp-test-e2e
99126
decorate: true
100127
# only run e2e tests if code changed.

Makefile

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,17 @@ endif
368368
test: WHAT ?= ./...
369369
# We will need to move into the sub package, of sdk to run those tests.
370370
test: ## Run tests
371-
$(GO_TEST) -race $(COUNT_ARG) -coverprofile=coverage.txt -covermode=atomic $(TEST_ARGS) $$(go list "$(WHAT)" | grep -v ./test/e2e/)
371+
$(GO_TEST) -race $(COUNT_ARG) -coverprofile=coverage.txt -covermode=atomic $(TEST_ARGS) $$(go list "$(WHAT)" | grep -v -e 'test/e2e' -e 'test/integration')
372372
cd sdk && $(GO_TEST) -race $(COUNT_ARG) -coverprofile=coverage.txt -covermode=atomic $(TEST_ARGS) $(WHAT)
373373

374+
.PHONY: test-integration
375+
ifdef USE_GOTESTSUM
376+
test-integration: $(GOTESTSUM)
377+
endif
378+
test-integration: WHAT ?= ./test/integration...
379+
test-integration: ## Run integration tests
380+
$(GO_TEST) $(COUNT_ARG) $(PARALLELISM_ARG) $(WHAT) $(TEST_ARGS)
381+
374382
.PHONY: verify-k8s-deps
375383
verify-k8s-deps: ## Verify kubernetes deps
376384
hack/validate-k8s.sh

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
github.com/spf13/cobra v1.9.1
2424
github.com/spf13/pflag v1.0.6
2525
github.com/stretchr/testify v1.10.0
26+
go.uber.org/goleak v1.3.0
2627
go.uber.org/multierr v1.11.0
2728
golang.org/x/sys v0.32.0
2829
gopkg.in/square/go-jose.v2 v2.6.0

pkg/network/dialer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"fmt"
2525
"net"
2626
"net/http"
27+
"sync/atomic"
28+
"unsafe"
2729

2830
utilnet "k8s.io/apimachinery/pkg/util/net"
2931
"k8s.io/klog/v2"
@@ -43,7 +45,14 @@ func DefaultTransportWrapper(rt http.RoundTripper) http.RoundTripper {
4345
klog.FromContext(context.Background()).Error(err, "Cannot set timeout settings on roundtripper")
4446
return rt
4547
}
46-
tr.DialContext = DefaultDialContext()
48+
49+
// This function may be called from different goroutines on the same
50+
// `rt` at the same time, causing a data race.
51+
// To preven this race .DialContext is swapped atomically.
52+
defaultDialContext := DefaultDialContext()
53+
trDialContext := unsafe.Pointer(&tr.DialContext)
54+
atomic.StorePointer(&trDialContext, unsafe.Pointer(&defaultDialContext))
55+
4756
return rt
4857
}
4958

pkg/server/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http/httputil"
2626
"net/url"
2727
"os"
28+
"sync"
2829

2930
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
3031
"k8s.io/apimachinery/pkg/runtime"
@@ -207,7 +208,16 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co
207208

208209
// break connections on the tcp layer. Setting the client timeout would
209210
// also apply to watches, which we don't want.
210-
c.GenericConfig.LoopbackClientConfig.Wrap(network.DefaultTransportWrapper)
211+
// To prevent data races when wrapping the default transport in
212+
// multiple goroutines the wrapping is done in a once.
213+
var wrapDefaultTransportWrapper = sync.Once{}
214+
c.GenericConfig.LoopbackClientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
215+
wrapDefaultTransportWrapper.Do(func() {
216+
rt = network.DefaultTransportWrapper(rt)
217+
})
218+
return rt
219+
})
220+
211221
// Set effective version to the default kube version of the vendored libs.
212222
c.GenericConfig.EffectiveVersion = utilversion.DefaultKubeEffectiveVersion()
213223

pkg/server/options/controllers.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,12 @@ type Controllers struct {
4343
SAController kcmoptions.SAControllerOptions
4444
}
4545

46-
var kcmDefaults *kcmoptions.KubeControllerManagerOptions
47-
48-
func init() {
49-
var err error
50-
51-
kcmDefaults, err = kcmoptions.NewKubeControllerManagerOptions()
46+
func NewControllers() *Controllers {
47+
kcmDefaults, err := kcmoptions.NewKubeControllerManagerOptions()
5248
if err != nil {
5349
panic(err)
5450
}
55-
}
5651

57-
func NewControllers() *Controllers {
5852
return &Controllers{
5953
EnableAll: true,
6054

sdk/testing/kcp.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ var fs embed.FS
3535
func PrivateKcpServer(t TestingT, options ...kcptestingserver.Option) kcptestingserver.RunningServer {
3636
t.Helper()
3737

38-
serverName := "main"
39-
40-
cfg := &kcptestingserver.Config{Name: serverName}
38+
cfg := &kcptestingserver.Config{Name: "main"}
4139
for _, opt := range options {
4240
opt(cfg)
4341
}
4442

43+
serverName := cfg.Name
44+
4545
auditPolicyArg := false
4646
for _, arg := range cfg.Args {
4747
if arg == "--audit-policy-file" {

sdk/testing/server/fixture.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,6 @@ func (c *kcpServer) Run(t TestingT) error {
310310
}
311311

312312
func (c *kcpServer) Stop() {
313-
c.lock.Lock()
314-
defer c.lock.Unlock()
315-
316313
if c.cancel == nil {
317314
return
318315
}

test/e2e/framework/inprocess.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package framework
1818

1919
import (
2020
"context"
21+
"sync"
2122

2223
"github.com/spf13/pflag"
2324

@@ -32,11 +33,18 @@ import (
3233
)
3334

3435
func init() {
36+
globalOptionsLock := &sync.Mutex{}
37+
3538
kcptestingserver.ContextRunInProcessFunc = func(ctx context.Context, t kcptestingserver.TestingT, cfg kcptestingserver.Config) (<-chan struct{}, error) {
3639
ctx, cancel := context.WithCancel(ctx)
3740
t.Cleanup(cancel)
3841

42+
// During the setups global values (e.g. feature gates) are
43+
// initialized. This can produce data races as well as panics.
44+
globalOptionsLock.Lock()
3945
serverOptions := kcpoptions.NewOptions(cfg.DataDir)
46+
globalOptionsLock.Unlock()
47+
4048
fss := flag.NamedFlagSets{}
4149
serverOptions.AddFlags(&fss)
4250
all := pflag.NewFlagSet("kcp", pflag.ContinueOnError)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package framework
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"go.uber.org/goleak"
24+
)
25+
26+
var (
27+
KnownGoroutineLeaks = []goleak.Option{
28+
// context
29+
// created by: context.(*cancelCtx).propagateCancel
30+
// came up ~12 times, likely a result of some of the following
31+
// leaks
32+
goleak.IgnoreTopFunction("context.(*cancelCtx).propagateCancel.func2"),
33+
34+
// grpc
35+
// created by: google.golang.org/grpc.(*acBalancerWrapper).Connect
36+
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransportAndUnlock"),
37+
// created by: go.etcd.io/etcd/client/v3.(*watcher).newWatcherGrpcStream
38+
goleak.IgnoreTopFunction("google.golang.org/grpc.(*pickerWrapper).pick"),
39+
// created by: google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer
40+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
41+
// created by: google.golang.org/grpc.newClientStreamWithParams
42+
goleak.IgnoreTopFunction("google.golang.org/grpc.newClientStreamWithParams.func4"),
43+
// created by: google.golang.org/grpc/internal/transport.(*serverHandlerTransport).HandleStreams
44+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*serverHandlerTransport).HandleStreams.func1"),
45+
// created by: google.golang.org/grpc.newClientStreamWithParams
46+
goleak.IgnoreTopFunction("google.golang.org/grpc.newClientStreamWithParams.func4"),
47+
// created by: google.golang.org/grpc/internal/transport.NewHTTP2Client
48+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
49+
// created by: google.golang.org/grpc/internal/transport.NewHTTP2Client
50+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"),
51+
// created by: go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc.(*watchServer).Watch
52+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*recvBufferReader).readMessageHeader"),
53+
// created by: go.etcd.io/etcd/client/v3.(*watchGrpcStream).newWatchClient
54+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*recvBufferReader).readMessageHeaderClient"),
55+
// created by: google.golang.org/grpc/internal/transport.(*serverHandlerTransport).HandleStreams
56+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*serverHandlerTransport).HandleStreams.func1"),
57+
// created by: golang.org/x/net/http2.(*serverConn).scheduleHandler
58+
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*serverHandlerTransport).runStream"),
59+
// created by: google.golang.org/grpc/internal/transport.NewHTTP2Client
60+
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
61+
// created by: google.golang.org/grpc/internal/transport.(*serverHandlerTransport).HandleStreams
62+
goleak.IgnoreTopFunction("sync.runtime_notifyListWait"),
63+
64+
// etcd
65+
// created by: go.etcd.io/etcd/client/v3.(*watchGrpcStream).waitCancelSubstreams.func1
66+
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/v3.(*watchGrpcStream).waitCancelSubstreams.func1.1"),
67+
// created by: go.etcd.io/etcd/client/v3.(*watchGrpcStream).newWatchClient
68+
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/v3.(*watchGrpcStream).serveSubstream.func1"),
69+
70+
// kcp / kube
71+
// created by k8s.io/apiserver/pkg/registry/generic/registry.(*Store).startObservingCount
72+
goleak.IgnoreTopFunction("k8s.io/apimachinery/pkg/util/wait.BackoffUntil"),
73+
// created by: github.com/kcp-dev/kcp/pkg/informer.NewGenericDiscoveringDynamicSharedInformerFactory[...]
74+
goleak.IgnoreTopFunction("github.com/kcp-dev/kcp/pkg/informer.NewGenericDiscoveringDynamicSharedInformerFactory[...].func3"),
75+
// created by: k8s.io/apiserver/pkg/storage/storagebackend/factory.newETCD3Check
76+
goleak.IgnoreTopFunction("k8s.io/apiserver/pkg/storage/storagebackend/factory.newETCD3Check.func2"),
77+
78+
// Known from kcp-dev/kcp#3350
79+
// created by: k8s.io/client-go/util/workqueue.newDelayingQueue[...]
80+
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType[...]).waitingLoop"),
81+
// created by: k8s.io/client-go/util/workqueue.newQueue[...]
82+
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Typed[...]).updateUnfinishedWorkLoop"),
83+
84+
// unknown
85+
// created by: net/http.(*Transport).dialConn
86+
goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
87+
// created by: net/http.(*Server).Serve
88+
goleak.IgnoreTopFunction("golang.org/x/net/http2.(*serverConn).serve"),
89+
// created by: gopkg.in/natefinch/lumberjack%2ev2.(*Logger).mill.func1
90+
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
91+
// created by: golang.org/x/net/http2.(*serverConn).serve
92+
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
93+
}
94+
95+
// 14s as etcd sets a client request timeout of up to 7 seconds when
96+
// shutting down the server and then starts shutting down everything
97+
// else.
98+
// A shorter timestan d would lead to false positives in the tests.
99+
WaitTime = 14 * time.Second
100+
)
101+
102+
// GoleakWithDefaults verifies that there are no goroutine leaks.
103+
// Goleak tests cannot be run in parallelized tests.
104+
func GoleakWithDefaults(tb testing.TB, in ...goleak.Option) {
105+
tb.Helper()
106+
107+
tb.Logf("waiting %v for goroutines to finish", WaitTime)
108+
time.Sleep(WaitTime)
109+
110+
opts := append(KnownGoroutineLeaks, in...)
111+
goleak.VerifyNone(tb, opts...)
112+
}

0 commit comments

Comments
 (0)