diff --git a/.golangci.yml b/.golangci.yml index c607321f..c2a7bc15 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -5,7 +5,7 @@ linters: - cyclop - errname - exhaustive - - exportloopref + - copyloopvar - gocritic - gofmt - gosimple @@ -16,15 +16,17 @@ linters: - stylecheck - typecheck - unused +run: + go: "1.22" linters-settings: - stylecheck: - go: "1.22" gocritic: enabled-checks: - hugeParam - rangeExprCopy - rangeValCopy - indexAlloc - - deprecatedComment + settings: + ifElseChain: + minThreshold: 3 cyclop: - max-complexity: 150 # TODO: reduce that to 20 \ No newline at end of file + max-complexity: 150 # TODO: reduce that to 20 diff --git a/Makefile b/Makefile index 88141f7c..71bd1e89 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ ifneq ($(CLEAN_BUILD),) LDFLAGS ?= -X 'main.buildVersion=${VERSION}-${BUILD_SHA}' -X 'main.buildDate=${BUILD_DATE}' endif -GOLANGCI_LINT_VERSION = v1.54.2 +GOLANGCI_LINT_VERSION = v1.61.0 YQ_VERSION = v4.43.1 # build a single arch target provided as argument diff --git a/cmd/display_loop.go b/cmd/display_loop.go new file mode 100644 index 00000000..d2c8c6b4 --- /dev/null +++ b/cmd/display_loop.go @@ -0,0 +1,56 @@ +package cmd + +import "slices" + +type displayLoop struct { + all []displayLoopItem + current int +} + +type displayLoopItem struct { + name string + columns []string + group []string +} + +func (dl *displayLoop) prev() { + dl.current += len(dl.all) - 1 + dl.current %= len(dl.all) +} + +func (dl *displayLoop) next() { + dl.current++ + dl.current %= len(dl.all) +} + +func (dl *displayLoop) getCurrentItems() []displayLoopItem { + current := dl.all[dl.current] + if current.group == nil { + return []displayLoopItem{current} + } + var items []displayLoopItem + for _, item := range dl.all { + if slices.Contains(current.group, item.name) { + items = append(items, item) + } + } + return items +} + +func (dl *displayLoop) getNames() []string { + var names []string + items := dl.getCurrentItems() + for _, item := range items { + names = append(names, item.name) + } + return names +} + +func (dl *displayLoop) getCols() []string { + var cols []string + items := dl.getCurrentItems() + for _, item := range items { + cols = append(cols, item.columns...) + } + return cols +} diff --git a/cmd/display_loop_test.go b/cmd/display_loop_test.go new file mode 100644 index 00000000..74952eaf --- /dev/null +++ b/cmd/display_loop_test.go @@ -0,0 +1,38 @@ +package cmd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDisplayLoop(t *testing.T) { + enrichment.current = 0 + assert.Equal(t, []string{"None"}, enrichment.getNames()) + assert.Equal(t, []string{"SrcAddr", "SrcPort", "DstAddr", "DstPort"}, enrichment.getCols()) + + enrichment.prev() + assert.Equal(t, []string{"Zone", "Host", "Owner", "Resource"}, enrichment.getNames()) + assert.Equal(t, []string{ + "SrcZone", + "DstZone", + "SrcHostName", + "SrcHostName", + "SrcOwnerName", + "SrcOwnerType", + "DstOwnerName", + "DstOwnerType", + "SrcName", + "SrcType", + "DstName", + "DstType", + }, enrichment.getCols()) + + enrichment.next() + assert.Equal(t, []string{"None"}, enrichment.getNames()) + assert.Equal(t, []string{"SrcAddr", "SrcPort", "DstAddr", "DstPort"}, enrichment.getCols()) + + enrichment.next() + assert.Equal(t, []string{"Zone"}, enrichment.getNames()) + assert.Equal(t, []string{"SrcZone", "DstZone"}, enrichment.getCols()) +} diff --git a/cmd/flow_capture.go b/cmd/flow_capture.go index a30ee1b1..49b08098 100644 --- a/cmd/flow_capture.go +++ b/cmd/flow_capture.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "regexp" - "slices" "sort" "strings" "sync" @@ -35,24 +34,43 @@ var ( regexes = []string{} lastFlows = []config.GenericMap{} - rawDisplay = "Raw" - standardDisplay = "Standard" - pktDropDisplay = "PktDrop" - dnsDisplay = "DNS" - rttDisplay = "RTT" - networkEventsDisplay = "NetworkEvents" - display = []string{pktDropDisplay, dnsDisplay, rttDisplay, networkEventsDisplay} - - noEnrichment = "None" - zoneEnrichment = "Zone" - hostEnrichment = "Host" - ownerEnrichment = "Owner" - resourceEnrichment = "Resource" - enrichement = []string{resourceEnrichment} + features = displayLoop{ + current: featureDefaultIndex, + all: []displayLoopItem{ + {name: "Raw"}, + {name: "Standard", columns: []string{"Dir", "Interfaces", "Proto", "Dscp", "Bytes", "Packets"}}, + {name: "PktDrop", columns: []string{"DropBytes", "DropPackets", "DropState", "DropCause"}}, + {name: "DNS", columns: []string{"DnsId", "DnsLatency", "DnsRCode", "DnsErrno"}}, + {name: "RTT", columns: []string{"RTT"}}, + {name: "NetworkEvents", columns: []string{"NetworkEvents"}}, + {group: []string{"PktDrop", "DNS", "RTT", "NetworkEvents"}}, + }, + } + + enrichment = displayLoop{ + current: enrichmentDefaultIndex, + all: []displayLoopItem{ + {name: "None", columns: []string{"SrcAddr", "SrcPort", "DstAddr", "DstPort"}}, + {name: "Zone", columns: []string{"SrcZone", "DstZone"}}, + {name: "Host", columns: []string{"SrcHostName", "SrcHostName"}}, + {name: "Owner", columns: []string{"SrcOwnerName", "SrcOwnerType", "DstOwnerName", "DstOwnerType"}}, + {name: "Resource", columns: []string{"SrcName", "SrcType", "DstName", "DstType"}}, + {group: []string{"Zone", "Host", "Owner", "Resource"}}, + }, + } +) + +const ( + featureDefaultIndex = 6 // All feats. + enrichmentDefaultIndex = 4 // Resources ) func runFlowCapture(_ *cobra.Command, _ []string) { - go scanner() + go func() { + scanner() + // scanner returns on exit request + os.Exit(0) + }() captureType = "Flow" wg := sync.WaitGroup{} @@ -60,20 +78,24 @@ func runFlowCapture(_ *cobra.Command, _ []string) { for i := range ports { go func(idx int) { defer wg.Done() - runFlowCaptureOnAddr(ports[idx], nodes[idx]) + err := runFlowCaptureOnAddr(ports[idx], nodes[idx]) + if err != nil { + // Only fatal errors are returned here + log.Fatal(err) + } }(i) } wg.Wait() } -func runFlowCaptureOnAddr(port int, filename string) { +func runFlowCaptureOnAddr(port int, filename string) error { if len(filename) > 0 { log.Infof("Starting Flow Capture for %s...", filename) } else { log.Infof("Starting Flow Capture...") - filename = strings.Replace( + filename = strings.ReplaceAll( currentTime().UTC().Format(time.RFC3339), - ":", "", -1) // get rid of offensive colons + ":", "") // get rid of offensive colons } var f *os.File @@ -99,8 +121,7 @@ func runFlowCaptureOnAddr(port int, filename string) { flowPackets := make(chan *genericmap.Flow, 100) collector, err := grpc.StartCollector(port, flowPackets) if err != nil { - log.Error("StartCollector failed:", err.Error()) - log.Fatal(err) + return fmt.Errorf("StartCollector failed: %w", err) } log.Trace("Started collector") @@ -121,7 +142,7 @@ func runFlowCaptureOnAddr(port int, filename string) { if stopReceived { log.Trace("Stop received") - return + return nil } // parse and display flow async go parseGenericMapAndDisplay(fp.GenericMap.Value) @@ -138,17 +159,17 @@ func runFlowCaptureOnAddr(port int, filename string) { // append new line between each record to read file easilly bytes, err := f.Write(append(fp.GenericMap.Value, []byte(",\n")...)) if err != nil { - log.Fatal(err) + return err } if !captureStarted { log.Trace("Wrote flows to json") } // terminate capture if max bytes reached - totalBytes = totalBytes + int64(bytes) + totalBytes += int64(bytes) if totalBytes > maxBytes { log.Infof("Capture reached %s, exiting now...", sizestr.ToString(maxBytes)) - return + return nil } // terminate capture if max time reached @@ -156,11 +177,12 @@ func runFlowCaptureOnAddr(port int, filename string) { duration := now.Sub(startupTime) if int(duration) > int(maxTime) { log.Infof("Capture reached %s, exiting now...", maxTime) - return + return nil } captureStarted = true } + return nil } func parseGenericMapAndDisplay(bytes []byte) { @@ -191,7 +213,7 @@ func manageFlowsDisplay(genericMap config.GenericMap) { if len(regexes) > 0 { // regexes may change during the render so we make a copy first rCopy := make([]string, len(regexes)) - copy(rCopy[:], regexes) + copy(rCopy, regexes) filtered := []config.GenericMap{} for _, flow := range lastFlows { match := true @@ -255,11 +277,11 @@ func updateTable() { fmt.Printf("Filters: %s\n", filter) } fmt.Printf("Showing last: %d Use Up / Down keyboard arrows to increase / decrease limit\n", flowsToShow) - fmt.Printf("Display: %s Use Left / Right keyboard arrows to cycle views\n", strings.Join(display, ",")) - fmt.Printf("Enrichment: %s Use Page Up / Page Down keyboard keys to cycle enrichment scopes\n", strings.Join(enrichement, ",")) + fmt.Printf("Display: %s Use Left / Right keyboard arrows to cycle views\n", strings.Join(features.getNames(), ",")) + fmt.Printf("Enrichment: %s Use Page Up / Page Down keyboard keys to cycle enrichment scopes\n", strings.Join(enrichment.getNames(), ",")) } - if slices.Contains(display, rawDisplay) { + if features.current == 0 { // Raw fmt.Print("Raw flow logs:\n") for _, flow := range lastFlows { fmt.Printf("%v\n", flow) @@ -276,87 +298,8 @@ func updateTable() { } // enrichment fields - if !slices.Contains(enrichement, noEnrichment) { - if slices.Contains(enrichement, zoneEnrichment) { - cols = append(cols, - "SrcZone", - "DstZone", - ) - } - - if slices.Contains(enrichement, hostEnrichment) { - cols = append(cols, - "SrcHostName", - "DstHostName", - ) - } - - if slices.Contains(enrichement, ownerEnrichment) { - cols = append(cols, - "SrcOwnerName", - "SrcOwnerType", - "DstOwnerName", - "DstOwnerType", - ) - } - - if slices.Contains(enrichement, resourceEnrichment) { - cols = append(cols, - "SrcName", - "SrcType", - "DstName", - "DstType", - ) - } - } else { - cols = append(cols, - "SrcAddr", - "SrcPort", - "DstAddr", - "DstPort", - ) - } - - // standard / feature fields - if !slices.Contains(display, standardDisplay) { - if slices.Contains(display, pktDropDisplay) { - cols = append(cols, - "DropBytes", - "DropPackets", - "DropState", - "DropCause", - ) - } - - if slices.Contains(display, dnsDisplay) { - cols = append(cols, - "DnsId", - "DnsLatency", - "DnsRCode", - "DnsErrno", - ) - } - - if slices.Contains(display, rttDisplay) { - cols = append(cols, - "RTT", - ) - } - if slices.Contains(display, networkEventsDisplay) { - cols = append(cols, - "NetworkEvents", - ) - } - } else { - cols = append(cols, - "Dir", - "Interfaces", - "Proto", - "Dscp", - "Bytes", - "Packets", - ) - } + cols = append(cols, enrichment.getCols()...) + cols = append(cols, features.getCols()...) colInterfaces := make([]interface{}, len(cols)) for i, c := range cols { @@ -411,82 +354,26 @@ func scanner() { if err != nil { panic(err) } - if key == keyboard.KeyCtrlC || stopReceived { + switch { + case key == keyboard.KeyCtrlC, stopReceived: log.Info("Ctrl-C pressed, exiting program.") - // exit program - os.Exit(0) - } else if key == keyboard.KeyArrowUp { - flowsToShow = flowsToShow + 1 - } else if key == keyboard.KeyArrowDown { + return + case key == keyboard.KeyArrowUp: + flowsToShow++ + case key == keyboard.KeyArrowDown: if flowsToShow > 10 { - flowsToShow = flowsToShow - 1 - } - } else if key == keyboard.KeyArrowRight { - if slices.Contains(display, pktDropDisplay) && slices.Contains(display, dnsDisplay) && - slices.Contains(display, rttDisplay) && slices.Contains(display, networkEventsDisplay) { - display = []string{rawDisplay} - } else if slices.Contains(display, rawDisplay) { - display = []string{standardDisplay} - } else if slices.Contains(display, standardDisplay) { - display = []string{pktDropDisplay} - } else if slices.Contains(display, pktDropDisplay) { - display = []string{dnsDisplay} - } else if slices.Contains(display, dnsDisplay) { - display = []string{networkEventsDisplay} - } else if slices.Contains(display, networkEventsDisplay) { - display = []string{rttDisplay} - } else if slices.Contains(display, rttDisplay) { - display = []string{rawDisplay} - } else { - display = []string{pktDropDisplay, dnsDisplay, rttDisplay, networkEventsDisplay} - } - } else if key == keyboard.KeyArrowLeft { - if slices.Contains(display, pktDropDisplay) && slices.Contains(display, dnsDisplay) && slices.Contains(display, rttDisplay) && - slices.Contains(display, networkEventsDisplay) { - display = []string{rttDisplay} - } else if slices.Contains(display, rttDisplay) { - display = []string{dnsDisplay} - } else if slices.Contains(display, dnsDisplay) { - display = []string{pktDropDisplay} - } else if slices.Contains(display, pktDropDisplay) { - display = []string{networkEventsDisplay} - } else if slices.Contains(display, networkEventsDisplay) { - display = []string{standardDisplay} - } else if slices.Contains(display, standardDisplay) { - display = []string{rawDisplay} - } else { - display = []string{pktDropDisplay, dnsDisplay, rttDisplay, networkEventsDisplay} + flowsToShow-- } - } else if key == keyboard.KeyPgup { - if slices.Contains(enrichement, zoneEnrichment) && slices.Contains(enrichement, hostEnrichment) && slices.Contains(enrichement, ownerEnrichment) { - enrichement = []string{noEnrichment} - } else if slices.Contains(enrichement, noEnrichment) { - enrichement = []string{resourceEnrichment} - } else if slices.Contains(enrichement, resourceEnrichment) { - enrichement = []string{ownerEnrichment} - } else if slices.Contains(enrichement, ownerEnrichment) { - enrichement = []string{hostEnrichment} - } else if slices.Contains(enrichement, hostEnrichment) { - enrichement = []string{zoneEnrichment} - } else { - enrichement = []string{zoneEnrichment, hostEnrichment, ownerEnrichment, resourceEnrichment} - } - } else if key == keyboard.KeyPgdn { - if slices.Contains(enrichement, zoneEnrichment) && slices.Contains(enrichement, hostEnrichment) && slices.Contains(enrichement, ownerEnrichment) { - enrichement = []string{zoneEnrichment} - } else if slices.Contains(enrichement, zoneEnrichment) { - enrichement = []string{hostEnrichment} - } else if slices.Contains(enrichement, hostEnrichment) { - enrichement = []string{ownerEnrichment} - } else if slices.Contains(enrichement, ownerEnrichment) { - enrichement = []string{resourceEnrichment} - } else if slices.Contains(enrichement, resourceEnrichment) { - enrichement = []string{noEnrichment} - } else { - enrichement = []string{zoneEnrichment, hostEnrichment, ownerEnrichment, resourceEnrichment} - } - } else if key == keyboard.KeyBackspace || key == keyboard.KeyBackspace2 { + case key == keyboard.KeyArrowRight: + features.next() + case key == keyboard.KeyArrowLeft: + features.prev() + case key == keyboard.KeyPgup: + enrichment.prev() + case key == keyboard.KeyPgdn: + enrichment.next() + case key == keyboard.KeyBackspace, key == keyboard.KeyBackspace2: if len(regexes) > 0 { lastIndex := len(regexes) - 1 if len(regexes[lastIndex]) > 0 { @@ -495,14 +382,14 @@ func scanner() { regexes = regexes[:lastIndex] } } - } else if key == keyboard.KeyEnter { + case key == keyboard.KeyEnter: regexes = append(regexes, "") - } else { + default: if len(regexes) == 0 { regexes = []string{string(char)} } else { lastIndex := len(regexes) - 1 - regexes[lastIndex] = regexes[lastIndex] + string(char) + regexes[lastIndex] += string(char) } } lastRefresh = startupTime diff --git a/cmd/flow_capture_test.go b/cmd/flow_capture_test.go index 4a2e64a4..33f2ce9a 100644 --- a/cmd/flow_capture_test.go +++ b/cmd/flow_capture_test.go @@ -53,8 +53,8 @@ func TestFlowTableMultipleFlows(t *testing.T) { setOutputBuffer(&buf) // set display to standard without enrichment - display = []string{standardDisplay} - enrichement = []string{noEnrichment} + features.current = 1 + enrichment.current = 0 // set time and bytes per flow flowTime := 1704063600000 @@ -68,8 +68,8 @@ func TestFlowTableMultipleFlows(t *testing.T) { buf.Reset() // update time and bytes for next flow - flowTime = flowTime + 1000 - bytes = bytes + 1000 + flowTime += 1000 + bytes += 1000 // add flow to table parseGenericMapAndDisplay([]byte(fmt.Sprintf(`{ @@ -108,11 +108,7 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { setOutputBuffer(&buf) // getRows function cleanup everything and redraw table with sample flow - getRows := func(d []string, e []string) []string { - // prepare display options - display = d - enrichement = e - + getRows := func() []string { // clear filters and previous flows regexes = []string{} lastFlows = []config.GenericMap{} @@ -126,17 +122,13 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { return strings.Split(buf.String(), "\n") } - // set display without enrichment - rows := getRows([]string{pktDropDisplay, dnsDisplay, rttDisplay, networkEventsDisplay}, []string{noEnrichment}) - assert.Equal(t, 4, len(rows)) - assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort DropBytes DropPackets DropState DropCause DnsId DnsLatency DnsRCode DnsErrno RTT NetworkEvents `, rows[0]) - assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 32B 1 TCP_INVALID_STATE SKB_DROP_REASON_TCP_INVALID_SEQUENCE 31319 1ms NoError 0 10µs hello `, rows[1]) - assert.Equal(t, `---------------- ---------------------------------------- ------ ---------------------------------------- ------ ------------ ------------ -------------------- ---------------------------------------- ------ ------ ------ ------ ------ ---------------- `, rows[2]) - assert.Empty(t, rows[3]) + // Reset display loops + enrichment.current = 0 + features.current = 0 // set display to standard - rows = getRows([]string{standardDisplay}, []string{noEnrichment}) - + features.next() + rows := getRows() assert.Equal(t, 4, len(rows)) assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort Dir Interfaces Proto Dscp Bytes Packets `, rows[0]) assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 Ingress f18b970c2ce8fdd TCP Standard 456B 5 `, rows[1]) @@ -144,8 +136,8 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { assert.Empty(t, rows[3]) // set display to pktDrop - rows = getRows([]string{pktDropDisplay}, []string{noEnrichment}) - + features.next() + rows = getRows() assert.Equal(t, 4, len(rows)) assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort DropBytes DropPackets DropState DropCause `, rows[0]) assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 32B 1 TCP_INVALID_STATE SKB_DROP_REASON_TCP_INVALID_SEQUENCE `, rows[1]) @@ -153,8 +145,8 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { assert.Empty(t, rows[3]) // set display to DNS - rows = getRows([]string{dnsDisplay}, []string{noEnrichment}) - + features.next() + rows = getRows() assert.Equal(t, 4, len(rows)) assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort DnsId DnsLatency DnsRCode DnsErrno `, rows[0]) assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 31319 1ms NoError 0 `, rows[1]) @@ -162,8 +154,8 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { assert.Empty(t, rows[3]) // set display to RTT - rows = getRows([]string{rttDisplay}, []string{noEnrichment}) - + features.next() + rows = getRows() assert.Equal(t, 4, len(rows)) assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort RTT `, rows[0]) assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 10µs `, rows[1]) @@ -171,10 +163,20 @@ func TestFlowTableAdvancedDisplay(t *testing.T) { assert.Empty(t, rows[3]) // set display to NetworkEvents - rows = getRows([]string{networkEventsDisplay}, []string{noEnrichment}) + features.next() + rows = getRows() assert.Equal(t, 4, len(rows)) assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort NetworkEvents `, rows[0]) assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 hello `, rows[1]) assert.Equal(t, `---------------- ---------------------------------------- ------ ---------------------------------------- ------ ---------------- `, rows[2]) assert.Empty(t, rows[3]) + + // set display to all + features.next() + rows = getRows() + assert.Equal(t, 4, len(rows)) + assert.Equal(t, `Time SrcAddr SrcPort DstAddr DstPort DropBytes DropPackets DropState DropCause DnsId DnsLatency DnsRCode DnsErrno RTT NetworkEvents `, rows[0]) + assert.Equal(t, `17:25:28.703000 10.128.0.29 1234 10.129.0.26 5678 32B 1 TCP_INVALID_STATE SKB_DROP_REASON_TCP_INVALID_SEQUENCE 31319 1ms NoError 0 10µs hello `, rows[1]) + assert.Equal(t, `---------------- ---------------------------------------- ------ ---------------------------------------- ------ ------------ ------------ -------------------- ---------------------------------------- ------ ------ ------ ------ ------ ---------------- `, rows[2]) + assert.Empty(t, rows[3]) } diff --git a/cmd/flow_db.go b/cmd/flow_db.go index 3e28fe9b..b5760a81 100644 --- a/cmd/flow_db.go +++ b/cmd/flow_db.go @@ -96,16 +96,17 @@ func insertFlowToDB(db *sql.DB, buf []byte) error { } // Insert message into database var flowSQL string - if flow["PktDropPackets"] != 0 && flow["DnsId"] != 0 { + switch { + case flow["PktDropPackets"] != 0 && flow["DnsId"] != 0: flowSQL = `INSERT INTO flow(DnsErrno, Dscp, DstAddr, DstPort, Interface, Proto, SrcAddr, SrcPort, Bytes, Packets, PktDropLatestDropCause, PktDropBytes, PktDropPackets, DnsId, DnsFlagsResponseCode, DnsLatencyMs, TimeFlowRttNs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - } else if flow["PktDropPackets"] != 0 { + case flow["PktDropPackets"] != 0: flowSQL = `INSERT INTO flow(DnsErrno, Dscp, DstAddr, DstPort, Interface, Proto, SrcAddr, SrcPort, Bytes, Packets, PktDropLatestDropCause, PktDropBytes, PktDropPackets, TimeFlowRttNs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - } else if flow["DnsId"] != 0 { + case flow["DnsId"] != 0: flowSQL = `INSERT INTO flow(DnsErrno, Dscp, DstAddr, DstPort, Interface, Proto, SrcAddr, SrcPort, Bytes, Packets, DnsId, DnsFlagsResponseCode, DnsLatencyMs, TimeFlowRttNs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - } else { + default: flowSQL = `INSERT INTO flow(DnsErrno, Dscp, DstAddr, DstPort, Interface, Proto, SrcAddr, SrcPort, Bytes, Packets, TimeFlowRttNs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` } @@ -116,26 +117,27 @@ func insertFlowToDB(db *sql.DB, buf []byte) error { return fmt.Errorf("error preparing SQL: %v", err.Error()) } - if flow["PktDropLatestDropCause"] != 0 && flow["DnsId"] != 0 { + switch { + case flow["PktDropLatestDropCause"] != 0 && flow["DnsId"] != 0: _, err = statement.Exec( flow["DNSErrno"], flow["Dscp"], flow["DstAddr"], flow["DstPort"], flow["Interface"], flow["Proto"], flow["SrcAddr"], flow["SrcPort"], flow["Bytes"], flow["Packets"], flow["PktDropLatestDropCause"], flow["PktDropBytes"], flow["PktDropPackets"], flow["DnsId"], flow["DnsFlagsResponseCode"], flow["DnsLatencyMs"], flow["TimeFlowRttNs"]) - } else if flow["PktDropLatestDropCause"] != 0 { + case flow["PktDropLatestDropCause"] != 0: _, err = statement.Exec( flow["DNSErrno"], flow["Dscp"], flow["DstAddr"], flow["DstPort"], flow["Interface"], flow["Proto"], flow["SrcAddr"], flow["SrcPort"], flow["Bytes"], flow["Packets"], flow["PktDropLatestDropCause"], flow["PktDropBytes"], flow["PktDropPackets"], flow["TimeFlowRttNs"]) - } else if flow["DnsId"] != 0 { + case flow["DnsId"] != 0: _, err = statement.Exec( flow["DNSErrno"], flow["Dscp"], flow["DstAddr"], flow["DstPort"], flow["Interface"], flow["Proto"], flow["SrcAddr"], flow["SrcPort"], flow["Bytes"], flow["Packets"], flow["DnsId"], flow["DnsFlagsResponseCode"], flow["DnsLatencyMs"], flow["TimeFlowRttNs"]) - } else { + default: _, err = statement.Exec( flow["DNSErrno"], flow["Dscp"], flow["DstAddr"], flow["DstPort"], flow["Interface"], flow["Proto"], flow["SrcAddr"], flow["SrcPort"], flow["Bytes"], flow["Packets"], diff --git a/cmd/packet_capture.go b/cmd/packet_capture.go index 706fd3e5..0310096f 100644 --- a/cmd/packet_capture.go +++ b/cmd/packet_capture.go @@ -36,20 +36,24 @@ func runPacketCapture(_ *cobra.Command, _ []string) { for i := range ports { go func(idx int) { defer wg.Done() - runPacketCaptureOnAddr(ports[idx], nodes[idx]) + err := runPacketCaptureOnAddr(ports[idx], nodes[idx]) + if err != nil { + // Only fatal error are returned + log.Fatal(err) + } }(i) } wg.Wait() } -func runPacketCaptureOnAddr(port int, filename string) { +func runPacketCaptureOnAddr(port int, filename string) error { if len(filename) > 0 { log.Infof("Starting Packet Capture for %s...", filename) } else { log.Infof("Starting Packet Capture...") - filename = strings.Replace( + filename = strings.ReplaceAll( currentTime().UTC().Format(time.RFC3339), - ":", "", -1) // get rid of offensive colons + ":", "") // get rid of offensive colons } var f *os.File @@ -82,8 +86,7 @@ func runPacketCaptureOnAddr(port int, filename string) { flowPackets := make(chan *genericmap.Flow, 100) collector, err := grpc.StartCollector(port, flowPackets) if err != nil { - log.Error("StartCollector failed:", err.Error()) - log.Fatal(err) + return fmt.Errorf("StartCollector failed: %w", err) } log.Trace("Started collector") @@ -103,14 +106,14 @@ func runPacketCaptureOnAddr(port int, filename string) { if stopReceived { log.Trace("Stop received") - return + return nil } genericMap := config.GenericMap{} err := json.Unmarshal(fp.GenericMap.Value, &genericMap) if err != nil { log.Error("Error while parsing json", err) - return + return nil } if !captureStarted { log.Tracef("Parsed genericMap %v", genericMap) @@ -134,7 +137,7 @@ func runPacketCaptureOnAddr(port int, filename string) { b, err := base64.StdEncoding.DecodeString(data.(string)) if err != nil { log.Error("Error while decoding data", err) - return + return nil } // write enriched data as interface @@ -143,7 +146,7 @@ func runPacketCaptureOnAddr(port int, filename string) { // then append packet to file using totalPackets as unique id err = pw.WriteEnhancedPacketBlock(totalPackets, ts, b, types.EnhancedPacketOptions{}) if err != nil { - log.Fatal(err) + return err } } else { if !captureStarted { @@ -155,23 +158,24 @@ func runPacketCaptureOnAddr(port int, filename string) { } // terminate capture if max bytes reached - totalBytes = totalBytes + int64(len(fp.GenericMap.Value)) + totalBytes += int64(len(fp.GenericMap.Value)) if totalBytes > maxBytes { log.Infof("Capture reached %s, exiting now...", sizestr.ToString(maxBytes)) - return + return nil } - totalPackets = totalPackets + 1 + totalPackets++ // terminate capture if max time reached now := currentTime() duration := now.Sub(startupTime) if int(duration) > int(maxTime) { log.Infof("Capture reached %s, exiting now...", maxTime) - return + return nil } captureStarted = true } + return nil } func writeEnrichedData(pw *pcapng.FileWriter, genericMap *config.GenericMap) { diff --git a/cmd/root.go b/cmd/root.go index c0990de0..0d6f3f10 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -28,9 +28,7 @@ var ( maxTime time.Duration maxBytes int64 - currentTime = func() time.Time { - return time.Now() - } + currentTime = time.Now startupTime = currentTime() lastRefresh = startupTime totalBytes = int64(0) @@ -49,7 +47,7 @@ var ( Use: "network-observability-cli", Short: "network-observability-cli is an interactive Flow and Packet visualizer", Long: `An interactive Flow / PCAP collector and visualization tool`, - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, _ []string) { }, } diff --git a/cmd/root_test.go b/cmd/root_test.go index 7164c8fe..a2a067b3 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -83,6 +83,8 @@ func setup() { // clear filters and previous flows regexes = []string{} lastFlows = []config.GenericMap{} + features.current = featureDefaultIndex + enrichment.current = enrichmentDefaultIndex } func resetTime() { diff --git a/e2e/cluster/kind.go b/e2e/cluster/kind.go index 56d732ca..227722ec 100644 --- a/e2e/cluster/kind.go +++ b/e2e/cluster/kind.go @@ -85,14 +85,14 @@ func (k *Kind) GetLogsDir() string { // export logs into the e2e-logs folder of the base directory. func (k *Kind) exportLogs() env.Func { - return func(ctx context.Context, config *envconf.Config) (context.Context, error) { + return func(ctx context.Context, _ *envconf.Config) (context.Context, error) { logsDir := k.GetLogsDir() klog.WithField("directory", logsDir).Info("exporting cluster logs") exe := gexe.New() out := exe.Run("kind export logs " + logsDir + " --name " + k.clusterName) klog.WithField("out", out).Info("exported cluster logs") - //move output files to cluster logs folder + // move output files to cluster logs folder err := os.Rename(path.Join(k.baseDir, "e2e", "tmp"), path.Join(logsDir, "output")) if err != nil { klog.Error(err) @@ -112,7 +112,7 @@ func (k *Kind) GetAgentLogs() string { // delete netobserv-cli namespace func (k *Kind) deleteNamespace() env.Func { - return func(ctx context.Context, config *envconf.Config) (context.Context, error) { + return func(ctx context.Context, _ *envconf.Config) (context.Context, error) { exe := gexe.New() out := exe.Run("kubectl delete namespace netobserv-cli") klog.WithField("out", out).Info("deleted namespace")