Skip to content

Commit 86465d0

Browse files
A development step PR for pluggable BBR framework. Does not change behavior.
1 parent 8c60d23 commit 86465d0

File tree

10 files changed

+203
-20
lines changed

10 files changed

+203
-20
lines changed

cmd/bbr/runner/runner.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,15 @@ import (
4141
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/datastore"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
44+
bbr "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
45+
routing "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/routing"
4446
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
4547
"sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
4648
"sigs.k8s.io/gateway-api-inference-extension/version"
4749
)
4850

4951
var (
52+
// Flags
5053
grpcPort = flag.Int("grpc-port", 9004, "The gRPC port used for communicating with Envoy proxy")
5154
grpcHealthPort = flag.Int("grpc-health-port", 9005, "The port used for gRPC liveness and readiness probes")
5255
metricsPort = flag.Int("metrics-port", 9090, "The metrics port")
@@ -55,7 +58,12 @@ var (
5558
secureServing = flag.Bool("secure-serving", true, "Enables secure serving.")
5659
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
5760

61+
// Logging
5862
setupLog = ctrl.Log.WithName("setup")
63+
64+
// Contains the BBR plugins specs specified via repeated flags:
65+
// --plugin <type>:<name>[:<json>]
66+
pluginSpecs bbr.BBRPluginSpecs
5967
)
6068

6169
func NewRunner() *Runner {
@@ -67,6 +75,10 @@ func NewRunner() *Runner {
6775
// Runner is used to run bbr with its plugins
6876
type Runner struct {
6977
bbrExecutableName string
78+
79+
// The slice of BBR plugin instances executed by the request handler,
80+
// in the same order the plugin flags are provided.
81+
bbrPluginInstances []bbr.BBRPlugin
7082
}
7183

7284
// WithExecutableName sets the name of the executable containing the runner.
@@ -80,6 +92,8 @@ func (r *Runner) Run(ctx context.Context) error {
8092
setupLog.Info(r.bbrExecutableName+" build", "commit-sha", version.CommitSHA, "build-ref", version.BuildRef)
8193
opts := zap.Options{Development: true}
8294
opts.BindFlags(flag.CommandLine)
95+
96+
flag.Var(&pluginSpecs, "plugin", `Repeatable. --plugin <type>:<name>[:<json>]`)
8397
flag.Parse()
8498
initLogging(&opts)
8599

@@ -140,12 +154,46 @@ func (r *Runner) Run(ctx context.Context) error {
140154
return err
141155
}
142156

157+
// Register factories for all known in-tree BBR plugins
158+
r.registerInTreePlugins()
159+
160+
// Construct BBR plugin instances for the in tree plugins that are (1) registered and (2) requested via the --plugin flags
161+
if len(pluginSpecs) == 0 {
162+
setupLog.Info("No BBR plugins are specified. Running BBR with the default behavior.")
163+
164+
// Append a default BBRPlugin to the slice of the BBRPlugin instances using regular registered factory mechanism.
165+
factory := bbr.Registry[routing.DefaultPluginType]
166+
defaultPlugin, err := factory("", nil)
167+
if err != nil {
168+
setupLog.Error(err, "Failed to create default plugin")
169+
return err
170+
}
171+
r.withPlugin(defaultPlugin)
172+
} else {
173+
setupLog.Info("BBR plugins are specified. Running BBR with the specified plugins.")
174+
175+
for _, s := range pluginSpecs {
176+
factory, ok := bbr.Registry[s.Type]
177+
if !ok {
178+
fmt.Fprintf(os.Stderr, "unknown plugin type %q (no factory registered)\n", s.Type)
179+
os.Exit(2)
180+
}
181+
instance, err := factory(s.Name, s.JSON)
182+
if err != nil {
183+
fmt.Fprintf(os.Stderr, "invalid %s#%s: %v\n", s.Type, s.Name, err)
184+
os.Exit(2)
185+
}
186+
r.withPlugin(instance)
187+
}
188+
}
189+
143190
// Setup ExtProc Server Runner
144191
serverRunner := &runserver.ExtProcServerRunner{
145-
GrpcPort: *grpcPort,
146-
Datastore: ds,
147-
SecureServing: *secureServing,
148-
Streaming: *streaming,
192+
GrpcPort: *grpcPort,
193+
Datastore: ds,
194+
SecureServing: *secureServing,
195+
Streaming: *streaming,
196+
PluginInstances: r.bbrPluginInstances,
149197
}
150198
if err := serverRunner.SetupWithManager(mgr); err != nil {
151199
setupLog.Error(err, "Failed to setup BBR controllers")
@@ -173,6 +221,15 @@ func (r *Runner) Run(ctx context.Context) error {
173221
return nil
174222
}
175223

224+
// registerInTreePlugins registers the factory functions of all known BBR plugins
225+
func (r *Runner) registerInTreePlugins() {
226+
bbr.Register(routing.DefaultPluginType, routing.DefaultPluginFactory)
227+
}
228+
229+
func (r *Runner) withPlugin(p bbr.BBRPlugin) {
230+
r.bbrPluginInstances = append(r.bbrPluginInstances, p)
231+
}
232+
176233
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
177234
func registerHealthServer(mgr manager.Manager, port int) error {
178235
srv := grpc.NewServer()

config/charts/body-based-routing/templates/bbr.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ spec:
1313
labels:
1414
app: {{ .Values.bbr.name }}
1515
spec:
16-
serviceAccountName: {{ .Values.bbr.name}}-{{ .Release.Name }}
16+
serviceAccountName: {{ .Values.bbr.name }}-{{ .Release.Name }}
1717
containers:
1818
- name: bbr
1919
image: {{ .Values.bbr.image.hub }}/{{ .Values.bbr.image.name }}:{{ .Values.bbr.image.tag }}
@@ -24,6 +24,14 @@ spec:
2424
{{- range $key, $value := .Values.bbr.flags }}
2525
- --{{ $key }}
2626
- "{{ $value }}"
27+
{{- end }}
28+
{{- range .Values.bbr.plugins }}
29+
- --plugin
30+
{{- if .json }}
31+
- {{ printf "%s:%s:%s" .type .name (toJson .json) | quote }}
32+
{{- else }}
33+
- {{ printf "%s:%s" .type .name | quote }}
34+
{{- end }}
2735
{{- end }}
2836
{{- if not .Values.bbr.multiNamespace }}
2937
env:

config/charts/body-based-routing/values.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ bbr:
1414
# Log verbosity
1515
v: 3
1616

17+
# Plugin configuration (transitional - currently runs no-op default plugin)
18+
plugins:
19+
- type: default-bbr
20+
name: default-bbr
21+
json: {"v":0, "no-op":true}
22+
1723
provider:
1824
name: none
1925

pkg/bbr/handlers/request.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handlers
1919
import (
2020
"context"
2121
"encoding/json"
22+
"fmt"
2223

2324
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2425
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -43,12 +44,31 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
4344
logger := log.FromContext(ctx)
4445
var ret []*eppb.ProcessingResponse
4546

47+
// Executing BBR plugins in the order they were registered.
48+
// This change is a transitional development step to test the BBR plugins.
49+
// At the moment, the loop runs a no-op plugin.
50+
// Once the Default BBR plugin is fully implemented and integrated, the loop will run actual pluggable logic.
51+
for _, plugin := range s.pluginInstances {
52+
var headers map[string][]string
53+
var err error
54+
logger.Info("Executing plugin", "plugin", plugin.TypedName())
55+
56+
requestBodyBytes, headers, err = plugin.Execute(requestBodyBytes)
57+
if err != nil {
58+
logger.Error(err, "Plugin execution failed", "plugin", plugin.TypedName())
59+
return nil, fmt.Errorf("plugin %s failed: %w", plugin.TypedName(), err)
60+
}
61+
_ = headers // TODO: Handle headers returned by plugins
62+
}
63+
4664
var requestBody RequestBody
4765
if err := json.Unmarshal(requestBodyBytes, &requestBody); err != nil {
4866
metrics.RecordModelNotParsedCounter()
4967
return nil, err
5068
}
5169

70+
logger.Info("Parsed model name", "model", requestBody.Model)
71+
5272
if requestBody.Model == "" {
5373
metrics.RecordModelNotInBodyCounter()
5474
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
@@ -73,6 +93,8 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
7393
metrics.RecordSuccessCounter()
7494
baseModel := s.ds.GetBaseModel(requestBody.Model)
7595

96+
logger.Info("Base model from datastore", "baseModel", baseModel)
97+
7698
if s.streaming {
7799
ret = append(ret, &eppb.ProcessingResponse{
78100
Response: &eppb.ProcessingResponse_RequestHeaders{

pkg/bbr/handlers/request_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func TestHandleRequestBody(t *testing.T) {
267267

268268
for _, test := range tests {
269269
t.Run(test.name, func(t *testing.T) {
270-
server := NewServer(test.streaming, &fakeDatastore{})
270+
server := NewServer(test.streaming, &fakeDatastore{}, fakeBBRPlugins{})
271271
bodyBytes, _ := json.Marshal(test.body)
272272
resp, err := server.HandleRequestBody(ctx, bodyBytes)
273273
if err != nil {

pkg/bbr/handlers/server.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,28 @@ import (
2929

3030
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
3131
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
32+
33+
bbr "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
3234
)
3335

3436
type Datastore interface {
3537
GetBaseModel(modelName string) string
3638
}
3739

38-
func NewServer(streaming bool, ds Datastore) *Server {
40+
func NewServer(streaming bool, ds Datastore, bbrPluginInstances []bbr.BBRPlugin) *Server {
3941
return &Server{
40-
streaming: streaming,
41-
ds: ds,
42+
streaming: streaming,
43+
ds: ds,
44+
pluginInstances: bbrPluginInstances,
4245
}
4346
}
4447

4548
// Server implements the Envoy external processing server.
4649
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
4750
type Server struct {
48-
streaming bool
49-
ds Datastore
51+
streaming bool
52+
ds Datastore
53+
pluginInstances []bbr.BBRPlugin
5054
}
5155

5256
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

pkg/bbr/handlers/server_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/log"
2828

2929
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
30+
31+
bbr "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
32+
plugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
3033
)
3134

3235
func TestProcessRequestBody(t *testing.T) {
@@ -139,7 +142,8 @@ func TestProcessRequestBody(t *testing.T) {
139142

140143
for _, tc := range cases {
141144
t.Run(tc.desc, func(t *testing.T) {
142-
srv := NewServer(tc.streaming, &fakeDatastore{})
145+
fakePlugins := fakeBBRPlugins{}
146+
srv := NewServer(tc.streaming, &fakeDatastore{}, fakePlugins)
143147
streamedBody := &streamedBody{}
144148
for i, body := range tc.bodys {
145149
got, err := srv.processRequestBody(context.Background(), body, streamedBody, log.FromContext(ctx))
@@ -162,3 +166,18 @@ type fakeDatastore struct{}
162166
func (ds *fakeDatastore) GetBaseModel(modelName string) string {
163167
return ""
164168
}
169+
170+
type fakeBBRPlugin struct{}
171+
172+
type fakeBBRPlugins []bbr.BBRPlugin
173+
174+
func (p *fakeBBRPlugin) Execute(requestBodyBytes []byte) (mutatedBodyBytes []byte, headers map[string][]string, err error) {
175+
return []byte{}, map[string][]string{}, nil
176+
}
177+
178+
func (p *fakeBBRPlugin) TypedName() plugins.TypedName {
179+
return plugins.TypedName{
180+
Type: "fake",
181+
Name: "test-plugin",
182+
}
183+
}

pkg/bbr/plugins/plugins.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import (
2323
type BBRPlugin interface {
2424
plugin.Plugin
2525

26-
// Execute runs the plugin logic on the request body and a map of headers.
27-
// A plugin's imnplementation logic CAN mutate the body of the message.
26+
// Execute runs the plugin's logic on the request body.
27+
// A plugin's implementation logic CAN mutate the body of the message.
2828
// A plugin's implementation MUST return a map of headers.
2929
// If no headers are set by the implementation, the return headers map is nil.
30-
Execute(requestBodyBytes []byte, requestHeaders map[string][]string) (mutatedBodyBytes []byte, headers map[string][]string, err error)
30+
// In the future, a headers map can be added to the plugin interface to allow different plugins on a chain to share information on the headers.
31+
Execute(requestBodyBytes []byte) (mutatedBodyBytes []byte, headers map[string][]string, err error)
3132
}

pkg/bbr/routing/default.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routing
18+
19+
import (
20+
"encoding/json"
21+
22+
bbr "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
23+
epp "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
24+
)
25+
26+
const (
27+
DefaultPluginType = "default-bbr"
28+
)
29+
30+
// compile-time type validation
31+
var _ bbr.BBRPlugin = &DefaultPlugin{}
32+
33+
type DefaultPlugin struct {
34+
typedName epp.TypedName
35+
}
36+
37+
// DefaultPluginFactory defines the factory function for DefaultPlugin.
38+
// The name and rawParameters are ignored as the plugin uses the default configuration.
39+
func DefaultPluginFactory(_ string, _ json.RawMessage) (bbr.BBRPlugin, error) {
40+
return NewDefaultPlugin(), nil
41+
}
42+
43+
// / NewDefaultPlugin returns a concrete *DefaultPlugin.
44+
func NewDefaultPlugin() *DefaultPlugin {
45+
return &DefaultPlugin{
46+
typedName: epp.TypedName{Type: DefaultPluginType, Name: DefaultPluginType},
47+
}
48+
}
49+
50+
func (p *DefaultPlugin) Execute(requestBodyBytes []byte) ([]byte, map[string][]string, error) {
51+
// No-op BBR plugin to be replaced by the actual logic currently running in the
52+
return requestBodyBytes, nil, nil
53+
}
54+
55+
// TypedName returns the type and name tuple of this plugin instance.
56+
func (p *DefaultPlugin) TypedName() epp.TypedName {
57+
return p.typedName
58+
}
59+
60+
// WithName sets the name of the default BBR plugin
61+
func (p *DefaultPlugin) WithName(name string) *DefaultPlugin {
62+
p.typedName.Name = name
63+
return p
64+
}

pkg/bbr/server/runserver.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ import (
3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/controller"
3434
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/datastore"
3535
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/handlers"
36+
bbr "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
3637
)
3738

3839
// ExtProcServerRunner provides methods to manage an external process server.
3940
type ExtProcServerRunner struct {
40-
GrpcPort int
41-
Datastore datastore.Datastore
42-
SecureServing bool
43-
Streaming bool
41+
GrpcPort int
42+
Datastore datastore.Datastore
43+
SecureServing bool
44+
Streaming bool
45+
PluginInstances []bbr.BBRPlugin
4446
}
4547

4648
func NewDefaultExtProcServerRunner(port int, streaming bool) *ExtProcServerRunner {
@@ -81,7 +83,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
8183
srv = grpc.NewServer()
8284
}
8385

84-
extProcPb.RegisterExternalProcessorServer(srv, handlers.NewServer(r.Streaming, r.Datastore))
86+
extProcPb.RegisterExternalProcessorServer(srv, handlers.NewServer(r.Streaming, r.Datastore, r.PluginInstances))
8587

8688
// Forward to the gRPC runnable.
8789
return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx)

0 commit comments

Comments
 (0)