Skip to content

Commit 75a990e

Browse files
authored
NETOBSERV-2503: allow TLS/mTLS in grpc ingestor (#1146)
* NETOBSERV-2503: allow TLS/mTLS in grpc ingestor * Add tests * fix linter
1 parent 977e834 commit 75a990e

File tree

15 files changed

+423
-96
lines changed

15 files changed

+423
-96
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,4 @@ require (
175175
)
176176

177177
replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101
178+
replace github.com/netobserv/netobserv-ebpf-agent => github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
177177
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
178178
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukKIUAVK4xIFUpL12xuexA0FuTVpuo=
179179
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
180+
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca h1:VQMzly3wKRXwWm0V8Bnzj2Us786hKAR1CCfH/24+yd8=
181+
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
180182
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
181183
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
182184
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -257,8 +259,6 @@ github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+
257259
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
258260
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 h1:rxQipq0xpoiao7ifls/82JCcOVALC4n08ppTLCUFGL4=
259261
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
260-
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e h1:hGJIbcfTbzjpuZ9K7hVwD/6+KijR0TfqJjmXeigQELc=
261-
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
262262
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
263263
github.com/netsampler/goflow2 v1.3.7/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
264264
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=

pkg/api/ingest_grpc.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package api
22

33
type IngestGRPCProto struct {
4-
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
5-
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
4+
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
5+
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
6+
CertPath string `yaml:"certPath,omitempty" json:"certPath,omitempty" doc:"path of the TLS certificate, if any"`
7+
KeyPath string `yaml:"keyPath,omitempty" json:"keyPath,omitempty" doc:"path of the TLS certificate key, if any"`
8+
ClientCAPath string `yaml:"clientCAPath,omitempty" json:"clientCAPath,omitempty" doc:"path of the client TLS CA, if any, for mutual TLS"`
69
}

pkg/pipeline/encode/encode_kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func Test_TLSConfigCA(t *testing.T) {
122122

123123
func Test_MutualTLSConfig(t *testing.T) {
124124
test.ResetPromRegistry()
125-
ca, user, userKey, cleanup := test.CreateAllCerts(t)
125+
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
126126
defer cleanup()
127127
pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{})
128128
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package ingest
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"errors"
58
"fmt"
9+
"os"
610

711
"github.com/netobserv/flowlogs-pipeline/pkg/api"
812
"github.com/netobserv/flowlogs-pipeline/pkg/config"
@@ -15,6 +19,7 @@ import (
1519

1620
"github.com/sirupsen/logrus"
1721
grpc2 "google.golang.org/grpc"
22+
"google.golang.org/grpc/credentials"
1823
"google.golang.org/grpc/status"
1924
"google.golang.org/protobuf/proto"
2025
)
@@ -33,14 +38,14 @@ type GRPCProtobuf struct {
3338
}
3439

3540
func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (*GRPCProtobuf, error) {
36-
netObserv := api.IngestGRPCProto{}
41+
cfg := api.IngestGRPCProto{}
3742
if params.Ingest != nil && params.Ingest.GRPC != nil {
38-
netObserv = *params.Ingest.GRPC
43+
cfg = *params.Ingest.GRPC
3944
}
40-
if netObserv.Port == 0 {
41-
return nil, fmt.Errorf("ingest port not specified")
45+
if cfg.Port == 0 {
46+
return nil, errors.New("ingest port not specified")
4247
}
43-
bufLen := netObserv.BufferLen
48+
bufLen := cfg.BufferLen
4449
if bufLen == 0 {
4550
bufLen = defaultBufferLen
4651
}
@@ -54,8 +59,40 @@ func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (
5459
withBatchSizeBytes(),
5560
withStageDuration(),
5661
)
57-
collector, err := grpc.StartCollector(netObserv.Port, flowPackets,
58-
grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(metrics))))
62+
var opts []grpc2.ServerOption
63+
// GRPC metrics
64+
opts = append(opts, grpc2.UnaryInterceptor(instrumentGRPC(metrics)))
65+
66+
if cfg.CertPath != "" && cfg.KeyPath != "" {
67+
// TLS
68+
cert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath)
69+
if err != nil {
70+
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
71+
}
72+
tlsCfg := &tls.Config{
73+
Certificates: []tls.Certificate{cert},
74+
ClientAuth: tls.NoClientCert,
75+
}
76+
if cfg.ClientCAPath != "" {
77+
// mTLS
78+
caCert, err := os.ReadFile(cfg.ClientCAPath)
79+
if err != nil {
80+
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
81+
}
82+
pool := x509.NewCertPool()
83+
pool.AppendCertsFromPEM(caCert)
84+
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
85+
tlsCfg.ClientCAs = pool
86+
glog.Info("Starting GRPC server with mTLS")
87+
} else {
88+
glog.Info("Starting GRPC server with TLS")
89+
}
90+
opts = append(opts, grpc2.Creds(credentials.NewTLS(tlsCfg)))
91+
} else {
92+
glog.Info("Starting GRPC server - no TLS")
93+
}
94+
95+
collector, err := grpc.StartCollector(cfg.Port, flowPackets, grpc.WithGRPCServerOptions(opts...))
5996
if err != nil {
6097
return nil, err
6198
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package ingest
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
test2 "github.com/mariomac/guara/pkg/test"
9+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
10+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
11+
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
12+
"github.com/netobserv/flowlogs-pipeline/pkg/test"
13+
flowgrpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
14+
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
type tlsConfig struct {
20+
clientCertPath string
21+
clientKeyPath string
22+
serverCAPathForClient string
23+
serverCertPath string
24+
serverKeyPath string
25+
clientCAPathForServer string
26+
}
27+
28+
func TestGRPC_NoTLS(t *testing.T) {
29+
runGRPCTestForTLS(t, tlsConfig{}, "")
30+
}
31+
32+
func TestGRPC_SimpleTLS(t *testing.T) {
33+
ca, _, _, cert, key, cleanup := test.CreateAllCerts(t)
34+
defer cleanup()
35+
runGRPCTestForTLS(
36+
t,
37+
tlsConfig{
38+
serverCAPathForClient: ca,
39+
serverCertPath: cert,
40+
serverKeyPath: key,
41+
},
42+
"",
43+
)
44+
}
45+
46+
func TestGRPC_MutualTLS(t *testing.T) {
47+
ca, user, userKey, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
48+
defer cleanup()
49+
runGRPCTestForTLS(
50+
t,
51+
tlsConfig{
52+
clientCAPathForServer: ca,
53+
clientCertPath: user,
54+
clientKeyPath: userKey,
55+
serverCAPathForClient: ca,
56+
serverCertPath: serverCert,
57+
serverKeyPath: serverKey,
58+
},
59+
"",
60+
)
61+
}
62+
63+
func TestGRPC_FailExpectingTLS(t *testing.T) {
64+
_, _, _, cert, key, cleanup := test.CreateAllCerts(t)
65+
defer cleanup()
66+
runGRPCTestForTLS(
67+
t,
68+
tlsConfig{
69+
serverCAPathForClient: "", // by not providing server CA, the client is in NOTLS mode
70+
serverCertPath: cert,
71+
serverKeyPath: key,
72+
},
73+
"connection error",
74+
)
75+
}
76+
77+
func TestGRPC_FailExpectingMTLS(t *testing.T) {
78+
ca, _, _, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
79+
defer cleanup()
80+
runGRPCTestForTLS(
81+
t,
82+
tlsConfig{
83+
clientCAPathForServer: ca,
84+
clientCertPath: "", // by not providing client cert, the client is in simple TLS mode
85+
clientKeyPath: "",
86+
serverCAPathForClient: ca,
87+
serverCertPath: serverCert,
88+
serverKeyPath: serverKey,
89+
},
90+
"write tcp", // would prefer a more explicit error, but that's all that we get: "write tcp 127.0.0.1:54154->127.0.0.1:40763: write: broken pipe"
91+
)
92+
}
93+
94+
// nolint:gocritic // hugeParam; don't care in tests
95+
func runGRPCTestForTLS(t *testing.T, certs tlsConfig, expectErrorContains string) {
96+
port, err := test2.FreeTCPPort()
97+
require.NoError(t, err)
98+
99+
out := make(chan config.GenericMap)
100+
cfg := &config.Ingest{
101+
GRPC: &api.IngestGRPCProto{
102+
Port: port,
103+
CertPath: certs.serverCertPath,
104+
KeyPath: certs.serverKeyPath,
105+
ClientCAPath: certs.clientCAPathForServer,
106+
},
107+
}
108+
ingester, err := NewGRPCProtobuf(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Ingest: cfg})
109+
require.NoError(t, err)
110+
111+
go ingester.Ingest(out)
112+
113+
flowSender, err := flowgrpc.ConnectClient("127.0.0.1", port, certs.serverCAPathForClient, certs.clientCertPath, certs.clientKeyPath)
114+
require.NoError(t, err)
115+
defer flowSender.Close()
116+
117+
record := &pbflow.Records{
118+
Entries: []*pbflow.Record{{
119+
EthProtocol: 2048,
120+
Transport: &pbflow.Transport{},
121+
Network: &pbflow.Network{
122+
SrcAddr: &pbflow.IP{
123+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
124+
},
125+
DstAddr: &pbflow.IP{
126+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
127+
},
128+
},
129+
}},
130+
}
131+
132+
_, err = flowSender.Client().Send(context.Background(), record)
133+
if expectErrorContains != "" {
134+
require.ErrorContains(t, err, expectErrorContains)
135+
return
136+
}
137+
require.NoError(t, err)
138+
139+
var received config.GenericMap
140+
select {
141+
case r := <-out:
142+
received = r
143+
case <-time.After(timeout):
144+
require.Fail(t, "timeout while waiting for Ingester to receive and process data")
145+
}
146+
147+
assert.Equal(t, "1.2.3.4", received["SrcAddr"])
148+
assert.Equal(t, "5.6.7.8", received["DstAddr"])
149+
}

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func Test_TLSConfigCA(t *testing.T) {
242242

243243
func Test_MutualTLSConfig(t *testing.T) {
244244
test.ResetPromRegistry()
245-
ca, user, userKey, cleanup := test.CreateAllCerts(t)
245+
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
246246
defer cleanup()
247247
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
248248
Brokers: []string{"any"},

pkg/pipeline/pipeline_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ parameters:
168168
// yield thread to allow pipe services correctly start
169169
time.Sleep(10 * time.Millisecond)
170170

171-
flowSender, err := grpc.ConnectClient("127.0.0.1", port)
171+
flowSender, err := grpc.ConnectClient("127.0.0.1", port, "", "", "")
172172
require.NoError(t, err)
173173
defer flowSender.Close()
174174

pkg/pipeline/write/grpc/client.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,38 @@
11
package grpc
22

33
import (
4-
"flag"
5-
"log"
4+
"crypto/tls"
65

76
pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
87
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
8+
"github.com/sirupsen/logrus"
99
"google.golang.org/grpc"
10+
"google.golang.org/grpc/credentials"
1011
"google.golang.org/grpc/credentials/insecure"
1112
)
1213

14+
var glog = logrus.WithField("component", "write.GRPCClient")
15+
1316
// ClientConnection wraps a gRPC+protobuf connection
1417
type ClientConnection struct {
1518
client pb.CollectorClient
1619
conn *grpc.ClientConn
1720
}
1821

19-
func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
20-
flag.Parse()
22+
func ConnectClient(hostIP string, hostPort int, tlsConfig *tls.Config) (*ClientConnection, error) {
2123
// Set up a connection to the server.
24+
var creds credentials.TransportCredentials
25+
if tlsConfig != nil {
26+
creds = credentials.NewTLS(tlsConfig)
27+
} else {
28+
glog.Info("Using GRPC - No TLS")
29+
creds = insecure.NewCredentials()
30+
}
2231
socket := utils.GetSocket(hostIP, hostPort)
23-
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
32+
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(creds))
2433

2534
if err != nil {
26-
log.Fatalf("did not connect: %v", err)
35+
return nil, err
2736
}
2837

2938
return &ClientConnection{

pkg/pipeline/write/grpc/grpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestGRPCCommunication(t *testing.T) {
2323
serverOut := make(chan *genericmap.Flow)
2424
_, err = StartCollector(port, serverOut)
2525
require.NoError(t, err)
26-
cc, err := ConnectClient("127.0.0.1", port)
26+
cc, err := ConnectClient("127.0.0.1", port, nil)
2727
require.NoError(t, err)
2828
client := cc.Client()
2929

@@ -76,7 +76,7 @@ func TestConstructorOptions(t *testing.T) {
7676
return handler(ctx, req)
7777
})))
7878
require.NoError(t, err)
79-
cc, err := ConnectClient("127.0.0.1", port)
79+
cc, err := ConnectClient("127.0.0.1", port, nil)
8080
require.NoError(t, err)
8181
client := cc.Client()
8282

0 commit comments

Comments
 (0)