Skip to content

Commit 3da9b4e

Browse files
committed
Add tests
1 parent 09e5c76 commit 3da9b4e

File tree

13 files changed

+373
-87
lines changed

13 files changed

+373
-87
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,4 @@ require (
174174
)
175175

176176
replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101
177+
replace github.com/netobserv/netobserv-ebpf-agent => github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
177177
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
178178
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukKIUAVK4xIFUpL12xuexA0FuTVpuo=
179179
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
180+
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca h1:VQMzly3wKRXwWm0V8Bnzj2Us786hKAR1CCfH/24+yd8=
181+
github.com/jotak/netobserv-agent v0.0.0-20251201132656-d1dee7de8bca/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
180182
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
181183
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
182184
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -257,8 +259,6 @@ github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+
257259
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
258260
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 h1:rxQipq0xpoiao7ifls/82JCcOVALC4n08ppTLCUFGL4=
259261
github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
260-
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e h1:hGJIbcfTbzjpuZ9K7hVwD/6+KijR0TfqJjmXeigQELc=
261-
github.com/netobserv/netobserv-ebpf-agent v1.10.0-community.0.20251125162210-4be10c36721e/go.mod h1:ZQf6WKnhdfdaR5PQ/Fc99l4/doL94OQeTkQknwFiQZo=
262262
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
263263
github.com/netsampler/goflow2 v1.3.7/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
264264
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=

pkg/pipeline/encode/encode_kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func Test_TLSConfigCA(t *testing.T) {
122122

123123
func Test_MutualTLSConfig(t *testing.T) {
124124
test.ResetPromRegistry()
125-
ca, user, userKey, cleanup := test.CreateAllCerts(t)
125+
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
126126
defer cleanup()
127127
pipeline := config.NewIPFIXPipeline("ingest", api.IngestIpfix{})
128128
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package ingest
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
test2 "github.com/mariomac/guara/pkg/test"
9+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
10+
"github.com/netobserv/flowlogs-pipeline/pkg/config"
11+
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
12+
"github.com/netobserv/flowlogs-pipeline/pkg/test"
13+
flowgrpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
14+
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
type tlsConfig struct {
20+
clientCertPath string
21+
clientKeyPath string
22+
serverCAPathForClient string
23+
serverCertPath string
24+
serverKeyPath string
25+
clientCAPathForServer string
26+
}
27+
28+
func TestGRPC_NoTLS(t *testing.T) {
29+
runGRPCTestForTLS(t, tlsConfig{}, "")
30+
}
31+
32+
func TestGRPC_SimpleTLS(t *testing.T) {
33+
ca, _, _, cert, key, cleanup := test.CreateAllCerts(t)
34+
defer cleanup()
35+
runGRPCTestForTLS(
36+
t,
37+
tlsConfig{
38+
serverCAPathForClient: ca,
39+
serverCertPath: cert,
40+
serverKeyPath: key,
41+
},
42+
"",
43+
)
44+
}
45+
46+
func TestGRPC_MutualTLS(t *testing.T) {
47+
ca, user, userKey, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
48+
defer cleanup()
49+
runGRPCTestForTLS(
50+
t,
51+
tlsConfig{
52+
clientCAPathForServer: ca,
53+
clientCertPath: user,
54+
clientKeyPath: userKey,
55+
serverCAPathForClient: ca,
56+
serverCertPath: serverCert,
57+
serverKeyPath: serverKey,
58+
},
59+
"",
60+
)
61+
}
62+
63+
func TestGRPC_FailExpectingTLS(t *testing.T) {
64+
_, _, _, cert, key, cleanup := test.CreateAllCerts(t)
65+
defer cleanup()
66+
runGRPCTestForTLS(
67+
t,
68+
tlsConfig{
69+
serverCAPathForClient: "", // by not providing server CA, the client is in NOTLS mode
70+
serverCertPath: cert,
71+
serverKeyPath: key,
72+
},
73+
"connection error",
74+
)
75+
}
76+
77+
func TestGRPC_FailExpectingMTLS(t *testing.T) {
78+
ca, _, _, serverCert, serverKey, cleanup := test.CreateAllCerts(t)
79+
defer cleanup()
80+
runGRPCTestForTLS(
81+
t,
82+
tlsConfig{
83+
clientCAPathForServer: ca,
84+
clientCertPath: "", // by not providing client cert, the client is in simple TLS mode
85+
clientKeyPath: "",
86+
serverCAPathForClient: ca,
87+
serverCertPath: serverCert,
88+
serverKeyPath: serverKey,
89+
},
90+
"write tcp", // would prefer a more explicit error, but that's all that we get: "write tcp 127.0.0.1:54154->127.0.0.1:40763: write: broken pipe"
91+
)
92+
}
93+
94+
func runGRPCTestForTLS(t *testing.T, certs tlsConfig, expectErrorContains string) {
95+
port, err := test2.FreeTCPPort()
96+
require.NoError(t, err)
97+
98+
out := make(chan config.GenericMap)
99+
cfg := &config.Ingest{
100+
GRPC: &api.IngestGRPCProto{
101+
Port: port,
102+
CertPath: certs.serverCertPath,
103+
KeyPath: certs.serverKeyPath,
104+
ClientCAPath: certs.clientCAPathForServer,
105+
},
106+
}
107+
ingester, err := NewGRPCProtobuf(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Ingest: cfg})
108+
require.NoError(t, err)
109+
110+
go ingester.Ingest(out)
111+
112+
flowSender, err := flowgrpc.ConnectClient("127.0.0.1", port, certs.serverCAPathForClient, certs.clientCertPath, certs.clientKeyPath)
113+
require.NoError(t, err)
114+
defer flowSender.Close()
115+
116+
record := &pbflow.Records{
117+
Entries: []*pbflow.Record{{
118+
EthProtocol: 2048,
119+
Transport: &pbflow.Transport{},
120+
Network: &pbflow.Network{
121+
SrcAddr: &pbflow.IP{
122+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
123+
},
124+
DstAddr: &pbflow.IP{
125+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
126+
},
127+
},
128+
}},
129+
}
130+
131+
_, err = flowSender.Client().Send(context.Background(), record)
132+
if expectErrorContains != "" {
133+
require.ErrorContains(t, err, expectErrorContains)
134+
return
135+
}
136+
require.NoError(t, err)
137+
138+
var received config.GenericMap
139+
select {
140+
case r := <-out:
141+
received = r
142+
case <-time.After(timeout):
143+
require.Fail(t, "timeout while waiting for Ingester to receive and process data")
144+
}
145+
146+
assert.Equal(t, "1.2.3.4", received["SrcAddr"])
147+
assert.Equal(t, "5.6.7.8", received["DstAddr"])
148+
}

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func Test_TLSConfigCA(t *testing.T) {
242242

243243
func Test_MutualTLSConfig(t *testing.T) {
244244
test.ResetPromRegistry()
245-
ca, user, userKey, cleanup := test.CreateAllCerts(t)
245+
ca, user, userKey, _, _, cleanup := test.CreateAllCerts(t)
246246
defer cleanup()
247247
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
248248
Brokers: []string{"any"},

pkg/pipeline/pipeline_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ parameters:
168168
// yield thread to allow pipe services correctly start
169169
time.Sleep(10 * time.Millisecond)
170170

171-
flowSender, err := grpc.ConnectClient("127.0.0.1", port)
171+
flowSender, err := grpc.ConnectClient("127.0.0.1", port, "", "", "")
172172
require.NoError(t, err)
173173
defer flowSender.Close()
174174

pkg/pipeline/write/grpc/client.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,38 @@
11
package grpc
22

33
import (
4-
"flag"
5-
"log"
4+
"crypto/tls"
65

76
pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
87
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
8+
"github.com/sirupsen/logrus"
99
"google.golang.org/grpc"
10+
"google.golang.org/grpc/credentials"
1011
"google.golang.org/grpc/credentials/insecure"
1112
)
1213

14+
var glog = logrus.WithField("component", "write.GRPCClient")
15+
1316
// ClientConnection wraps a gRPC+protobuf connection
1417
type ClientConnection struct {
1518
client pb.CollectorClient
1619
conn *grpc.ClientConn
1720
}
1821

19-
func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
20-
flag.Parse()
22+
func ConnectClient(hostIP string, hostPort int, tlsConfig *tls.Config) (*ClientConnection, error) {
2123
// Set up a connection to the server.
24+
var creds credentials.TransportCredentials
25+
if tlsConfig != nil {
26+
creds = credentials.NewTLS(tlsConfig)
27+
} else {
28+
glog.Info("Using GRPC - No TLS")
29+
creds = insecure.NewCredentials()
30+
}
2231
socket := utils.GetSocket(hostIP, hostPort)
23-
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
32+
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(creds))
2433

2534
if err != nil {
26-
log.Fatalf("did not connect: %v", err)
35+
return nil, err
2736
}
2837

2938
return &ClientConnection{

pkg/pipeline/write/grpc/grpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestGRPCCommunication(t *testing.T) {
2323
serverOut := make(chan *genericmap.Flow)
2424
_, err = StartCollector(port, serverOut)
2525
require.NoError(t, err)
26-
cc, err := ConnectClient("127.0.0.1", port)
26+
cc, err := ConnectClient("127.0.0.1", port, nil)
2727
require.NoError(t, err)
2828
client := cc.Client()
2929

@@ -76,7 +76,7 @@ func TestConstructorOptions(t *testing.T) {
7676
return handler(ctx, req)
7777
})))
7878
require.NoError(t, err)
79-
cc, err := ConnectClient("127.0.0.1", port)
79+
cc, err := ConnectClient("127.0.0.1", port, nil)
8080
require.NoError(t, err)
8181
client := cc.Client()
8282

pkg/pipeline/write/write_grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewWriteGRPC(params config.StageParam) (Writer, error) {
4646
return nil, fmt.Errorf("write.grpc param is mandatory: %v", params.Write)
4747
}
4848
logrus.Debugf("NewWriteGRPC ConnectClient %s:%d...", writeGRPC.hostIP, writeGRPC.hostPort)
49-
clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort)
49+
clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort, nil)
5050
if err != nil {
5151
return nil, err
5252
}

pkg/pipeline/write/write_grpc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
func Test_WriteGRPC(t *testing.T) {
1414
port, err := test.FreeTCPPort()
1515
require.NoError(t, err)
16-
cc, err := grpc.ConnectClient("127.0.0.1", port)
16+
cc, err := grpc.ConnectClient("127.0.0.1", port, nil)
1717
require.NoError(t, err)
1818
ws := writeGRPC{
1919
hostIP: "127.0.0.1",

0 commit comments

Comments
 (0)