Skip to content

Commit 1836f95

Browse files
authored
Merge pull request kubernetes#88549 from caesarxuchao/egressSelector-metrics
Add metrics for egress dials
2 parents 7c7ce47 + 1e78fc0 commit 1836f95

File tree

6 files changed

+439
-85
lines changed

6 files changed

+439
-85
lines changed

staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ go_library(
1616
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
1717
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
1818
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1beta1:go_default_library",
19+
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library",
1920
"//vendor/google.golang.org/grpc:go_default_library",
2021
"//vendor/k8s.io/klog:go_default_library",
2122
"//vendor/k8s.io/utils/path:go_default_library",
23+
"//vendor/k8s.io/utils/trace:go_default_library",
2224
"//vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client:go_default_library",
2325
"//vendor/sigs.k8s.io/yaml:go_default_library",
2426
],
@@ -33,7 +35,10 @@ filegroup(
3335

3436
filegroup(
3537
name = "all-srcs",
36-
srcs = [":package-srcs"],
38+
srcs = [
39+
":package-srcs",
40+
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:all-srcs",
41+
],
3742
tags = ["automanaged"],
3843
visibility = ["//visibility:public"],
3944
)
@@ -47,7 +52,11 @@ go_test(
4752
embed = [":go_default_library"],
4853
deps = [
4954
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
55+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
5056
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
5157
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
58+
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library",
59+
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
60+
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
5261
],
5362
)

staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go

Lines changed: 185 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,21 @@ import (
2222
"crypto/tls"
2323
"crypto/x509"
2424
"fmt"
25-
"google.golang.org/grpc"
2625
"io/ioutil"
27-
utilnet "k8s.io/apimachinery/pkg/util/net"
28-
"k8s.io/apiserver/pkg/apis/apiserver"
29-
"k8s.io/klog"
3026
"net"
3127
"net/http"
3228
"net/url"
33-
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
3429
"strings"
30+
"time"
31+
32+
"google.golang.org/grpc"
33+
34+
utilnet "k8s.io/apimachinery/pkg/util/net"
35+
"k8s.io/apiserver/pkg/apis/apiserver"
36+
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
37+
"k8s.io/klog"
38+
utiltrace "k8s.io/utils/trace"
39+
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
3540
)
3641

3742
var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext
@@ -123,16 +128,122 @@ func tunnelHTTPConnect(proxyConn net.Conn, proxyAddress, addr string) (net.Conn,
123128
return proxyConn, nil
124129
}
125130

126-
func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialFunc, error) {
127-
clientCert := tcpTransport.TLSConfig.ClientCert
128-
clientKey := tcpTransport.TLSConfig.ClientKey
129-
caCert := tcpTransport.TLSConfig.CABundle
130-
proxyURL, err := url.Parse(tcpTransport.URL)
131+
type proxier interface {
132+
// proxy returns a connection to addr.
133+
proxy(addr string) (net.Conn, error)
134+
}
135+
136+
var _ proxier = &httpConnectProxier{}
137+
138+
type httpConnectProxier struct {
139+
conn net.Conn
140+
proxyAddress string
141+
}
142+
143+
func (t *httpConnectProxier) proxy(addr string) (net.Conn, error) {
144+
return tunnelHTTPConnect(t.conn, t.proxyAddress, addr)
145+
}
146+
147+
var _ proxier = &grpcProxier{}
148+
149+
type grpcProxier struct {
150+
tunnel client.Tunnel
151+
}
152+
153+
func (g *grpcProxier) proxy(addr string) (net.Conn, error) {
154+
return g.tunnel.Dial("tcp", addr)
155+
}
156+
157+
type proxyServerConnector interface {
158+
// connect establishes connection to the proxy server, and returns a
159+
// proxier based on the connection.
160+
connect() (proxier, error)
161+
}
162+
163+
type tcpHTTPConnectConnector struct {
164+
proxyAddress string
165+
tlsConfig *tls.Config
166+
}
167+
168+
func (t *tcpHTTPConnectConnector) connect() (proxier, error) {
169+
conn, err := tls.Dial("tcp", t.proxyAddress, t.tlsConfig)
131170
if err != nil {
132-
return nil, fmt.Errorf("invalid proxy server url %q: %v", tcpTransport.URL, err)
171+
return nil, err
133172
}
134-
proxyAddress := proxyURL.Host
173+
return &httpConnectProxier{conn: conn, proxyAddress: t.proxyAddress}, nil
174+
}
135175

176+
type udsHTTPConnectConnector struct {
177+
udsName string
178+
}
179+
180+
func (u *udsHTTPConnectConnector) connect() (proxier, error) {
181+
conn, err := net.Dial("unix", u.udsName)
182+
if err != nil {
183+
return nil, err
184+
}
185+
return &httpConnectProxier{conn: conn, proxyAddress: u.udsName}, nil
186+
}
187+
188+
type udsGRPCConnector struct {
189+
udsName string
190+
}
191+
192+
func (u *udsGRPCConnector) connect() (proxier, error) {
193+
udsName := u.udsName
194+
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
195+
c, err := net.Dial("unix", udsName)
196+
if err != nil {
197+
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
198+
}
199+
return c, err
200+
})
201+
202+
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
203+
if err != nil {
204+
return nil, err
205+
}
206+
return &grpcProxier{tunnel: tunnel}, nil
207+
}
208+
209+
type dialerCreator struct {
210+
connector proxyServerConnector
211+
direct bool
212+
options metricsOptions
213+
}
214+
215+
type metricsOptions struct {
216+
transport string
217+
protocol string
218+
}
219+
220+
func (d *dialerCreator) createDialer() utilnet.DialFunc {
221+
if d.direct {
222+
return directDialer
223+
}
224+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
225+
trace := utiltrace.New(fmt.Sprintf("Proxy via HTTP Connect over %s", d.options.transport), utiltrace.Field{Key: "address", Value: addr})
226+
defer trace.LogIfLong(500 * time.Millisecond)
227+
start := egressmetrics.Metrics.Clock().Now()
228+
proxier, err := d.connector.connect()
229+
if err != nil {
230+
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageConnect)
231+
return nil, err
232+
}
233+
conn, err := proxier.proxy(addr)
234+
if err != nil {
235+
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageProxy)
236+
return nil, err
237+
}
238+
egressmetrics.Metrics.ObserveDialLatency(egressmetrics.Metrics.Clock().Now().Sub(start), d.options.protocol, d.options.transport)
239+
return conn, nil
240+
}
241+
}
242+
243+
func getTLSConfig(t *apiserver.TLSConfig) (*tls.Config, error) {
244+
clientCert := t.ClientCert
245+
clientKey := t.ClientKey
246+
caCert := t.CABundle
136247
clientCerts, err := tls.LoadX509KeyPair(clientCert, clientKey)
137248
if err != nil {
138249
return nil, fmt.Errorf("failed to read key pair %s & %s, got %v", clientCert, clientKey, err)
@@ -151,56 +262,75 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
151262
// Use host's root CA set instead of providing our own
152263
certPool = nil
153264
}
154-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
155-
klog.V(4).Infof("Sending request to %q.", addr)
156-
proxyConn, err := tls.Dial("tcp", proxyAddress,
157-
&tls.Config{
158-
Certificates: []tls.Certificate{clientCerts},
159-
RootCAs: certPool,
160-
},
161-
)
162-
if err != nil {
163-
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
164-
}
165-
return tunnelHTTPConnect(proxyConn, proxyAddress, addr)
166-
}
167-
return contextDialer, nil
265+
return &tls.Config{
266+
Certificates: []tls.Certificate{clientCerts},
267+
RootCAs: certPool,
268+
}, nil
168269
}
169270

170-
func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) {
171-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
172-
proxyConn, err := net.Dial("unix", udsConfig.UDSName)
173-
if err != nil {
174-
return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err)
175-
}
176-
return tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr)
271+
func getProxyAddress(urlString string) (string, error) {
272+
proxyURL, err := url.Parse(urlString)
273+
if err != nil {
274+
return "", fmt.Errorf("invalid proxy server url %q: %v", urlString, err)
177275
}
178-
return contextDialer, nil
276+
return proxyURL.Host, nil
179277
}
180278

181-
func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
182-
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
279+
func connectionToDialerCreator(c apiserver.Connection) (*dialerCreator, error) {
280+
switch c.ProxyProtocol {
183281

184-
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
185-
c, err := net.Dial("unix", udsName)
282+
case apiserver.ProtocolHTTPConnect:
283+
if c.Transport.UDS != nil {
284+
return &dialerCreator{
285+
connector: &udsHTTPConnectConnector{
286+
udsName: c.Transport.UDS.UDSName,
287+
},
288+
options: metricsOptions{
289+
transport: egressmetrics.TransportUDS,
290+
protocol: egressmetrics.ProtocolHTTPConnect,
291+
},
292+
}, nil
293+
} else if c.Transport.TCP != nil {
294+
tlsConfig, err := getTLSConfig(c.Transport.TCP.TLSConfig)
186295
if err != nil {
187-
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
296+
return nil, err
188297
}
189-
return c, err
190-
})
191-
192-
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
193-
if err != nil {
194-
return nil, err
298+
proxyAddress, err := getProxyAddress(c.Transport.TCP.URL)
299+
if err != nil {
300+
return nil, err
301+
}
302+
return &dialerCreator{
303+
connector: &tcpHTTPConnectConnector{
304+
tlsConfig: tlsConfig,
305+
proxyAddress: proxyAddress,
306+
},
307+
options: metricsOptions{
308+
transport: egressmetrics.TransportTCP,
309+
protocol: egressmetrics.ProtocolHTTPConnect,
310+
},
311+
}, nil
312+
} else {
313+
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
195314
}
196-
197-
proxyConn, err := tunnel.Dial("tcp", addr)
198-
if err != nil {
199-
return nil, err
315+
case apiserver.ProtocolGRPC:
316+
if c.Transport.UDS != nil {
317+
return &dialerCreator{
318+
connector: &udsGRPCConnector{
319+
udsName: c.Transport.UDS.UDSName,
320+
},
321+
options: metricsOptions{
322+
transport: egressmetrics.TransportUDS,
323+
protocol: egressmetrics.ProtocolGRPC,
324+
},
325+
}, nil
200326
}
201-
return proxyConn, nil
327+
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
328+
case apiserver.ProtocolDirect:
329+
return &dialerCreator{direct: true}, nil
330+
default:
331+
return nil, fmt.Errorf("unrecognized service connection protocol %q", c.ProxyProtocol)
202332
}
203-
return contextDialer, nil
333+
204334
}
205335

206336
// NewEgressSelector configures lookup mechanism for Lookup.
@@ -218,40 +348,11 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe
218348
if err != nil {
219349
return nil, err
220350
}
221-
switch service.Connection.ProxyProtocol {
222-
223-
case apiserver.ProtocolHTTPConnect:
224-
if service.Connection.Transport.UDS != nil {
225-
contextDialer, err := createConnectUDSDialer(service.Connection.Transport.UDS)
226-
if err != nil {
227-
return nil, fmt.Errorf("failed to create HTTPConnect uds dialer: %v", err)
228-
}
229-
cs.egressToDialer[name] = contextDialer
230-
} else if service.Connection.Transport.TCP != nil {
231-
contextDialer, err := createConnectTCPDialer(service.Connection.Transport.TCP)
232-
if err != nil {
233-
return nil, fmt.Errorf("failed to create HTTPConnect dialer: %v", err)
234-
}
235-
cs.egressToDialer[name] = contextDialer
236-
} else {
237-
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
238-
}
239-
case apiserver.ProtocolGRPC:
240-
if service.Connection.Transport.UDS != nil {
241-
grpcContextDialer, err := createGRPCUDSDialer(service.Connection.Transport.UDS.UDSName)
242-
if err != nil {
243-
return nil, fmt.Errorf("failed to create grpc dialer: %v", err)
244-
}
245-
cs.egressToDialer[name] = grpcContextDialer
246-
247-
} else {
248-
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
249-
}
250-
case apiserver.ProtocolDirect:
251-
cs.egressToDialer[name] = directDialer
252-
default:
253-
return nil, fmt.Errorf("unrecognized service connection protocol %q", service.Connection.ProxyProtocol)
351+
dialerCreator, err := connectionToDialerCreator(service.Connection)
352+
if err != nil {
353+
return nil, fmt.Errorf("failed to create dialer for egressSelection %q: %v", name, err)
254354
}
355+
cs.egressToDialer[name] = dialerCreator.createDialer()
255356
}
256357
return cs, nil
257358
}

0 commit comments

Comments
 (0)