Skip to content

Commit 454bec5

Browse files
committed
bugfixes and small improvements to bench and orchestrate
1 parent 86fe2d1 commit 454bec5

File tree

11 files changed

+103
-114
lines changed

11 files changed

+103
-114
lines changed

cmd/bench/http.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package main
22

33
import (
44
"context"
5-
"fmt"
65
"io"
76
mrand "math/rand"
87
"net"
8+
"strconv"
99
"strings"
1010
"time"
1111
)
@@ -55,15 +55,15 @@ func buildHTTPAnnounce(hash, peer, host string, port int, left int64, numwant in
5555
b.WriteString("&peer_id=")
5656
b.WriteString(peer)
5757
b.WriteString("&port=")
58-
b.WriteString(fmt.Sprintf("%d", port))
58+
b.WriteString(strconv.Itoa(port))
5959
b.WriteString("&uploaded=0&downloaded=0&left=")
60-
b.WriteString(fmt.Sprintf("%d", left))
60+
b.WriteString(strconv.FormatInt(left, 10))
6161
if compact {
6262
b.WriteString("&compact=1")
6363
}
6464
if numwant >= 0 {
6565
b.WriteString("&numwant=")
66-
b.WriteString(fmt.Sprintf("%d", numwant))
66+
b.WriteString(strconv.Itoa(numwant))
6767
}
6868
b.WriteString(" HTTP/1.1\r\nHost: ")
6969
b.WriteString(host)
@@ -95,7 +95,7 @@ type httpBenchWorker struct {
9595

9696
func (w *httpBenchWorker) run(ctx context.Context, cfg config, ds *dataset, limiter *rateLimiter) *workerMetrics {
9797
metrics := newWorkerMetrics()
98-
rng := mrand.New(mrand.NewSource(cfg.rngSeed + int64(w.id*3571)))
98+
rng := mrand.New(mrand.NewSource(cfg.rngSeed + int64(w.id*rngOffsetHTTP)))
9999
peerIdx := w.id % len(ds.peers)
100100
peer := ds.encodedPeers[peerIdx]
101101

cmd/bench/main.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"flag"
77
"fmt"
8+
"io"
89
"os"
910
"runtime"
1011
"time"
@@ -32,6 +33,11 @@ const (
3233
defaultSeedFraction = 0.2 // 20% seeders
3334

3435
defaultPort = 6881
36+
37+
// RNG seed offsets for reproducible but distinct sequences per worker type.
38+
rngOffsetUDP = 7919
39+
rngOffsetHTTP = 3571
40+
rngOffsetSeed = 11
3541
)
3642

3743
// Hardcoded numwant distribution: 20 (90%), 30 (8%), 50 (2%)
@@ -193,18 +199,18 @@ func main() {
193199
Warnings: warnings,
194200
}
195201

196-
enc := json.NewEncoder(os.Stdout)
197-
enc.SetIndent("", " ")
202+
var w io.Writer = os.Stdout
198203
if cfg.outPath != "" {
199204
f, err := os.Create(cfg.outPath)
200205
if err != nil {
201206
fmt.Fprintf(os.Stderr, "failed to open output file: %v\n", err)
202207
os.Exit(1)
203208
}
204209
defer f.Close()
205-
enc = json.NewEncoder(f)
206-
enc.SetIndent("", " ")
210+
w = f
207211
}
212+
enc := json.NewEncoder(w)
213+
enc.SetIndent("", " ")
208214
if err := enc.Encode(out); err != nil {
209215
fmt.Fprintf(os.Stderr, "failed to encode results: %v\n", err)
210216
os.Exit(1)

cmd/bench/metrics.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"time"
77
)
88

9-
const maxInt64 = int64(^uint64(0) >> 1)
10-
119
// Latency histogram.
1210

1311
type latencyHistogram struct {
@@ -31,7 +29,7 @@ func newLatencyHistogram(start time.Duration, buckets int) *latencyHistogram {
3129
return &latencyHistogram{
3230
bounds: bounds,
3331
counts: make([]int64, buckets+1),
34-
min: maxInt64,
32+
min: math.MaxInt64,
3533
}
3634
}
3735

@@ -74,7 +72,7 @@ func (h *latencyHistogram) merge(other *latencyHistogram) {
7472

7573
func (h *latencyHistogram) quantile(q float64) time.Duration {
7674
if q <= 0 {
77-
if h.min == maxInt64 {
75+
if h.min == math.MaxInt64 {
7876
return 0
7977
}
8078
return time.Duration(h.min)
@@ -112,7 +110,7 @@ func (h *latencyHistogram) mean() time.Duration {
112110
}
113111

114112
func formatDuration(d time.Duration) string {
115-
if int64(d) == maxInt64 {
113+
if int64(d) == math.MaxInt64 {
116114
return "0s"
117115
}
118116
return d.String()

cmd/bench/seed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ func seedPhase(ctx context.Context, cfg config, ds *dataset) error {
1111
return nil
1212
}
1313

14-
rng := mrand.New(mrand.NewSource(cfg.rngSeed + 11))
14+
rng := mrand.New(mrand.NewSource(cfg.rngSeed + rngOffsetSeed))
1515

1616
seedHashes := ds.torrents
1717
if cfg.seed > 0 && cfg.seed < len(seedHashes) {

cmd/bench/udp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ type udpBenchWorker struct {
162162

163163
func (w *udpBenchWorker) run(ctx context.Context, cfg config, ds *dataset, limiter *rateLimiter) *workerMetrics {
164164
metrics := newWorkerMetrics()
165-
rng := mrand.New(mrand.NewSource(cfg.rngSeed + int64(w.id*7919)))
165+
rng := mrand.New(mrand.NewSource(cfg.rngSeed + int64(w.id*rngOffsetUDP)))
166166
peerIdx := w.id % len(ds.peers)
167167
peerID := ds.peers[peerIdx]
168168

cmd/bench/util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Random number generation.
1515

1616
func cryptoSeed() int64 {
17-
n, err := rand.Int(rand.Reader, big.NewInt(maxInt64))
17+
n, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt64))
1818
if err != nil {
1919
return time.Now().UnixNano()
2020
}
@@ -48,10 +48,10 @@ func chooseWeighted(rng *mrand.Rand, choices []weightedChoice, fallback int) int
4848
// Statistical distributions.
4949

5050
func clampInt(val, minVal, maxVal int) int {
51-
if minVal > 0 && val < minVal {
51+
if val < minVal {
5252
return minVal
5353
}
54-
if maxVal > 0 && val > maxVal {
54+
if val > maxVal {
5555
return maxVal
5656
}
5757
return val

cmd/orchestrate/config.go

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ type RunResult struct {
392392
RunID string `json:"run_id"`
393393
Mode string `json:"mode"`
394394
Routines int `json:"routines"`
395-
OtherRoutes int `json:"other_routines"`
395+
OtherRoutines int `json:"other_routines"`
396396
ResultFile string `json:"result_file"`
397397
Status string `json:"status"`
398398
Error string `json:"error,omitempty"`
@@ -478,6 +478,26 @@ func (rc *ResultCollector) GetResults() []RunResult {
478478
return results
479479
}
480480

481+
// benchResultFile represents the structure of a bench result JSON file.
482+
type benchResultFile struct {
483+
Metrics map[string]benchMetric `json:"metrics"`
484+
}
485+
486+
type benchMetric struct {
487+
RatePerSec float64 `json:"rate_per_sec"`
488+
Success float64 `json:"success"`
489+
Errors float64 `json:"errors"`
490+
Timeouts float64 `json:"timeouts"`
491+
Latency benchLatency `json:"latency"`
492+
}
493+
494+
type benchLatency struct {
495+
P50 string `json:"p50"`
496+
P90 string `json:"p90"`
497+
P95 string `json:"p95"`
498+
P99 string `json:"p99"`
499+
}
500+
481501
// LoadResultFile loads and parses a bench result file.
482502
func LoadResultFile(path string) (*ResultSummary, map[string]interface{}, error) {
483503
data, err := os.ReadFile(path)
@@ -490,40 +510,22 @@ func LoadResultFile(path string) (*ResultSummary, map[string]interface{}, error)
490510
return nil, nil, err
491511
}
492512

493-
summary := &ResultSummary{}
513+
var result benchResultFile
514+
if err := json.Unmarshal(data, &result); err != nil {
515+
return nil, nil, err
516+
}
494517

495-
if metrics, ok := raw["metrics"].(map[string]interface{}); ok {
496-
for _, v := range metrics {
497-
if m, ok := v.(map[string]interface{}); ok {
498-
if rate, ok := m["rate_per_sec"].(float64); ok {
499-
summary.Throughput = rate
500-
}
501-
if success, ok := m["success"].(float64); ok {
502-
summary.Success = int64(success)
503-
}
504-
if errors, ok := m["errors"].(float64); ok {
505-
summary.Errors = int64(errors)
506-
}
507-
if timeouts, ok := m["timeouts"].(float64); ok {
508-
summary.Timeouts = int64(timeouts)
509-
}
510-
if lat, ok := m["latency"].(map[string]interface{}); ok {
511-
if p50, ok := lat["p50"].(string); ok {
512-
summary.P50 = p50
513-
}
514-
if p90, ok := lat["p90"].(string); ok {
515-
summary.P90 = p90
516-
}
517-
if p95, ok := lat["p95"].(string); ok {
518-
summary.P95 = p95
519-
}
520-
if p99, ok := lat["p99"].(string); ok {
521-
summary.P99 = p99
522-
}
523-
}
524-
break
525-
}
526-
}
518+
summary := &ResultSummary{}
519+
for _, m := range result.Metrics {
520+
summary.Throughput = m.RatePerSec
521+
summary.Success = int64(m.Success)
522+
summary.Errors = int64(m.Errors)
523+
summary.Timeouts = int64(m.Timeouts)
524+
summary.P50 = m.Latency.P50
525+
summary.P90 = m.Latency.P90
526+
summary.P95 = m.Latency.P95
527+
summary.P99 = m.Latency.P99
528+
break // Only need the first metric
527529
}
528530

529531
return summary, raw, nil

cmd/orchestrate/main.go

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -131,36 +131,7 @@ func runLocal(args []string) error {
131131
executor := NewLocalExecutor(tracker, benchBin, *target, *outputDir)
132132
collector := NewResultCollector(*outputDir, scenario.Name, *mode)
133133

134-
slog.Info(fmt.Sprintf("\nRunning %d benchmark(s)...\n", len(runs)))
135-
136-
for i, run := range runs {
137-
slog.Info(fmt.Sprintf("[%d/%d] %s (routines=%d)", i+1, len(runs), run.Name, run.Routines))
138-
139-
if *pause && i > 0 {
140-
fmt.Fprint(os.Stderr, "Press Enter to continue...")
141-
if err := WaitForInput(); err != nil {
142-
return fmt.Errorf("wait for input: %w", err)
143-
}
144-
}
145-
146-
select {
147-
case <-ctx.Done():
148-
return ctx.Err()
149-
default:
150-
}
151-
152-
result := executor.Run(ctx, run)
153-
collector.AddResult(result)
154-
155-
if result.Summary != nil {
156-
slog.Info(fmt.Sprintf(" -> %.0f req/s | p50: %s | p90: %s | p95: %s | p99: %s",
157-
result.Summary.Throughput, result.Summary.P50, result.Summary.P90, result.Summary.P95, result.Summary.P99))
158-
} else if result.Error != "" {
159-
slog.Error(fmt.Sprintf(" -> %s", result.Error))
160-
}
161-
}
162-
163-
return FinalizeResults(collector)
134+
return runBenchmarkLoop(ctx, runs, executor, collector, *pause)
164135
}
165136

166137
func runClient(args []string) error {
@@ -258,12 +229,19 @@ func runClient(args []string) error {
258229
executor := NewClientExecutor(conn, benchBin, *target, *statsURL, *outputDir)
259230
collector := NewResultCollector(*outputDir, scenario.Name, *mode)
260231

261-
slog.Info(fmt.Sprintf("Connected. Running %d benchmark(s)...\n", len(runs)))
232+
slog.Info(fmt.Sprintf("Connected. Running %d benchmark(s)...", len(runs)))
233+
234+
return runBenchmarkLoop(ctx, runs, executor, collector, *pause)
235+
}
236+
237+
// runBenchmarkLoop executes the benchmark runs and collects results.
238+
func runBenchmarkLoop(ctx context.Context, runs []BenchmarkRun, executor *Executor, collector *ResultCollector, pause bool) error {
239+
slog.Info(fmt.Sprintf("Running %d benchmark(s)...\n", len(runs)))
262240

263241
for i, run := range runs {
264242
slog.Info(fmt.Sprintf("[%d/%d] %s (routines=%d)", i+1, len(runs), run.Name, run.Routines))
265243

266-
if *pause && i > 0 {
244+
if pause && i > 0 {
267245
fmt.Fprint(os.Stderr, "Press Enter to continue...")
268246
if err := WaitForInput(); err != nil {
269247
return fmt.Errorf("wait for input: %w", err)

cmd/orchestrate/run.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (e *Executor) Run(ctx context.Context, run BenchmarkRun) RunResult {
5555
RunID: run.Name,
5656
Mode: run.Mode,
5757
Routines: run.Routines,
58-
OtherRoutes: run.OtherRoutines,
58+
OtherRoutines: run.OtherRoutines,
5959
}
6060

6161
udpRoutines, httpRoutines := DetermineRoutines(run)
@@ -128,23 +128,10 @@ func (e *Executor) startTracker(ctx context.Context, udpRoutines, httpRoutines i
128128

129129
// startTrackerLocal handles local tracker startup.
130130
func (e *Executor) startTrackerLocal(ctx context.Context, udpRoutines, httpRoutines int, run BenchmarkRun) error {
131-
if e.tracker.IsAlive() {
132-
slog.Info(" Stopping existing tracker...")
133-
if err := e.tracker.Stop(ctx); err != nil {
134-
slog.Warn("stop tracker", "error", err)
135-
}
136-
time.Sleep(time.Second)
137-
}
138-
139-
slog.Info(" Clearing cache...")
140-
if err := e.tracker.ClearCache(); err != nil {
141-
slog.Warn("failed to clear cache", "error", err)
142-
}
143-
144131
slog.Info(fmt.Sprintf(" Starting tracker (%s_routines=%d, other=%d)...",
145132
run.Mode, run.Routines, run.OtherRoutines))
146133

147-
return e.tracker.Start(ctx, udpRoutines, httpRoutines)
134+
return e.tracker.Restart(ctx, udpRoutines, httpRoutines)
148135
}
149136

150137
// startTrackerRemote handles remote tracker startup via protocol.
@@ -169,7 +156,9 @@ func (e *Executor) startTrackerRemote(udpRoutines, httpRoutines int) error {
169156
return nil
170157
case MsgTypeError:
171158
var errMsg ErrorMessage
172-
json.Unmarshal(data, &errMsg)
159+
if err := json.Unmarshal(data, &errMsg); err != nil {
160+
return fmt.Errorf("server error (failed to parse: %v)", err)
161+
}
173162
return fmt.Errorf("server error: %s", errMsg.ErrorMsg)
174163
default:
175164
return fmt.Errorf("unexpected response: %s", msgType)
@@ -207,7 +196,9 @@ func (e *Executor) notifyDone(udpRoutines, httpRoutines int, resultFile string,
207196
return nil
208197
case MsgTypeError:
209198
var errMsg ErrorMessage
210-
json.Unmarshal(data, &errMsg)
199+
if err := json.Unmarshal(data, &errMsg); err != nil {
200+
return fmt.Errorf("server error on stop (failed to parse: %v)", err)
201+
}
211202
return fmt.Errorf("server error on stop: %s", errMsg.ErrorMsg)
212203
default:
213204
return fmt.Errorf("unexpected response: %s", msgType)

0 commit comments

Comments
 (0)