Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,4 @@ require (
)

replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101
replace github.com/netobserv/netobserv-ebpf-agent => github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukKIUAVK4xIFUpL12xuexA0FuTVpuo=
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca h1:VQMzly3wKRXwWm0V8Bnzj2Us786hKAR1CCfH/24+yd8=
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -257,8 +259,6 @@ github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 h1:rxQipq0xpoiao7ifls/82JCcOVALC4n08ppTLCUFGL4=
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e h1:hGJIbcfTbzjpuZ9K7hVwD/6+KijR0TfqJjmXeigQELc=
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
github.com/netsampler/goflow2 v1.3.7/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package api

type IngestGRPCProto struct {
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
CertPath string `yaml:"certPath,omitempty" json:"certPath,omitempty" doc:"path of the TLS certificate, if any"`
KeyPath string `yaml:"keyPath,omitempty" json:"keyPath,omitempty" doc:"path of the TLS certificate key, if any"`
ClientCAPath string `yaml:"clientCAPath,omitempty" json:"clientCAPath,omitempty" doc:"path of the client TLS CA, if any, for mutual TLS"`
}
2 changes: 1 addition & 1 deletion pkg/pipeline/encode/encode_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Test_TLSConfigCA(t *testing.T) {

func Test_MutualTLSConfig(t *testing.T) {
test.ResetPromRegistry()
ca, user, userKey, cleanup := test.CreateAllCerts(t)
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
defer cleanup()
pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{})
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{
Expand Down
51 changes: 44 additions & 7 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package ingest

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -15,6 +19,7 @@ import (

"github.com/sirupsen/logrus"
grpc2 "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
Expand All @@ -33,14 +38,14 @@ type GRPCProtobuf struct {
}

func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (*GRPCProtobuf, error) {
netObserv := api.IngestGRPCProto{}
cfg := api.IngestGRPCProto{}
if params.Ingest != nil && params.Ingest.GRPC != nil {
netObserv = *params.Ingest.GRPC
cfg = *params.Ingest.GRPC
}
if netObserv.Port == 0 {
return nil, fmt.Errorf("ingest port not specified")
if cfg.Port == 0 {
return nil, errors.New("ingest port not specified")
}
bufLen := netObserv.BufferLen
bufLen := cfg.BufferLen
if bufLen == 0 {
bufLen = defaultBufferLen
}
Expand All @@ -54,8 +59,40 @@ func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) (
withBatchSizeBytes(),
withStageDuration(),
)
collector, err := grpc.StartCollector(netObserv.Port, flowPackets,
grpc.WithGRPCServerOptions(grpc2.UnaryInterceptor(instrumentGRPC(metrics))))
var opts []grpc2.ServerOption
// GRPC metrics
opts = append(opts, grpc2.UnaryInterceptor(instrumentGRPC(metrics)))

if cfg.CertPath != "" && cfg.KeyPath != "" {
// TLS
cert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured certificate: %w", err)
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
}
if cfg.ClientCAPath != "" {
// mTLS
caCert, err := os.ReadFile(cfg.ClientCAPath)
if err != nil {
return nil, fmt.Errorf("cannot load configured client CA certificate: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert
tlsCfg.ClientCAs = pool
glog.Info("Starting GRPC server with mTLS")
} else {
glog.Info("Starting GRPC server with TLS")
}
opts = append(opts, grpc2.Creds(credentials.NewTLS(tlsCfg)))
} else {
glog.Info("Starting GRPC server - no TLS")
}

collector, err := grpc.StartCollector(cfg.Port, flowPackets, grpc.WithGRPCServerOptions(opts...))
if err != nil {
return nil, err
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/pipeline/ingest/ingest_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package ingest

import (
"context"
"testing"
"time"

test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
flowgrpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type tlsConfig struct {
clientCertPath string
clientKeyPath string
serverCAPathForClient string
serverCertPath string
serverKeyPath string
clientCAPathForServer string
}

func TestGRPC_NoTLS(t *testing.T) {
runGRPCTestForTLS(t, tlsConfig{}, "")
}

func TestGRPC_SimpleTLS(t *testing.T) {
ca, _, _, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
runGRPCTestForTLS(
t,
tlsConfig{
serverCAPathForClient: ca,
serverCertPath: cert,
serverKeyPath: key,
},
"",
)
}

func TestGRPC_MutualTLS(t *testing.T) {
ca, user, userKey, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
defer cleanup()
runGRPCTestForTLS(
t,
tlsConfig{
clientCAPathForServer: ca,
clientCertPath: user,
clientKeyPath: userKey,
serverCAPathForClient: ca,
serverCertPath: serverCert,
serverKeyPath: serverKey,
},
"",
)
}

func TestGRPC_FailExpectingTLS(t *testing.T) {
_, _, _, cert, key, cleanup := test.CreateAllCerts(t)
defer cleanup()
runGRPCTestForTLS(
t,
tlsConfig{
serverCAPathForClient: "", // by not providing server CA, the client is in NOTLS mode
serverCertPath: cert,
serverKeyPath: key,
},
"connection error",
)
}

func TestGRPC_FailExpectingMTLS(t *testing.T) {
ca, _, _, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
defer cleanup()
runGRPCTestForTLS(
t,
tlsConfig{
clientCAPathForServer: ca,
clientCertPath: "", // by not providing client cert, the client is in simple TLS mode
clientKeyPath: "",
serverCAPathForClient: ca,
serverCertPath: serverCert,
serverKeyPath: serverKey,
},
"write tcp", // would prefer a more explicit error, but that's all that we get: "write tcp 127.0.0.1:54154->127.0.0.1:40763: write: broken pipe"
)
}

// nolint:gocritic // hugeParam; don't care in tests
func runGRPCTestForTLS(t *testing.T, certs tlsConfig, expectErrorContains string) {
port, err := test2.FreeTCPPort()
require.NoError(t, err)

out := make(chan config.GenericMap)
cfg := &config.Ingest{
GRPC: &api.IngestGRPCProto{
Port: port,
CertPath: certs.serverCertPath,
KeyPath: certs.serverKeyPath,
ClientCAPath: certs.clientCAPathForServer,
},
}
ingester, err := NewGRPCProtobuf(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Ingest: cfg})
require.NoError(t, err)

go ingester.Ingest(out)

flowSender, err := flowgrpc.ConnectClient("127.0.0.1", port, certs.serverCAPathForClient, certs.clientCertPath, certs.clientKeyPath)
require.NoError(t, err)
defer flowSender.Close()

record := &pbflow.Records{
Entries: []*pbflow.Record{{
EthProtocol: 2048,
Transport: &pbflow.Transport{},
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
},
}},
}

_, err = flowSender.Client().Send(context.Background(), record)
if expectErrorContains != "" {
require.ErrorContains(t, err, expectErrorContains)
return
}
require.NoError(t, err)

var received config.GenericMap
select {
case r := <-out:
received = r
case <-time.After(timeout):
require.Fail(t, "timeout while waiting for Ingester to receive and process data")
}

assert.Equal(t, "1.2.3.4", received["SrcAddr"])
assert.Equal(t, "5.6.7.8", received["DstAddr"])
}
2 changes: 1 addition & 1 deletion pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func Test_TLSConfigCA(t *testing.T) {

func Test_MutualTLSConfig(t *testing.T) {
test.ResetPromRegistry()
ca, user, userKey, cleanup := test.CreateAllCerts(t)
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
defer cleanup()
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
Brokers: []string{"any"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ parameters:
// yield thread to allow pipe services correctly start
time.Sleep(10 * time.Millisecond)

flowSender, err := grpc.ConnectClient("127.0.0.1", port)
flowSender, err := grpc.ConnectClient("127.0.0.1", port, "", "", "")
require.NoError(t, err)
defer flowSender.Close()

Expand Down
21 changes: 15 additions & 6 deletions pkg/pipeline/write/grpc/client.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
package grpc

import (
"flag"
"log"
"crypto/tls"

pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

var glog = logrus.WithField("component", "write.GRPCClient")

// ClientConnection wraps a gRPC+protobuf connection
type ClientConnection struct {
client pb.CollectorClient
conn *grpc.ClientConn
}

func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
flag.Parse()
func ConnectClient(hostIP string, hostPort int, tlsConfig *tls.Config) (*ClientConnection, error) {
// Set up a connection to the server.
var creds credentials.TransportCredentials
if tlsConfig != nil {
creds = credentials.NewTLS(tlsConfig)
} else {
glog.Info("Using GRPC - No TLS")
creds = insecure.NewCredentials()
}
socket := utils.GetSocket(hostIP, hostPort)
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(creds))

if err != nil {
log.Fatalf("did not connect: %v", err)
return nil, err
}

return &ClientConnection{
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/write/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestGRPCCommunication(t *testing.T) {
serverOut := make(chan *genericmap.Flow)
_, err = StartCollector(port, serverOut)
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, nil)
require.NoError(t, err)
client := cc.Client()

Expand Down Expand Up @@ -76,7 +76,7 @@ func TestConstructorOptions(t *testing.T) {
return handler(ctx, req)
})))
require.NoError(t, err)
cc, err := ConnectClient("127.0.0.1", port)
cc, err := ConnectClient("127.0.0.1", port, nil)
require.NoError(t, err)
client := cc.Client()

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/write/write_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewWriteGRPC(params config.StageParam) (Writer, error) {
return nil, fmt.Errorf("write.grpc param is mandatory: %v", params.Write)
}
logrus.Debugf("NewWriteGRPC ConnectClient %s:%d...", writeGRPC.hostIP, writeGRPC.hostPort)
clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort)
clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/write/write_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func Test_WriteGRPC(t *testing.T) {
port, err := test.FreeTCPPort()
require.NoError(t, err)
cc, err := grpc.ConnectClient("127.0.0.1", port)
cc, err := grpc.ConnectClient("127.0.0.1", port, nil)
require.NoError(t, err)
ws := writeGRPC{
hostIP: "127.0.0.1",
Expand Down
Loading