Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func buildGRPCExporter(cfg *config.Agent, m *metrics.Metrics) (node.TerminalFunc
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m)
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.TargetTLSCACertPath, cfg.TargetTLSUserCertPath, cfg.TargetTLSUserKeyPath, cfg.GRPCMessageMaxFlows, m)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type Agent struct {
TargetHost string `env:"TARGET_HOST"`
// Port is the port the flow or packet collector, when the EXPORT variable is set to "grpc"
TargetPort int `env:"TARGET_PORT"`
// CA certificate path of the target, when TLS is used. Empty by default (no TLS).
TargetTLSCACertPath string `env:"TARGET_TLS_CA_CERT_PATH"`
// User certificate path, when mTLS is used. Empty by default (no mTLS).
TargetTLSUserCertPath string `env:"TARGET_TLS_USER_CERT_PATH"`
// User certificate key path, when mTLS is used. Empty by default (no mTLS).
TargetTLSUserKeyPath string `env:"TARGET_TLS_USER_KEY_PATH"`
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type GRPCProto struct {
batchCounter prometheus.Counter
}

func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
func StartGRPCProto(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort, caPath, userCertPath, userKeyPath)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, 1000, metrics.NoOp())
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", 1000, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("::1", port, 1000, metrics.NoOp())
exporter, err := StartGRPCProto("::1", port, "", "", "", 1000, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, msgMaxLen, metrics.NoOp())
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", msgMaxLen, metrics.NoOp())
require.NoError(t, err)

// Send a message much longer than the limit length
Expand Down
45 changes: 41 additions & 4 deletions pkg/grpc/flow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,60 @@
package flowgrpc

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

var clog = logrus.WithField("component", "grpc.Client")

// ClientConnection wraps a gRPC+protobuf connection
type ClientConnection struct {
client pbflow.CollectorClient
conn *grpc.ClientConn
}

func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
// TODO: allow configuring some options (keepalive, backoff...)
func ConnectClient(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string) (*ClientConnection, error) {
// TODO: allow configuring more options (keepalive, backoff...)
var opts []grpc.DialOption
if caPath == "" {
clog.Info("Starting GRPC client - no TLS")
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
// Configure TLS (server CA)
caCert, err := os.ReadFile(caPath)
if err != nil {
return nil, fmt.Errorf("cannot load CA certificate: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: pool,
}
if userCertPath != "" && userKeyPath != "" {
clog.Info("Starting GRPC client with mTLS")
// Configure mTLS (client certificates)
cert, err := tls.LoadX509KeyPair(userCertPath, userKeyPath)
if err != nil {
return nil, fmt.Errorf("cannot load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
} else {
clog.Info("Starting GRPC client with TLS")
}

opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
}
socket := utils.GetSocket(hostIP, hostPort)
conn, err := grpc.NewClient(socket,
grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(socket, opts...)
if err != nil {
return nil, err
}
Expand Down
154 changes: 145 additions & 9 deletions pkg/grpc/flow/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@ package flowgrpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"testing"
"time"

"github.com/mariomac/guara/pkg/test"
test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/timestamppb"
)

const timeout = 5 * time.Second

func TestGRPCCommunication(t *testing.T) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut)
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(t, err)
client := cc.Client()

Expand Down Expand Up @@ -93,7 +99,7 @@ func TestGRPCCommunication(t *testing.T) {
}

func TestConstructorOptions(t *testing.T) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(t, err)
intercepted := make(chan struct{})
// Override the default GRPC collector to verify that StartCollector is applying the
Expand All @@ -109,7 +115,7 @@ func TestConstructorOptions(t *testing.T) {
return handler(ctx, req)
})))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(t, err)
client := cc.Client()

Expand All @@ -127,13 +133,13 @@ func TestConstructorOptions(t *testing.T) {
}

func BenchmarkIPv4GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(b, err)
defer cc.Close()
client := cc.Client()
Expand Down Expand Up @@ -188,13 +194,13 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
}

func BenchmarkIPv6GRPCCommunication(b *testing.B) {
port, err := test.FreeTCPPort()
port, err := test2.FreeTCPPort()
require.NoError(b, err)
serverOut := make(chan *pbflow.Records, 1000)
collector, err := StartCollector(port, serverOut)
require.NoError(b, err)
defer collector.Close()
cc, err := ConnectClient("::1", port)
cc, err := ConnectClient("::1", port, "", "", "")
require.NoError(b, err)
defer cc.Close()
client := cc.Client()
Expand Down Expand Up @@ -249,3 +255,133 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
<-serverOut
}
}

// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
func TestGRPCCommunication_TLS(t *testing.T) {
ca, _, _, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
opts, err := buildTLSServerOptions(cert, key, "")
require.NoError(t, err)

port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port, ca, "", "")
require.NoError(t, err)
client := cc.Client()

go func() {
_, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}}},
})
require.NoError(t, err)
}()

var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 123, r.EthProtocol)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())

select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

// Note: there's more tests focused on TLS in FLP, that also cover agent's functions
func TestGRPCCommunication_MutualTLS(t *testing.T) {
ca, clCert, clKey, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
opts, err := buildTLSServerOptions(cert, key, ca)
require.NoError(t, err)

port, err := test2.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = StartCollector(port, serverOut, WithGRPCServerOptions(opts...))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port, ca, clCert, clKey)
require.NoError(t, err)
client := cc.Client()

go func() {
_, err = client.Send(context.Background(),
&pbflow.Records{Entries: []*pbflow.Record{{
EthProtocol: 123, Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x11223344},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x55667788},
},
}}},
})
require.NoError(t, err)
}()

var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 123, r.EthProtocol)
assert.EqualValues(t, 0x11223344, r.GetNetwork().GetSrcAddr().GetIpv4())
assert.EqualValues(t, 0x55667788, r.GetNetwork().GetDstAddr().GetIpv4())

select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

func buildTLSServerOptions(certPath, keyPath, clientCAPath string) ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption
if certPath != "" && keyPath != "" {
// TLS
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
}
if clientCAPath != "" {
// mTLS
caCert, err := os.ReadFile(clientCAPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
tlsCfg.ClientCAs = pool
}
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}
Loading