Skip to content

Commit 8f9c83c

Browse files
committed
fix(proxy): fix data race in DialContext
1 parent 605c987 commit 8f9c83c

File tree

2 files changed

+39
-47
lines changed

2 files changed

+39
-47
lines changed

Makefile

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ test-infrastructure: $(SETUP_ENVTEST) ## Run unit and integration tests with rac
943943
# Note: Fuzz tests are not executed with race detector because they would just time out.
944944
# To achieve that, all files with fuzz tests have the "!race" build tag, to still run fuzz tests
945945
# we have an additional `go test` run that focuses on "TestFuzzyConversion".
946-
cd test/infrastructure; KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test ./... $(TEST_ARGS)
946+
cd test/infrastructure; KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race $(TEST_ARGS) ./...
947947
$(MAKE) test-infrastructure-conversions TEST_ARGS="$(TEST_ARGS)"
948948

949949
.PHONY: test-infrastructure-conversions
@@ -956,10 +956,11 @@ test-infrastructure-verbose: ## Run unit and integration tests with race detecto
956956

957957
.PHONY: test-infrastructure-junit
958958
test-infrastructure-junit: $(SETUP_ENVTEST) $(GOTESTSUM) ## Run unit and integration tests with race detector and generate a junit report for docker infrastructure provider
959-
cd test/infrastructure; set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.infra_docker.exitcode) | tee $(ARTIFACTS)/junit.infra_docker.stdout
959+
@mkdir -p "$(ARTIFACTS)"
960+
cd test/infrastructure; set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race $(TEST_ARGS) -json ./...; echo $$? > $(ARTIFACTS)/junit.infra_docker.exitcode) | tee $(ARTIFACTS)/junit.infra_docker.stdout
960961
$(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.infra_docker.xml --raw-command cat $(ARTIFACTS)/junit.infra_docker.stdout
961962
exit $$(cat $(ARTIFACTS)/junit.infra_docker.exitcode)
962-
cd test/infrastructure; set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -run "^TestFuzzyConversion$$" -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit-fuzz.infra_docker.exitcode) | tee $(ARTIFACTS)/junit-fuzz.infra_docker.stdout
963+
cd test/infrastructure; set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -run "^TestFuzzyConversion$$" $(TEST_ARGS) -json ./...; echo $$? > $(ARTIFACTS)/junit-fuzz.infra_docker.exitcode) | tee $(ARTIFACTS)/junit-fuzz.infra_docker.stdout
963964
$(GOTESTSUM) --junitfile $(ARTIFACTS)/junit-fuzz.infra_docker.xml --raw-command cat $(ARTIFACTS)/junit-fuzz.infra_docker.stdout
964965
exit $$(cat $(ARTIFACTS)/junit-fuzz.infra_docker.exitcode)
965966

test/infrastructure/inmemory/pkg/server/proxy/dial.go

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -33,85 +33,80 @@ import (
3333

3434
const defaultTimeout = 10 * time.Second
3535

36-
// Dialer creates connections using Kubernetes API Server port-forwarding.
36+
// Dialer opens connections via Kubernetes API server port-forward.
37+
//
38+
// Safe for concurrent use: each dial builds its own SPDY transport and http.Client
39+
// to avoid races.
3740
type Dialer struct {
38-
proxy Proxy
39-
clientset *kubernetes.Clientset
40-
proxyTransport http.RoundTripper
41-
upgrader spdy.Upgrader
42-
timeout time.Duration
41+
proxy Proxy
42+
clientset *kubernetes.Clientset
43+
timeout time.Duration
4344
}
4445

45-
// NewDialer creates a new dialer for a given API server scope.
46+
// NewDialer returns a Dialer for proxy p.
47+
// Options may set timeouts or other fields. The timeout also updates p.KubeConfig.
4648
func NewDialer(p Proxy, options ...func(*Dialer) error) (*Dialer, error) {
4749
if p.Port == 0 {
4850
return nil, errors.New("port required")
4951
}
5052

51-
dialer := &Dialer{
52-
proxy: p,
53-
}
53+
d := &Dialer{proxy: p}
5454

5555
for _, option := range options {
56-
err := option(dialer)
57-
if err != nil {
56+
if err := option(d); err != nil {
5857
return nil, err
5958
}
6059
}
6160

62-
if dialer.timeout == 0 {
63-
dialer.timeout = defaultTimeout
64-
}
65-
p.KubeConfig.Timeout = dialer.timeout
66-
clientset, err := kubernetes.NewForConfig(p.KubeConfig)
67-
if err != nil {
68-
return nil, err
61+
if d.timeout == 0 {
62+
d.timeout = defaultTimeout
6963
}
70-
proxyTransport, upgrader, err := spdy.RoundTripperFor(p.KubeConfig)
64+
p.KubeConfig.Timeout = d.timeout
65+
66+
cs, err := kubernetes.NewForConfig(p.KubeConfig)
7167
if err != nil {
7268
return nil, err
7369
}
74-
dialer.proxyTransport = proxyTransport
75-
dialer.upgrader = upgrader
76-
dialer.clientset = clientset
77-
return dialer, nil
70+
d.clientset = cs
71+
return d, nil
7872
}
7973

80-
// DialContextWithAddr is a GO grpc compliant dialer construct.
74+
// DialContextWithAddr matches gRPC's dialer signature and calls DialContext.
8175
func (d *Dialer) DialContextWithAddr(ctx context.Context, addr string) (net.Conn, error) {
8276
return d.DialContext(ctx, scheme, addr)
8377
}
8478

85-
// DialContext creates proxied port-forwarded connections.
86-
// ctx is currently unused, but fulfils the type signature used by GRPC.
79+
// DialContext opens a port-forwarded connection to addr.
80+
//
81+
// Builds a fresh SPDY transport each time to avoid shared-state races.
82+
// ctx is kept for gRPC compatibility but ignored here.
8783
func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn, error) {
84+
proxyTransport, upgrader, err := spdy.RoundTripperFor(d.proxy.KubeConfig)
85+
if err != nil {
86+
return nil, err
87+
}
88+
httpClient := &http.Client{Transport: proxyTransport}
89+
8890
req := d.clientset.CoreV1().RESTClient().
8991
Post().
9092
Resource(d.proxy.Kind).
9193
Namespace(d.proxy.Namespace).
9294
Name(addr).
9395
SubResource("portforward")
9496

95-
dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())
97+
dialer := spdy.NewDialer(upgrader, httpClient, "POST", req.URL())
9698

97-
// Create a new connection from the dialer.
98-
//
99-
// Warning: Any early return should close this connection, otherwise we're going to leak them.
10099
connection, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
101100
if err != nil {
102101
return nil, errors.Wrap(err, "error upgrading connection")
103102
}
104103

105-
// Create the headers.
104+
// Port must match proxy.Port; request ID is always "0".
106105
headers := http.Header{}
107-
108-
// Set the header port number to match the proxy one.
109106
headers.Set(corev1.PortHeader, fmt.Sprintf("%d", d.proxy.Port))
110-
111-
// We only create a single stream over the connection
112107
headers.Set(corev1.PortForwardRequestIDHeader, "0")
113108

114-
// Create the error stream.
109+
// Error stream: required by protocol, closed immediately.
115110
headers.Set(corev1.StreamType, corev1.StreamTypeError)
116111
errorStream, err := connection.CreateStream(headers)
117112
if err != nil {
@@ -120,18 +115,14 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn
120115
connection.Close(),
121116
})
122117
}
123-
// Close the error stream right away, we're not writing to it.
124118
if err := errorStream.Close(); err != nil {
125119
return nil, kerrors.NewAggregate([]error{
126120
err,
127121
connection.Close(),
128122
})
129123
}
130124

131-
// Create the data stream.
132-
//
133-
// NOTE: Given that we're reusing the headers,
134-
// we need to overwrite the stream type before creating it.
125+
// Data stream: switch type and create the I/O stream.
135126
headers.Set(corev1.StreamType, corev1.StreamTypeData)
136127
dataStream, err := connection.CreateStream(headers)
137128
if err != nil {
@@ -141,17 +132,17 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn
141132
})
142133
}
143134

144-
// Create the net.Conn and return.
145135
return NewConn(connection, dataStream), nil
146136
}
147137

148-
// DialTimeout sets the timeout.
138+
// DialTimeout sets the per-dial timeout (also applied to KubeConfig).
149139
func DialTimeout(duration time.Duration) func(*Dialer) error {
150140
return func(d *Dialer) error {
151141
return d.setTimeout(duration)
152142
}
153143
}
154144

145+
// setTimeout updates timeout; used by DialTimeout.
155146
func (d *Dialer) setTimeout(duration time.Duration) error {
156147
d.timeout = duration
157148
return nil

0 commit comments

Comments
 (0)