Skip to content

Commit 7b59026

Browse files
Clean up errors from rebase, add running request metric to datasource, add predictor to new 2 phase configuration parser
1 parent 729c53b commit 7b59026

File tree

4 files changed

+37
-47
lines changed

4 files changed

+37
-47
lines changed

cmd/epp/runner/runner.go

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,20 @@ func (r *Runner) Run(ctx context.Context) error {
234234
return err
235235
}
236236

237-
rawConfig, err := r.parseConfigurationPhaseOne(ctx)
237+
// ===================================================================
238+
// == Latency Predictor Integration
239+
// ===================================================================
240+
var predictor latencypredictor.PredictorInterface // Use the interface type
241+
if *enableLatencyPredictor {
242+
setupLog.Info("Latency predictor is enabled. Initializing...")
243+
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
244+
} else {
245+
setupLog.Info("Latency predictor is disabled.")
246+
predictor = nil // This will be a true nil interface
247+
}
248+
// ===================================================================
249+
250+
rawConfig, err := r.parseConfigurationPhaseOne(ctx, predictor)
238251
if err != nil {
239252
setupLog.Error(err, "Failed to parse configuration")
240253
return err
@@ -315,32 +328,6 @@ func (r *Runner) Run(ctx context.Context) error {
315328
runtime.SetBlockProfileRate(1)
316329
}
317330

318-
// ===================================================================
319-
// == Latency Predictor Integration
320-
// ===================================================================
321-
var predictor latencypredictor.PredictorInterface // Use the interface type
322-
if *enableLatencyPredictor {
323-
setupLog.Info("Latency predictor is enabled. Initializing...")
324-
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
325-
326-
// For the runnable, you'll need to type assert back to the concrete type
327-
concretePredictor := predictor.(*latencypredictor.Predictor)
328-
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
329-
setupLog.Error(err, "Failed to register latency predictor runnable")
330-
return err
331-
}
332-
} else {
333-
setupLog.Info("Latency predictor is disabled.")
334-
predictor = nil // This will be a true nil interface
335-
}
336-
// ===================================================================
337-
338-
err = r.parsePluginsConfiguration(ctx, predictor, datastore)
339-
if err != nil {
340-
setupLog.Error(err, "Failed to parse the configuration")
341-
return err
342-
}
343-
344331
// --- Initialize Core EPP Components ---
345332
if r.schedulerConfig == nil {
346333
err := errors.New("scheduler config must be set either by config api or through code")
@@ -417,6 +404,12 @@ func (r *Runner) Run(ctx context.Context) error {
417404
return err
418405
}
419406

407+
if *enableLatencyPredictor && predictor != nil {
408+
if err := registerLatencyPredictorServer(mgr, predictor); err != nil {
409+
return err
410+
}
411+
}
412+
420413
// --- Start Manager ---
421414
// This blocks until a signal is received.
422415
setupLog.Info("Controller manager starting")
@@ -473,7 +466,7 @@ func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.Pred
473466
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
474467
}
475468

476-
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, error) {
469+
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context, predictor latencypredictor.PredictorInterface) (*configapi.EndpointPickerConfig, error) {
477470
if *configText == "" && *configFile == "" {
478471
return nil, nil // configuring through code, not through file
479472
}
@@ -702,6 +695,18 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.
702695
return nil
703696
}
704697

698+
// registerLatencyPredictorServer adds the Latency Predictor server as a Runnable to the given manager.
699+
func registerLatencyPredictorServer(mgr manager.Manager, predictor latencypredictor.PredictorInterface) error {
700+
// For the runnable, you'll need to type assert back to the concrete type
701+
concretePredictor := predictor.(*latencypredictor.Predictor)
702+
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
703+
setupLog.Error(err, "Failed to register latency predictor runnable")
704+
return err
705+
}
706+
setupLog.Info("Latency predictor runnable added to manager.")
707+
return nil
708+
}
709+
705710
func validateFlags() error {
706711
if (*poolName != "" && *endpointSelector != "") || (*poolName == "" && *endpointSelector == "") {
707712
return errors.New("either pool-name or endpoint-selector must be set")

pkg/epp/datalayer/metrics/datasource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func TestDatasource(t *testing.T) {
3131
source := NewDataSource("https", "/metrics", true, nil)
32-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "")
32+
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
3333
assert.Nil(t, err, "failed to create extractor")
3434

3535
name := source.Name()

pkg/epp/datalayer/metrics/extractor_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
const (
3232
// use hardcoded values - importing causes cycle
3333
defaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting"
34+
defaultTotalRunningRequestsMetric = "vllm:num_requests_running"
3435
defaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc"
3536
defaultLoraInfoMetric = "vllm:lora_requests_info"
3637
defaultCacheInfoMetric = "vllm:cache_config_info"
@@ -39,11 +40,11 @@ const (
3940
func TestExtractorExtract(t *testing.T) {
4041
ctx := context.Background()
4142

42-
if _, err := NewExtractor("vllm: dummy", "", "", ""); err == nil {
43+
if _, err := NewExtractor("vllm: dummy", "", "", "", ""); err == nil {
4344
t.Error("expected to fail to create extractor with invalid specification")
4445
}
4546

46-
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric,
47+
extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, defaultTotalRunningRequestsMetric,
4748
defaultKvCacheUsagePercentageMetric, defaultLoraInfoMetric, defaultCacheInfoMetric)
4849
if err != nil {
4950
t.Fatalf("failed to create extractor: %v", err)

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,7 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
114114
}
115115

116116
targetPod := schedulingResult.ProfileResults[schedulingResult.PrimaryProfileName].TargetPods[0].GetPod()
117-
<<<<<<< HEAD
118117
if !t.checkPredictor(logger, targetPod) {
119-
=======
120-
if !t.CheckPredictor(logger, targetPod) {
121-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
122118
return
123119
}
124120

@@ -161,11 +157,7 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
161157

162158
func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, targetPod *backend.Pod) {
163159
logger := log.FromContext(ctx)
164-
<<<<<<< HEAD
165160
if !t.checkPredictor(logger, targetPod) {
166-
=======
167-
if !t.CheckPredictor(logger, targetPod) {
168-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
169161
return
170162
}
171163

@@ -177,23 +169,15 @@ func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *scheduli
177169
return
178170
}
179171

180-
<<<<<<< HEAD
181172
if err := processHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
182-
=======
183-
if err := ProcessHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
184-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
185173
logger.V(logutil.DEBUG).Error(err, "ProcessHeader in latencypredictor failed")
186174
}
187175

188176
}
189177

190178
func (t *SLOAwareRouter) ResponseStreaming(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, pod *backend.Pod) {
191179
logger := log.FromContext(ctx)
192-
<<<<<<< HEAD
193180
if !t.checkPredictor(logger, pod) || response.EndOfStream {
194-
=======
195-
if !t.CheckPredictor(logger, pod) || response.EndOfStream {
196-
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
197181
return
198182
}
199183

0 commit comments

Comments
 (0)