44 "encoding/json"
55 "fmt"
66 "os"
7- "regexp"
8- "slices"
9- "sort"
107 "strings"
118 "sync"
129 "time"
@@ -17,9 +14,6 @@ import (
1714 "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc"
1815 "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
1916
20- "github.com/eiannone/keyboard"
21- "github.com/fatih/color"
22- "github.com/rodaine/table"
2317 "github.com/spf13/cobra"
2418)
2519
@@ -30,12 +24,6 @@ var flowCmd = &cobra.Command{
3024 Run : runFlowCapture ,
3125}
3226
33- var (
34- flowsToShow = 35
35- regexes = []string {}
36- lastFlows = []config.GenericMap {}
37- )
38-
3927func runFlowCapture (_ * cobra.Command , _ []string ) {
4028 go func () {
4129 if ! scanner () {
@@ -109,6 +97,7 @@ func runFlowCaptureOnAddr(port int, filename string) error {
10997 }()
11098
11199 log .Trace ("Ready ! Waiting for flows..." )
100+ go hearbeat ()
112101 for fp := range flowPackets {
113102 if ! captureStarted {
114103 log .Tracef ("Received first %d flows" , len (flowPackets ))
@@ -119,7 +108,7 @@ func runFlowCaptureOnAddr(port int, filename string) error {
119108 return nil
120109 }
121110 // parse and display flow async
122- go parseGenericMapAndDisplay (fp .GenericMap .Value )
111+ go parseGenericMapAndAppendFlow (fp .GenericMap .Value )
123112
124113 // Write flows to sqlite DB
125114 err = queryFlowDB (fp .GenericMap .Value , db )
@@ -163,7 +152,7 @@ func runFlowCaptureOnAddr(port int, filename string) error {
163152 return nil
164153}
165154
166- func parseGenericMapAndDisplay (bytes []byte ) {
155+ func parseGenericMapAndAppendFlow (bytes []byte ) {
167156 genericMap := config.GenericMap {}
168157 err := json .Unmarshal (bytes , & genericMap )
169158 if err != nil {
@@ -174,222 +163,5 @@ func parseGenericMapAndDisplay(bytes []byte) {
174163 if ! captureStarted {
175164 log .Tracef ("Parsed genericMap %v" , genericMap )
176165 }
177- manageFlowsDisplay (genericMap )
178- }
179-
180- func manageFlowsDisplay (genericMap config.GenericMap ) {
181- // lock since we are updating lastFlows concurrently
182- mutex .Lock ()
183-
184- lastFlows = append (lastFlows , genericMap )
185- sort .Slice (lastFlows , func (i , j int ) bool {
186- if captureType == "Flow" {
187- return toFloat64 (lastFlows [i ], "TimeFlowEndMs" ) < toFloat64 (lastFlows [j ], "TimeFlowEndMs" )
188- }
189- return toFloat64 (lastFlows [i ], "Time" ) < toFloat64 (lastFlows [j ], "Time" )
190- })
191- if len (regexes ) > 0 {
192- // regexes may change during the render so we make a copy first
193- rCopy := make ([]string , len (regexes ))
194- copy (rCopy , regexes )
195- filtered := []config.GenericMap {}
196- for _ , flow := range lastFlows {
197- match := true
198- for i := range rCopy {
199- ok , _ := regexp .MatchString (rCopy [i ], fmt .Sprintf ("%v" , flow ))
200- match = match && ok
201- if ! match {
202- break
203- }
204- }
205- if match {
206- filtered = append (filtered , flow )
207- }
208- }
209- lastFlows = filtered
210- }
211- if len (lastFlows ) > flowsToShow {
212- lastFlows = lastFlows [len (lastFlows )- flowsToShow :]
213- }
214- updateTable ()
215-
216- // unlock
217- mutex .Unlock ()
218- }
219-
220- func updateTable () {
221- // don't refresh terminal too often to avoid blinking
222- now := currentTime ()
223- if ! captureEnded && int (now .Sub (lastRefresh )) > int (maxRefreshRate ) {
224- lastRefresh = now
225- resetTerminal ()
226-
227- duration := now .Sub (startupTime )
228- if outputBuffer == nil {
229- fmt .Printf ("Running network-observability-cli as %s Capture\n " , captureType )
230- fmt .Printf ("Log level: %s " , logLevel )
231- fmt .Printf ("Duration: %s " , duration .Round (time .Second ))
232- fmt .Printf ("Capture size: %s\n " , sizestr .ToString (totalBytes ))
233- if len (strings .TrimSpace (options )) > 0 {
234- fmt .Printf ("Options: %s\n " , options )
235- }
236- if strings .Contains (options , "background=true" ) {
237- fmt .Printf ("Showing last: %d\n " , flowsToShow )
238- fmt .Printf ("Display: %s\n " , display .getCurrentItem ().name )
239- fmt .Printf ("Enrichment: %s\n " , enrichment .getCurrentItem ().name )
240- } else {
241- fmt .Printf ("Showing last: %d Use Up / Down keyboard arrows to increase / decrease limit\n " , flowsToShow )
242- fmt .Printf ("Display: %s Use Left / Right keyboard arrows to cycle views\n " , display .getCurrentItem ().name )
243- fmt .Printf ("Enrichment: %s Use Page Up / Page Down keyboard keys to cycle enrichment scopes\n " , enrichment .getCurrentItem ().name )
244- }
245- }
246-
247- if display .getCurrentItem ().name == rawDisplay {
248- fmt .Print ("Raw flow logs:\n " )
249- for _ , flow := range lastFlows {
250- fmt .Printf ("%v\n " , flow )
251- }
252- fmt .Printf ("%s\n " , strings .Repeat ("-" , 500 ))
253- } else {
254- // recreate table from scratch
255- headerFmt := color .New (color .BgHiBlue , color .Bold ).SprintfFunc ()
256- columnFmt := color .New (color .FgHiYellow ).SprintfFunc ()
257-
258- // main field, always show the end time
259- colIDs := []string {
260- "EndTime" ,
261- }
262-
263- // enrichment fields
264- if enrichment .getCurrentItem ().name != noOptions {
265- colIDs = append (colIDs , enrichment .getCurrentItem ().ids ... )
266- } else {
267- // TODO: add a new flag in the config to identify these as default non enriched fields
268- colIDs = append (colIDs ,
269- "SrcAddr" ,
270- "SrcPort" ,
271- "DstAddr" ,
272- "DstPort" ,
273- )
274- }
275-
276- // append interfaces and their directions between enrichment and features
277- // this is useful for pkt drop, udns etc
278- colIDs = append (colIDs ,
279- "Interfaces" ,
280- "IfDirections" ,
281- )
282-
283- // standard / feature fields
284- if display .getCurrentItem ().name != standardDisplay {
285- for _ , col := range cfg .Columns {
286- if col .Field != "" && slices .Contains (display .getCurrentItem ().ids , col .Feature ) {
287- colIDs = append (colIDs , col .ID )
288- }
289- }
290- } else {
291- // TODO: add a new flag in the config to identify these as default feature fields
292- colIDs = append (colIDs ,
293- "FlowDirection" ,
294- "Proto" ,
295- "Dscp" ,
296- "Bytes" ,
297- "Packets" ,
298- )
299- }
300-
301- colInterfaces := make ([]interface {}, len (colIDs ))
302- for i , id := range colIDs {
303- colInterfaces [i ] = ToTableColName (id )
304- }
305- tbl := table .New (colInterfaces ... )
306- if outputBuffer != nil {
307- tbl .WithWriter (outputBuffer )
308- }
309- tbl .WithHeaderFormatter (headerFmt ).WithFirstColumnFormatter (columnFmt )
310-
311- // append most recent rows
312- for _ , flow := range lastFlows {
313- tbl .AddRow (ToTableRow (flow , colIDs )... )
314- }
315-
316- // inserting empty row ensure minimum column sizes
317- emptyRow := []interface {}{}
318- for _ , id := range colIDs {
319- emptyRow = append (emptyRow , strings .Repeat ("-" , ToTableColWidth (id )))
320- }
321- tbl .AddRow (emptyRow ... )
322-
323- // print table
324- tbl .Print ()
325- }
326-
327- if len (keyboardError ) > 0 {
328- fmt .Println (keyboardError )
329- } else if outputBuffer == nil {
330- if len (regexes ) > 0 {
331- fmt .Printf ("Live table filter: %s Press enter to match multiple regexes at once\n " , regexes )
332- } else {
333- fmt .Printf ("Type anything to filter incoming flows in view\n " )
334- }
335- }
336- }
337- }
338-
339- // scanner returns true in case of normal exit (end of program execution) or false in case of error
340- func scanner () bool {
341- if err := keyboard .Open (); err != nil {
342- keyboardError = fmt .Sprintf ("Keyboard not supported %v" , err )
343- return false
344- }
345- defer func () {
346- _ = keyboard .Close ()
347- }()
348-
349- for {
350- char , key , err := keyboard .GetKey ()
351- if err != nil {
352- panic (err )
353- }
354- switch {
355- case key == keyboard .KeyCtrlC , stopReceived :
356- log .Info ("Ctrl-C pressed, exiting program." )
357- // exit program
358- return true
359- case key == keyboard .KeyArrowUp :
360- flowsToShow ++
361- case key == keyboard .KeyArrowDown :
362- if flowsToShow > 10 {
363- flowsToShow --
364- }
365- case key == keyboard .KeyArrowRight :
366- display .next ()
367- case key == keyboard .KeyArrowLeft :
368- display .prev ()
369- case key == keyboard .KeyPgup :
370- enrichment .next ()
371- case key == keyboard .KeyPgdn :
372- enrichment .prev ()
373- case key == keyboard .KeyBackspace || key == keyboard .KeyBackspace2 :
374- if len (regexes ) > 0 {
375- lastIndex := len (regexes ) - 1
376- if len (regexes [lastIndex ]) > 0 {
377- regexes [lastIndex ] = regexes [lastIndex ][:len (regexes [lastIndex ])- 1 ]
378- } else {
379- regexes = regexes [:lastIndex ]
380- }
381- }
382- case key == keyboard .KeyEnter :
383- regexes = append (regexes , "" )
384- default :
385- if len (regexes ) == 0 {
386- regexes = []string {string (char )}
387- } else {
388- lastIndex := len (regexes ) - 1
389- regexes [lastIndex ] += string (char )
390- }
391- }
392- lastRefresh = startupTime
393- updateTable ()
394- }
166+ AppendFlow (genericMap )
395167}
0 commit comments