Skip to content

Commit c1ea619

Browse files
feat(aigw): replace custom admin monitor with one in func-e (#1341)
**Description** This replaces our custom envoy admin monitor code now that func-e has the same functionality. **Related Issues/PRs (if applicable)** functionality upstreamed (and more) here tetratelabs/func-e#483 --------- Signed-off-by: Adrian Cole <[email protected]>
1 parent 12c8c4b commit c1ea619

File tree

10 files changed

+91
-803
lines changed

10 files changed

+91
-803
lines changed

.github/workflows/build_and_test.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ jobs:
363363
FUNC_E_HOME: /tmp/envoy-gateway # hard-coded directory in EG
364364
- name: Install Goose
365365
env:
366-
GOOSE_VERSION: v1.8.0
366+
GOOSE_VERSION: v1.10.0
367+
OS: Linux
367368
run: |
368369
curl -fsSL https://github.com/block/goose/releases/download/stable/download_cli.sh | CONFIGURE=false bash
369370
- env:

cmd/aigw/healthcheck.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"log/slog"
1212
"time"
1313

14-
"github.com/envoyproxy/ai-gateway/internal/aigw"
14+
"github.com/tetratelabs/func-e/experimental/admin"
1515
)
1616

1717
// healthcheck performs looks up the Envoy subprocess, gets its admin port,
@@ -27,13 +27,12 @@ func healthcheck(ctx context.Context, _, stderr io.Writer) error {
2727
}
2828

2929
func doHealthcheck(ctx context.Context, aigwPid int, logger *slog.Logger) error {
30-
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, aigwPid, 0)
31-
if err != nil {
30+
if adminClient, err := admin.NewAdminClient(ctx, aigwPid); err != nil {
3231
logger.Error("Failed to find Envoy admin server", "error", err)
3332
return err
34-
} else if err = envoyAdmin.IsReady(ctx); err != nil {
35-
logger.Error("Envoy admin server is not ready", "adminPort", envoyAdmin.Port(), "error", err)
33+
} else if err = adminClient.IsReady(ctx); err != nil {
34+
logger.Error("Envoy admin server is not ready", "adminPort", adminClient.Port(), "error", err)
3635
return err
3736
}
38-
return err
37+
return nil
3938
}

cmd/aigw/healthcheck_test.go

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,16 @@ package main
88
import (
99
"bytes"
1010
"context"
11-
"fmt"
11+
"io"
1212
"log/slog"
13-
"net/http"
14-
"net/http/httptest"
15-
"net/url"
1613
"os"
17-
"os/exec"
18-
"path/filepath"
19-
"strconv"
2014
"testing"
2115
"time"
2216

2317
"github.com/stretchr/testify/require"
18+
func_e "github.com/tetratelabs/func-e"
19+
"github.com/tetratelabs/func-e/api"
20+
"github.com/tetratelabs/func-e/experimental/admin"
2421
)
2522

2623
func Test_healthcheck(t *testing.T) {
@@ -38,38 +35,32 @@ func Test_healthcheck(t *testing.T) {
3835
})
3936

4037
t.Run("returns nil when ready", func(t *testing.T) {
41-
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
42-
require.Equal(t, "/ready", r.URL.Path)
43-
w.WriteHeader(http.StatusOK)
44-
_, _ = w.Write([]byte("live"))
45-
}))
46-
defer server.Close()
47-
48-
u, err := url.Parse(server.URL)
49-
require.NoError(t, err)
50-
port, err := strconv.Atoi(u.Port())
51-
require.NoError(t, err)
52-
53-
adminFile := filepath.Join(t.TempDir(), "admin-address.txt")
54-
require.NoError(t, os.WriteFile(adminFile, []byte(fmt.Sprintf("127.0.0.1:%d", port)), 0o600))
55-
5638
ctx, cancel := context.WithCancel(t.Context())
5739
defer cancel()
5840

59-
cmdStr := fmt.Sprintf("sleep 30 && echo -- --admin-address-path %s", adminFile)
60-
cmd := exec.CommandContext(ctx, "sh", "-c", cmdStr)
61-
require.NoError(t, cmd.Start())
62-
defer func() {
63-
_ = cmd.Process.Kill()
64-
_, _ = cmd.Process.Wait()
65-
}()
41+
var healthCheckErr error
42+
var log bytes.Buffer
6643

67-
time.Sleep(100 * time.Millisecond)
44+
// Even though AdminClient.IsReady exists, we don't have it injected in
45+
// Docker. This intentionally ignores the parameter.
46+
startupHook := func(ctx context.Context, _ admin.AdminClient, _ string) error {
47+
logger := slog.New(slog.NewTextHandler(&log, nil))
48+
healthCheckErr = doHealthcheck(ctx, pid, logger)
49+
// Cancel immediately to stop Envoy and complete test quickly
50+
cancel()
51+
return nil
52+
}
6853

69-
var buf bytes.Buffer
70-
logger := slog.New(slog.NewTextHandler(&buf, nil))
71-
err = doHealthcheck(t.Context(), pid, logger)
54+
// Run with minimal Envoy config
55+
err := func_e.Run(ctx, []string{
56+
"--config-yaml",
57+
"admin: {address: {socket_address: {address: '127.0.0.1', port_value: 0}}}",
58+
}, api.Out(io.Discard), api.EnvoyOut(io.Discard), api.EnvoyErr(io.Discard), admin.WithStartupHook(startupHook))
59+
60+
// Expect nil error since Run returns nil on context cancellation (documented behavior)
7261
require.NoError(t, err)
73-
require.Empty(t, buf)
62+
63+
require.NoError(t, healthCheckErr)
64+
require.Empty(t, log)
7465
})
7566
}

cmd/aigw/run.go

Lines changed: 37 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import (
1919
"strings"
2020
"time"
2121

22-
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
2322
"github.com/envoyproxy/gateway/cmd/envoy-gateway/root"
2423
egextension "github.com/envoyproxy/gateway/proto/extension"
2524
"github.com/go-logr/logr"
2625
"github.com/tetratelabs/func-e/api"
26+
"github.com/tetratelabs/func-e/experimental/admin"
2727
"github.com/tetratelabs/func-e/experimental/middleware"
2828
"google.golang.org/grpc"
2929
"google.golang.org/grpc/health/grpc_health_v1"
@@ -35,7 +35,6 @@ import (
3535
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
3636
"sigs.k8s.io/yaml"
3737

38-
"github.com/envoyproxy/ai-gateway/internal/aigw"
3938
"github.com/envoyproxy/ai-gateway/internal/controller"
4039
"github.com/envoyproxy/ai-gateway/internal/extensionserver"
4140
"github.com/envoyproxy/ai-gateway/internal/filterapi"
@@ -92,9 +91,6 @@ type runOpts struct {
9291
// 1. ${os.TempDir}/envoy-gateway-config.yaml: This contains the configuration for the Envoy Gateway agent to run, derived from envoyGatewayConfig.
9392
// 2. ${os.TempDir}/envoy-ai-gateway-resources: This will contain the EG resource generated by the translation and deployed by EG.
9493
func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) error {
95-
redirectEnvoyStdio := newEnvoyRunMiddleware(stdout, stderr)
96-
ctx = middleware.WithRunMiddleware(ctx, redirectEnvoyStdio)
97-
9894
start := time.Now()
9995
var debugLogger *slog.Logger
10096
if c.Debug {
@@ -161,7 +157,7 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
161157
if err != nil {
162158
return err
163159
}
164-
fakeClient, extProxDone, envoyAdminPort, listenerPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
160+
fakeClient, extProxDone, listenerPort, err := runCtx.writeEnvoyResourcesAndRunExtProc(ctx, aiGatewayResourcesYaml)
165161
if err != nil {
166162
return fmt.Errorf("failed to write envoy resources and run extproc: %w", err)
167163
}
@@ -170,6 +166,10 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
170166
return fmt.Errorf("failed to write file %s: %w", resourceYamlPath, err)
171167
}
172168

169+
// Set up middleware with startup hook now that we know listenerPort
170+
redirectEnvoyStdio := newEnvoyRunMiddleware(start, listenerPort, stdout, stderr)
171+
ctx = middleware.WithRunMiddleware(ctx, redirectEnvoyStdio)
172+
173173
lis, err := net.Listen("tcp", "localhost:1061")
174174
if err != nil {
175175
return fmt.Errorf("failed to listen: %w", err)
@@ -214,65 +214,40 @@ func run(ctx context.Context, c cmdRun, o runOpts, stdout, stderr io.Writer) err
214214
}
215215
server.SetArgs([]string{"server", "--config-path", egConfigPath})
216216

217-
// Start a monitoring goroutine to poll Envoy's readiness. This starts
218-
// before the server to ensure we don't miss the readiness window.
219-
go func() {
220-
envoyAdmin, err := aigw.NewEnvoyAdminClient(ctx, os.Getpid(), envoyAdminPort)
221-
if err != nil {
222-
debugLogger.Error("Failed to find Envoy admin server", "error", err)
223-
serverCancel() // Likely a crashed envoy process
224-
return
225-
}
226-
debugLogger.Info("Found Envoy admin server", "adminPort", envoyAdmin.Port())
227-
if err = pollEnvoyReady(ctx, debugLogger, envoyAdmin, 100*time.Millisecond); err != nil {
228-
return
229-
}
230-
c.AdminPort = envoyAdminPort // write back for testing
231-
// Print a status message without any timestamp formatting
232-
startDuration := time.Since(start).Round(100 * time.Millisecond)
233-
_, _ = fmt.Fprintf(stderr, "Envoy AI Gateway listening on http://localhost:%d (admin http://localhost:%d) after %v\n",
234-
listenerPort, envoyAdmin.Port(), startDuration)
235-
}()
236-
237217
// Start the gateway server. This will block until the server is stopped.
218+
// The startup hook (configured via middleware) will print the status message when Envoy is ready.
238219
if err := server.ExecuteContext(serverCtx); err != nil {
239220
return fmt.Errorf("failed to execute server: %w", err)
240221
}
241222
return extProcErr
242223
}
243224

244-
// newEnvoyRunMiddleware sets options for running Envoy and returns a context to
245-
// propagate them to func-e/
246-
func newEnvoyRunMiddleware(stdout, stderr io.Writer) func(next api.RunFunc) api.RunFunc {
225+
// newEnvoyRunMiddleware sets options for running Envoy and returns a middleware
226+
// that configures Envoy I/O and sets up a startup hook to print the ready message.
227+
func newEnvoyRunMiddleware(start time.Time, listenerPort int, stdout, stderr io.Writer) func(next api.RunFunc) api.RunFunc {
228+
// Define startup hook that will be called when Envoy admin is ready
229+
startupHook := func(_ context.Context, adminClient admin.AdminClient, _ string) error {
230+
// Print a status message without any timestamp formatting
231+
startDuration := time.Since(start).Round(100 * time.Millisecond)
232+
_, _ = fmt.Fprintf(stderr, "Envoy AI Gateway listening on http://localhost:%d (admin http://localhost:%d) after %v\n",
233+
listenerPort, adminClient.Port(), startDuration)
234+
return nil
235+
}
236+
247237
// aigw is primarily an Envoy controller, so ensure its output is visible
248-
overrides := []api.RunOption{api.EnvoyOut(stdout), api.EnvoyErr(stderr)}
238+
overrides := []api.RunOption{
239+
api.EnvoyOut(stdout),
240+
api.EnvoyErr(stderr),
241+
admin.WithStartupHook(startupHook),
242+
}
243+
249244
return func(next api.RunFunc) api.RunFunc {
250245
return func(ctx context.Context, args []string, options ...api.RunOption) error {
251246
return next(ctx, args, append(options, overrides...)...)
252247
}
253248
}
254249
}
255250

256-
// pollEnvoyReady polls Envoy's readiness until it is ready or the context is done.
257-
func pollEnvoyReady(ctx context.Context, l *slog.Logger, envoyAdmin aigw.EnvoyAdminClient, interval time.Duration) error {
258-
t := time.NewTicker(interval)
259-
defer t.Stop()
260-
261-
for {
262-
select {
263-
case <-ctx.Done():
264-
return ctx.Err()
265-
case <-t.C:
266-
if err := envoyAdmin.IsReady(ctx); err == nil {
267-
l.Info("Envoy is ready!")
268-
return nil
269-
} else {
270-
l.Info("Waiting for Envoy to be ready...", "err", err)
271-
}
272-
}
273-
}
274-
}
275-
276251
// recreateDir removes the directory at the given path and creates a new one.
277252
func recreateDir(path string) error {
278253
err := os.RemoveAll(path)
@@ -288,33 +263,33 @@ func recreateDir(path string) error {
288263

289264
// writeEnvoyResourcesAndRunExtProc reads all resources from the given string, writes them to the output file, and runs
290265
// external processes for EnvoyExtensionPolicy resources.
291-
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, int, error) {
292-
aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, envoyProxies, err := collectObjects(original, runCtx.envoyGatewayResourcesOut, runCtx.stderrLogger)
266+
func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Context, original string) (client.Client, <-chan error, int, error) {
267+
aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, _, err := collectObjects(original, runCtx.envoyGatewayResourcesOut, runCtx.stderrLogger)
293268
if err != nil {
294-
return nil, nil, 0, 0, fmt.Errorf("error collecting: %w", err)
269+
return nil, nil, 0, fmt.Errorf("error collecting: %w", err)
295270
}
296271
if len(gateways) > 1 {
297-
return nil, nil, 0, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
272+
return nil, nil, 0, fmt.Errorf("multiple gateways are not supported: %s", gateways[0].Name)
298273
}
299274
for _, bsp := range backendSecurityPolicies {
300275
spec := bsp.Spec
301276
if spec.AWSCredentials != nil && spec.AWSCredentials.OIDCExchangeToken != nil {
302277
// TODO: We can make it work by generalizing the rotation logic.
303-
return nil, nil, 0, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
278+
return nil, nil, 0, fmt.Errorf("OIDC exchange token is not supported: %s", bsp.Name)
304279
}
305280
}
306281

307282
// Do the substitution for the secrets.
308283
for _, s := range secrets {
309284
if err = runCtx.rewriteSecretWithAnnotatedLocation(s); err != nil {
310-
return nil, nil, 0, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
285+
return nil, nil, 0, fmt.Errorf("failed to rewrite secret %s: %w", s.Name, err)
311286
}
312287
}
313288

314289
var secretList *corev1.SecretList
315290
fakeClient, _fakeClientSet, httpRoutes, eps, httpRouteFilters, backends, secretList, backendTrafficPolicies, securityPolicies, err := translateCustomResourceObjects(ctx, aigwRoutes, mcpRoutes, aigwBackends, backendSecurityPolicies, backendTLSPolicies, gateways, secrets, runCtx.stderrLogger)
316291
if err != nil {
317-
return nil, nil, 0, 0, fmt.Errorf("error translating: %w", err)
292+
return nil, nil, 0, fmt.Errorf("error translating: %w", err)
318293
}
319294
runCtx.fakeClientSet = _fakeClientSet
320295

@@ -338,7 +313,7 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
338313
}
339314
gw := gateways[0]
340315
if len(gw.Spec.Listeners) == 0 {
341-
return nil, nil, 0, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
316+
return nil, nil, 0, fmt.Errorf("gateway %s has no listeners configured", gw.Name)
342317
}
343318
runCtx.mustClearSetOwnerReferencesAndStatusAndWriteObj(&gw.TypeMeta, gw)
344319
for _, ep := range eps.Items {
@@ -349,20 +324,20 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
349324
Secrets("").Get(ctx,
350325
controller.FilterConfigSecretPerGatewayName(gw.Name, gw.Namespace), metav1.GetOptions{})
351326
if err != nil {
352-
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config secret: %w", err)
327+
return nil, nil, 0, fmt.Errorf("failed to get filter config secret: %w", err)
353328
}
354329

355330
rawConfig, ok := filterConfigSecret.StringData[controller.FilterConfigKeyInSecret]
356331
if !ok {
357-
return nil, nil, 0, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
332+
return nil, nil, 0, fmt.Errorf("failed to get filter config from secret: %w", err)
358333
}
359334
var fc filterapi.Config
360335
if err = yaml.Unmarshal([]byte(rawConfig), &fc); err != nil {
361-
return nil, nil, 0, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
336+
return nil, nil, 0, fmt.Errorf("failed to unmarshal filter config: %w", err)
362337
}
363338
runCtx.stderrLogger.Info("Running external process", "config", fc)
364339
done := runCtx.mustStartExtProc(ctx, &fc)
365-
return fakeClient, done, runCtx.tryFindEnvoyAdminPort(gw, envoyProxies), runCtx.tryFindEnvoyListenerPort(gw), nil
340+
return fakeClient, done, runCtx.tryFindEnvoyListenerPort(gw), nil
366341
}
367342

368343
// mustStartExtProc starts the external process with the given working directory, port, and filter configuration.
@@ -486,54 +461,6 @@ func (runCtx *runCmdContext) tryFindEnvoyListenerPort(gw *gwapiv1.Gateway) int {
486461
return int(gw.Spec.Listeners[0].Port)
487462
}
488463

489-
// tryFindEnvoyAdminPort tries to find the port where the Envoy Admin interface is listening to.
490-
// By default, Envoy Gateway assigns a random port to the Envoy Admin interface, and we may not be able to find it. This method
491-
// attempts to find an EnvoyProxy instance attached to the standalone Gateway, and reads the bootstrap config to check if there
492-
// is a custom port configured for the admin interface.
493-
// If there is no EnvoyProxy or the admin port is not configured, this returns 0.
494-
func (runCtx *runCmdContext) tryFindEnvoyAdminPort(gw *gwapiv1.Gateway, proxies []*egv1a1.EnvoyProxy) int {
495-
if gw.Spec.Infrastructure == nil ||
496-
gw.Spec.Infrastructure.ParametersRef == nil ||
497-
gw.Spec.Infrastructure.ParametersRef.Kind != "EnvoyProxy" ||
498-
gw.Spec.Infrastructure.ParametersRef.Name == "" {
499-
return 0
500-
}
501-
502-
var bootstrap *egv1a1.ProxyBootstrap
503-
for _, p := range proxies {
504-
if p.Name == gw.Spec.Infrastructure.ParametersRef.Name {
505-
bootstrap = p.Spec.Bootstrap
506-
break
507-
}
508-
}
509-
510-
if bootstrap == nil || bootstrap.Value == nil {
511-
return 0
512-
}
513-
514-
type adminSettings struct {
515-
Admin struct {
516-
Address struct {
517-
SocketAddress struct {
518-
Address string `json:"address"`
519-
PortValue int `json:"port_value"`
520-
} `json:"socket_address,omitempty"`
521-
} `json:"address"`
522-
} `json:"admin"`
523-
}
524-
525-
var admin adminSettings
526-
if err := yaml.Unmarshal([]byte(*bootstrap.Value), &admin); err != nil {
527-
runCtx.stderrLogger.Error("Failed to read EnvoyProxy bootstrap settings", "error", err)
528-
return 0
529-
}
530-
531-
if admin.Admin.Address.SocketAddress.Address == "" {
532-
return 0
533-
}
534-
return admin.Admin.Address.SocketAddress.PortValue
535-
}
536-
537464
func maybeResolveHome(p string) string {
538465
if strings.HasPrefix(p, "~/") {
539466
home, err := os.UserHomeDir()

0 commit comments

Comments
 (0)