Skip to content

Commit 31f102c

Browse files
Clean up errors from rebase, add running request metric to datasource, add predictor to new 2 phase configuration parser
1 parent f63bc01 commit 31f102c

File tree

4 files changed

+37
-47
lines changed

4 files changed

+37
-47
lines changed

cmd/epp/runner/runner.go

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,20 @@ func (r *Runner) Run(ctx context.Context) error {
227227
return err
228228
}
229229

230-
rawConfig, err := r.parseConfigurationPhaseOne(ctx)
230+
// ===================================================================
231+
// == Latency Predictor Integration
232+
// ===================================================================
233+
var predictor latencypredictor.PredictorInterface // Use the interface type
234+
if *enableLatencyPredictor {
235+
setupLog.Info("Latency predictor is enabled. Initializing...")
236+
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
237+
} else {
238+
setupLog.Info("Latency predictor is disabled.")
239+
predictor = nil // This will be a true nil interface
240+
}
241+
// ===================================================================
242+
243+
rawConfig, err := r.parseConfigurationPhaseOne(ctx, predictor)
231244
if err != nil {
232245
setupLog.Error(err, "Failed to parse configuration")
233246
return err
@@ -322,32 +335,6 @@ func (r *Runner) Run(ctx context.Context) error {
322335
runtime.SetBlockProfileRate(1)
323336
}
324337

325-
// ===================================================================
326-
// == Latency Predictor Integration
327-
// ===================================================================
328-
var predictor latencypredictor.PredictorInterface // Use the interface type
329-
if *enableLatencyPredictor {
330-
setupLog.Info("Latency predictor is enabled. Initializing...")
331-
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
332-
333-
// For the runnable, you'll need to type assert back to the concrete type
334-
concretePredictor := predictor.(*latencypredictor.Predictor)
335-
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
336-
setupLog.Error(err, "Failed to register latency predictor runnable")
337-
return err
338-
}
339-
} else {
340-
setupLog.Info("Latency predictor is disabled.")
341-
predictor = nil // This will be a true nil interface
342-
}
343-
// ===================================================================
344-
345-
err = r.parsePluginsConfiguration(ctx, predictor, datastore)
346-
if err != nil {
347-
setupLog.Error(err, "Failed to parse the configuration")
348-
return err
349-
}
350-
351338
// --- Initialize Core EPP Components ---
352339
if r.schedulerConfig == nil {
353340
err := errors.New("scheduler config must be set either by config api or through code")
@@ -420,6 +407,12 @@ func (r *Runner) Run(ctx context.Context) error {
420407
return err
421408
}
422409

410+
if *enableLatencyPredictor && predictor != nil {
411+
if err := registerLatencyPredictorServer(mgr, predictor); err != nil {
412+
return err
413+
}
414+
}
415+
423416
// --- Start Manager ---
424417
// This blocks until a signal is received.
425418
setupLog.Info("Controller manager starting")
@@ -454,7 +447,7 @@ func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.Pred
454447
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
455448
}
456449

457-
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, error) {
450+
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context, predictor latencypredictor.PredictorInterface) (*configapi.EndpointPickerConfig, error) {
458451
if *configText == "" && *configFile == "" {
459452
return nil, nil // configuring through code, not through file
460453
}
@@ -683,6 +676,18 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.
683676
return nil
684677
}
685678

679+
// registerLatencyPredictorServer adds the Latency Predictor server as a Runnable to the given manager.
680+
func registerLatencyPredictorServer(mgr manager.Manager, predictor latencypredictor.PredictorInterface) error {
681+
// For the runnable, you'll need to type assert back to the concrete type
682+
concretePredictor := predictor.(*latencypredictor.Predictor)
683+
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
684+
setupLog.Error(err, "Failed to register latency predictor runnable")
685+
return err
686+
}
687+
setupLog.Info("Latency predictor runnable added to manager.")
688+
return nil
689+
}
690+
686691
func validateFlags() error {
687692
if *poolName == "" {
688693
return fmt.Errorf("required %q flag not set", "poolName")

pkg/epp/datalayer/metrics/datasource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func TestDatasource(t *testing.T) {
3131
source := NewDataSource("https", "/metrics", true, nil)
32-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "")
32+
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
3333
assert.Nil(t, err, "failed to create extractor")
3434

3535
name := source.Name()

pkg/epp/datalayer/metrics/extractor_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
const (
3232
// use hardcoded values - importing causes cycle
3333
defaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting"
34+
defaultTotalRunningRequestsMetric = "vllm:num_requests_running"
3435
defaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc"
3536
defaultLoraInfoMetric = "vllm:lora_requests_info"
3637
defaultCacheInfoMetric = "vllm:cache_config_info"
@@ -39,11 +40,11 @@ const (
3940
func TestExtractorExtract(t *testing.T) {
4041
ctx := context.Background()
4142

42-
if _, err := NewExtractor("vllm: dummy", "", "", ""); err == nil {
43+
if _, err := NewExtractor("vllm: dummy", "", "", "", ""); err == nil {
4344
t.Error("expected to fail to create extractor with invalid specification")
4445
}
4546

46-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric,
47+
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, defaultTotalRunningRequestsMetric,
4748
defaultKvCacheUsagePercentageMetric, defaultLoraInfoMetric, defaultCacheInfoMetric)
4849
if err != nil {
4950
t.Fatalf("failed to create extractor: %v", err)

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,7 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
114114
}
115115

116116
targetPod := schedulingResult.ProfileResults[schedulingResult.PrimaryProfileName].TargetPods[0].GetPod()
117-
<<<<<<< HEAD
118117
if !t.checkPredictor(logger, targetPod) {
119-
=======
120-
if !t.CheckPredictor(logger, targetPod) {
121-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
122118
return
123119
}
124120

@@ -161,11 +157,7 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
161157

162158
func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, targetPod *backend.Pod) {
163159
logger := log.FromContext(ctx)
164-
<<<<<<< HEAD
165160
if !t.checkPredictor(logger, targetPod) {
166-
=======
167-
if !t.CheckPredictor(logger, targetPod) {
168-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
169161
return
170162
}
171163

@@ -177,23 +169,15 @@ func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *scheduli
177169
return
178170
}
179171

180-
<<<<<<< HEAD
181172
if err := processHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
182-
=======
183-
if err := ProcessHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
184-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
185173
logger.V(logutil.DEBUG).Error(err, "ProcessHeader in latencypredictor failed")
186174
}
187175

188176
}
189177

190178
func (t *SLOAwareRouter) ResponseStreaming(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, pod *backend.Pod) {
191179
logger := log.FromContext(ctx)
192-
<<<<<<< HEAD
193180
if !t.checkPredictor(logger, pod) || response.EndOfStream {
194-
=======
195-
if !t.CheckPredictor(logger, pod) || response.EndOfStream {
196-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
197181
return
198182
}
199183

0 commit comments

Comments
 (0)