Skip to content

Commit 1b5fb26

Browse files
authored
moved main code to runner package under epp/cmd (#956)
* moved main code to runner package under epp/cmd Signed-off-by: Nir Rozenbaum <[email protected]> * added the ability to configure postResponse plugins in requestcontrol layer Signed-off-by: Nir Rozenbaum <[email protected]> * updated dockerfile Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent dad0616 commit 1b5fb26

File tree

8 files changed

+420
-329
lines changed

8 files changed

+420
-329
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ COPY go.mod go.sum ./
1717
RUN go mod download
1818

1919
# Sources
20-
COPY cmd/epp ./cmd
20+
COPY cmd/epp ./cmd/epp
2121
COPY pkg/epp ./pkg/epp
2222
COPY conformance/testing-epp ./conformance/testing-epp
2323
COPY internal ./internal
2424
COPY api ./api
25-
WORKDIR /src/cmd
25+
WORKDIR /src/cmd/epp
2626
RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" -o /epp
2727

2828
## Multistage deploy

cmd/epp/main.go

Lines changed: 2 additions & 311 deletions
Original file line numberDiff line numberDiff line change
@@ -17,322 +17,13 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"flag"
21-
"fmt"
2220
"os"
2321

24-
"github.com/go-logr/logr"
25-
"github.com/prometheus/client_golang/prometheus"
26-
uberzap "go.uber.org/zap"
27-
"go.uber.org/zap/zapcore"
28-
"google.golang.org/grpc"
29-
healthPb "google.golang.org/grpc/health/grpc_health_v1"
30-
"k8s.io/apimachinery/pkg/types"
31-
ctrl "sigs.k8s.io/controller-runtime"
32-
"sigs.k8s.io/controller-runtime/pkg/log"
33-
"sigs.k8s.io/controller-runtime/pkg/log/zap"
34-
"sigs.k8s.io/controller-runtime/pkg/manager"
35-
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
36-
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
37-
38-
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
39-
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
40-
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
41-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
42-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
43-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
44-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
45-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
46-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
47-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
48-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
49-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
50-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
51-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
52-
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
53-
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
54-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
55-
)
56-
57-
var (
58-
grpcPort = flag.Int(
59-
"grpcPort",
60-
runserver.DefaultGrpcPort,
61-
"The gRPC port used for communicating with Envoy proxy")
62-
grpcHealthPort = flag.Int(
63-
"grpcHealthPort",
64-
9003,
65-
"The port used for gRPC liveness and readiness probes")
66-
metricsPort = flag.Int(
67-
"metricsPort", 9090, "The metrics port")
68-
destinationEndpointHintKey = flag.String(
69-
"destinationEndpointHintKey",
70-
runserver.DefaultDestinationEndpointHintKey,
71-
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
72-
destinationEndpointHintMetadataNamespace = flag.String(
73-
"DestinationEndpointHintMetadataNamespace",
74-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
75-
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
76-
"target endpoint. If not set, then an outer namespace struct should not be created.")
77-
poolName = flag.String(
78-
"poolName",
79-
runserver.DefaultPoolName,
80-
"Name of the InferencePool this Endpoint Picker is associated with.")
81-
poolNamespace = flag.String(
82-
"poolNamespace",
83-
runserver.DefaultPoolNamespace,
84-
"Namespace of the InferencePool this Endpoint Picker is associated with.")
85-
refreshMetricsInterval = flag.Duration(
86-
"refreshMetricsInterval",
87-
runserver.DefaultRefreshMetricsInterval,
88-
"interval to refresh metrics")
89-
refreshPrometheusMetricsInterval = flag.Duration(
90-
"refreshPrometheusMetricsInterval",
91-
runserver.DefaultRefreshPrometheusMetricsInterval,
92-
"interval to flush prometheus metrics")
93-
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
94-
secureServing = flag.Bool(
95-
"secureServing", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.")
96-
certPath = flag.String(
97-
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
98-
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
99-
"then a self-signed certificate is used.")
100-
// metric flags
101-
totalQueuedRequestsMetric = flag.String("totalQueuedRequestsMetric",
102-
"vllm:num_requests_waiting",
103-
"Prometheus metric for the number of queued requests.")
104-
kvCacheUsagePercentageMetric = flag.String("kvCacheUsagePercentageMetric",
105-
"vllm:gpu_cache_usage_perc",
106-
"Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
107-
// LoRA metrics
108-
loraInfoMetric = flag.String("loraInfoMetric",
109-
"vllm:lora_requests_info",
110-
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
111-
112-
setupLog = ctrl.Log.WithName("setup")
113-
114-
// Environment variables
115-
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
116-
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
117-
reqHeaderBasedSchedulerForTesting = envutil.GetEnvBool("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING", false, setupLog)
22+
"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
11823
)
11924

120-
func loadPrefixCacheConfig() prefix.Config {
121-
baseLogger := log.Log.WithName("env-config")
122-
123-
return prefix.Config{
124-
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultHashBlockSize, baseLogger),
125-
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
126-
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_LRU_CAPACITY", prefix.DefaultLRUIndexerCapacity, baseLogger),
127-
}
128-
}
129-
13025
func main() {
131-
if err := run(); err != nil {
26+
if err := runner.NewRunner().Run(); err != nil {
13227
os.Exit(1)
13328
}
13429
}
135-
136-
func run() error {
137-
opts := zap.Options{
138-
Development: true,
139-
}
140-
opts.BindFlags(flag.CommandLine)
141-
flag.Parse()
142-
initLogging(&opts)
143-
144-
// Validate flags
145-
if err := validateFlags(); err != nil {
146-
setupLog.Error(err, "Failed to validate flags")
147-
return err
148-
}
149-
150-
// Print all flag values
151-
flags := make(map[string]any)
152-
flag.VisitAll(func(f *flag.Flag) {
153-
flags[f.Name] = f.Value
154-
})
155-
setupLog.Info("Flags processed", "flags", flags)
156-
157-
// --- Load Configurations from Environment Variables ---
158-
sdConfig := saturationdetector.LoadConfigFromEnv()
159-
160-
// --- Get Kubernetes Config ---
161-
cfg, err := ctrl.GetConfig()
162-
if err != nil {
163-
setupLog.Error(err, "Failed to get Kubernetes rest config")
164-
return err
165-
}
166-
167-
// --- Setup Datastore ---
168-
mapping, err := backendmetrics.NewMetricMapping(
169-
*totalQueuedRequestsMetric,
170-
*kvCacheUsagePercentageMetric,
171-
*loraInfoMetric,
172-
)
173-
if err != nil {
174-
setupLog.Error(err, "Failed to create metric mapping from flags.")
175-
return err
176-
}
177-
verifyMetricMapping(*mapping, setupLog)
178-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
179-
ctx := ctrl.SetupSignalHandler()
180-
datastore := datastore.NewDatastore(ctx, pmf)
181-
182-
// --- Setup Metrics Server ---
183-
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
184-
metrics.Register(customCollectors...)
185-
metrics.RecordInferenceExtensionInfo()
186-
// Register metrics handler.
187-
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
188-
// More info:
189-
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/metrics/server
190-
// - https://book.kubebuilder.io/reference/metrics.html
191-
metricsServerOptions := metricsserver.Options{
192-
BindAddress: fmt.Sprintf(":%d", *metricsPort),
193-
FilterProvider: filters.WithAuthenticationAndAuthorization,
194-
}
195-
196-
poolNamespacedName := types.NamespacedName{
197-
Name: *poolName,
198-
Namespace: *poolNamespace,
199-
}
200-
mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions)
201-
if err != nil {
202-
setupLog.Error(err, "Failed to create controller manager")
203-
return err
204-
}
205-
206-
// --- Initialize Core EPP Components ---
207-
scheduler := scheduling.NewScheduler(datastore)
208-
if schedulerV2 {
209-
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
210-
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
211-
212-
schedulerProfile := framework.NewSchedulerProfile().
213-
WithScorers(framework.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
214-
framework.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
215-
WithPicker(picker.NewMaxScorePicker())
216-
217-
if prefixCacheScheduling {
218-
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
219-
if err := schedulerProfile.AddPlugins(framework.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
220-
setupLog.Error(err, "Failed to register scheduler plugins")
221-
return err
222-
}
223-
}
224-
225-
schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
226-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
227-
}
228-
229-
if reqHeaderBasedSchedulerForTesting {
230-
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
231-
}
232-
233-
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
234-
235-
director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
236-
237-
// --- Setup ExtProc Server Runner ---
238-
serverRunner := &runserver.ExtProcServerRunner{
239-
GrpcPort: *grpcPort,
240-
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
241-
DestinationEndpointHintKey: *destinationEndpointHintKey,
242-
PoolNamespacedName: poolNamespacedName,
243-
Datastore: datastore,
244-
SecureServing: *secureServing,
245-
CertPath: *certPath,
246-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
247-
Director: director,
248-
SaturationDetector: saturationDetector,
249-
}
250-
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
251-
setupLog.Error(err, "Failed to setup EPP controllers")
252-
return err
253-
}
254-
255-
// --- Add Runnables to Manager ---
256-
// Register health server.
257-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
258-
return err
259-
}
260-
261-
// Register ext-proc server.
262-
if err := registerExtProcServer(mgr, serverRunner, ctrl.Log.WithName("ext-proc")); err != nil {
263-
return err
264-
}
265-
266-
// --- Start Manager ---
267-
// This blocks until a signal is received.
268-
setupLog.Info("Controller manager starting")
269-
if err := mgr.Start(ctx); err != nil {
270-
setupLog.Error(err, "Error starting controller manager")
271-
return err
272-
}
273-
setupLog.Info("Controller manager terminated")
274-
return nil
275-
}
276-
277-
func initLogging(opts *zap.Options) {
278-
// Unless -zap-log-level is explicitly set, use -v
279-
useV := true
280-
flag.Visit(func(f *flag.Flag) {
281-
if f.Name == "zap-log-level" {
282-
useV = false
283-
}
284-
})
285-
if useV {
286-
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
287-
lvl := -1 * (*logVerbosity)
288-
opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl)))
289-
}
290-
291-
logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
292-
ctrl.SetLogger(logger)
293-
}
294-
295-
// registerExtProcServer adds the ExtProcServerRunner as a Runnable to the manager.
296-
func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerRunner, logger logr.Logger) error {
297-
if err := mgr.Add(runner.AsRunnable(logger)); err != nil {
298-
setupLog.Error(err, "Failed to register ext-proc gRPC server runnable")
299-
return err
300-
}
301-
setupLog.Info("ExtProc server runner added to manager.")
302-
return nil
303-
}
304-
305-
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
306-
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
307-
srv := grpc.NewServer()
308-
healthPb.RegisterHealthServer(srv, &healthServer{
309-
logger: logger,
310-
datastore: ds,
311-
})
312-
if err := mgr.Add(
313-
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
314-
setupLog.Error(err, "Failed to register health server")
315-
return err
316-
}
317-
return nil
318-
}
319-
320-
func validateFlags() error {
321-
if *poolName == "" {
322-
return fmt.Errorf("required %q flag not set", "poolName")
323-
}
324-
325-
return nil
326-
}
327-
328-
func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logger) {
329-
if mapping.TotalQueuedRequests == nil {
330-
logger.Info("Not scraping metric: TotalQueuedRequests")
331-
}
332-
if mapping.KVCacheUtilization == nil {
333-
logger.Info("Not scraping metric: KVCacheUtilization")
334-
}
335-
if mapping.LoraRequestInfo == nil {
336-
logger.Info("Not scraping metric: LoraRequestInfo")
337-
}
338-
}

cmd/epp/health.go renamed to cmd/epp/runner/health.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package main
17+
package runner
1818

1919
import (
2020
"context"

0 commit comments

Comments
 (0)