@@ -2,10 +2,8 @@ package cmd
22
33import (
44 "encoding/json"
5- "fmt"
65 "os"
76 "strings"
8- "sync"
97 "time"
108
119 "github.com/jpillora/sizestr"
@@ -25,25 +23,12 @@ var flowCmd = &cobra.Command{
2523}
2624
2725func runFlowCapture (_ * cobra.Command , _ []string ) {
28- go scanner ()
29-
3026 captureType = "Flow"
31- wg := sync.WaitGroup {}
32- wg .Add (len (ports ))
33- for i := range ports {
34- go func (idx int ) {
35- defer wg .Done ()
36- err := runFlowCaptureOnAddr (ports [idx ], nodes [idx ])
37- if err != nil {
38- // Only fatal errors are returned here
39- log .Fatal (err )
40- }
41- }(i )
42- }
43- wg .Wait ()
27+ go startFlowCollector ()
28+ createDisplay ()
4429}
4530
46- func runFlowCaptureOnAddr ( port int , filename string ) error {
31+ func startFlowCollector () {
4732 if len (filename ) > 0 {
4833 log .Infof ("Starting Flow Capture for %s..." , filename )
4934 } else {
@@ -59,47 +44,48 @@ func runFlowCaptureOnAddr(port int, filename string) error {
5944 log .Errorf ("Create directory failed: %v" , err .Error ())
6045 log .Fatal (err )
6146 }
62- log .Trace ("Created flow folder" )
47+ log .Debug ("Created flow folder" )
6348
6449 f , err = os .Create ("./output/flow/" + filename + ".txt" )
6550 if err != nil {
6651 log .Errorf ("Create file %s failed: %v" , filename , err .Error ())
6752 log .Fatal (err )
6853 }
6954 defer f .Close ()
70- log .Trace ("Created flow logs txt file" )
55+ log .Debug ("Created flow logs txt file" )
7156
7257 // Initialize sqlite DB
7358 db := initFlowDB (filename )
74- log .Trace ("Initialized database" )
59+ log .Debug ("Initialized database" )
7560
7661 flowPackets := make (chan * genericmap.Flow , 100 )
7762 collector , err := grpc .StartCollector (port , flowPackets )
7863 if err != nil {
79- return fmt .Errorf ("StartCollector failed: %w" , err )
64+ log .Errorf ("StartCollector failed: %v" , err .Error ())
65+ return
8066 }
81- log .Trace ("Started collector" )
67+ log .Debug ("Started collector" )
8268 collectorStarted = true
8369
8470 go func () {
8571 <- utils .ExitChannel ()
86- log .Trace ("Ending collector" )
72+ log .Debug ("Ending collector" )
8773 close (flowPackets )
8874 collector .Close ()
8975 db .Close ()
90- log .Trace ("Done" )
76+ log .Debug ("Done" )
9177 }()
9278
93- log .Trace ("Ready ! Waiting for flows..." )
79+ log .Debug ("Ready ! Waiting for flows..." )
9480 go hearbeat ()
9581 for fp := range flowPackets {
9682 if ! captureStarted {
97- log .Tracef ("Received first %d flows" , len (flowPackets ))
83+ log .Debugf ("Received first %d flows" , len (flowPackets ))
9884 }
9985
10086 if stopReceived {
101- log .Trace ("Stop received" )
102- return nil
87+ log .Debug ("Stop received" )
88+ return
10389 }
10490 // parse and display flow async
10591 go parseGenericMapAndAppendFlow (fp .GenericMap .Value )
@@ -110,24 +96,25 @@ func runFlowCaptureOnAddr(port int, filename string) error {
11096 log .Error ("Error while writing to DB:" , err .Error ())
11197 }
11298 if ! captureStarted {
113- log .Trace ("Wrote flows to DB" )
99+ log .Debug ("Wrote flows to DB" )
114100 }
115101
116102 // append new line between each record to read file easilly
117103 bytes , err := f .Write (append (fp .GenericMap .Value , []byte (",\n " )... ))
118104 if err != nil {
119- return err
105+ log .Error (err )
106+ return
120107 }
121108 if ! captureStarted {
122- log .Trace ("Wrote flows to json" )
109+ log .Debug ("Wrote flows to json" )
123110 }
124111
125112 // terminate capture if max bytes reached
126113 totalBytes += int64 (bytes )
127114 if totalBytes > maxBytes {
128115 if exit := onLimitReached (); exit {
129116 log .Infof ("Capture reached %s, exiting now..." , sizestr .ToString (maxBytes ))
130- return nil
117+ return
131118 }
132119 }
133120
@@ -137,13 +124,12 @@ func runFlowCaptureOnAddr(port int, filename string) error {
137124 if int (duration ) > int (maxTime ) {
138125 if exit := onLimitReached (); exit {
139126 log .Infof ("Capture reached %s, exiting now..." , maxTime )
140- return nil
127+ return
141128 }
142129 }
143130
144131 captureStarted = true
145132 }
146- return nil
147133}
148134
149135func parseGenericMapAndAppendFlow (bytes []byte ) {
@@ -155,7 +141,7 @@ func parseGenericMapAndAppendFlow(bytes []byte) {
155141 }
156142
157143 if ! captureStarted {
158- log .Tracef ("Parsed genericMap %v" , genericMap )
144+ log .Debugf ("Parsed genericMap %v" , genericMap )
159145 }
160146 AppendFlow (genericMap )
161147}
0 commit comments