@@ -30,6 +30,7 @@ import (
3030 "os"
3131 "runtime"
3232 "runtime/debug"
33+ "slices"
3334 "strconv"
3435 "strings"
3536 "sync"
@@ -38,6 +39,7 @@ import (
3839
3940 "github.com/gofiber/contrib/websocket"
4041 "github.com/gofiber/fiber/v2"
42+ "github.com/google/uuid"
4143 "github.com/minio/hperf/shared"
4244 "github.com/shirou/gopsutil/cpu"
4345 "github.com/shirou/gopsutil/mem"
@@ -69,24 +71,31 @@ type test struct {
6971
7072 Readers []* netPerfReader
7173 errors []shared.TError
74+ errMap map [string ]struct {}
7275 errIndex atomic.Int32
7376 DPS []shared.DP
7477 M sync.Mutex
7578
7679 DataFile * os.File
7780 DataFileIndex int
81+ cons map [string ]* websocket.Conn
7882}
7983
80- func (t * test ) AddError (err error ) {
84+ func (t * test ) AddError (err error , id string ) {
85+ t .M .Lock ()
86+ defer t .M .Unlock ()
8187 if err == nil {
8288 return
8389 }
90+ _ , ok := t .errMap [id ]
91+ if ok {
92+ return
93+ }
8494 if t .Config .Debug {
8595 fmt .Println ("ERR:" , err )
8696 }
87- t .M .Lock ()
88- defer t .M .Unlock ()
8997 t .errors = append (t .errors , shared.TError {Error : err .Error (), Created : time .Now ()})
98+ t .errMap [id ] = struct {}{}
9099}
91100
92101func RunServer (ctx context.Context , address string , storagePath string ) (err error ) {
@@ -359,6 +368,8 @@ func newTest(c *shared.Config) (t *test, err error) {
359368 defer testLock .Unlock ()
360369
361370 t = new (test )
371+ t .errMap = make (map [string ]struct {})
372+ t .cons = make (map [string ]* websocket.Conn )
362373 t .Started = time .Now ()
363374 t .Config = * c
364375 t .DPS = make ([]shared.DP , 0 )
@@ -477,7 +488,7 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
477488 go startPerformanceReader (test , test .Readers [i ])
478489 }
479490
480- var paginator DataPointPaginator
491+ listenToLiveTests ( con , signal )
481492 for {
482493 if test .ctx .Err () != nil {
483494 return
@@ -490,34 +501,24 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
490501 if signal .Config .Debug {
491502 fmt .Println ("Duration: " , signal .Config .TestID , time .Since (start ).Seconds ())
492503 }
504+
493505 generateDataPoints (test )
494- if con != nil {
495- _ , paginator = sendDataResponseToWebsocket (con , test , paginator )
496- }
506+ _ = sendAndSaveData (test )
497507 }
498508}
499509
500510func listenToLiveTests (con * websocket.Conn , s shared.WebsocketSignal ) {
501- var paginator DataPointPaginator
502- var err error
503- paginator .After = time .Now ()
504- for {
505- time .Sleep (1 * time .Second )
506- for i := range tests {
507- if time .Since (tests [i ].Started ).Seconds () > float64 (tests [i ].Config .Duration ) {
508- continue
509- }
510- if s .Config .TestID != "" && tests [i ].ID != s .Config .TestID {
511- continue
512- }
513- if s .Config .Debug {
514- fmt .Println ("Listen:" , tests [i ].ID , "DPS:" , len (tests [i ].DPS ), "ERR:" , len (tests [i ].errors ))
515- }
516- err , paginator = sendDataResponseToWebsocket (con , tests [i ], paginator )
517- if err != nil {
518- return
519- }
511+ uid := uuid .NewString ()
512+
513+ for i := range tests {
514+ if s .Config .TestID != "" && tests [i ].ID != s .Config .TestID {
515+ continue
520516 }
517+ if s .Config .Debug {
518+ fmt .Println ("Listen:" , tests [i ].ID , "DPS:" , len (tests [i ].DPS ), "ERR:" , len (tests [i ].errors ))
519+ }
520+
521+ tests [i ].cons [uid ] = con
521522 }
522523}
523524
@@ -527,45 +528,67 @@ type DataPointPaginator struct {
527528 After time.Time
528529}
529530
530- func sendDataResponseToWebsocket (con * websocket.Conn , t * test , lastPaginator DataPointPaginator ) (err error , Paginator DataPointPaginator ) {
531+ func sendAndSaveData (t * test ) (err error ) {
532+ defer func () {
533+ r := recover ()
534+ if r != nil {
535+ log .Println (r , string (debug .Stack ()))
536+ }
537+ }()
538+
531539 wss := new (shared.WebsocketSignal )
532540 wss .SType = shared .Stats
533541 dataResponse := new (shared.DataReponseToClient )
534542
543+ if t .DataFile == nil && t .Config .Save {
544+ newTestFile (t )
545+ }
546+
535547 for i := range t .DPS {
536- if i <= lastPaginator .DPIndex {
537- continue
538- }
539- if ! lastPaginator .After .IsZero () {
540- if t .DPS [i ].Created .Before (lastPaginator .After ) {
541- continue
548+ dataResponse .DPS = append (dataResponse .DPS , t .DPS [i ])
549+ if t .Config .Save {
550+ fileb , err := json .Marshal (t .DPS [i ])
551+ if err != nil {
552+ t .AddError (err , "datapoint-marshaling" )
542553 }
554+ t .DataFile .Write (append (fileb , []byte {10 }... ))
543555 }
544- dataResponse .DPS = append (dataResponse .DPS , t .DPS [i ])
545- Paginator .DPIndex = i
556+ t .DPS = slices .Delete (t .DPS , i , i + 1 )
546557 }
547558
548559 for i := range t .errors {
549- if i <= lastPaginator .ErrIndex {
550- continue
551- }
552- if ! lastPaginator .After .IsZero () {
553- if t .errors [i ].Created .Before (lastPaginator .After ) {
554- continue
560+ dataResponse .Errors = append (dataResponse .Errors , t .errors [i ])
561+ if t .Config .Save {
562+ fileb , err := json .Marshal (t .errors [i ])
563+ if err != nil {
564+ t .AddError (err , "error-marshaling" )
555565 }
566+ t .DataFile .Write (append (fileb , []byte {10 }... ))
556567 }
557- dataResponse .Errors = append (dataResponse .Errors , t .errors [i ])
558- Paginator .ErrIndex = i
568+ t .M .Lock ()
569+ t .errors = slices .Delete (t .errors , i , i + 1 )
570+ t .M .Unlock ()
559571 }
560572
573+ t .M .Lock ()
574+ t .errMap = make (map [string ]struct {})
575+ t .M .Unlock ()
576+
561577 wss .DataPoint = dataResponse
562- err = con .WriteJSON (wss )
563- if err != nil {
564- if t .Config .Debug {
565- fmt .Println ("Unable to send data point:" , err )
578+ for i := range t .cons {
579+ if t .cons [i ] == nil {
580+ continue
581+ }
582+
583+ err = t .cons [i ].WriteJSON (wss )
584+ if err != nil {
585+ if t .Config .Debug {
586+ fmt .Println ("Unable to send data point:" , err )
587+ }
588+ t .cons [i ].Close ()
589+ delete (t .cons , i )
590+ continue
566591 }
567- con .Close ()
568- con = nil
569592 }
570593 return
571594}
@@ -609,18 +632,6 @@ func generateDataPoints(t *test) {
609632 r .m .Unlock ()
610633
611634 t .DPS = append (t .DPS , d )
612-
613- if t .Config .Save {
614- fileb , err := json .Marshal (d )
615- if err != nil {
616- t .AddError (err )
617- }
618- if t .DataFile == nil {
619- newTestFile (t )
620- }
621- t .DataFile .Write (append (fileb , []byte {10 }... ))
622- }
623-
624635 }
625636 return
626637}
@@ -727,7 +738,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
727738 route = "/http"
728739 body = AR
729740 default :
730- t .AddError (fmt .Errorf ("Unknown test type: %d" , t .Config .TestType ))
741+ t .AddError (fmt .Errorf ("Unknown test type: %d" , t .Config .TestType ), "unknown-signal" )
731742 }
732743
733744 req , err = http .NewRequestWithContext (
@@ -748,12 +759,12 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
748759 if errors .Is (err , context .Canceled ) {
749760 return
750761 }
751- t .AddError (err )
762+ t .AddError (err , "network-error" )
752763 return
753764 }
754765
755766 if resp .StatusCode != http .StatusOK {
756- t .AddError (fmt .Errorf ("Status code was %d, expected 200 from host %s" , resp .StatusCode , r .addr ))
767+ t .AddError (fmt .Errorf ("Status code was %d, expected 200 from host %s" , resp .StatusCode , r .addr ), "invalid-status-code" )
757768 return
758769 }
759770
0 commit comments