Skip to content

Commit d2330a5

Browse files
authored
NETOBSERV-2503: allow TLS/mTLS in grpc exporter (#843)
* NETOBSERV-2503: allow TLS/mTLS in grpc exporter Add options to configure GRPC exporter with TLS/mTLS * Add tests * Fix linter * Add a test where the client certificate is invalid
1 parent d1f6be7 commit d2330a5

File tree

7 files changed

+472
-19
lines changed

7 files changed

+472
-19
lines changed

pkg/agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func buildGRPCExporter(cfg *config.Agent, m *metrics.Metrics) (node.TerminalFunc
263263
return nil, fmt.Errorf("missing target host or port: %s:%d",
264264
cfg.TargetHost, cfg.TargetPort)
265265
}
266-
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
266+
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.TargetTLSCACertPath, cfg.TargetTLSUserCertPath, cfg.TargetTLSUserKeyPath, cfg.GRPCMessageMaxFlows, m)
267267
if err != nil {
268268
return nil, err
269269
}

pkg/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ type Agent struct {
101101
TargetHost string `env:"TARGET_HOST"`
102102
// Port is the port the flow or packet collector, when the EXPORT variable is set to "grpc"
103103
TargetPort int `env:"TARGET_PORT"`
104+
// CA certificate path of the target, when TLS is used. Empty by default (no TLS).
105+
TargetTLSCACertPath string `env:"TARGET_TLS_CA_CERT_PATH"`
106+
// User certificate path, when mTLS is used. Empty by default (no mTLS).
107+
TargetTLSUserCertPath string `env:"TARGET_TLS_USER_CERT_PATH"`
108+
// User certificate key path, when mTLS is used. Empty by default (no mTLS).
109+
TargetTLSUserKeyPath string `env:"TARGET_TLS_USER_KEY_PATH"`
104110
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
105111
// larger than that number will be split and submitted sequentially.
106112
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`

pkg/exporter/grpc_proto.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type GRPCProto struct {
3232
batchCounter prometheus.Counter
3333
}
3434

35-
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
36-
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
35+
func StartGRPCProto(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
36+
clientConn, err := grpc.ConnectClient(hostIP, hostPort, caPath, userCertPath, userKeyPath)
3737
if err != nil {
3838
return nil, err
3939
}

pkg/exporter/grpc_proto_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
2929
defer coll.Close()
3030

3131
// Start GRPCProto exporter stage
32-
exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NoOp())
32+
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", 1000, metrics.NoOp())
3333
require.NoError(t, err)
3434

3535
// Send some flows to the input of the exporter stage
@@ -71,7 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
7171
defer coll.Close()
7272

7373
// Start GRPCProto exporter stage
74-
exporter, err := StartGRPCProto("::1", port, 1000, metrics.NoOp())
74+
exporter, err := StartGRPCProto("::1", port, "", "", "", 1000, metrics.NoOp())
7575
require.NoError(t, err)
7676

7777
// Send some flows to the input of the exporter stage
@@ -114,7 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
114114

115115
const msgMaxLen = 10000
116116
// Start GRPCProto exporter stage
117-
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NoOp())
117+
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", msgMaxLen, metrics.NoOp())
118118
require.NoError(t, err)
119119

120120
// Send a message much longer than the limit length

pkg/grpc/flow/client.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,60 @@
22
package flowgrpc
33

44
import (
5+
"crypto/tls"
6+
"crypto/x509"
7+
"fmt"
8+
"os"
9+
510
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
611
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
12+
"github.com/sirupsen/logrus"
713
"google.golang.org/grpc"
14+
"google.golang.org/grpc/credentials"
815
"google.golang.org/grpc/credentials/insecure"
916
)
1017

18+
var clog = logrus.WithField("component", "grpc.Client")
19+
1120
// ClientConnection wraps a gRPC+protobuf connection
1221
type ClientConnection struct {
1322
client pbflow.CollectorClient
1423
conn *grpc.ClientConn
1524
}
1625

17-
func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
18-
// TODO: allow configuring some options (keepalive, backoff...)
26+
func ConnectClient(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string) (*ClientConnection, error) {
27+
// TODO: allow configuring more options (keepalive, backoff...)
28+
var opts []grpc.DialOption
29+
if caPath == "" {
30+
clog.Info("Starting GRPC client - no TLS")
31+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
32+
} else {
33+
// Configure TLS (server CA)
34+
caCert, err := os.ReadFile(caPath)
35+
if err != nil {
36+
return nil, fmt.Errorf("cannot load CA certificate: %w", err)
37+
}
38+
pool := x509.NewCertPool()
39+
pool.AppendCertsFromPEM(caCert)
40+
tlsConfig := &tls.Config{
41+
RootCAs: pool,
42+
}
43+
if userCertPath != "" && userKeyPath != "" {
44+
clog.Info("Starting GRPC client with mTLS")
45+
// Configure mTLS (client certificates)
46+
cert, err := tls.LoadX509KeyPair(userCertPath, userKeyPath)
47+
if err != nil {
48+
return nil, fmt.Errorf("cannot load client certificate: %w", err)
49+
}
50+
tlsConfig.Certificates = []tls.Certificate{cert}
51+
} else {
52+
clog.Info("Starting GRPC client with TLS")
53+
}
54+
55+
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
56+
}
1957
socket := utils.GetSocket(hostIP, hostPort)
20-
conn, err := grpc.NewClient(socket,
21-
grpc.WithTransportCredentials(insecure.NewCredentials()))
58+
conn, err := grpc.NewClient(socket, opts...)
2259
if err != nil {
2360
return nil, err
2461
}

pkg/grpc/flow/grpc_test.go

Lines changed: 185 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,32 @@ package flowgrpc
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"fmt"
8+
"os"
59
"testing"
610
"time"
711

8-
"github.com/mariomac/guara/pkg/test"
12+
test2 "github.com/mariomac/guara/pkg/test"
913
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
14+
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
1015
"github.com/stretchr/testify/assert"
1116
"github.com/stretchr/testify/require"
1217
"google.golang.org/grpc"
18+
"google.golang.org/grpc/credentials"
1319
"google.golang.org/protobuf/types/known/timestamppb"
1420
)
1521

1622
const timeout = 5 * time.Second
1723

1824
func TestGRPCCommunication(t *testing.T) {
19-
port, err := test.FreeTCPPort()
25+
port, err := test2.FreeTCPPort()
2026
require.NoError(t, err)
2127
serverOut := make(chan *pbflow.Records)
2228
_, err = StartCollector(port, serverOut)
2329
require.NoError(t, err)
24-
cc, err := ConnectClient("127.0.0.1", port)
30+
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
2531
require.NoError(t, err)
2632
client := cc.Client()
2733

@@ -93,7 +99,7 @@ func TestGRPCCommunication(t *testing.T) {
9399
}
94100

95101
func TestConstructorOptions(t *testing.T) {
96-
port, err := test.FreeTCPPort()
102+
port, err := test2.FreeTCPPort()
97103
require.NoError(t, err)
98104
intercepted := make(chan struct{})
99105
// Override the default GRPC collector to verify that StartCollector is applying the
@@ -109,7 +115,7 @@ func TestConstructorOptions(t *testing.T) {
109115
return handler(ctx, req)
110116
})))
111117
require.NoError(t, err)
112-
cc, err := ConnectClient("127.0.0.1", port)
118+
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
113119
require.NoError(t, err)
114120
client := cc.Client()
115121

@@ -127,13 +133,13 @@ func TestConstructorOptions(t *testing.T) {
127133
}
128134

129135
func BenchmarkIPv4GRPCCommunication(b *testing.B) {
130-
port, err := test.FreeTCPPort()
136+
port, err := test2.FreeTCPPort()
131137
require.NoError(b, err)
132138
serverOut := make(chan *pbflow.Records, 1000)
133139
collector, err := StartCollector(port, serverOut)
134140
require.NoError(b, err)
135141
defer collector.Close()
136-
cc, err := ConnectClient("127.0.0.1", port)
142+
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
137143
require.NoError(b, err)
138144
defer cc.Close()
139145
client := cc.Client()
@@ -188,13 +194,13 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
188194
}
189195

190196
func BenchmarkIPv6GRPCCommunication(b *testing.B) {
191-
port, err := test.FreeTCPPort()
197+
port, err := test2.FreeTCPPort()
192198
require.NoError(b, err)
193199
serverOut := make(chan *pbflow.Records, 1000)
194200
collector, err := StartCollector(port, serverOut)
195201
require.NoError(b, err)
196202
defer collector.Close()
197-
cc, err := ConnectClient("::1", port)
203+
cc, err := ConnectClient("::1", port, "", "", "")
198204
require.NoError(b, err)
199205
defer cc.Close()
200206
client := cc.Client()
@@ -249,3 +255,173 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
249255
<-serverOut
250256
}
251257
}
258+
259+
// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
260+
func TestGRPCCommunication_TLS(t *testing.T) {
261+
_, _, _, ca, cert, key, cleanup := test.CreateAllCerts(t)
262+
defer cleanup()
263+
opts, err := buildTLSServerOptions(cert, key, "")
264+
require.NoError(t, err)
265+
266+
port, err := test2.FreeTCPPort()
267+
require.NoError(t, err)
268+
serverOut := make(chan *pbflow.Records)
269+
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
270+
require.NoError(t, err)
271+
cc, err := ConnectClient("127.0.0.1", port, ca, "", "")
272+
require.NoError(t, err)
273+
client := cc.Client()
274+
275+
go func() {
276+
_, err = client.Send(context.Background(),
277+
&pbflow.Records{Entries: []*pbflow.Record{{
278+
EthProtocol: 123, Network: &pbflow.Network{
279+
SrcAddr: &pbflow.IP{
280+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
281+
},
282+
DstAddr: &pbflow.IP{
283+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
284+
},
285+
}}},
286+
})
287+
require.NoError(t, err)
288+
}()
289+
290+
var rs *pbflow.Records
291+
select {
292+
case rs = <-serverOut:
293+
case <-time.After(timeout):
294+
require.Fail(t, "timeout waiting for flows")
295+
}
296+
assert.Len(t, rs.Entries, 1)
297+
r := rs.Entries[0]
298+
assert.EqualValues(t, 123, r.EthProtocol)
299+
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
300+
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())
301+
302+
select {
303+
case rs = <-serverOut:
304+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
305+
default:
306+
// ok!
307+
}
308+
}
309+
310+
// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
311+
func TestGRPCCommunication_MutualTLS(t *testing.T) {
312+
clCA, clCert, clKey, ca, cert, key, cleanup := test.CreateAllCerts(t)
313+
defer cleanup()
314+
opts, err := buildTLSServerOptions(cert, key, clCA)
315+
require.NoError(t, err)
316+
317+
port, err := test2.FreeTCPPort()
318+
require.NoError(t, err)
319+
serverOut := make(chan *pbflow.Records)
320+
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
321+
require.NoError(t, err)
322+
cc, err := ConnectClient("127.0.0.1", port, ca, clCert, clKey)
323+
require.NoError(t, err)
324+
client := cc.Client()
325+
326+
go func() {
327+
_, err = client.Send(context.Background(),
328+
&pbflow.Records{Entries: []*pbflow.Record{{
329+
EthProtocol: 123, Network: &pbflow.Network{
330+
SrcAddr: &pbflow.IP{
331+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
332+
},
333+
DstAddr: &pbflow.IP{
334+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
335+
},
336+
}}},
337+
})
338+
require.NoError(t, err)
339+
}()
340+
341+
var rs *pbflow.Records
342+
select {
343+
case rs = <-serverOut:
344+
case <-time.After(timeout):
345+
require.Fail(t, "timeout waiting for flows")
346+
}
347+
assert.Len(t, rs.Entries, 1)
348+
r := rs.Entries[0]
349+
assert.EqualValues(t, 123, r.EthProtocol)
350+
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
351+
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())
352+
353+
select {
354+
case rs = <-serverOut:
355+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
356+
default:
357+
// ok!
358+
}
359+
}
360+
361+
func TestGRPCCommunication_MutualTLS_InvalidCert(t *testing.T) {
362+
_, clCert, clKey, ca, cert, key, cleanup := test.CreateAllCerts(t)
363+
defer cleanup()
364+
365+
// Here we pass the server CA, which was NOT used to generate the client cert, which means that the client cert should be rejected upon connecting
366+
opts, err := buildTLSServerOptions(cert, key, ca)
367+
require.NoError(t, err)
368+
369+
port, err := test2.FreeTCPPort()
370+
require.NoError(t, err)
371+
serverOut := make(chan *pbflow.Records)
372+
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
373+
require.NoError(t, err)
374+
cc, err := ConnectClient("127.0.0.1", port, ca, clCert, clKey)
375+
require.NoError(t, err)
376+
client := cc.Client()
377+
378+
go func() {
379+
_, err = client.Send(context.Background(),
380+
&pbflow.Records{Entries: []*pbflow.Record{{
381+
EthProtocol: 123, Network: &pbflow.Network{
382+
SrcAddr: &pbflow.IP{
383+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
384+
},
385+
DstAddr: &pbflow.IP{
386+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
387+
},
388+
}}},
389+
})
390+
require.ErrorContains(t, err, "tls: unknown certificate authority")
391+
}()
392+
393+
select {
394+
case rs := <-serverOut:
395+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
396+
default:
397+
// ok!
398+
}
399+
}
400+
401+
func buildTLSServerOptions(certPath, keyPath, clientCAPath string) ([]grpc.ServerOption, error) {
402+
var opts []grpc.ServerOption
403+
if certPath != "" && keyPath != "" {
404+
// TLS
405+
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
406+
if err != nil {
407+
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
408+
}
409+
tlsCfg := &tls.Config{
410+
Certificates: []tls.Certificate{cert},
411+
ClientAuth: tls.NoClientCert,
412+
}
413+
if clientCAPath != "" {
414+
// mTLS
415+
caCert, err := os.ReadFile(clientCAPath)
416+
if err != nil {
417+
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
418+
}
419+
pool := x509.NewCertPool()
420+
pool.AppendCertsFromPEM(caCert)
421+
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
422+
tlsCfg.ClientCAs = pool
423+
}
424+
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
425+
}
426+
return opts, nil
427+
}

0 commit comments

Comments
 (0)