Skip to content

Commit 58a7e6c

Browse files
authored
refactor bbr main as a prep for pluggability (#1867)
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 943e676 commit 58a7e6c

File tree

3 files changed

+162
-134
lines changed

3 files changed

+162
-134
lines changed

cmd/bbr/main.go

Lines changed: 2 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -17,141 +17,15 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"flag"
21-
"fmt"
2220
"os"
2321

24-
"github.com/go-logr/logr"
25-
uberzap "go.uber.org/zap"
26-
"go.uber.org/zap/zapcore"
27-
"google.golang.org/grpc"
28-
healthPb "google.golang.org/grpc/health/grpc_health_v1"
2922
ctrl "sigs.k8s.io/controller-runtime"
30-
"sigs.k8s.io/controller-runtime/pkg/log/zap"
31-
"sigs.k8s.io/controller-runtime/pkg/manager"
32-
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
33-
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
34-
35-
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
36-
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
37-
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
38-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
39-
)
40-
41-
var (
42-
grpcPort = flag.Int(
43-
"grpc-port",
44-
9004,
45-
"The gRPC port used for communicating with Envoy proxy")
46-
grpcHealthPort = flag.Int(
47-
"grpc-health-port",
48-
9005,
49-
"The port used for gRPC liveness and readiness probes")
50-
metricsPort = flag.Int(
51-
"metrics-port", 9090, "The metrics port")
52-
streaming = flag.Bool(
53-
"streaming", false, "Enables streaming support for Envoy full-duplex streaming mode")
54-
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
55-
56-
setupLog = ctrl.Log.WithName("setup")
23+
"sigs.k8s.io/gateway-api-inference-extension/cmd/bbr/runner"
5724
)
5825

5926
func main() {
60-
if err := run(); err != nil {
27+
if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
6128
os.Exit(1)
6229
}
63-
}
64-
65-
func run() error {
66-
opts := zap.Options{Development: true}
67-
opts.BindFlags(flag.CommandLine)
68-
flag.Parse()
69-
initLogging(&opts)
70-
71-
// Print all flag values
72-
flags := make(map[string]any)
73-
flag.VisitAll(func(f *flag.Flag) {
74-
flags[f.Name] = f.Value
75-
})
76-
setupLog.Info("Flags processed", "flags", flags)
77-
78-
// Init runtime.
79-
cfg, err := ctrl.GetConfig()
80-
if err != nil {
81-
setupLog.Error(err, "Failed to get rest config")
82-
return err
83-
}
84-
85-
metrics.Register()
86-
87-
// Register metrics handler.
88-
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
89-
// More info:
90-
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/metrics/server
91-
// - https://book.kubebuilder.io/reference/metrics.html
92-
metricsServerOptions := metricsserver.Options{
93-
BindAddress: fmt.Sprintf(":%d", *metricsPort),
94-
FilterProvider: filters.WithAuthenticationAndAuthorization,
95-
}
96-
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Metrics: metricsServerOptions})
97-
if err != nil {
98-
setupLog.Error(err, "Failed to create manager", "config", cfg)
99-
return err
100-
}
101-
102-
ctx := ctrl.SetupSignalHandler()
103-
104-
// Setup runner.
105-
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort, *streaming)
106-
107-
// Register health server.
108-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), *grpcHealthPort); err != nil {
109-
return err
110-
}
111-
112-
// Register ext-proc server.
113-
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
114-
setupLog.Error(err, "Failed to register ext-proc gRPC server")
115-
return err
116-
}
117-
118-
// Start the manager. This blocks until a signal is received.
119-
setupLog.Info("Manager starting")
120-
if err := mgr.Start(ctx); err != nil {
121-
setupLog.Error(err, "Error starting manager")
122-
return err
123-
}
124-
setupLog.Info("Manager terminated")
125-
return nil
126-
}
127-
128-
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
129-
func registerHealthServer(mgr manager.Manager, logger logr.Logger, port int) error {
130-
srv := grpc.NewServer()
131-
healthPb.RegisterHealthServer(srv, &healthServer{
132-
logger: logger,
133-
})
134-
if err := mgr.Add(
135-
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
136-
setupLog.Error(err, "Failed to register health server")
137-
return err
138-
}
139-
return nil
140-
}
141-
142-
func initLogging(opts *zap.Options) {
143-
useV := true
144-
flag.Visit(func(f *flag.Flag) {
145-
if f.Name == "zap-log-level" {
146-
useV = false
147-
}
148-
})
149-
if useV {
150-
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
151-
lvl := -1 * (*logVerbosity)
152-
opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl)))
153-
}
15430

155-
logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
156-
ctrl.SetLogger(logger)
15731
}

cmd/bbr/health.go renamed to cmd/bbr/runner/health.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,21 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package main
17+
package runner
1818

1919
import (
2020
"context"
2121

2222
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
23-
"github.com/go-logr/logr"
2423
"google.golang.org/grpc/codes"
2524
healthPb "google.golang.org/grpc/health/grpc_health_v1"
2625
"google.golang.org/grpc/status"
2726

27+
"sigs.k8s.io/controller-runtime/pkg/log"
2828
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2929
)
3030

31-
type healthServer struct {
32-
logger logr.Logger
33-
}
31+
type healthServer struct{}
3432

3533
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
3634
// TODO: we're accepting ANY service name for now as a temporary hack in alignment with
@@ -40,7 +38,7 @@ func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckReques
4038
// return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
4139
// }
4240

43-
s.logger.V(logutil.VERBOSE).Info("gRPC health check serving", "service", in.Service)
41+
log.FromContext(ctx).V(logutil.DEBUG).Info("gRPC health check serving", "service", in.Service)
4442
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
4543
}
4644

cmd/bbr/runner/runner.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runner
18+
19+
import (
20+
"context"
21+
"flag"
22+
"fmt"
23+
24+
uberzap "go.uber.org/zap"
25+
"go.uber.org/zap/zapcore"
26+
"google.golang.org/grpc"
27+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
30+
"sigs.k8s.io/controller-runtime/pkg/manager"
31+
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
32+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
33+
34+
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
35+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
36+
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
37+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
38+
)
39+
40+
var (
41+
grpcPort = flag.Int("grpc-port", 9004, "The gRPC port used for communicating with Envoy proxy")
42+
grpcHealthPort = flag.Int("grpc-health-port", 9005, "The port used for gRPC liveness and readiness probes")
43+
metricsPort = flag.Int("metrics-port", 9090, "The metrics port")
44+
streaming = flag.Bool("streaming", false, "Enables streaming support for Envoy full-duplex streaming mode")
45+
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
46+
47+
setupLog = ctrl.Log.WithName("setup")
48+
)
49+
50+
func NewRunner() *Runner {
51+
return &Runner{
52+
bbrExecutableName: "BBR",
53+
}
54+
}
55+
56+
// Runner is used to run bbr with its plugins
57+
type Runner struct {
58+
bbrExecutableName string
59+
}
60+
61+
// WithExecutableName sets the name of the executable containing the runner.
62+
// The name is used in the version log upon startup and is otherwise opaque.
63+
func (r *Runner) WithExecutableName(exeName string) *Runner {
64+
r.bbrExecutableName = exeName
65+
return r
66+
}
67+
68+
func (r *Runner) Run(ctx context.Context) error {
69+
opts := zap.Options{Development: true}
70+
opts.BindFlags(flag.CommandLine)
71+
flag.Parse()
72+
initLogging(&opts)
73+
74+
// Print all flag values
75+
flags := make(map[string]any)
76+
flag.VisitAll(func(f *flag.Flag) {
77+
flags[f.Name] = f.Value
78+
})
79+
setupLog.Info("Flags processed", "flags", flags)
80+
81+
// Init runtime.
82+
cfg, err := ctrl.GetConfig()
83+
if err != nil {
84+
setupLog.Error(err, "Failed to get rest config")
85+
return err
86+
}
87+
88+
metrics.Register()
89+
90+
// Register metrics handler.
91+
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
92+
// More info:
93+
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/metrics/server
94+
// - https://book.kubebuilder.io/reference/metrics.html
95+
metricsServerOptions := metricsserver.Options{
96+
BindAddress: fmt.Sprintf(":%d", *metricsPort),
97+
FilterProvider: filters.WithAuthenticationAndAuthorization,
98+
}
99+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{Metrics: metricsServerOptions})
100+
if err != nil {
101+
setupLog.Error(err, "Failed to create manager", "config", cfg)
102+
return err
103+
}
104+
105+
// Setup runner.
106+
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort, *streaming)
107+
108+
// Register health server.
109+
if err := registerHealthServer(mgr, *grpcHealthPort); err != nil {
110+
return err
111+
}
112+
113+
// Register ext-proc server.
114+
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
115+
setupLog.Error(err, "Failed to register ext-proc gRPC server")
116+
return err
117+
}
118+
119+
// Start the manager. This blocks until a signal is received.
120+
setupLog.Info("Manager starting")
121+
if err := mgr.Start(ctx); err != nil {
122+
setupLog.Error(err, "Error starting manager")
123+
return err
124+
}
125+
setupLog.Info("Manager terminated")
126+
return nil
127+
}
128+
129+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
130+
func registerHealthServer(mgr manager.Manager, port int) error {
131+
srv := grpc.NewServer()
132+
healthPb.RegisterHealthServer(srv, &healthServer{})
133+
if err := mgr.Add(
134+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
135+
setupLog.Error(err, "Failed to register health server")
136+
return err
137+
}
138+
return nil
139+
}
140+
141+
func initLogging(opts *zap.Options) {
142+
useV := true
143+
flag.Visit(func(f *flag.Flag) {
144+
if f.Name == "zap-log-level" {
145+
useV = false
146+
}
147+
})
148+
if useV {
149+
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
150+
lvl := -1 * (*logVerbosity)
151+
opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl)))
152+
}
153+
154+
logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
155+
ctrl.SetLogger(logger)
156+
}

0 commit comments

Comments
 (0)