Skip to content

Commit bac9351

Browse files
author
Chao Xu
committed
refactor egress dialer construction code and add unit test
1 parent fbb1fb8 commit bac9351

File tree

3 files changed

+298
-114
lines changed

3 files changed

+298
-114
lines changed

staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go

Lines changed: 177 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"time"
3131

3232
"google.golang.org/grpc"
33+
3334
utilnet "k8s.io/apimachinery/pkg/util/net"
3435
"k8s.io/apiserver/pkg/apis/apiserver"
3536
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
@@ -127,16 +128,122 @@ func tunnelHTTPConnect(proxyConn net.Conn, proxyAddress, addr string) (net.Conn,
127128
return proxyConn, nil
128129
}
129130

130-
func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialFunc, error) {
131-
clientCert := tcpTransport.TLSConfig.ClientCert
132-
clientKey := tcpTransport.TLSConfig.ClientKey
133-
caCert := tcpTransport.TLSConfig.CABundle
134-
proxyURL, err := url.Parse(tcpTransport.URL)
131+
type proxier interface {
132+
// proxy returns a connection to addr.
133+
proxy(addr string) (net.Conn, error)
134+
}
135+
136+
var _ proxier = &httpConnectProxier{}
137+
138+
type httpConnectProxier struct {
139+
conn net.Conn
140+
proxyAddress string
141+
}
142+
143+
func (t *httpConnectProxier) proxy(addr string) (net.Conn, error) {
144+
return tunnelHTTPConnect(t.conn, t.proxyAddress, addr)
145+
}
146+
147+
var _ proxier = &grpcProxier{}
148+
149+
type grpcProxier struct {
150+
tunnel client.Tunnel
151+
}
152+
153+
func (g *grpcProxier) proxy(addr string) (net.Conn, error) {
154+
return g.tunnel.Dial("tcp", addr)
155+
}
156+
157+
type proxyServerConnector interface {
158+
// connect establishes connection to the proxy server, and returns a
159+
// proxier based on the connection.
160+
connect() (proxier, error)
161+
}
162+
163+
type tcpHTTPConnectConnector struct {
164+
proxyAddress string
165+
tlsConfig *tls.Config
166+
}
167+
168+
func (t *tcpHTTPConnectConnector) connect() (proxier, error) {
169+
conn, err := tls.Dial("tcp", t.proxyAddress, t.tlsConfig)
135170
if err != nil {
136-
return nil, fmt.Errorf("invalid proxy server url %q: %v", tcpTransport.URL, err)
171+
return nil, err
137172
}
138-
proxyAddress := proxyURL.Host
173+
return &httpConnectProxier{conn: conn, proxyAddress: t.proxyAddress}, nil
174+
}
175+
176+
type udsHTTPConnectConnector struct {
177+
udsName string
178+
}
139179

180+
func (u *udsHTTPConnectConnector) connect() (proxier, error) {
181+
conn, err := net.Dial("unix", u.udsName)
182+
if err != nil {
183+
return nil, err
184+
}
185+
return &httpConnectProxier{conn: conn, proxyAddress: u.udsName}, nil
186+
}
187+
188+
type udsGRPCConnector struct {
189+
udsName string
190+
}
191+
192+
func (u *udsGRPCConnector) connect() (proxier, error) {
193+
udsName := u.udsName
194+
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
195+
c, err := net.Dial("unix", udsName)
196+
if err != nil {
197+
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
198+
}
199+
return c, err
200+
})
201+
202+
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
203+
if err != nil {
204+
return nil, err
205+
}
206+
return &grpcProxier{tunnel: tunnel}, nil
207+
}
208+
209+
type dialerCreator struct {
210+
connector proxyServerConnector
211+
direct bool
212+
options metricsOptions
213+
}
214+
215+
type metricsOptions struct {
216+
transport string
217+
protocol string
218+
}
219+
220+
func (d *dialerCreator) createDialer() utilnet.DialFunc {
221+
if d.direct {
222+
return directDialer
223+
}
224+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
225+
trace := utiltrace.New(fmt.Sprintf("Proxy via HTTP Connect over %s", d.options.transport), utiltrace.Field{Key: "address", Value: addr})
226+
defer trace.LogIfLong(500 * time.Millisecond)
227+
start := egressmetrics.Metrics.Clock().Now()
228+
proxier, err := d.connector.connect()
229+
if err != nil {
230+
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageConnect)
231+
return nil, err
232+
}
233+
conn, err := proxier.proxy(addr)
234+
if err != nil {
235+
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageProxy)
236+
return nil, err
237+
}
238+
egressmetrics.Metrics.ObserveDialLatency(egressmetrics.Metrics.Clock().Now().Sub(start), d.options.protocol, d.options.transport)
239+
return conn, nil
240+
}
241+
}
242+
243+
func getTLSConfig(t *apiserver.TLSConfig) (*tls.Config, error) {
244+
clientCert := t.ClientCert
245+
clientKey := t.ClientKey
246+
caCert := t.CABundle
140247
clientCerts, err := tls.LoadX509KeyPair(clientCert, clientKey)
141248
if err != nil {
142249
return nil, fmt.Errorf("failed to read key pair %s & %s, got %v", clientCert, clientKey, err)
@@ -155,81 +262,75 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
155262
// Use host's root CA set instead of providing our own
156263
certPool = nil
157264
}
158-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
159-
trace := utiltrace.New("Proxy via HTTP Connect over TCP", utiltrace.Field{Key: "address", Value: addr})
160-
defer trace.LogIfLong(500 * time.Millisecond)
161-
klog.V(4).Infof("Sending request to %q.", addr)
162-
start := time.Now()
163-
proxyConn, err := tls.Dial("tcp", proxyAddress,
164-
&tls.Config{
165-
Certificates: []tls.Certificate{clientCerts},
166-
RootCAs: certPool,
167-
},
168-
)
169-
if err != nil {
170-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageDial)
171-
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
172-
}
173-
ret, err := tunnelHTTPConnect(proxyConn, proxyAddress, addr)
174-
if err != nil {
175-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageProxy)
176-
return nil, err
177-
}
178-
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP)
179-
return ret, nil
180-
}
181-
return contextDialer, nil
265+
return &tls.Config{
266+
Certificates: []tls.Certificate{clientCerts},
267+
RootCAs: certPool,
268+
}, nil
182269
}
183270

184-
func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) {
185-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
186-
trace := utiltrace.New("Proxy via HTTP Connect over UDS", utiltrace.Field{Key: "address", Value: addr})
187-
defer trace.LogIfLong(500 * time.Millisecond)
188-
start := time.Now()
189-
proxyConn, err := net.Dial("unix", udsConfig.UDSName)
190-
if err != nil {
191-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageDial)
192-
return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err)
193-
}
194-
ret, err := tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr)
195-
if err != nil {
196-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageProxy)
197-
return nil, err
198-
}
199-
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS)
200-
return ret, nil
271+
func getProxyAddress(urlString string) (string, error) {
272+
proxyURL, err := url.Parse(urlString)
273+
if err != nil {
274+
return "", fmt.Errorf("invalid proxy server url %q: %v", urlString, err)
201275
}
202-
return contextDialer, nil
276+
return proxyURL.Host, nil
203277
}
204278

205-
func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
206-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
207-
trace := utiltrace.New("Proxy via GRPC over UDS", utiltrace.Field{Key: "address", Value: addr})
208-
defer trace.LogIfLong(500 * time.Millisecond)
209-
start := time.Now()
210-
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
211-
c, err := net.Dial("unix", udsName)
279+
func connectionToDialerCreator(c apiserver.Connection) (*dialerCreator, error) {
280+
switch c.ProxyProtocol {
281+
282+
case apiserver.ProtocolHTTPConnect:
283+
if c.Transport.UDS != nil {
284+
return &dialerCreator{
285+
connector: &udsHTTPConnectConnector{
286+
udsName: c.Transport.UDS.UDSName,
287+
},
288+
options: metricsOptions{
289+
transport: egressmetrics.TransportUDS,
290+
protocol: egressmetrics.ProtocolHTTPConnect,
291+
},
292+
}, nil
293+
} else if c.Transport.TCP != nil {
294+
tlsConfig, err := getTLSConfig(c.Transport.TCP.TLSConfig)
212295
if err != nil {
213-
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
296+
return nil, err
214297
}
215-
return c, err
216-
})
217-
218-
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
219-
if err != nil {
220-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageDial)
221-
return nil, err
298+
proxyAddress, err := getProxyAddress(c.Transport.TCP.URL)
299+
if err != nil {
300+
return nil, err
301+
}
302+
return &dialerCreator{
303+
connector: &tcpHTTPConnectConnector{
304+
tlsConfig: tlsConfig,
305+
proxyAddress: proxyAddress,
306+
},
307+
options: metricsOptions{
308+
transport: egressmetrics.TransportTCP,
309+
protocol: egressmetrics.ProtocolHTTPConnect,
310+
},
311+
}, nil
312+
} else {
313+
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
222314
}
223-
224-
proxyConn, err := tunnel.Dial("tcp", addr)
225-
if err != nil {
226-
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageProxy)
227-
return nil, err
315+
case apiserver.ProtocolGRPC:
316+
if c.Transport.UDS != nil {
317+
return &dialerCreator{
318+
connector: &udsGRPCConnector{
319+
udsName: c.Transport.UDS.UDSName,
320+
},
321+
options: metricsOptions{
322+
transport: egressmetrics.TransportUDS,
323+
protocol: egressmetrics.ProtocolGRPC,
324+
},
325+
}, nil
228326
}
229-
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS)
230-
return proxyConn, nil
327+
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
328+
case apiserver.ProtocolDirect:
329+
return &dialerCreator{direct: true}, nil
330+
default:
331+
return nil, fmt.Errorf("unrecognized service connection protocol %q", c.ProxyProtocol)
231332
}
232-
return contextDialer, nil
333+
233334
}
234335

235336
// NewEgressSelector configures lookup mechanism for Lookup.
@@ -247,40 +348,11 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe
247348
if err != nil {
248349
return nil, err
249350
}
250-
switch service.Connection.ProxyProtocol {
251-
252-
case apiserver.ProtocolHTTPConnect:
253-
if service.Connection.Transport.UDS != nil {
254-
contextDialer, err := createConnectUDSDialer(service.Connection.Transport.UDS)
255-
if err != nil {
256-
return nil, fmt.Errorf("failed to create HTTPConnect uds dialer: %v", err)
257-
}
258-
cs.egressToDialer[name] = contextDialer
259-
} else if service.Connection.Transport.TCP != nil {
260-
contextDialer, err := createConnectTCPDialer(service.Connection.Transport.TCP)
261-
if err != nil {
262-
return nil, fmt.Errorf("failed to create HTTPConnect dialer: %v", err)
263-
}
264-
cs.egressToDialer[name] = contextDialer
265-
} else {
266-
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
267-
}
268-
case apiserver.ProtocolGRPC:
269-
if service.Connection.Transport.UDS != nil {
270-
grpcContextDialer, err := createGRPCUDSDialer(service.Connection.Transport.UDS.UDSName)
271-
if err != nil {
272-
return nil, fmt.Errorf("failed to create grpc dialer: %v", err)
273-
}
274-
cs.egressToDialer[name] = grpcContextDialer
275-
276-
} else {
277-
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
278-
}
279-
case apiserver.ProtocolDirect:
280-
cs.egressToDialer[name] = directDialer
281-
default:
282-
return nil, fmt.Errorf("unrecognized service connection protocol %q", service.Connection.ProxyProtocol)
351+
dialerCreator, err := connectionToDialerCreator(service.Connection)
352+
if err != nil {
353+
return nil, fmt.Errorf("failed to create dialer for egressSelection %q: %v", name, err)
283354
}
355+
cs.egressToDialer[name] = dialerCreator.createDialer()
284356
}
285357
return cs, nil
286358
}

0 commit comments

Comments
 (0)