diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index 0ec1290..4fbf950 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -12,6 +12,7 @@ package main import ( "context" "encoding/json" + "errors" "flag" "fmt" "net/http" @@ -103,6 +104,79 @@ func makeWorkerConfig() worker.WorkerConfig { return c } +type workersGroup struct { + workers []worker.Worker + ready bool + mu sync.Mutex +} + +func (w *workersGroup) Add(worker worker.Worker) { + w.mu.Lock() + defer w.mu.Unlock() + w.workers = append(w.workers, worker) +} + +func (w *workersGroup) SetReady() { + w.mu.Lock() + defer w.mu.Unlock() + + if len(w.workers) == 0 { + util.Die("No workers added to the group, cannot set ready") + } + + if w.ready { + util.Die("Workers group is already marked as ready") + } + + w.ready = true +} + +func (w *workersGroup) StatusJson() ([]byte, error) { + if !w.ready { + return nil, errors.New("workers group is not marked as ready yet") + } + + var results []interface{} + var locks []*sync.Mutex + for _, v := range w.workers { + status, lock := v.GetStatus() + results = append(results, status) + locks = append(locks, lock) + } + + for _, lock := range locks { + lock.Lock() + } + + defer func() { + for _, lock := range locks { + lock.Unlock() + } + }() + + serialized, err := json.MarshalIndent(results, "", " ") + if err != nil { + return nil, fmt.Errorf("failed to serialize worker status: %w", err) + } + + return serialized, nil +} + +func (w *workersGroup) ResetStats() error { + w.mu.Lock() + defer w.mu.Unlock() + + if !w.ready { + return errors.New("workers group is not marked as ready yet") + } + + for _, worker := range w.workers { + worker.ResetStats() + } + + return nil +} + func main() { flag.Parse() @@ -155,39 +229,23 @@ func main() { nPartitions := int32(len(t.Partitions)) log.Debugf("Targeting topic %s with %d partitions", *topic, nPartitions) - var workers []worker.Worker + var workers workersGroup mux := http.NewServeMux() mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { - var results []interface{} - var locks []*sync.Mutex - for _, v := range workers { - status, lock := v.GetStatus() - results = append(results, status) - locks = append(locks, lock) - } - - for _, lock := range locks { - lock.Lock() - } - - serialized, err := json.MarshalIndent(results, "", " ") - - for _, lock := range locks { - lock.Unlock() + serialized, err := workers.StatusJson() + if err != nil { + log.Errorf("Failed to serialize worker status: %v", err) + http.Error(w, fmt.Sprintf("Failed to serialize worker status: %v", err), http.StatusInternalServerError) + return } - - util.Chk(err, "Status serialization error") - w.WriteHeader(http.StatusOK) w.Write(serialized) }) mux.HandleFunc("/reset", func(w http.ResponseWriter, r *http.Request) { log.Info("Remote request /reset") - for _, v := range workers { - v.ResetStats() - } + workers.ResetStats() w.WriteHeader(http.StatusOK) }) @@ -259,7 +317,8 @@ func main() { pw.EnableTransactions(tconfig) } - workers = append(workers, &pw) + workers.Add(&pw) + workers.SetReady() waitErr := pw.Wait() util.Chk(err, "Producer error: %v", waitErr) log.Info("Finished producer.") @@ -268,7 +327,8 @@ func main() { makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount, (*consumeTputMb)*1024*1024, ), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) - workers = append(workers, &srw) + workers.Add(&srw) + workers.SetReady() for loopState.Next() { log.Info("Starting sequential read pass") @@ -288,8 +348,9 @@ func main() { ) worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) randomWorkers = append(randomWorkers, &worker) - workers = append(workers, &worker) + workers.Add(&worker) } + workers.SetReady() for loopState.Next() { for _, w := range randomWorkers { @@ -316,7 +377,8 @@ func main() { verifier.NewGroupReadConfig( makeWorkerConfig(), *cgName, nPartitions, *cgReaders, *seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions)) - workers = append(workers, &grw) + workers.Add(&grw) + workers.SetReady() for loopState.Next() { log.Info("Starting group read pass")