Skip to content

Commit fb3ae59

Browse files
author
Mario Macias
authored
Added ICMP single-packet test (#42)
* Added ICMP single-packet test * Fix timestamp comparison to avoid getting twice the same record
1 parent bbc573b commit fb3ae59

File tree

4 files changed

+144
-8
lines changed

4 files changed

+144
-8
lines changed

e2e/basic/common.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ func (bt *FlowCaptureTester) lokiQuery(t *testing.T, logQL string) tester.LokiQu
243243
require.NotNil(t, query)
244244
require.NotEmpty(t, query.Data.Result)
245245
}, test.Interval(time.Second))
246-
require.NotEmpty(t, query.Data.Result)
247246
result := query.Data.Result[0]
248247
return result
249248
}

e2e/basic/flow_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@
33
package basic
44

55
import (
6+
"context"
67
"path"
78
"testing"
89
"time"
910

11+
"github.com/mariomac/guara/pkg/test"
1012
"github.com/netobserv/netobserv-ebpf-agent/e2e/cluster"
13+
"github.com/netobserv/netobserv-ebpf-agent/e2e/cluster/tester"
1114
"github.com/sirupsen/logrus"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/client-go/kubernetes"
19+
"sigs.k8s.io/e2e-framework/pkg/envconf"
20+
"sigs.k8s.io/e2e-framework/pkg/features"
1221
)
1322

1423
const (
@@ -42,3 +51,120 @@ func TestBasicFlowCapture(t *testing.T) {
4251
}
4352
bt.DoTest(t)
4453
}
54+
55+
// TestSinglePacketFlows uses a known packet size and number to check that,
56+
// (1) packets are aggregated only once,
57+
// (2) once packets are evicted, no more flows are aggregated on top of them.
58+
func TestSinglePacketFlows(t *testing.T) {
59+
var pingerIP, serverPodIP string
60+
var latestFlowMS time.Time
61+
testCluster.TestEnv().Test(t, features.New("single-packet flow capture").Setup(
62+
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
63+
kclient, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
64+
require.NoError(t, err)
65+
// extract pinger Pod information from kubernetes
66+
test.Eventually(t, testTimeout, func(t require.TestingT) {
67+
client, err := kclient.CoreV1().Pods(namespace).
68+
Get(ctx, "pinger", metav1.GetOptions{})
69+
require.NoError(t, err)
70+
require.NotEmpty(t, client.Status.PodIP)
71+
pingerIP = client.Status.PodIP
72+
}, test.Interval(time.Second))
73+
// extract server (ping destination) pod information from kubernetes
74+
test.Eventually(t, testTimeout, func(t require.TestingT) {
75+
server, err := kclient.CoreV1().Pods(namespace).
76+
List(ctx, metav1.ListOptions{LabelSelector: "app=server"})
77+
require.NoError(t, err)
78+
require.Len(t, server.Items, 1)
79+
require.NotEmpty(t, server.Items)
80+
require.NotEmpty(t, server.Items[0].Status.PodIP)
81+
serverPodIP = server.Items[0].Status.PodIP
82+
}, test.Interval(time.Second))
83+
return ctx
84+
},
85+
).Assess("correctness of single, small ICMP packet from pinger to server",
86+
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
87+
pods, err := tester.NewPods(cfg)
88+
require.NoError(t, err)
89+
90+
logrus.WithField("destinationIP", serverPodIP).Info("Sending ICMP packet")
91+
stdOut, stdErr, err := pods.Execute(ctx, namespace, "pinger",
92+
"ping", "-c", "1", serverPodIP)
93+
require.NoError(t, err)
94+
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Info("ping sent")
95+
96+
sent, recv := getPingFlows(t, time.Now().Add(-time.Minute))
97+
98+
assert.Equal(t, pingerIP, sent["SrcAddr"])
99+
assert.Equal(t, serverPodIP, sent["DstAddr"])
100+
assert.EqualValues(t, 98, sent["Bytes"]) // default ping data size + IP+ICMP headers
101+
assert.EqualValues(t, 1, sent["Packets"])
102+
assert.Equal(t, pingerIP, recv["DstAddr"])
103+
assert.Equal(t, serverPodIP, recv["SrcAddr"])
104+
assert.EqualValues(t, 98, recv["Bytes"]) // default ping data size + IP+ICMP headers
105+
assert.EqualValues(t, 1, recv["Packets"])
106+
107+
latestFlowMS = asTime(recv["TimeFlowEndMs"])
108+
109+
return ctx
110+
},
111+
).Assess("correctness of another ICMP packet contained in another flow",
112+
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
113+
pods, err := tester.NewPods(cfg)
114+
require.NoError(t, err)
115+
116+
logrus.WithField("destinationIP", serverPodIP).Info("Sending ICMP packet")
117+
stdOut, stdErr, err := pods.Execute(ctx, namespace, "pinger",
118+
"ping", "-s", "100", "-c", "1", serverPodIP)
119+
require.NoError(t, err)
120+
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Info("ping sent")
121+
122+
// We filter by time to avoid getting twice the same flows
123+
sent, recv := getPingFlows(t, latestFlowMS)
124+
125+
assert.Equal(t, pingerIP, sent["SrcAddr"])
126+
assert.Equal(t, serverPodIP, sent["DstAddr"])
127+
assert.EqualValues(t, 142, sent["Bytes"]) // 100-byte data size + IP+ICMP headers
128+
assert.EqualValues(t, 1, sent["Packets"])
129+
assert.Equal(t, pingerIP, recv["DstAddr"])
130+
assert.Equal(t, serverPodIP, recv["SrcAddr"])
131+
assert.EqualValues(t, 142, recv["Bytes"]) // 100-byte data size + IP+ICMP headers
132+
assert.EqualValues(t, 1, recv["Packets"])
133+
return ctx
134+
},
135+
).Feature())
136+
}
137+
138+
func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]interface{}) {
139+
logrus.Info("Verifying that the request/return ICMP packets have been captured individually")
140+
var query *tester.LokiQueryResponse
141+
var err error
142+
test.Eventually(t, testTimeout, func(t require.TestingT) {
143+
query, err = testCluster.Loki().
144+
Query(1, `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
145+
require.NoError(t, err)
146+
require.NotNil(t, query)
147+
require.Len(t, query.Data.Result, 1)
148+
if len(query.Data.Result) > 0 {
149+
sent, err = query.Data.Result[0].Values[0].FlowData()
150+
require.NoError(t, err)
151+
require.Less(t, newerThan.UnixMilli(),
152+
asTime(sent["TimeFlowStartMs"]).UnixMilli())
153+
}
154+
}, test.Interval(time.Second))
155+
156+
test.Eventually(t, testTimeout, func(t require.TestingT) {
157+
query, err = testCluster.Loki().
158+
Query(1, `{DstK8S_OwnerName="pinger",SrcK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
159+
require.NoError(t, err)
160+
require.NotNil(t, query)
161+
require.Len(t, query.Data.Result, 1)
162+
if len(query.Data.Result) > 0 {
163+
recv, err = query.Data.Result[0].Values[0].FlowData()
164+
require.NoError(t, err)
165+
require.Less(t, newerThan.UnixMilli(),
166+
asTime(sent["TimeFlowStartMs"]).UnixMilli())
167+
}
168+
}, test.Interval(time.Second))
169+
return sent, recv
170+
}

e2e/basic/manifests/pods.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,14 @@ spec:
4242
spec:
4343
containers:
4444
- name: nginx
45-
image: nginx:latest
45+
image: nginx:latest
46+
---
47+
# Used for single-packet test
48+
apiVersion: v1
49+
kind: Pod
50+
metadata:
51+
name: pinger
52+
spec:
53+
containers:
54+
- name: pinger
55+
image: ibmcom/ping

e2e/cluster/tester/loki.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ import (
1111
)
1212

1313
const (
14-
pathReady = "/ready"
15-
pathQuery = "/loki/api/v1/query"
16-
queryArgLimit = "limit"
17-
queryArgQuery = "query"
14+
pathReady = "/ready"
15+
pathQueryRange = "/loki/api/v1/query_range"
16+
queryArgLimit = "limit"
17+
queryArgQuery = "query"
18+
queryStep = "step=30m"
1819
)
1920

2021
var llog = logrus.WithField("component", "loki.Tester")
@@ -52,8 +53,8 @@ func (l *Loki) Ready() error {
5253

5354
// Query executes an arbitrary logQL query, given a limit in the results
5455
func (l *Loki) Query(limit int, logQL string) (*LokiQueryResponse, error) {
55-
status, body, err := l.get(fmt.Sprintf("%s?%s=%d&%s=%s",
56-
pathQuery, queryArgLimit, limit,
56+
status, body, err := l.get(fmt.Sprintf("%s?%s=%d&%s&%s=%s",
57+
pathQueryRange, queryArgLimit, limit, queryStep,
5758
queryArgQuery, url.QueryEscape(logQL)))
5859
if err != nil {
5960
return nil, fmt.Errorf("loki request error: %w", err)

0 commit comments

Comments
 (0)