Skip to content

Commit d7f90d2

Browse files
kfswainnirrozenbaum
authored andcommitted
adding waitgroup to allow synchronous testing (#1534)
1 parent 859f32d commit d7f90d2

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/binary"
2222
"encoding/json"
2323
"fmt"
24+
"sync"
2425

2526
"github.com/cespare/xxhash/v2"
2627
k8stypes "k8s.io/apimachinery/pkg/types"
@@ -78,6 +79,7 @@ type Plugin struct {
7879
config Config
7980
pluginState *plugins.PluginState
8081
indexer Indexer
82+
wg sync.WaitGroup
8183
}
8284

8385
// podSet holds an pods servers that may have a specific prefix hash.
@@ -218,8 +220,11 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche
218220
// This function is just adding data, it does not need to block other operations.
219221
// TODO: look into making this entire function async, none of this needs to be done in-band
220222
// The PR that introduces this change is meant as a cherrypick, so it was minimally invasive.
223+
// WaitGroup is added to the Plugin struct to allow waiting in tests.
224+
p.wg.Add(1)
221225
go func() {
222226
p.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
227+
p.wg.Done()
223228
}()
224229

225230
total := len(state.PrefixHashes)

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func TestPrefixPlugin(t *testing.T) {
7171
},
7272
}
7373
plugin.PreRequest(context.Background(), req1, schedulingResult, 0)
74+
plugin.wg.Wait()
7475

7576
// Second request doesn't share any prefix with first one. It should be added to the cache but
7677
// the pod score should be 0.
@@ -98,6 +99,7 @@ func TestPrefixPlugin(t *testing.T) {
9899
},
99100
}
100101
plugin.PreRequest(context.Background(), req2, schedulingResult, 0)
102+
plugin.wg.Wait()
101103

102104
// Third request shares partial prefix with first one.
103105
req3 := &types.LLMRequest{
@@ -123,6 +125,7 @@ func TestPrefixPlugin(t *testing.T) {
123125
},
124126
}
125127
plugin.PreRequest(context.Background(), req3, schedulingResult, 0)
128+
plugin.wg.Wait()
126129

127130
// 4th request is same as req3 except the model is different, still no match.
128131
req4 := &types.LLMRequest{
@@ -148,6 +151,7 @@ func TestPrefixPlugin(t *testing.T) {
148151
},
149152
}
150153
plugin.PreRequest(context.Background(), req4, schedulingResult, 0)
154+
plugin.wg.Wait()
151155

152156
// 5th request shares partial prefix with 3rd one.
153157
req5 := &types.LLMRequest{
@@ -173,6 +177,7 @@ func TestPrefixPlugin(t *testing.T) {
173177
},
174178
}
175179
plugin.PreRequest(context.Background(), req5, schedulingResult, 0)
180+
plugin.wg.Wait()
176181
}
177182

178183
// TestPrefixPluginStress is a stress test for the prefix scoring plugin, using prompts of increasing length.
@@ -220,6 +225,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) {
220225
},
221226
}
222227
plugin.PreRequest(context.Background(), req, schedulingResult, 0)
228+
plugin.wg.Wait()
223229

224230
// Second cycle: validate internal state
225231
state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req.RequestId, plugins.StateKey(plugin.TypedName().String()))

0 commit comments

Comments
 (0)