Skip to content

Commit cb02f63

Browse files
authored
Eliminate VU races, unify execution loop, remove cpu check loop (#1505)
1 parent 70bd176 commit cb02f63

File tree

5 files changed

+39
-104
lines changed

5 files changed

+39
-104
lines changed

.github/workflows/wasp-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
defaults:
99
run:
1010
working-directory: wasp
11-
runs-on: ubuntu-latest
11+
runs-on: ubuntu22.04-16cores-64GB
1212
steps:
1313
- uses: actions/checkout@v3
1414
- uses: dorny/paths-filter@v3

wasp/stat.go

Lines changed: 0 additions & 48 deletions
This file was deleted.

wasp/wasp.go

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ type Stats struct {
217217
CurrentSegment atomic.Int64 `json:"current_schedule_segment"`
218218
SamplesRecorded atomic.Int64 `json:"samples_recorded"`
219219
SamplesSkipped atomic.Int64 `json:"samples_skipped"`
220-
RunStarted atomic.Bool `json:"runStarted"`
221220
RunPaused atomic.Bool `json:"runPaused"`
222221
RunStopped atomic.Bool `json:"runStopped"`
223222
RunFailed atomic.Bool `json:"runFailed"`
@@ -246,6 +245,7 @@ type Generator struct {
246245
Log zerolog.Logger
247246
labels model.LabelSet
248247
rl atomic.Pointer[ratelimit.Limiter]
248+
rpsLoopOnce *sync.Once
249249
scheduleSegments []*Segment
250250
currentSegmentMu *sync.Mutex
251251
currentSegment *Segment
@@ -320,6 +320,7 @@ func NewGenerator(cfg *Config) (*Generator, error) {
320320
dataCancel: dataCancel,
321321
gun: cfg.Gun,
322322
vu: cfg.VU,
323+
rpsLoopOnce: &sync.Once{},
323324
Responses: NewResponses(rch),
324325
ResponsesChan: rch,
325326
labels: ls,
@@ -345,43 +346,26 @@ func NewGenerator(cfg *Config) (*Generator, error) {
345346
return nil, err
346347
}
347348
}
348-
CPUCheckLoop()
349349
return g, nil
350350
}
351351

352-
// runExecuteLoop initiates the generator's execution loop based on the configured load type.
353-
// It manages request pacing for RPS or handles virtual users for load testing scenarios.
354-
func (g *Generator) runExecuteLoop() {
355-
g.currentSegment = g.scheduleSegments[0]
356-
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
357-
switch g.Cfg.LoadType {
358-
case RPS:
359-
g.ResponsesWaitGroup.Add(1)
360-
// we run pacedCall controlled by stats.CurrentRPS
361-
go func() {
362-
for {
363-
select {
364-
case <-g.ResponsesCtx.Done():
365-
g.ResponsesWaitGroup.Done()
366-
g.Log.Info().Msg("RPS generator has stopped")
367-
return
368-
default:
369-
g.pacedCall()
370-
}
352+
// runGunLoop runs the generator's Gun loop
353+
// It manages request pacing for RPS after the first segment is loaded.
354+
func (g *Generator) runGunLoop() {
355+
g.ResponsesWaitGroup.Add(1)
356+
// we run pacedCall controlled by stats.CurrentRPS
357+
go func() {
358+
for {
359+
select {
360+
case <-g.ResponsesCtx.Done():
361+
g.ResponsesWaitGroup.Done()
362+
g.Log.Info().Msg("RPS generator has stopped")
363+
return
364+
default:
365+
g.pacedCall()
371366
}
372-
}()
373-
case VU:
374-
g.currentSegmentMu.Lock()
375-
g.stats.CurrentVUs.Store(g.currentSegment.From)
376-
g.currentSegmentMu.Unlock()
377-
// we start all vus once
378-
vus := g.stats.CurrentVUs.Load()
379-
for i := 0; i < int(vus); i++ {
380-
inst := g.vu.Clone(g)
381-
g.runVU(inst)
382-
g.vus = append(g.vus, inst)
383367
}
384-
}
368+
}()
385369
}
386370

387371
// runSetupWithTimeout executes the VirtualUser's setup within the configured timeout.
@@ -478,7 +462,6 @@ func (g *Generator) runVU(vu VirtualUser) {
478462
// It returns true when all segments have been handled, signaling the scheduler to terminate.
479463
func (g *Generator) processSegment() bool {
480464
defer func() {
481-
g.stats.RunStarted.Store(true)
482465
g.Log.Info().
483466
Int64("Segment", g.stats.CurrentSegment.Load()).
484467
Int64("VUs", g.stats.CurrentVUs.Load()).
@@ -490,14 +473,18 @@ func (g *Generator) processSegment() bool {
490473
}
491474
g.currentSegmentMu.Lock()
492475
g.currentSegment = g.scheduleSegments[g.stats.CurrentSegment.Load()]
493-
g.currentSegment.StartTime = time.Now()
494476
g.currentSegmentMu.Unlock()
495477
g.stats.CurrentSegment.Add(1)
478+
g.currentSegment.StartTime = time.Now()
496479
switch g.Cfg.LoadType {
497480
case RPS:
498481
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
499482
g.rl.Store(&newRateLimit)
500483
g.stats.CurrentRPS.Store(g.currentSegment.From)
484+
// start Gun loop once, in next segments we control it using g.rl ratelimiter
485+
g.rpsLoopOnce.Do(func() {
486+
g.runGunLoop()
487+
})
501488
case VU:
502489
oldVUs := g.stats.CurrentVUs.Load()
503490
newVUs := g.currentSegment.From
@@ -527,6 +514,8 @@ func (g *Generator) processSegment() bool {
527514
// runScheduleLoop initiates an asynchronous loop that processes scheduling segments and monitors for completion signals.
528515
// It enables the generator to handle load distribution seamlessly in the background.
529516
func (g *Generator) runScheduleLoop() {
517+
g.currentSegment = g.scheduleSegments[0]
518+
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
530519
go func() {
531520
for {
532521
select {
@@ -621,11 +610,7 @@ func (g *Generator) collectVUResults() {
621610
// handling timeouts and storing the response.
622611
// It ensures requests adhere to the generator's configuration and execution state.
623612
func (g *Generator) pacedCall() {
624-
if !g.Stats().RunStarted.Load() {
625-
return
626-
}
627-
l := *g.rl.Load()
628-
l.Take()
613+
(*g.rl.Load()).Take()
629614
if g.stats.RunPaused.Load() {
630615
return
631616
}
@@ -668,7 +653,6 @@ func (g *Generator) Run(wait bool) (interface{}, bool) {
668653
g.sendStatsToLoki()
669654
}
670655
g.runScheduleLoop()
671-
g.runExecuteLoop()
672656
g.collectVUResults()
673657
if wait {
674658
return g.Wait()
@@ -696,7 +680,6 @@ func (g *Generator) Stop() (interface{}, bool) {
696680
if g.stats.RunStopped.Load() {
697681
return nil, true
698682
}
699-
g.stats.RunStarted.Store(false)
700683
g.stats.RunStopped.Store(true)
701684
g.stats.RunFailed.Store(true)
702685
g.Log.Warn().Msg("Graceful stop")

wasp/wasp_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func BenchmarkPacedCall(b *testing.B) {
2121
Gun: NewMockGun(&MockGunConfig{}),
2222
})
2323
require.NoError(b, err)
24-
gen.runExecuteLoop()
24+
gen.runGunLoop()
2525
b.ResetTimer()
2626
for i := 0; i < b.N; i++ {
2727
gen.pacedCall()

wasp/wasp_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -449,13 +449,13 @@ func TestSmokeStaticRPSSchedulePrecision(t *testing.T) {
449449
require.NoError(t, err)
450450
_, failed := gen.Run(true)
451451
require.Equal(t, false, failed)
452-
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
452+
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
453453
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
454454
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
455455
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))
456456

457457
okData, _, failResponses := convertResponsesData(gen)
458-
require.GreaterOrEqual(t, len(okData), 980)
458+
require.GreaterOrEqual(t, len(okData), 950)
459459
require.LessOrEqual(t, len(okData), 1010)
460460
require.Empty(t, failResponses)
461461
require.Empty(t, gen.Errors())
@@ -475,14 +475,14 @@ func TestSmokeCustomUnitPrecision(t *testing.T) {
475475
_, failed := gen.Run(true)
476476
require.Equal(t, false, failed)
477477
stats := gen.Stats()
478-
require.GreaterOrEqual(t, stats.Success.Load(), int64(4970))
478+
require.GreaterOrEqual(t, stats.Success.Load(), int64(4950))
479479
require.LessOrEqual(t, stats.Success.Load(), int64(5010))
480480
require.Equal(t, stats.Failed.Load(), int64(0))
481481
require.Equal(t, stats.CallTimeout.Load(), int64(0))
482482
require.Equal(t, stats.CurrentTimeUnit, gen.Cfg.RateLimitUnitDuration.Nanoseconds())
483483

484484
okData, _, failResponses := convertResponsesData(gen)
485-
require.GreaterOrEqual(t, len(okData), 4970)
485+
require.GreaterOrEqual(t, len(okData), 4950)
486486
require.LessOrEqual(t, len(okData), 5010)
487487
require.Empty(t, failResponses)
488488
require.Empty(t, gen.Errors())
@@ -501,13 +501,13 @@ func TestSmokeStaticRPSScheduleIsNotBlocking(t *testing.T) {
501501
require.NoError(t, err)
502502
_, failed := gen.Run(true)
503503
require.Equal(t, false, failed)
504-
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
504+
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
505505
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
506506
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
507507
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))
508508

509509
okData, _, failResponses := convertResponsesData(gen)
510-
require.GreaterOrEqual(t, len(okData), 980)
510+
require.GreaterOrEqual(t, len(okData), 950)
511511
require.LessOrEqual(t, len(okData), 1010)
512512
require.Empty(t, failResponses)
513513
require.Empty(t, gen.Errors())
@@ -676,10 +676,10 @@ func TestSmokeVUsIncrease(t *testing.T) {
676676

677677
okData, okResponses, failResponses := convertResponsesData(gen)
678678
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
679-
require.GreaterOrEqual(t, len(okResponses), 147)
680-
require.GreaterOrEqual(t, len(okData), 147)
679+
require.GreaterOrEqual(t, len(okResponses), 140)
680+
require.GreaterOrEqual(t, len(okData), 140)
681681
require.Equal(t, okResponses[0].Data.(string), "successCallData")
682-
require.Equal(t, okResponses[147].Data.(string), "successCallData")
682+
require.Equal(t, okResponses[140].Data.(string), "successCallData")
683683
require.Empty(t, failResponses)
684684
require.Empty(t, gen.Errors())
685685
}
@@ -706,10 +706,10 @@ func TestSmokeVUsDecrease(t *testing.T) {
706706

707707
okData, okResponses, failResponses := convertResponsesData(gen)
708708
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
709-
require.GreaterOrEqual(t, len(okResponses), 147)
710-
require.GreaterOrEqual(t, len(okData), 147)
709+
require.GreaterOrEqual(t, len(okResponses), 140)
710+
require.GreaterOrEqual(t, len(okData), 140)
711711
require.Equal(t, okResponses[0].Data.(string), "successCallData")
712-
require.Equal(t, okResponses[147].Data.(string), "successCallData")
712+
require.Equal(t, okResponses[140].Data.(string), "successCallData")
713713
require.Empty(t, failResponses)
714714
require.Empty(t, gen.Errors())
715715
}

0 commit comments

Comments
 (0)