Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 89 additions & 27 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net/http"
Expand Down Expand Up @@ -103,6 +104,79 @@ func makeWorkerConfig() worker.WorkerConfig {
return c
}

type workersGroup struct {
workers []worker.Worker
ready bool
mu sync.Mutex
}

func (w *workersGroup) Add(worker worker.Worker) {
w.mu.Lock()
defer w.mu.Unlock()
w.workers = append(w.workers, worker)
}

func (w *workersGroup) SetReady() {
w.mu.Lock()
defer w.mu.Unlock()

if len(w.workers) == 0 {
util.Die("No workers added to the group, cannot set ready")
}

if w.ready {
util.Die("Workers group is already marked as ready")
}

w.ready = true
}

func (w *workersGroup) StatusJson() ([]byte, error) {
if !w.ready {
return nil, errors.New("workers group is not marked as ready yet")
}

var results []interface{}
var locks []*sync.Mutex
for _, v := range w.workers {
status, lock := v.GetStatus()
results = append(results, status)
locks = append(locks, lock)
}

for _, lock := range locks {
lock.Lock()
}

defer func() {
for _, lock := range locks {
lock.Unlock()
}
}()

serialized, err := json.MarshalIndent(results, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to serialize worker status: %w", err)
}

return serialized, nil
}

func (w *workersGroup) ResetStats() error {
w.mu.Lock()
defer w.mu.Unlock()

if !w.ready {
return errors.New("workers group is not marked as ready yet")
}

for _, worker := range w.workers {
worker.ResetStats()
}

return nil
}

func main() {
flag.Parse()

Expand Down Expand Up @@ -155,39 +229,23 @@ func main() {
nPartitions := int32(len(t.Partitions))
log.Debugf("Targeting topic %s with %d partitions", *topic, nPartitions)

var workers []worker.Worker
var workers workersGroup

mux := http.NewServeMux()
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
var results []interface{}
var locks []*sync.Mutex
for _, v := range workers {
status, lock := v.GetStatus()
results = append(results, status)
locks = append(locks, lock)
}

for _, lock := range locks {
lock.Lock()
}

serialized, err := json.MarshalIndent(results, "", " ")

for _, lock := range locks {
lock.Unlock()
serialized, err := workers.StatusJson()
if err != nil {
log.Errorf("Failed to serialize worker status: %v", err)
http.Error(w, fmt.Sprintf("Failed to serialize worker status: %v", err), http.StatusInternalServerError)
return
}

util.Chk(err, "Status serialization error")

w.WriteHeader(http.StatusOK)
w.Write(serialized)
})

mux.HandleFunc("/reset", func(w http.ResponseWriter, r *http.Request) {
log.Info("Remote request /reset")
for _, v := range workers {
v.ResetStats()
}
workers.ResetStats()
w.WriteHeader(http.StatusOK)
})

Expand Down Expand Up @@ -259,7 +317,8 @@ func main() {
pw.EnableTransactions(tconfig)
}

workers = append(workers, &pw)
workers.Add(&pw)
workers.SetReady()
waitErr := pw.Wait()
util.Chk(err, "Producer error: %v", waitErr)
log.Info("Finished producer.")
Expand All @@ -268,7 +327,8 @@ func main() {
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
(*consumeTputMb)*1024*1024,
), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
workers = append(workers, &srw)
workers.Add(&srw)
workers.SetReady()

for loopState.Next() {
log.Info("Starting sequential read pass")
Expand All @@ -288,8 +348,9 @@ func main() {
)
worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
randomWorkers = append(randomWorkers, &worker)
workers = append(workers, &worker)
workers.Add(&worker)
}
workers.SetReady()

for loopState.Next() {
for _, w := range randomWorkers {
Expand All @@ -316,7 +377,8 @@ func main() {
verifier.NewGroupReadConfig(
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
*seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
workers = append(workers, &grw)
workers.Add(&grw)
workers.SetReady()

for loopState.Next() {
log.Info("Starting group read pass")
Expand Down