diff --git a/go.mod b/go.mod index cf02e616..b2062d0e 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/prometheus/client_golang v1.22.0 github.com/redis/go-redis/v9 v9.7.3 github.com/stretchr/testify v1.10.0 + github.com/valyala/fasthttp v1.62.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.72.0 k8s.io/apimachinery v0.33.1 @@ -26,6 +27,7 @@ require ( require ( cel.dev/expr v0.20.0 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -56,6 +58,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -69,6 +72,7 @@ require ( github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect diff --git a/go.sum b/go.sum index 1a775aba..07430cc1 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cel.dev/expr v0.20.0 h1:OunBvVCfvpWlt4dN7zg3FM6TDkzOePe1+foGJ9AXeeI= cel.dev/expr v0.20.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= @@ -148,8 +150,14 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.62.0 h1:8dKRBX/y2rCzyc6903Zu1+3qN0H/d2MsxPPmVNamiH0= +github.com/valyala/fasthttp v1.62.0/go.mod h1:FCINgr4GKdKqV8Q0xv8b+UxPV+H/O5nNFo3D+r54Htg= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= diff --git a/pkg/config/config.go b/pkg/config/config.go index 0d8798b1..55437a5c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,8 @@ package config import ( + "encoding/json" + "github.com/go-logr/logr" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" ) @@ -52,15 +54,49 @@ const ( prefixScorerBlockSizeEnvKey = "PREFIX_SCORER_BLOCK_SIZE" prefixScorerBlockSizeDefault = 256 + + externalPrefix = "EXTERNAL_" + httpPrefix = "HTTP_" + + preSchedulers = "PRE_SCHEDULERS" + filters = "FILTERS" + scorers = "SCORERS" + postSchedulers = "POST_SCHEDULERS" + + // EXTERNAL_HTTP_PRE_SCHEDULERS + // EXTERNAL_PREFILL_HTTP_PRE_SCHEDULERS + // EXTERNAL_HTTP_FILTERS + // EXTERNAL_PREFILL_HTTP_FILTERS + // EXTERNAL_HTTP_SCORERS + // EXTERNAL_PREFILL_HTTP_SCORERS + // EXTERNAL_HTTP_POST_SCHEDULERS + // EXTERNAL_PREFILL_HTTP_POST_SCHEDULERS ) +// ExternalPluginInfo configuration of an external plugin +type ExternalPluginInfo struct { + Name string `json:"name"` + URL string `json:"url"` + Weight int `json:"weight"` +} + +// ExternalPlugins contains all types of external plugins configuration +type ExternalPlugins struct { + PreSchedulers []ExternalPluginInfo + Filters []ExternalPluginInfo + Scorers []ExternalPluginInfo + PostSchedulers []ExternalPluginInfo +} + // Config contains scheduler configuration, currently configuration is loaded from environment variables type Config struct { - DecodeSchedulerPlugins map[string]int - PrefillSchedulerPlugins map[string]int - PDEnabled bool - PDThreshold int - PrefixBlockSize int + DecodeSchedulerPlugins map[string]int + PrefillSchedulerPlugins map[string]int + DecodeSchedulerExternalPlugins ExternalPlugins + PrefillSchedulerExternalPlugins ExternalPlugins + PDEnabled bool + PDThreshold int + PrefixBlockSize int } // LoadConfig loads configuration from environment variables and returns a new instance of Config @@ -72,12 +108,29 @@ func LoadConfig(logger logr.Logger) *Config { GIEKVCacheUtilizationScorerName, GIEQueueScorerName, GIEPrefixScorerName, } + // load external plugins for decode and prefill schedulers + prefillSchedulerExternalPlugins := ExternalPlugins{ + PreSchedulers: loadExternalPluginsInfo(logger, httpPrefix, "", preSchedulers), + Filters: loadExternalPluginsInfo(logger, httpPrefix, "", filters), + Scorers: loadExternalPluginsInfo(logger, httpPrefix, "", scorers), + PostSchedulers: loadExternalPluginsInfo(logger, httpPrefix, "", postSchedulers), + } + + decodeSchedulerExternalPlugins := ExternalPlugins{ + PreSchedulers: loadExternalPluginsInfo(logger, httpPrefix, prefillPrefix, preSchedulers), + Filters: loadExternalPluginsInfo(logger, httpPrefix, prefillPrefix, filters), + Scorers: loadExternalPluginsInfo(logger, httpPrefix, prefillPrefix, scorers), + PostSchedulers: loadExternalPluginsInfo(logger, httpPrefix, prefillPrefix, postSchedulers), + } + return &Config{ - DecodeSchedulerPlugins: loadPluginInfo(logger, false, pluginNames), - PrefillSchedulerPlugins: loadPluginInfo(logger, true, pluginNames), - PDEnabled: env.GetEnvString(pdEnabledEnvKey, "false", logger) == "true", - PDThreshold: env.GetEnvInt(pdPromptLenThresholdEnvKey, pdPromptLenThresholdDefault, logger), - PrefixBlockSize: env.GetEnvInt(prefixScorerBlockSizeEnvKey, prefixScorerBlockSizeDefault, logger), + DecodeSchedulerPlugins: loadPluginInfo(logger, false, pluginNames), + PrefillSchedulerPlugins: loadPluginInfo(logger, true, pluginNames), + DecodeSchedulerExternalPlugins: prefillSchedulerExternalPlugins, + PrefillSchedulerExternalPlugins: decodeSchedulerExternalPlugins, + PDEnabled: env.GetEnvString(pdEnabledEnvKey, "false", logger) == "true", + PDThreshold: env.GetEnvInt(pdPromptLenThresholdEnvKey, pdPromptLenThresholdDefault, logger), + PrefixBlockSize: env.GetEnvInt(prefixScorerBlockSizeEnvKey, prefixScorerBlockSizeDefault, logger), } } @@ -107,3 +160,26 @@ func loadPluginInfo(logger logr.Logger, prefill bool, pluginNames []string) map[ return result } + +// loadExternalPluginsInfo loads configuration of external plugins for the given scheduler type and the given plugins type +// +//nolint:unparam // future: protocol will support more values (grpc, wasm, etc.) +func loadExternalPluginsInfo(logger logr.Logger, protocol string, schedulerType string, pluginType string) []ExternalPluginInfo { + var plugins []ExternalPluginInfo + + envVarName := externalPrefix + protocol + schedulerType + pluginType + envVarRawValue := env.GetEnvString(envVarName, "", logger) + + if envVarRawValue == "" { + logger.Info("Environment variable is not defined", "var", envVarName) + return plugins + } + + if err := json.Unmarshal([]byte(envVarRawValue), &plugins); err != nil { + logger.Info("Error in environment variable unmarshaling", "error", err, "variable", envVarName, "value", envVarRawValue) + return plugins + } + + logger.Info("External plugin loaded", "type", pluginType, "plugins", plugins) + return plugins +} diff --git a/pkg/scheduling/pd/scheduler.go b/pkg/scheduling/pd/scheduler.go index b303d663..572aa697 100644 --- a/pkg/scheduling/pd/scheduler.go +++ b/pkg/scheduling/pd/scheduler.go @@ -22,6 +22,7 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" "github.com/llm-d/llm-d-inference-scheduler/pkg/config" + externalhttp "github.com/llm-d/llm-d-inference-scheduler/pkg/scheduling/plugins/external/http" "github.com/llm-d/llm-d-inference-scheduler/pkg/scheduling/plugins/filter" "github.com/llm-d/llm-d-inference-scheduler/pkg/scheduling/plugins/scorer" ) @@ -70,13 +71,13 @@ func NewScheduler(ctx context.Context, schedulerConfig *config.Config, ds Datast scheduler.prefill = scheduling.NewSchedulerWithConfig( ds, - scheduler.generateSchedulerConfig(ctx, schedulerConfig.PrefillSchedulerPlugins, + scheduler.generateSchedulerConfig(ctx, schedulerConfig.PrefillSchedulerPlugins, schedulerConfig.PrefillSchedulerExternalPlugins, &filter.PrefillFilter{}), ) scheduler.decode = scheduling.NewSchedulerWithConfig( ds, - scheduler.generateSchedulerConfig(ctx, schedulerConfig.DecodeSchedulerPlugins, + scheduler.generateSchedulerConfig(ctx, schedulerConfig.DecodeSchedulerPlugins, schedulerConfig.DecodeSchedulerExternalPlugins, &filter.DecodeFilter{}), ) @@ -212,7 +213,57 @@ func (s *Scheduler) pluginsFromConfig(ctx context.Context, pluginsConfig map[str return plugins } -func (s *Scheduler) generateSchedulerConfig(ctx context.Context, pluginsConfig map[string]int, extraFilters ...plugins.Filter) *scheduling.SchedulerConfig { +func externalFiltersFromConfig(ctx context.Context, info []config.ExternalPluginInfo) []plugins.Filter { + logger := log.FromContext(ctx) + filters := make([]plugins.Filter, 0) + + for _, extPluginInfo := range info { + filters = append(filters, externalhttp.NewFilter(ctx, extPluginInfo.Name, extPluginInfo.URL)) + } + + logger.Info(fmt.Sprintf("Created %d external filters", len(filters))) + return filters +} + +func externalPreSchedulesFromConfig(ctx context.Context, info []config.ExternalPluginInfo) []plugins.PreSchedule { + logger := log.FromContext(ctx) + preSchedules := []plugins.PreSchedule{} + + for _, extPluginInfo := range info { + preSchedules = append(preSchedules, externalhttp.NewPreSchedule(ctx, extPluginInfo.Name, extPluginInfo.URL)) + } + + logger.Info(fmt.Sprintf("Created %d external pre-schedules", len(preSchedules))) + return preSchedules +} + +func externalPostSchedulesFromConfig(ctx context.Context, info []config.ExternalPluginInfo) []plugins.PostSchedule { + logger := log.FromContext(ctx) + postSchedules := []plugins.PostSchedule{} + + for _, extPluginInfo := range info { + postSchedules = append(postSchedules, externalhttp.NewPostSchedule(ctx, extPluginInfo.Name, extPluginInfo.URL)) + } + + logger.Info(fmt.Sprintf("Created %d external post-schedules", len(postSchedules))) + return postSchedules +} + +func externalScorersFromConfig(ctx context.Context, info []config.ExternalPluginInfo) []*giescorer.WeightedScorer { + logger := log.FromContext(ctx) + scorers := []*giescorer.WeightedScorer{} + + for _, extPluginInfo := range info { + scorers = append(scorers, giescorer.NewWeightedScorer(externalhttp.NewScorer(ctx, extPluginInfo.Name, extPluginInfo.URL), extPluginInfo.Weight)) + } + + logger.Info(fmt.Sprintf("Created %d external scorers", len(scorers))) + return scorers +} + +func (s *Scheduler) generateSchedulerConfig(ctx context.Context, pluginsConfig map[string]int, + externalPlugins config.ExternalPlugins, extraFilters ...plugins.Filter) *scheduling.SchedulerConfig { + thePlugins := s.pluginsFromConfig(ctx, pluginsConfig) preSchedulePlugins := []plugins.PreSchedule{} filters := []plugins.Filter{} @@ -240,6 +291,13 @@ func (s *Scheduler) generateSchedulerConfig(ctx context.Context, pluginsConfig m } } + // add external plugins + preSchedulePlugins = append(preSchedulePlugins, externalPreSchedulesFromConfig(ctx, externalPlugins.PreSchedulers)...) + filters = append(filters, externalFiltersFromConfig(ctx, externalPlugins.Filters)...) + scorers = append(scorers, externalScorersFromConfig(ctx, externalPlugins.Scorers)...) + postSchedulePlugins = append(postSchedulePlugins, externalPostSchedulesFromConfig(ctx, externalPlugins.PostSchedulers)...) + // postResponsePlugins = append(postResponsePlugins, postResponse) + return scheduling.NewSchedulerConfig(). WithPreSchedulePlugins(preSchedulePlugins...). WithFilters(filters...). diff --git a/pkg/scheduling/plugins/external/http/filter.go b/pkg/scheduling/plugins/external/http/filter.go new file mode 100644 index 00000000..7cd7e022 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/filter.go @@ -0,0 +1,92 @@ +// Package http contains all types of external http plugins +package http + +import ( + "context" + "encoding/json" + "fmt" + "path" + + "github.com/valyala/fasthttp" + + "sigs.k8s.io/controller-runtime/pkg/log" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// Filter implementation of the external http filter +type Filter struct { + Plugin +} + +var _ plugins.Filter = &Filter{} // validate interface conformance + +// NewFilter creates a new instance of external http filter based on the given parameters +func NewFilter(_ context.Context, name string, url string) plugins.Filter { + return &Filter{Plugin{name: name, url: url}} +} + +// Filter filters the given list of pods +func (f *Filter) Filter(schedContext *types.SchedulingContext, pods []types.Pod) []types.Pod { + logger := log.FromContext(schedContext).WithName(f.Name()) + + // Create filter http request payload based on the given data + filterPayload := newFilterPayload(schedContext, pods) + payload, err := json.Marshal(filterPayload) + + if err != nil { + logger.Error(err, "Failed to marshal scheduling context, filter will be skipped") + return pods + } + + // Create a new fasthttp request and response + req := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(req) + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(resp) + + req.SetRequestURI(path.Join(f.url, "filter")) + req.Header.SetMethod(fasthttp.MethodPost) + req.Header.SetContentType("application/json") + req.SetBody(payload) + + // Execute the request + client := &fasthttp.Client{} + if err := client.Do(req, resp); err != nil { + logger.Error(err, "request failed") + return pods + } + + // Optionally check status code + if resp.StatusCode() != fasthttp.StatusOK { + logger.Error(nil, fmt.Sprintf("bad response status: %d, body: %s", resp.StatusCode(), resp.Body())) + return pods + } + + var filteredPodNames []namespacedName + + // filter plugin response is an array of pod full names + if err := json.Unmarshal(resp.Body(), &filteredPodNames); err != nil { + logger.Error(err, "external filter's response body unmarshal failed", "name", f.Name(), "resp body", resp.Body()) + return pods + } + + // filter list of given pods based on the returned list of pods + podsNamesSet := map[string]bool{} + + for _, nn := range filteredPodNames { + podsNamesSet[namespacedNameToString(nn.Name, nn.Namespace)] = true + } + + filteredPods := make([]types.Pod, 0) + + for _, p := range pods { + nn := p.GetPod().NamespacedName + if _, exists := podsNamesSet[namespacedNameToString(nn.Name, nn.Namespace)]; exists { + filteredPods = append(filteredPods, p) + } + } + + return filteredPods +} diff --git a/pkg/scheduling/plugins/external/http/plugin.go b/pkg/scheduling/plugins/external/http/plugin.go new file mode 100644 index 00000000..2ba54910 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/plugin.go @@ -0,0 +1,12 @@ +package http + +// Plugin common information for external http plugin +type Plugin struct { + name string + url string +} + +// Name returns the plugin's name +func (s *Plugin) Name() string { + return s.name +} diff --git a/pkg/scheduling/plugins/external/http/postschedule.go b/pkg/scheduling/plugins/external/http/postschedule.go new file mode 100644 index 00000000..20152dd8 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/postschedule.go @@ -0,0 +1,31 @@ +package http + +import ( + "context" + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// PostSchedule implementation of the external http post-scheduler +type PostSchedule struct { + Plugin +} + +var _ plugins.PostSchedule = &PostSchedule{} // validate interface conformance + +// NewPostSchedule creates a new instance of external http post schedule based on the given parameters +func NewPostSchedule(_ context.Context, name string, url string) plugins.PostSchedule { + return &PostSchedule{Plugin{name: name, url: url}} +} + +// PostSchedule is called after scheduler picked the target pod +func (p *PostSchedule) PostSchedule(ctx *types.SchedulingContext, res *types.Result) { + logger := log.FromContext(ctx).WithName(p.Name()) + logger.Info(">>>> external post schedule called - TODO implement it!", "name", p.Name()) + + // TODO - send http request to the url + fmt.Println(res) +} diff --git a/pkg/scheduling/plugins/external/http/preschedule.go b/pkg/scheduling/plugins/external/http/preschedule.go new file mode 100644 index 00000000..62912de5 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/preschedule.go @@ -0,0 +1,29 @@ +package http + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// PreSchedule implementation of the external http pre scheduler +type PreSchedule struct { + Plugin +} + +var _ plugins.PreSchedule = &PreSchedule{} // validate interface conformance + +// NewPreSchedule creates a new instance of external http pre schedule based on the given parameters +func NewPreSchedule(_ context.Context, name string, url string) plugins.PreSchedule { + return &PreSchedule{Plugin{name: name, url: url}} +} + +// PreSchedule is called when the scheduler receives a new request. It can be used for various initialization work +func (p *PreSchedule) PreSchedule(ctx *types.SchedulingContext) { + logger := log.FromContext(ctx).WithName(p.Name()) + logger.Info(">>>> external pre schedule called - TODO implement it!", "name", p.Name()) + + // TODO - send http request to the url +} diff --git a/pkg/scheduling/plugins/external/http/sample/Dockerfile b/pkg/scheduling/plugins/external/http/sample/Dockerfile new file mode 100644 index 00000000..e7aaefc6 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/sample/Dockerfile @@ -0,0 +1,39 @@ +# Use official Python runtime as base image +FROM python:3.11-slim + +# Set environment variables +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PORT=1080 +ENV HOST=0.0.0.0 + +# Create app directory and non-root user for security +RUN groupadd -r appuser && useradd -r -g appuser appuser +WORKDIR /app + +# Copy requirements first for better Docker layer caching +COPY requirements.txt* ./ + +# Install Python dependencies (if requirements.txt exists) +RUN if [ -f requirements.txt ]; then pip install --no-cache-dir -r requirements.txt; fi + +# Copy application code +COPY server.py ./ +# COPY *.py ./ + +# Create logs directory and set permissions +RUN mkdir -p /app/logs && \ + chown -R appuser:appuser /app + +# Switch to non-root user +USER appuser + +# Expose the port +EXPOSE 1080 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:1080')" || exit 1 + +# Default command +CMD ["python", "server.py", "1080", "0.0.0.0"] \ No newline at end of file diff --git a/pkg/scheduling/plugins/external/http/sample/Makefile b/pkg/scheduling/plugins/external/http/sample/Makefile new file mode 100644 index 00000000..73d38270 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/sample/Makefile @@ -0,0 +1,29 @@ +# Makefile for the sample external plugins project + +CONTAINER_RUNTIME ?= docker + +SHELL := /usr/bin/env bash + +# Defaults +PROJECT_NAME ?= sample-external-plugins +REGISTRY ?= ghcr.io/llm-d +IMAGE_TAG_BASE ?= $(REGISTRY)/$(PROJECT_NAME) +SAMPLE_TAG ?= dev +IMG = $(IMAGE_TAG_BASE):$(SAMPLE_TAG) +CONTAINER_TOOL := $(shell { command -v docker >/dev/null 2>&1 && echo docker; } || { command -v podman >/dev/null 2>&1 && echo podman; } || echo "") + +# go source files +SRC = $(shell find . -type f -name '*.go') + +##@ Container Build/Push + +.PHONY: image-build +image-build: check-container-tool ## Build Docker image ## Build Docker image using $(CONTAINER_TOOL) + @printf "\033[33;1m==== Building Docker image $(IMG) ====\033[0m\n" + $(CONTAINER_TOOL) build --build-arg TARGETOS=$(TARGETOS) --build-arg TARGETARCH=$(TARGETARCH) -t $(IMG) . + +.PHONY: check-container-tool +check-container-tool: + @command -v $(CONTAINER_TOOL) >/dev/null 2>&1 || { \ + echo "❌ $(CONTAINER_TOOL) is not installed."; \ + echo "🔧 Try: sudo apt install $(CONTAINER_TOOL) OR brew install $(CONTAINER_TOOL)"; exit 1; } diff --git a/pkg/scheduling/plugins/external/http/sample/README.md b/pkg/scheduling/plugins/external/http/sample/README.md new file mode 100644 index 00000000..80621901 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/sample/README.md @@ -0,0 +1,182 @@ +# Sample external plugins HTTP server + +A Python HTTP web server that provides plugins endpoints for llm-d-inference-scheduler. + +## Overview + +This server implements two main endpoints: +- **Filter Endpoint**: Filters available pods based on scheduling context and constraints + +## Features + +- HTTP server listening on configurable port (default: 1080) +- JSON request/response handling +- Comprehensive request validation +- Logging with structured output +- Error handling and HTTP status codes + +## File Structure + +``` +. +├── server.py # Main server implementation +├── README.md # This documentation +├── requirements.txt # Python dependencies (if any) +``` + +## API Endpoints + +### POST /external/sample/filter + +Filters available pods based on sample logic (pod name ends with '2'). + +**Request Structure:** +```json +{ + "sched_context": { + "request": { + "target_model": "string", + "request_id": "string", + "critical": boolean, + "prompt": "string", + "headers": {} + }, + "pods": [ + { + "pod": { + "namespaced_name": { + "name": "string", + "namespace": "string" + }, + "address": "string", + "labels": {} + }, + "metrics": { + "active_models": {}, + "waiting_models": {}, + "max_active_models": integer, + "running_queue_size": integer, + "waiting_queue_size": integer, + "kv_cache_usage_percent": float, + "kv_cache_max_token_capacity": integer + }, + "update_time": "string" + } + ] + }, + "pods": [/* same structure as sched_context.pods */] +} +``` + +**Response:** +```json +[ + { + "name": "string", + "namespace": "string" + } +] +``` + +## Installation & Setup + +### Prerequisites + +- Python 3.7 or higher +- No external dependencies required (uses only Python standard library) + +### Quick Start + +1. **Run the server** + ```bash + python server.py + # or + python3 server.py + ``` + +2. **Run with custom port/host** + ```bash + python server.py 8080 # Custom port + python server.py 8080 0.0.0.0 # Custom port and host + ``` + +### Sample Request Body +```json +{ + "sched_context": { + "request": { + "target_model": "llama-7b", + "request_id": "req-12345", + "critical": false, + "prompt": "Hello, world!", + "headers": { + "user-agent": "test-client" + } + }, + "pods": [] + }, + "pods": [ + { + "pod": { + "namespaced_name": { + "name": "model-server-1", + "namespace": "ml-serving" + }, + "address": "10.0.1.100", + "labels": { + "model": "llama-7b", + "gpu": "true" + } + }, + "metrics": { + "active_models": {"llama-7b": 2}, + "waiting_models": {}, + "max_active_models": 4, + "running_queue_size": 2, + "waiting_queue_size": 1, + "kv_cache_usage_percent": 65.5, + "kv_cache_max_token_capacity": 8192 + }, + "update_time": "2025-06-09T10:30:00Z" + } + ] +} +``` + +## Configuration + +### Command Line Arguments +```bash +python server.py [PORT] [HOST] + +# Examples: +python server.py 8080 # Port 8080, localhost +python server.py 8080 0.0.0.0 # Port 8080, all interfaces +``` + +### Docker +```dockerfile +FROM python:3.9-slim +WORKDIR /app +COPY server.py . +EXPOSE 1080 +CMD ["python", "server.py", "1080", "0.0.0.0"] +``` + +### Endpoint Picker configuration + +To add this filter plugin to the EPP, the following environment variable should be defined: + +```yaml +env: +- name: EXTERNAL_HTTP_PREFILL_FILTERS + value: '[{"name":"load", "url":"http://sample-plugins-server-service:80/external/sample"}]' +``` + +### Install on cluster +```bash +make image-build +kind load docker-image ghcr.io/llm-d/sample-external-plugins:dev --name +k apply -f ./deployment.yaml +k port-forward svc/sample-plugins-server-service 8800:80 +``` diff --git a/pkg/scheduling/plugins/external/http/sample/deployment.yaml b/pkg/scheduling/plugins/external/http/sample/deployment.yaml new file mode 100644 index 00000000..607798a0 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/sample/deployment.yaml @@ -0,0 +1,35 @@ +apiVersion: v1 +kind: Service +metadata: + name: sample-plugins-server-service +spec: + selector: + app: sample-plugins-server # This should match your pod's labels + ports: + - port: 80 # Port the service exposes + targetPort: 1080 # Port your Python server listens on + protocol: TCP + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: sample-plugins-server +spec: + replicas: 1 + selector: + matchLabels: + app: sample-plugins-server + template: + metadata: + labels: + app: sample-plugins-server # Must match the Service selector + spec: + containers: + - name: sample-plugins-server + image: ghcr.io/llm-d/sample-external-plugins:dev + # imagePullPolicy: Always + imagePullPolicy: Never + ports: + - containerPort: 1080 # Port your Python server listens on +--- \ No newline at end of file diff --git a/pkg/scheduling/plugins/external/http/sample/server.py b/pkg/scheduling/plugins/external/http/sample/server.py new file mode 100755 index 00000000..b78c6a4f --- /dev/null +++ b/pkg/scheduling/plugins/external/http/sample/server.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Sample external plugins HTTP server +A Python HTTP web server that provides plugins endpoints for llm-d-inference-scheduler. +""" + +import json +import logging +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse +from typing import Dict, List, Any, Optional +import sys +from datetime import datetime + + +# configure logger +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class RequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + """Handle POST requests""" + try: + parsed_path = urlparse(self.path) + path = parsed_path.path + + # supported endpoints + if path not in ['/external/sample/filter']: + self.send_error(404, "Endpoint not found") + return + + # read request body + content_length = int(self.headers.get('Content-Length', 0)) + if content_length == 0: + self.send_error(400, "Empty request body") + return + + body = self.rfile.read(content_length) + try: + request_data = json.loads(body.decode('utf-8')) + except json.JSONDecodeError as e: + self.send_error(400, f"Invalid JSON: {str(e)}") + return + + # Route to appropriate handler + if path == '/external/sample/filter': + if not self._validate_filter_request(request_data): + self.send_error(400, "Invalid filter request structure") + return + response_data = self._handle_filter(request_data) + + # Send successful response + self._send_json_response(200, response_data) + + except Exception as e: + logger.error(f"Error processing request: {str(e)}") + self.send_error(500, f"Internal server error: {str(e)}") + + def do_GET(self): + """Handle GET requests - return method not allowed""" + self.send_error(405, "Method not allowed. Only POST is supported.") + + def _validate_filter_request(self, data: Dict[str, Any]) -> bool: + """validate the filter request structure""" + logger.info("validating filter request") + + try: + if not self._validate_required_fields(data, ['sched_context', 'pods'], 'filter request payload'): + return False + + # Validate sched_context structure + if not self._validate_required_fields(data['sched_context'], ['request', 'pods'], 'sched_context'): + return False + + # Validate request structure + if not self._validate_required_fields(data['sched_context']['request'], ['target_model', 'request_id', 'critical', 'prompt', 'headers'], 'sched_context>request'): + return False + + # Validate pods array + if not isinstance(data['pods'], list): + logger.error("pods must be an array") + return False + + # TODO check pods structure + return True + + except Exception as e: + logger.error(f"Validation error: {str(e)}") + return False + + def _validate_required_fields(self, data: Dict[str, Any], + required_fields: List[str], + parent_name: str) -> bool: + logger.info(f"validate fields for {parent_name}") + for field in required_fields: + if field not in data: + logger.error(f"Missing required field in {parent_name}: {field}") + return False + logger.info(f"{parent_name} is valid") + return True + + def _handle_filter(self, request_data: Dict[str, Any]) -> List[Dict[str, str]]: + """ + Handle filter endpoint - filters pods based on the sample logic (filter out all pods with name ending with '2') + """ + logger.info("Processing filter request") + + candidate_pods = request_data['pods'] + filtered_pods = [] + + for pod_data in candidate_pods: + pod = pod_data['pod'] + + # Check if pod has capacity + if not pod['namespaced_name']['name'].endswith('hqrcm'): + logger.info(f">>><<< pod {pod['namespaced_name']['name']} not filtered") + filtered_pods.append({ + 'name': pod['namespaced_name']['name'], + 'namespace': pod['namespaced_name']['namespace'] + }) + else: + logger.info(f">>><<< pod {pod['namespaced_name']['name']} WAS filtered") + + logger.info(f"Filtered {len(candidate_pods)} pods down to {len(filtered_pods)}") + return filtered_pods + + def _send_json_response(self, status_code: int, data: Any): + """Send JSON response""" + response_json = json.dumps(data, indent=2) + + self.send_response(status_code) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', str(len(response_json))) + self.end_headers() + self.wfile.write(response_json.encode('utf-8')) + + def log_message(self, format_str, *args): + logger.info(f"{self.address_string()} - {format_str % args}") + + +def run_server(port: int = 1080, host: str = 'localhost'): + """Run the HTTP server""" + server_address = (host, port) + httpd = HTTPServer(server_address, RequestHandler) + + logger.info(f"Starting Sample external plugins HTTP server on {host}:{port}") + + try: + httpd.serve_forever() + except KeyboardInterrupt: + logger.info("Server stopped by user") + httpd.server_close() + + +if __name__ == '__main__': + # Parse command line arguments + port = 1080 + host = 'localhost' + + if len(sys.argv) > 1: + try: + port = int(sys.argv[1]) + except ValueError: + print(f"Invalid port number: {sys.argv[1]}") + sys.exit(1) + + if len(sys.argv) > 2: + host = sys.argv[2] + + run_server(port, host) \ No newline at end of file diff --git a/pkg/scheduling/plugins/external/http/scorer.go b/pkg/scheduling/plugins/external/http/scorer.go new file mode 100644 index 00000000..7be3ed19 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/scorer.go @@ -0,0 +1,36 @@ +package http + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// Scorer implementation of the external http scorer +type Scorer struct { + Plugin +} + +var _ plugins.Scorer = &Scorer{} // validate interface conformance + +// NewScorer creates a new instance of external http filter based on the given parameters +func NewScorer(_ context.Context, name string, url string) plugins.Scorer { + return &Scorer{Plugin{name: name, url: url}} +} + +// Score scores the given pods in range of 0-1 +func (s *Scorer) Score(ctx *types.SchedulingContext, pods []types.Pod) map[types.Pod]float64 { + logger := log.FromContext(ctx).WithName(s.Name()) + logger.Info(">>> external scorer call - TODO implement it!", "name", s.Name()) + + result := map[types.Pod]float64{} + + // TODO - send http request to the url + for _, pod := range pods { + result[pod] = 1.0 + } + + return result +} diff --git a/pkg/scheduling/plugins/external/http/types.go b/pkg/scheduling/plugins/external/http/types.go new file mode 100644 index 00000000..9128b613 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/types.go @@ -0,0 +1,99 @@ +package http + +import ( + "maps" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// This file contains types that duplicate upstream types but include json marshaling annotations. +// This duplication allows sending these data structures to external http servers without requiring changes upstream. + +// schedulingContext is types.SchedulingContext +type schedulingContext struct { + Request request `json:"request"` + Pods []pod `json:"pods"` +} + +// request is types.LLMRequest +type request struct { + TargetModel string `json:"target_model"` + RequestID string `json:"request_id"` + Critical bool `json:"critical"` + Prompt string `json:"prompt"` + Headers map[string]string `json:"headers"` +} + +// pod is types.Pod +type pod struct { + Pod *podInfo `json:"pod,omitempty"` + Metrics *podMetrics `json:"metrics,omitempty"` +} + +// podInfo is backend.Pod (sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend) +type podInfo struct { + NamespacedName namespacedName `json:"namespaced_name"` + Address string `json:"address"` + Labels map[string]string `json:"labels"` +} + +// namespacedName is k8stypes.NamespacedName (k8s.io/apimachinery/pkg/types) +type namespacedName struct { + Name string `json:"name"` + Namespace string `json:"namespace"` +} + +// podMertics is backendmetrics.MetricsState ("sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics") +type podMetrics struct { + ActiveModels map[string]int `json:"active_models"` + WaitingModels map[string]int `json:"waiting_models"` + MaxActiveModels int `json:"max_active_models"` + RunningQueueSize int `json:"running_queue_size"` + WaitingQueueSize int `json:"waiting_queue_size"` + KVCacheUsagePercent float64 `json:"kv_cache_usage_percent"` + KVCacheMaxTokenCapacity int `json:"kv_cache_max_token_capacity"` + UpdateTime string `json:"update_time"` +} + +// newSchedulingContext creates a new scheduling context according the given parameter +func newSchedulingContext(ctx *types.SchedulingContext) *schedulingContext { + headers := map[string]string{} + + if ctx.Req.Headers != nil { + headers = maps.Clone(ctx.Req.Headers) + } + + return &schedulingContext{ + Pods: podsToExtPods(ctx.PodsSnapshot), + Request: request{ + TargetModel: ctx.Req.TargetModel, + RequestID: ctx.Req.RequestId, + Critical: ctx.Req.Critical, + Prompt: ctx.Req.Prompt, + Headers: headers, + }, + } +} + +// Filter plugin + +// filterPayload is the input parameter to a filter http call +type filterPayload struct { + SchedContext *schedulingContext `json:"sched_context"` + Pods []pod `json:"pods"` +} + +// newFilterPayload creates a filterPayload object based on the given parameters +// returns the pointer to the created object +func newFilterPayload(ctx *types.SchedulingContext, pods []types.Pod) *filterPayload { + externalPods := make([]pod, len(pods)) + + for i, internalPod := range pods { + externalPods[i] = pod{ + Pod: podInfoFromBackend(internalPod.GetPod()), + Metrics: metricsFromBackend(internalPod.GetMetrics()), + } + } + + return &filterPayload{SchedContext: newSchedulingContext(ctx), Pods: externalPods} +} diff --git a/pkg/scheduling/plugins/external/http/utils.go b/pkg/scheduling/plugins/external/http/utils.go new file mode 100644 index 00000000..a1d9efd7 --- /dev/null +++ b/pkg/scheduling/plugins/external/http/utils.go @@ -0,0 +1,59 @@ +package http + +import ( + "maps" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// podsToExtPods converts upstream pod object to pod object used for external plugins http calls +func podsToExtPods(pods []types.Pod) []pod { + extPods := make([]pod, len(pods)) + + for i, p := range pods { + extPods[i] = pod{ + Pod: podInfoFromBackend(p.GetPod()), + Metrics: metricsFromBackend(p.GetMetrics()), + } + } + + return extPods +} + +// podInfoFromBackend converts upstream backend.Pod to podInfo object used for external plugins http calls +func podInfoFromBackend(bkPod *backend.Pod) *podInfo { + if bkPod == nil { + return nil + } + + return &podInfo{ + NamespacedName: namespacedName{Name: bkPod.NamespacedName.Name, Namespace: bkPod.NamespacedName.Namespace}, + Address: bkPod.Address, + Labels: maps.Clone(bkPod.Labels), + } +} + +// metricsFromBackend converts upstream backendmetrics.MetricsState to podMetrics object used for external plugins http calls +func metricsFromBackend(bkMetrics *backendmetrics.MetricsState) *podMetrics { + if bkMetrics == nil { + return nil + } + + return &podMetrics{ + ActiveModels: maps.Clone(bkMetrics.ActiveModels), + WaitingModels: maps.Clone(bkMetrics.WaitingModels), + MaxActiveModels: bkMetrics.MaxActiveModels, + RunningQueueSize: bkMetrics.RunningQueueSize, + WaitingQueueSize: bkMetrics.WaitingQueueSize, + KVCacheUsagePercent: bkMetrics.KVCacheUsagePercent, + KVCacheMaxTokenCapacity: bkMetrics.KvCacheMaxTokenCapacity, + UpdateTime: bkMetrics.UpdateTime.Format("2006-01-02 15:04:05"), + } +} + +// namespacedNameToString creates full pod name based on namespace and name +func namespacedNameToString(name string, namespace string) string { + return namespace + "/" + name +}