Skip to content

Commit 09e5c76

Browse files
committed
NETOBSERV-2503: allow TLS/mTLS in grpc ingestor
1 parent 7262038 commit 09e5c76

File tree

2 files changed

+49
-9
lines changed

2 files changed

+49
-9
lines changed

pkg/api/ingest_grpc.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package api
22

33
type IngestGRPCProto struct {
4-
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
5-
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
4+
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
5+
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
6+
CertPath string `yaml:"certPath,omitempty" json:"certPath,omitempty" doc:"path of the TLS certificate, if any"`
7+
KeyPath string `yaml:"keyPath,omitempty" json:"keyPath,omitempty" doc:"path of the TLS certificate key, if any"`
8+
ClientCAPath string `yaml:"clientCAPath,omitempty" json:"clientCAPath,omitempty" doc:"path of the client TLS CA, if any, for mutual TLS"`
69
}

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package ingest
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"errors"
58
"fmt"
9+
"os"
610

711
"github.com/netobserv/flowlogs-pipeline/pkg/api"
812
"github.com/netobserv/flowlogs-pipeline/pkg/config"
@@ -15,6 +19,7 @@ import (
1519

1620
"github.com/sirupsen/logrus"
1721
grpc2 "google.golang.org/grpc"
22+
"google.golang.org/grpc/credentials"
1823
"google.golang.org/grpc/status"
1924
"google.golang.org/protobuf/proto"
2025
)
@@ -33,14 +38,14 @@ type GRPCProtobuf struct {
3338
}
3439

3540
func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (*GRPCProtobuf, error) {
36-
netObserv := api.IngestGRPCProto{}
41+
cfg := api.IngestGRPCProto{}
3742
if params.Ingest != nil && params.Ingest.GRPC != nil {
38-
netObserv = *params.Ingest.GRPC
43+
cfg = *params.Ingest.GRPC
3944
}
40-
if netObserv.Port == 0 {
41-
return nil, fmt.Errorf("ingest port not specified")
45+
if cfg.Port == 0 {
46+
return nil, errors.New("ingest port not specified")
4247
}
43-
bufLen := netObserv.BufferLen
48+
bufLen := cfg.BufferLen
4449
if bufLen == 0 {
4550
bufLen = defaultBufferLen
4651
}
@@ -54,8 +59,40 @@ func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (
5459
withBatchSizeBytes(),
5560
withStageDuration(),
5661
)
57-
collector, err := grpc.StartCollector(netObserv.Port, flowPackets,
58-
grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(metrics))))
62+
var opts []grpc2.ServerOption
63+
// GRPC metrics
64+
opts = append(opts, grpc2.UnaryInterceptor(instrumentGRPC(metrics)))
65+
66+
if cfg.CertPath != "" && cfg.KeyPath != "" {
67+
// TLS
68+
cert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath)
69+
if err != nil {
70+
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
71+
}
72+
tlsCfg := &tls.Config{
73+
Certificates: []tls.Certificate{cert},
74+
ClientAuth: tls.NoClientCert,
75+
}
76+
if cfg.ClientCAPath != "" {
77+
// mTLS
78+
caCert, err := os.ReadFile(cfg.ClientCAPath)
79+
if err != nil {
80+
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
81+
}
82+
pool := x509.NewCertPool()
83+
pool.AppendCertsFromPEM(caCert)
84+
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
85+
tlsCfg.ClientCAs = pool
86+
glog.Info("Starting GRPC server with mTLS")
87+
} else {
88+
glog.Info("Starting GRPC server with TLS")
89+
}
90+
opts = append(opts, grpc2.Creds(credentials.NewTLS(tlsCfg)))
91+
} else {
92+
glog.Info("Starting GRPC server - no TLS")
93+
}
94+
95+
collector, err := grpc.StartCollector(cfg.Port, flowPackets, grpc.WithGRPCServerOptions(opts...))
5996
if err != nil {
6097
return nil, err
6198
}

0 commit comments

Comments
 (0)