Skip to content

Commit ef1090c

Browse files
Added cummulative latency dashboard and ability to configure workload iterations
1 parent 13671d0 commit ef1090c

File tree

10 files changed

+539
-175
lines changed

10 files changed

+539
-175
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ Environment Variables (Overrides):
244244
PLGM_COLLECTIONS_PATH Path to collection JSON
245245
PLGM_QUERIES_PATH Path to query JSON
246246
PLGM_DURATION Test duration (e.g. 60s, 5m)
247+
PLGM_ITERATIONS Number of times to repeat the workload
248+
PLGM_INTERVAL_DELAY Time to pause between iterations (e.g. 5s, 1m)
247249
PLGM_CONCURRENCY Number of active workers
248250
PLGM_DOCUMENTS_COUNT Initial seed document count
249251
PLGM_DROP_COLLECTIONS Drop collections on start (true/false)
@@ -642,6 +644,8 @@ You can override any setting in `config.yaml` using environment variables. This
642644
| **Workload Control** | | | |
643645
| `concurrency` | `PLGM_CONCURRENCY` | Number of active worker goroutines | `50` |
644646
| `duration` | `PLGM_DURATION` | Test duration (Go duration string) | `5m`, `60s` |
647+
| `iterations` | `PLGM_ITERATIONS` | Number of times to repeat the workload | `1` |
648+
| `interval_delay` | `PLGM_INTERVAL_DELAY` | Time to pause between iterations (e.g. 5s, 1m) | `0s` |
645649
| `default_workload` | `PLGM_DEFAULT_WORKLOAD` | Use built-in "Flights" workload (`true`/`false`) | `false` |
646650
| `collections_path` | `PLGM_COLLECTIONS_PATH` | Path to custom collection JSON files (supports directories for multi-collection load) | `./schemas` |
647651
| `queries_path` | `PLGM_QUERIES_PATH` | Path to custom query JSON files or directory. | `./queries` |
@@ -908,6 +912,18 @@ Control how plgm reacts to network lag or database pressure.
908912
* *Tip:* For stress testing, you might want to set `retry_attempts: 0` to see raw failure rates immediately.
909913
* *Default:* `2` attempts with `5ms` backoff.
910914
915+
916+
#### Workload Iterations & Scheduling
917+
Control how plgm repeats a given workload and schedules the time between runs.
918+
919+
* **`iterations`**: The number of times to run the defined workload duration back-to-back.
920+
* *Tip:* This is perfect for warming up the database cache during the first iteration and capturing true performance metrics on subsequent iterations, without having to manually restart the tool.
921+
* *Default:* `1`
922+
* **`interval_delay`**: The amount of time to pause the workload between each iteration (e.g., `10s`, `1m`).
923+
* *Tip:* Adding a delay allows database background processes (like log flushing, checkpointing, or compaction) to catch up, simulating real-world batch-processing patterns.
924+
* *Default:* `0s`
925+
926+
911927
### 3. Custom Connection Parameters (`custom_params`)
912928
913929
In the `config.yaml`, the `custom_params` section allows you to pass arbitrary options directly to the MongoDB driver's connection string. These parameters are appended as URI query options and are critical for tuning network throughput, security, and routing. All standard MongoDB connection parameters are supported.

cmd/plgm/main.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"os"
1212
"strings"
13+
"time"
1314

1415
"github.com/Percona-Lab/percona-load-generator-mongodb/internal/benchmark"
1516
"github.com/Percona-Lab/percona-load-generator-mongodb/internal/config"
@@ -66,6 +67,8 @@ func main() {
6667
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_COLLECTIONS_PATH", "Path to collection JSON")
6768
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_QUERIES_PATH", "Path to query JSON")
6869
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_DURATION", "Test duration (e.g. 60s, 5m)")
70+
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_ITERATIONS", "Number of times to repeat the workload")
71+
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_INTERVAL_DELAY", "Time to pause between iterations (e.g. 5s, 1m)")
6972
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_CONCURRENCY", "Number of active workers")
7073
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_DOCUMENTS_COUNT", "Initial seed document count")
7174
fmt.Fprintf(os.Stderr, " %-35s %s\n", "PLGM_DROP_COLLECTIONS", "Drop collections on start (true/false)")
@@ -235,8 +238,16 @@ func main() {
235238
}
236239
defer benchConn.Disconnect(ctx)
237240

238-
if err := benchmark.RunRawInjector(ctx, benchConn.Database, appCfg); err != nil {
239-
log.Fatal(err)
241+
intervalDuration, _ := time.ParseDuration(appCfg.IntervalDelay)
242+
for i := 1; i <= appCfg.Iterations; i++ {
243+
log.Printf("Starting Raw Injector iteration %d of %d", i, appCfg.Iterations)
244+
if err := benchmark.RunRawInjector(ctx, benchConn.Database, appCfg); err != nil {
245+
log.Fatal(err)
246+
}
247+
if i < appCfg.Iterations && intervalDuration > 0 {
248+
log.Printf("Waiting %s before next iteration...", appCfg.IntervalDelay)
249+
time.Sleep(intervalDuration)
250+
}
240251
}
241252
return
242253
}
@@ -295,7 +306,15 @@ func main() {
295306
}
296307
}
297308

298-
if err := mongo.RunWorkload(ctx, conn.Database, collectionsCfg.Collections, queriesCfg.Queries, appCfg); err != nil {
299-
log.Fatal(err)
309+
intervalDuration, _ := time.ParseDuration(appCfg.IntervalDelay)
310+
for i := 1; i <= appCfg.Iterations; i++ {
311+
log.Printf("Starting Standard Workload iteration %d of %d", i, appCfg.Iterations)
312+
if err := mongo.RunWorkload(ctx, conn.Database, collectionsCfg.Collections, queriesCfg.Queries, appCfg); err != nil {
313+
log.Fatal(err)
314+
}
315+
if i < appCfg.Iterations && intervalDuration > 0 {
316+
log.Printf("Waiting %s before next iteration...", appCfg.IntervalDelay)
317+
time.Sleep(intervalDuration)
318+
}
300319
}
301320
}

config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ concurrency: 4
9292
# Set to "0" to run the queries exactly once and exit.
9393
duration: "10s"
9494

95+
# Number of times to repeat the workload execution.
96+
iterations: 1
97+
98+
# Time to pause between iterations (e.g. "5s", "1m").
99+
interval_delay: "0s"
100+
95101
# Enable transactional workload
96102
use_transactions: false
97103

images/dashboard_bottom.png

10.6 KB
Loading

images/dashboard_top.png

-9.8 KB
Loading

images/workload.png

8.14 KB
Loading

internal/config/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type AppConfig struct {
1818
SkipSeed bool `yaml:"skip_seed"`
1919
DocumentsCount int `yaml:"documents_count"`
2020
Concurrency int `yaml:"concurrency"`
21+
Iterations int `yaml:"iterations"`
22+
IntervalDelay string `yaml:"interval_delay"`
2123

2224
Duration string `yaml:"duration"`
2325
FindPercent int `yaml:"find_percent"`
@@ -132,6 +134,8 @@ func applyUIDefaults(cfg *AppConfig) {
132134
cfg.DefaultWorkload = true
133135
cfg.SkipSeed = true
134136
cfg.DocumentsCount = 100000
137+
cfg.Iterations = 1
138+
cfg.IntervalDelay = "0s"
135139

136140
// --- MIX TAB ---
137141
cfg.FindPercent = 50
@@ -336,6 +340,15 @@ func applyEnvOverrides(cfg *AppConfig) map[string]bool {
336340
cfg.Duration = envDuration
337341
}
338342

343+
if envIterations := os.Getenv("PLGM_ITERATIONS"); envIterations != "" {
344+
if n, err := strconv.Atoi(envIterations); err == nil && n > 0 {
345+
cfg.Iterations = n
346+
}
347+
}
348+
if envInterval := os.Getenv("PLGM_INTERVAL_DELAY"); envInterval != "" {
349+
cfg.IntervalDelay = envInterval
350+
}
351+
339352
// Percentages
340353
if p := os.Getenv("PLGM_FIND_PERCENT"); p != "" {
341354
if n, err := strconv.Atoi(p); err == nil && n >= 0 {

internal/stats/collector.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,41 @@ func (h *LatencyHistogram) GetPercentile(p float64) float64 {
101101
return float64(MaxLatencyBin)
102102
}
103103

104+
func (h *LatencyHistogram) GetStats() map[string]float64 {
105+
h.mu.Lock()
106+
defer h.mu.Unlock()
107+
108+
if h.Count == 0 {
109+
return map[string]float64{"avg": 0, "min": 0, "max": 0, "p95": 0, "p99": 0}
110+
}
111+
112+
min := h.Min
113+
if min == math.MaxFloat64 {
114+
min = 0 // Sanity check to prevent rendering giant float limits
115+
}
116+
117+
// Inline percentile calculation to avoid deadlocking
118+
getPerc := func(p float64) float64 {
119+
targetCount := int64(math.Ceil((p / 100.0) * float64(h.Count)))
120+
var currentCount int64 = 0
121+
for i, count := range h.Buckets {
122+
currentCount += count
123+
if currentCount >= targetCount {
124+
return float64(i)
125+
}
126+
}
127+
return float64(MaxLatencyBin)
128+
}
129+
130+
return map[string]float64{
131+
"avg": h.Sum / float64(h.Count),
132+
"min": min,
133+
"max": h.Max,
134+
"p95": getPerc(95.0),
135+
"p99": getPerc(99.0),
136+
}
137+
}
138+
104139
type Collector struct {
105140
FindOps uint64
106141
InsertOps uint64
@@ -117,6 +152,9 @@ type Collector struct {
117152
DeleteHist *LatencyHistogram
118153
AggHist *LatencyHistogram
119154
TransHist *LatencyHistogram
155+
TotalHist *LatencyHistogram
156+
157+
CurrentIteration int
120158

121159
startTime time.Time
122160
prevFind uint64
@@ -137,12 +175,14 @@ func NewCollector() *Collector {
137175
DeleteHist: &LatencyHistogram{Min: math.MaxFloat64},
138176
AggHist: &LatencyHistogram{Min: math.MaxFloat64},
139177
TransHist: &LatencyHistogram{Min: math.MaxFloat64},
178+
TotalHist: &LatencyHistogram{Min: math.MaxFloat64},
140179
startTime: time.Now(),
141180
}
142181
}
143182

144183
func (c *Collector) Track(opType string, duration time.Duration) {
145184
ms := float64(duration.Nanoseconds()) / 1e6
185+
c.TotalHist.Record(ms)
146186
switch opType {
147187
case "find":
148188
atomic.AddUint64(&c.FindOps, 1)
@@ -170,6 +210,7 @@ func (c *Collector) Track(opType string, duration time.Duration) {
170210

171211
func (c *Collector) Add(opType string, count int64, duration time.Duration) {
172212
ms := float64(duration.Nanoseconds()) / 1e6
213+
c.TotalHist.RecordBatch(ms, count)
173214
switch opType {
174215
case "find":
175216
atomic.AddUint64(&c.FindOps, uint64(count))
@@ -236,7 +277,7 @@ func (c *Collector) Monitor(done <-chan struct{}, refreshRateSec int, concurrenc
236277
csvWriter.Write([]string{
237278
"Timestamp", "ElapsedSec",
238279
"Select_OpsSec", "Insert_OpsSec", "Upsert_OpsSec",
239-
"Update_OpsSec", "Delete_OpsSec", "Agg_OpsSec", "Trans_OpsSec",
280+
"Update_OpsSec", "Delete_OpsSec", "Agg_OpsSec", "Trans_OpsSec", "Iteration",
240281
})
241282
csvWriter.Flush()
242283
}
@@ -289,6 +330,11 @@ func (c *Collector) Monitor(done <-chan struct{}, refreshRateSec int, concurrenc
289330
rateAgg := float64(currentAgg-lastAgg) / float64(refreshRateSec)
290331
rateTrans := float64(currentTrans-lastTrans) / float64(refreshRateSec)
291332

333+
iter := c.CurrentIteration
334+
if iter < 1 {
335+
iter = 1
336+
}
337+
292338
csvWriter.Write([]string{
293339
time.Now().Format(time.RFC3339),
294340
fmt.Sprintf("%.0f", elapsed),
@@ -299,8 +345,9 @@ func (c *Collector) Monitor(done <-chan struct{}, refreshRateSec int, concurrenc
299345
fmt.Sprintf("%.2f", rateDelete),
300346
fmt.Sprintf("%.2f", rateAgg),
301347
fmt.Sprintf("%.2f", rateTrans),
348+
strconv.Itoa(iter),
302349
})
303-
csvWriter.Flush() // Flush immediately so data is saved even if crashed
350+
csvWriter.Flush()
304351

305352
// Update 'last' trackers for the next tick
306353
lastFind = currentFind

0 commit comments

Comments
 (0)