Skip to content

Commit da0b747

Browse files
committed
fix background and e2e
1 parent 0ccd7fe commit da0b747

File tree

4 files changed

+73
-56
lines changed

4 files changed

+73
-56
lines changed

cmd/flow_capture.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,47 +44,47 @@ func startFlowCollector() {
4444
log.Errorf("Create directory failed: %v", err.Error())
4545
log.Fatal(err)
4646
}
47-
log.Trace("Created flow folder")
47+
log.Debug("Created flow folder")
4848

4949
f, err = os.Create("./output/flow/" + filename + ".txt")
5050
if err != nil {
5151
log.Errorf("Create file %s failed: %v", filename, err.Error())
5252
log.Fatal(err)
5353
}
5454
defer f.Close()
55-
log.Trace("Created flow logs txt file")
55+
log.Debug("Created flow logs txt file")
5656

5757
// Initialize sqlite DB
5858
db := initFlowDB(filename)
59-
log.Trace("Initialized database")
59+
log.Debug("Initialized database")
6060

6161
flowPackets := make(chan *genericmap.Flow, 100)
6262
collector, err := grpc.StartCollector(port, flowPackets)
6363
if err != nil {
6464
log.Errorf("StartCollector failed: %v", err.Error())
6565
return
6666
}
67-
log.Trace("Started collector")
67+
log.Debug("Started collector")
6868
collectorStarted = true
6969

7070
go func() {
7171
<-utils.ExitChannel()
72-
log.Trace("Ending collector")
72+
log.Debug("Ending collector")
7373
close(flowPackets)
7474
collector.Close()
7575
db.Close()
76-
log.Trace("Done")
76+
log.Debug("Done")
7777
}()
7878

79-
log.Trace("Ready ! Waiting for flows...")
79+
log.Debug("Ready ! Waiting for flows...")
8080
go hearbeat()
8181
for fp := range flowPackets {
8282
if !captureStarted {
83-
log.Tracef("Received first %d flows", len(flowPackets))
83+
log.Debugf("Received first %d flows", len(flowPackets))
8484
}
8585

8686
if stopReceived {
87-
log.Trace("Stop received")
87+
log.Debug("Stop received")
8888
return
8989
}
9090
// parse and display flow async
@@ -96,7 +96,7 @@ func startFlowCollector() {
9696
log.Error("Error while writing to DB:", err.Error())
9797
}
9898
if !captureStarted {
99-
log.Trace("Wrote flows to DB")
99+
log.Debug("Wrote flows to DB")
100100
}
101101

102102
// append new line between each record to read file easilly
@@ -106,7 +106,7 @@ func startFlowCollector() {
106106
return
107107
}
108108
if !captureStarted {
109-
log.Trace("Wrote flows to json")
109+
log.Debug("Wrote flows to json")
110110
}
111111

112112
// terminate capture if max bytes reached
@@ -141,7 +141,7 @@ func parseGenericMapAndAppendFlow(bytes []byte) {
141141
}
142142

143143
if !captureStarted {
144-
log.Tracef("Parsed genericMap %v", genericMap)
144+
log.Debugf("Parsed genericMap %v", genericMap)
145145
}
146146
AppendFlow(genericMap)
147147
}

cmd/flow_display.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package cmd
22

33
import (
44
"fmt"
5+
"os"
6+
"os/signal"
57
"regexp"
68
"slices"
79
"sort"
810
"strings"
11+
"syscall"
912
"time"
1013

1114
"github.com/jpillora/sizestr"
@@ -46,6 +49,8 @@ var (
4649
cols: []string{},
4750
flows: []config.GenericMap{},
4851
}
52+
53+
uiError error
4954
)
5055

5156
func createDisplay() {
@@ -66,10 +71,14 @@ func createDisplay() {
6671
SetRoot(getMain(), true).
6772
EnableMouse(true)
6873

69-
go hearbeat()
70-
71-
if err := app.Run(); err != nil {
72-
panic(err)
74+
uiError = app.Run()
75+
if uiError == nil {
76+
go hearbeat()
77+
} else {
78+
fmt.Printf("Can't display advanced UI: %v", uiError)
79+
done := make(chan os.Signal, 1)
80+
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
81+
<-done
7382
}
7483
}
7584

@@ -87,12 +96,8 @@ func getTop() tview.Primitive {
8796
// info row
8897
fpsText := tview.NewTextView().SetText(getFPSText())
8998
infoRow := tview.NewFlex().SetDirection(tview.FlexColumn).
90-
AddItem(tview.NewTextView().SetText(
91-
fmt.Sprintf("Running network-observability-cli as %s Capture", captureType),
92-
), 0, 1, false).
93-
AddItem(tview.NewTextView().SetText(
94-
fmt.Sprintf("Log level: %s", logLevel),
95-
), 0, 1, false).
99+
AddItem(tview.NewTextView().SetText(getCaptureTypeText()), 0, 1, false).
100+
AddItem(tview.NewTextView().SetText(getLogLevelText()), 0, 1, false).
96101
AddItem(durationText.SetText(getDurationText()), 0, 1, false).
97102
AddItem(sizeText.SetText(getSizeText()), 0, 1, false).
98103
AddItem(fpsText, 0, 1, false).
@@ -245,6 +250,14 @@ func getBottom() tview.Primitive {
245250
return flexView
246251
}
247252

253+
func getCaptureTypeText() string {
254+
return fmt.Sprintf("%s Capture", captureType)
255+
}
256+
257+
func getLogLevelText() string {
258+
return fmt.Sprintf("Log level: %s", logLevel)
259+
}
260+
248261
func getEnrichmentText() string {
249262
if display.getCurrentItem().name == rawDisplay {
250263
return "Enrichment: n/a\n"
@@ -281,35 +294,42 @@ func getRegexesText() string {
281294
}
282295

283296
func AppendFlow(genericMap config.GenericMap) {
284-
// lock since we are updating lastFlows concurrently
285-
mutex.Lock()
297+
if uiError != nil {
298+
// simply print flow into logs
299+
log.Printf("%v\n", genericMap)
300+
} else {
301+
// lock since we are updating lastFlows concurrently
302+
mutex.Lock()
303+
304+
// add new flow to the array
305+
lastFlows = append(lastFlows, genericMap)
286306

287-
// add new flow to the array
288-
lastFlows = append(lastFlows, genericMap)
307+
// sort flows according to time
308+
sort.Slice(lastFlows, func(i, j int) bool {
309+
if captureType == "Flow" {
310+
return toFloat64(lastFlows[i], "TimeFlowEndMs") < toFloat64(lastFlows[j], "TimeFlowEndMs")
311+
}
312+
return toFloat64(lastFlows[i], "Time") < toFloat64(lastFlows[j], "Time")
313+
})
289314

290-
// sort flows according to time
291-
sort.Slice(lastFlows, func(i, j int) bool {
292-
if captureType == "Flow" {
293-
return toFloat64(lastFlows[i], "TimeFlowEndMs") < toFloat64(lastFlows[j], "TimeFlowEndMs")
315+
// limit flows kept in memory
316+
if len(lastFlows) > keepCount {
317+
lastFlows = lastFlows[len(lastFlows)-keepCount:]
294318
}
295-
return toFloat64(lastFlows[i], "Time") < toFloat64(lastFlows[j], "Time")
296-
})
297319

298-
// limit flows kept in memory
299-
if len(lastFlows) > keepCount {
300-
lastFlows = lastFlows[len(lastFlows)-keepCount:]
320+
mutex.Unlock()
301321
}
302-
303-
mutex.Unlock()
304322
}
305323

306324
func hearbeat() {
307325
for {
308326
if captureEnded {
309327
return
310328
}
329+
311330
updateStatusTexts()
312331
updateTable()
332+
313333
time.Sleep(time.Second / time.Duration(framesPerSecond))
314334
}
315335
}

cmd/packet_capture.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ func startPacketCollector() {
4848
log.Errorf("Create directory failed: %v", err.Error())
4949
log.Fatal(err)
5050
}
51-
log.Trace("Created pcap folder")
51+
log.Debug("Created pcap folder")
5252

5353
pw, err := pcapng.NewFileWriter("./output/pcap/" + filename + ".pcapng")
5454
if err != nil {
5555
log.Errorf("Create file %s failed: %v", filename, err.Error())
5656
log.Fatal(err)
5757
}
58-
log.Trace("Created pcapng file")
58+
log.Debug("Created pcapng file")
5959

6060
// write pcap file header
6161
so := types.SectionHeaderOptions{
@@ -67,34 +67,34 @@ func startPacketCollector() {
6767
log.Fatal(err)
6868
}
6969
defer f.Close()
70-
log.Trace("Wrote pcap section header")
70+
log.Debug("Wrote pcap section header")
7171

7272
flowPackets := make(chan *genericmap.Flow, 100)
7373
collector, err := grpc.StartCollector(port, flowPackets)
7474
if err != nil {
7575
log.Errorf("StartCollector failed: %v", err.Error())
7676
return
7777
}
78-
log.Trace("Started collector")
78+
log.Debug("Started collector")
7979
collectorStarted = true
8080

8181
go func() {
8282
<-utils.ExitChannel()
83-
log.Trace("Ending collector")
83+
log.Debug("Ending collector")
8484
close(flowPackets)
8585
collector.Close()
86-
log.Trace("Done")
86+
log.Debug("Done")
8787
}()
8888

89-
log.Trace("Ready ! Waiting for packets...")
89+
log.Debug("Ready ! Waiting for packets...")
9090
go hearbeat()
9191
for fp := range flowPackets {
9292
if !captureStarted {
93-
log.Tracef("Received first %d packets", len(flowPackets))
93+
log.Debugf("Received first %d packets", len(flowPackets))
9494
}
9595

9696
if stopReceived {
97-
log.Trace("Stop received")
97+
log.Debug("Stop received")
9898
return
9999
}
100100

@@ -105,15 +105,15 @@ func startPacketCollector() {
105105
return
106106
}
107107
if !captureStarted {
108-
log.Tracef("Parsed genericMap %v", genericMap)
108+
log.Debugf("Parsed genericMap %v", genericMap)
109109
}
110110

111111
data, ok := genericMap["Data"]
112112
if ok {
113113
// clear generic map data
114114
delete(genericMap, "Data")
115115
if !captureStarted {
116-
log.Trace("Deleted data")
116+
log.Debug("Deleted data")
117117
}
118118

119119
// display as flow async
@@ -140,7 +140,7 @@ func startPacketCollector() {
140140
}
141141
} else {
142142
if !captureStarted {
143-
log.Trace("Data is missing")
143+
log.Debug("Data is missing")
144144
}
145145

146146
// display as flow async

e2e/capture_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,8 @@ func TestFlowCapture(t *testing.T) {
6868
assert.Contains(t, str, "pod/collector created")
6969
assert.Contains(t, str, "pod/collector condition met")
7070
// check that CLI is running
71-
assert.Contains(t, str, "Running network-observability-cli as Flow Capture")
72-
assert.Contains(t, str, "Time")
73-
assert.Contains(t, str, "Src Name")
74-
assert.Contains(t, str, "Src Namespace")
75-
assert.Contains(t, str, "Dst Name")
76-
assert.Contains(t, str, "Dst Namespace")
71+
assert.Contains(t, str, "Starting Flow Capture...")
72+
assert.Contains(t, str, "Started collector")
7773
// check that script terminated
7874
assert.Contains(t, str, "command terminated")
7975
return ctx
@@ -146,7 +142,8 @@ func TestPacketCapture(t *testing.T) {
146142
assert.Contains(t, str, "pod/collector created")
147143
assert.Contains(t, str, "pod/collector condition met")
148144
// check that CLI is running
149-
assert.Contains(t, str, "Running network-observability-cli as Packet Capture")
145+
assert.Contains(t, str, "Starting Packet Capture...")
146+
assert.Contains(t, str, "Started collector")
150147
// check that script terminated
151148
assert.Contains(t, str, "command terminated")
152149
return ctx

0 commit comments

Comments
 (0)