Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ tests-e2e: oc-commands ## Run e2e tests using kind cluster
$(OCI_BIN) save -o cli-e2e-img.tar ${IMAGE}
GOOS=$(GOOS) go test -p 1 -timeout 30m -v -mod vendor -tags e2e ./e2e/...

.PHONY: tests-int
tests-int: ## Run e2e integration tests. You need a running cluster connected
GOOS= go test -p 1 -timeout 30m -v -mod vendor ./e2e/integration-tests/...

.PHONY: coverage-report
coverage-report: ## Generate coverage report
@echo "### Generating coverage report"
Expand Down
58 changes: 22 additions & 36 deletions cmd/flow_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package cmd

import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/jpillora/sizestr"
Expand All @@ -25,25 +23,12 @@ var flowCmd = &cobra.Command{
}

func runFlowCapture(_ *cobra.Command, _ []string) {
go scanner()

captureType = "Flow"
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i := range ports {
go func(idx int) {
defer wg.Done()
err := runFlowCaptureOnAddr(ports[idx], nodes[idx])
if err != nil {
// Only fatal errors are returned here
log.Fatal(err)
}
}(i)
}
wg.Wait()
go startFlowCollector()
createDisplay()
}

func runFlowCaptureOnAddr(port int, filename string) error {
func startFlowCollector() {
if len(filename) > 0 {
log.Infof("Starting Flow Capture for %s...", filename)
} else {
Expand All @@ -59,47 +44,48 @@ func runFlowCaptureOnAddr(port int, filename string) error {
log.Errorf("Create directory failed: %v", err.Error())
log.Fatal(err)
}
log.Trace("Created flow folder")
log.Debug("Created flow folder")

f, err = os.Create("./output/flow/" + filename + ".txt")
if err != nil {
log.Errorf("Create file %s failed: %v", filename, err.Error())
log.Fatal(err)
}
defer f.Close()
log.Trace("Created flow logs txt file")
log.Debug("Created flow logs txt file")

// Initialize sqlite DB
db := initFlowDB(filename)
log.Trace("Initialized database")
log.Debug("Initialized database")

flowPackets := make(chan *genericmap.Flow, 100)
collector, err := grpc.StartCollector(port, flowPackets)
if err != nil {
return fmt.Errorf("StartCollector failed: %w", err)
log.Errorf("StartCollector failed: %v", err.Error())
return
}
log.Trace("Started collector")
log.Debug("Started collector")
collectorStarted = true

go func() {
<-utils.ExitChannel()
log.Trace("Ending collector")
log.Debug("Ending collector")
close(flowPackets)
collector.Close()
db.Close()
log.Trace("Done")
log.Debug("Done")
}()

log.Trace("Ready ! Waiting for flows...")
log.Debug("Ready ! Waiting for flows...")
go hearbeat()
for fp := range flowPackets {
if !captureStarted {
log.Tracef("Received first %d flows", len(flowPackets))
log.Debugf("Received first %d flows", len(flowPackets))
}

if stopReceived {
log.Trace("Stop received")
return nil
log.Debug("Stop received")
return
}
// parse and display flow async
go parseGenericMapAndAppendFlow(fp.GenericMap.Value)
Expand All @@ -110,24 +96,25 @@ func runFlowCaptureOnAddr(port int, filename string) error {
log.Error("Error while writing to DB:", err.Error())
}
if !captureStarted {
log.Trace("Wrote flows to DB")
log.Debug("Wrote flows to DB")
}

// append new line between each record to read file easilly
bytes, err := f.Write(append(fp.GenericMap.Value, []byte(",\n")...))
if err != nil {
return err
log.Error(err)
return
}
if !captureStarted {
log.Trace("Wrote flows to json")
log.Debug("Wrote flows to json")
}

// terminate capture if max bytes reached
totalBytes += int64(bytes)
if totalBytes > maxBytes {
if exit := onLimitReached(); exit {
log.Infof("Capture reached %s, exiting now...", sizestr.ToString(maxBytes))
return nil
return
}
}

Expand All @@ -137,13 +124,12 @@ func runFlowCaptureOnAddr(port int, filename string) error {
if int(duration) > int(maxTime) {
if exit := onLimitReached(); exit {
log.Infof("Capture reached %s, exiting now...", maxTime)
return nil
return
}
}

captureStarted = true
}
return nil
}

func parseGenericMapAndAppendFlow(bytes []byte) {
Expand All @@ -155,7 +141,7 @@ func parseGenericMapAndAppendFlow(bytes []byte) {
}

if !captureStarted {
log.Tracef("Parsed genericMap %v", genericMap)
log.Debugf("Parsed genericMap %v", genericMap)
}
AppendFlow(genericMap)
}
Loading