Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
build
build/*
output
tmp
cover.out
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ linters:
- stylecheck
- typecheck
- unused
- ginkgolinter
run:
go: "1.22"
linters-settings:
Expand Down
71 changes: 71 additions & 0 deletions e2e/integration-tests/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package integrationtests

import (
"context"
"io/fs"
"os"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

func isCLIReady(clientset *kubernetes.Clientset, cliNS string) (bool, error) {
var collectorReady, cliDaemonsetReady bool
err := wait.PollUntilContextTimeout(context.Background(), 30*time.Second, 300*time.Second, false, func(context.Context) (done bool, err error) {
if !collectorReady {
collectorPod, err := getNamespacePods(clientset, cliNS, &metav1.ListOptions{FieldSelector: "status.phase=Running", LabelSelector: "run=collector"})
if err != nil {
return false, err
}

if len(collectorPod) > 0 {
collectorReady = true
}
}

daemonset := "netobserv-cli"
cliDaemonset, err := getDaemonSet(clientset, daemonset, cliNS)

if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
if cliDaemonset.Status.DesiredNumberScheduled == cliDaemonset.Status.NumberReady {
cliDaemonsetReady = true
}
return collectorReady && cliDaemonsetReady, nil
})
if err != nil {
return false, err
}
return true, nil
}

// get latest flows.json file
func getFlowsJSONFile() (string, error) {
// var files []fs.DirEntry
var files []string
outputDir := "./output/flow/"
dirFS := os.DirFS(outputDir)
files, err := fs.Glob(dirFS, "*.json")
if err != nil {
return "", err
}
// this could be problematic if two tests are running in parallel with --copy=true
var mostRecentFile fs.FileInfo
for _, file := range files {
fileInfo, err := os.Stat(outputDir + file)
if err != nil {
return "", nil
}
if mostRecentFile == nil || fileInfo.ModTime().After(mostRecentFile.ModTime()) {
mostRecentFile = fileInfo
}
}
return outputDir + mostRecentFile.Name(), nil
}
59 changes: 59 additions & 0 deletions e2e/integration-tests/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package integrationtests

import (
"context"
"os"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func getNewClient() (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG"))
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)

if err != nil {
return nil, err
}

return clientset, nil
}

func getClusterNodes(clientset *kubernetes.Clientset, options *metav1.ListOptions) ([]string, error) {
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), *options)
var allNodes []string
if err != nil {
return allNodes, err
}
for i := range nodes.Items {
allNodes = append(allNodes, nodes.Items[i].Name)
}
return allNodes, nil
}

func getDaemonSet(clientset *kubernetes.Clientset, daemonset string, ns string) (*appsv1.DaemonSet, error) {
ds, err := clientset.AppsV1().DaemonSets(ns).Get(context.TODO(), daemonset, metav1.GetOptions{})

if err != nil {
return nil, err
}

return ds, nil
}

func getNamespacePods(clientset *kubernetes.Clientset, namespace string, options *metav1.ListOptions) ([]string, error) {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), *options)
var allPods []string
if err != nil {
return allPods, err
}
for i := range pods.Items {
allPods = append(allPods, pods.Items[i].Name)
}
return allPods, nil
}
88 changes: 88 additions & 0 deletions e2e/integration-tests/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package integrationtests

type Flowlog struct {
// Source
SrcPort int
SrcK8sType string `json:"SrcK8S_Type"`
SrcK8sName string `json:"SrcK8S_Name"`
SrcK8sHostName string `json:"SrcK8S_HostName"`
SrcK8sOwnerType string `json:"SrcK8S_OwnerType"`
SrcAddr string
SrcMac string
SrcK8sNamespace string `json:"SrcK8S_Namespace"`
// Destination
DstPort int
DstK8sType string `json:"DstK8S_Type"`
DstK8sName string `json:"DstK8S_Name"`
DstK8sHostName string `json:"DstK8S_HostName"`
DstK8sOwnerType string `json:"DstK8S_OwnerType"`
DstAddr string
DstMac string
DstK8sHostIP string `json:"DstK8S_HostIP,omitempty"`
DstK8sNamespace string `json:"DstK8S_Namespace"`
// Protocol
Proto int
IcmpCode int
IcmpType int
Dscp int
// TODO: check if Flags supposed to be []string?
Flags int
// Time
TimeReceived int
TimeFlowEndMs int
TimeFlowStartMs int
// Interface
IfDirection int
IfDirections []int
Interfaces []string
Etype int
// Others
Packets int
Bytes int
Duplicate bool
AgentIP string
Sampling int
HashID string `json:"_HashId,omitempty"`
IsFirst bool `json:"_IsFirst,omitempty"`
RecordType string `json:"_RecordType,omitempty"`
NumFlowLogs int `json:"numFlowLogs,omitempty"`
K8SClusterName string `json:"K8S_ClusterName,omitempty"`
// Zone
SrcK8SZone string `json:"SrcK8S_Zone,omitempty"`
DstK8SZone string `json:"DstK8S_Zone,omitempty"`
// DNS
DNSLatencyMs int `json:"DnsLatencyMs,omitempty"`
DNSFlagsResponseCode string `json:"DnsFlagsResponseCode,omitempty"`
// Packet Drop
PktDropBytes int `json:"PktDropBytes,omitempty"`
PktDropPackets int `json:"PktDropPackets,omitempty"`
PktDropLatestState string `json:"PktDropLatestState,omitempty"`
PktDropLatestDropCause string `json:"PktDropLatestDropCause,omitempty"`
// RTT
TimeFlowRttNs int `json:"TimeFlowRttNs,omitempty"`
// Packet Translation
XlatDstAddr string `json:"XlatDstAddr,omitempty"`
XlatDstK8sName string `json:"XlatDstK8S_Name,omitempty"`
XlatDstK8sNamespace string `json:"XlatDstK8S_Namespace,omitempty"`
XlatDstK8sType string `json:"XlatDstK8S_Type,omitempty"`
XlatDstPort int `json:"XlatDstPort,omitempty"`
XlatSrcAddr string `json:"XlatSrcAddr,omitempty"`
XlatSrcK8sName string `json:"XlatSrcK8S_Name,omitempty"`
XlatSrcK8sNamespace string `json:"XlatSrcK8S_Namespace,omitempty"`
ZoneID int `json:"ZoneId,omitempty"`
// Network Events
NetworkEvents []NetworkEvent `json:"NetworkEvents,omitempty"`
// Secondary Network
SrcK8sNetworkName string `json:"SrcK8S_NetworkName,omitempty"`
DstK8sNetworkName string `json:"DstK8S_NetworkName,omitempty"`
// UDN
Udns []string `json:"Udns,omitempty"`
}
type NetworkEvent struct {
Action string `json:"Action,omitempty"`
Type string `json:"Type,omitempty"`
Name string `json:"Name,omitempty"`
Namespace string `json:"Namespace,omitempty"`
Direction string `json:"Direction,omitempty"`
Feature string `json:"Feature,omitempty"`
}
75 changes: 75 additions & 0 deletions e2e/integration-tests/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package integrationtests

import (
"encoding/json"
"fmt"
"os"
"os/exec"

g "github.com/onsi/ginkgo/v2"
o "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = g.Describe("NetObserv CLI e2e integration test suite", g.Serial, func() {
cliNS := "netobserv-cli"

g.It("Verify all CLI pods are deployed", g.Label("Sanity"), func() {
cliArgs := []string{"flows", "--copy=false"}
cmd := exec.Command(ocNetObservBinPath, cliArgs...)
err := cmd.Start()
o.Expect(err).NotTo(o.HaveOccurred())
// cleanup()
defer func() {
cliArgs := []string{"cleanup"}
cmd := exec.Command(ocNetObservBinPath, cliArgs...)
err := cmd.Run()
o.Expect(err).NotTo(o.HaveOccurred())
}()
var allPods []string
clientset, err := getNewClient()
o.Expect(err).NotTo(o.HaveOccurred())
nodes, err := getClusterNodes(clientset, &metav1.ListOptions{})
// agent pods + collector pods
totalExpectedPods := len(nodes) + 1
o.Expect(err).NotTo(o.HaveOccurred())
_, err = isCLIReady(clientset, cliNS)
o.Expect(err).NotTo(o.HaveOccurred(), "CLI didn't come ready")
allPods, err = getNamespacePods(clientset, cliNS, &metav1.ListOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(allPods).To(o.HaveLen(totalExpectedPods), fmt.Sprintf("Number of CLI pods are not as expected %d", totalExpectedPods))
})

g.It("Verify regexes filters are applied", func() {
// capture upto 500KB
nsfilter := "openshift-monitoring"
cliArgs := []string{"flows", "--regexes=SrcK8S_Namespace~" + nsfilter, "--copy=true", "--max-bytes=500000"}
cmd := exec.Command(ocNetObservBinPath, cliArgs...)
err := cmd.Run()
o.Expect(err).NotTo(o.HaveOccurred())
// cleanup()
defer func() {
cliArgs := []string{"cleanup"}
cmd := exec.Command(ocNetObservBinPath, cliArgs...)
err := cmd.Run()
o.Expect(err).NotTo(o.HaveOccurred())
}()
o.Expect(cmd.ProcessState.ExitCode()).To(o.BeNumerically("==", 0), "oc-netobserv returned non-zero exit code")
flowsJsonfile, err := getFlowsJSONFile()
o.Expect(err).NotTo(o.HaveOccurred())
flowsFile, err := os.Open(flowsJsonfile)
o.Expect(err).NotTo(o.HaveOccurred())
defer flowsFile.Close()
decoder := json.NewDecoder(flowsFile)
_, err = decoder.Token()
o.Expect(err).NotTo(o.HaveOccurred())
var flow Flowlog
for decoder.More() {
err := decoder.Decode(&flow)
o.Expect(err).NotTo(o.HaveOccurred())
if flow.SrcK8sNamespace != nsfilter {
o.Expect(true).To(o.BeFalse(), fmt.Sprintf("Flow %v does not meet regexes condition SrcK8S_Namespace~%s", flow, nsfilter))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should decode using a map[string)interface{}) here to avoid keeping the FlowLog definition in the tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will also reduce the dependencies in the go.mod file

Copy link
Member Author

@memodi memodi Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use map[string]interface{} then we'd need to do type assertions when we read those fields, correct? that make code bit untidy especially when we need to read in nested structures. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an alternative, you can declare the type inline with just what you need, such as:

		var flow struct {
			SrcK8sNamespace string `json:"SrcK8S_Namespace"`
		}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that's neat!

}
})
})
30 changes: 30 additions & 0 deletions e2e/integration-tests/integration_tests_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package integrationtests

import (
"os"
"os/exec"
"strings"
"testing"

g "github.com/onsi/ginkgo/v2"
o "github.com/onsi/gomega"
)

var ocNetObservBinPath string

func TestIntegrationTests(t *testing.T) {
o.RegisterFailHandler(g.Fail)
g.RunSpecs(t, "IntegrationTests Suite")
}

var _ = g.BeforeSuite(func() {
// kubeconfig env var and see if the cluster is reachable.
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig == "" {
g.Skip("Set KUBECONFIG env variable")
}

cmd := exec.Command("which", "oc-netobserv")
out, err := cmd.Output()
o.Expect(err).NotTo(o.HaveOccurred())
ocNetObservBinPath = strings.TrimSuffix(string(out), "\n")
})
Loading