@@ -35,22 +35,27 @@ import (
3535 "github.com/envoyproxy/ai-gateway/internal/extensionserver"
3636)
3737
38- // This is the default configuration for the AI Gateway when <path> parameter is not given.
39- //
40- //go:embed ai-gateway-default-resources.yaml
41- var aiGatewayDefaultResources string
38+ var (
39+ // This is the default configuration for the AI Gateway when <path> parameter is not given.
40+ //
41+ //go:embed ai-gateway-default-resources.yaml
42+ aiGatewayDefaultResources string
4243
43- // This is the template for the Envoy Gateway configuration where PLACEHOLDER_TMPDIR will be replaced with the temporary
44- // directory where the resources are written to.
45- //
46- //go:embed envoy-gateway-config.yaml
47- var envoyGatewayConfigTemplate string
44+ // This is the template for the Envoy Gateway configuration where PLACEHOLDER_TMPDIR will be replaced with the temporary
45+ // directory where the resources are written to.
46+ //
47+ //go:embed envoy-gateway-config.yaml
48+ envoyGatewayConfigTemplate string
49+ )
4850
4951const (
5052 substitutionEnvAnnotationPrefix = "substitution.aigw.run/env/"
5153 substitutionFileAnnotationPrefix = "substitution.aigw.run/file/"
5254)
5355
56+ // errExtProcRun is returned when the external processor fails to run.
57+ var errExtProcRun = fmt .Errorf ("external processor run error" )
58+
5459type runCmdContext struct {
5560 // isDebug true if the original `agw run` command is run with debug mode. Using this to
5661 // set the log level of the external process currently. TODO: maybe simply expose the external process log level
@@ -69,12 +74,18 @@ type runCmdContext struct {
6974 fakeClientSet * fake.Clientset
7075}
7176
77+ // runOpts are the options for the run command.
78+ type runOpts struct {
79+ // udsPath is the path to the UDS socket used by the AI Gateway extproc.
80+ udsPath string
81+ }
82+
7283// run starts the AI Gateway locally for a given configuration.
7384//
7485// This will create a temporary directory and a file:
7586// 1. ${os.TempDir}/envoy-gateway-config.yaml: This contains the configuration for the Envoy Gateway agent to run, derived from envoyGatewayConfig.
7687// 2. ${os.TempDir}/envoy-ai-gateway-resources: This will contain the EG resource generated by the translation and deployed by EG.
77- func run (ctx context.Context , c cmdRun , stdout , stderr io.Writer ) error {
88+ func run (ctx context.Context , c cmdRun , o runOpts , stdout , stderr io.Writer ) error {
7889 if ! c .Debug {
7990 stderr = io .Discard
8091 }
@@ -123,8 +134,11 @@ func run(ctx context.Context, c cmdRun, stdout, stderr io.Writer) error {
123134 // Write the Envoy Gateway resources into a file under resourcesTmpdir.
124135 resourceYamlPath := filepath .Join (resourcesTmpdir , "config.yaml" )
125136 stderrLogger .Info ("Creating Envoy Gateway resource file" , "path" , resourceYamlPath )
126- udsPath := filepath .Join (tmpdir , "uds.sock" )
127- _ = os .Remove (udsPath )
137+ udsPath := o .udsPath
138+ if udsPath == "" {
139+ udsPath = filepath .Join (tmpdir , "uds.sock" )
140+ _ = os .Remove (udsPath )
141+ }
128142
129143 // Do the translation of the given AI Gateway resources Yaml into Envoy Gateway resources and write them to the file.
130144 resourcesBuf := & bytes.Buffer {}
@@ -133,7 +147,7 @@ func run(ctx context.Context, c cmdRun, stdout, stderr io.Writer) error {
133147 if err != nil {
134148 return err
135149 }
136- fakeClient , err := runCtx .writeEnvoyResourcesAndRunExtProc (ctx , aiGatewayResourcesYaml )
150+ fakeClient , extProxDone , err := runCtx .writeEnvoyResourcesAndRunExtProc (ctx , aiGatewayResourcesYaml )
137151 if err != nil {
138152 return fmt .Errorf ("failed to write envoy resources and run extproc: %w" , err )
139153 }
@@ -150,9 +164,17 @@ func run(ctx context.Context, c cmdRun, stdout, stderr io.Writer) error {
150164 extSrv := extensionserver .New (fakeClient , ctrl .Log , udsPath , true )
151165 egextension .RegisterEnvoyGatewayExtensionServer (s , extSrv )
152166 grpc_health_v1 .RegisterHealthServer (s , extSrv )
167+
168+ serverCtx , serverCancel := context .WithCancel (ctx )
169+
170+ var extProcErr error
153171 go func () {
154- <- ctx .Done ()
172+ select {
173+ case <- ctx .Done ():
174+ case extProcErr = <- extProxDone :
175+ }
155176 s .GracefulStop ()
177+ serverCancel ()
156178 }()
157179 go func () {
158180 if err := s .Serve (lis ); err != nil {
@@ -177,10 +199,11 @@ func run(ctx context.Context, c cmdRun, stdout, stderr io.Writer) error {
177199 server .SetErr (io .Discard )
178200 }
179201 server .SetArgs ([]string {"server" , "--config-path" , egConfigPath })
180- if err := server .ExecuteContext (ctx ); err != nil {
202+ if err := server .ExecuteContext (serverCtx ); err != nil {
181203 return fmt .Errorf ("failed to execute server: %w" , err )
182204 }
183- return nil
205+
206+ return extProcErr
184207}
185208
186209// readConfig returns config from the given path, substituting ENV variables
@@ -212,32 +235,32 @@ func recreateDir(path string) error {
212235
213236// writeEnvoyResourcesAndRunExtProc reads all resources from the given string, writes them to the output file, and runs
214237// external processes for EnvoyExtensionPolicy resources.
215- func (runCtx * runCmdContext ) writeEnvoyResourcesAndRunExtProc (ctx context.Context , original string ) (client.Client , error ) {
238+ func (runCtx * runCmdContext ) writeEnvoyResourcesAndRunExtProc (ctx context.Context , original string ) (client.Client , <- chan error , error ) {
216239 aigwRoutes , aigwBackends , backendSecurityPolicies , gateways , secrets , err := collectObjects (original , runCtx .envoyGatewayResourcesOut , runCtx .stderrLogger )
217240 if err != nil {
218- return nil , fmt .Errorf ("error collecting: %w" , err )
241+ return nil , nil , fmt .Errorf ("error collecting: %w" , err )
219242 }
220243 if len (gateways ) > 1 {
221- return nil , fmt .Errorf ("multiple gateways are not supported: %s" , gateways [0 ].Name )
244+ return nil , nil , fmt .Errorf ("multiple gateways are not supported: %s" , gateways [0 ].Name )
222245 }
223246 for _ , bsp := range backendSecurityPolicies {
224247 spec := bsp .Spec
225248 if spec .AWSCredentials != nil && spec .AWSCredentials .OIDCExchangeToken != nil {
226249 // TODO: We can make it work by generalizing the rotation logic.
227- return nil , fmt .Errorf ("OIDC exchange token is not supported: %s" , bsp .Name )
250+ return nil , nil , fmt .Errorf ("OIDC exchange token is not supported: %s" , bsp .Name )
228251 }
229252 }
230253
231254 // Do the substitution for the secrets.
232255 for _ , s := range secrets {
233256 if err = runCtx .rewriteSecretWithAnnotatedLocation (s ); err != nil {
234- return nil , fmt .Errorf ("failed to rewrite secret %s: %w" , s .Name , err )
257+ return nil , nil , fmt .Errorf ("failed to rewrite secret %s: %w" , s .Name , err )
235258 }
236259 }
237260
238261 fakeClient , _fakeClientSet , httpRoutes , eps , httpRouteFilter , backends , _ , err := translateCustomResourceObjects (ctx , aigwRoutes , aigwBackends , backendSecurityPolicies , gateways , secrets , runCtx .stderrLogger )
239262 if err != nil {
240- return nil , fmt .Errorf ("error translating: %w" , err )
263+ return nil , nil , fmt .Errorf ("error translating: %w" , err )
241264 }
242265 runCtx .fakeClientSet = _fakeClientSet
243266
@@ -260,27 +283,27 @@ func (runCtx *runCmdContext) writeEnvoyResourcesAndRunExtProc(ctx context.Contex
260283 Secrets ("" ).Get (ctx ,
261284 controller .FilterConfigSecretPerGatewayName (gw .Name , gw .Namespace ), metav1.GetOptions {})
262285 if err != nil {
263- return nil , fmt .Errorf ("failed to get filter config secret: %w" , err )
286+ return nil , nil , fmt .Errorf ("failed to get filter config secret: %w" , err )
264287 }
265288
266289 rawConfig , ok := filterConfigSecret .StringData [controller .FilterConfigKeyInSecret ]
267290 if ! ok {
268- return nil , fmt .Errorf ("failed to get filter config from secret: %w" , err )
291+ return nil , nil , fmt .Errorf ("failed to get filter config from secret: %w" , err )
269292 }
270293 var fc filterapi.Config
271294 if err = yaml .Unmarshal ([]byte (rawConfig ), & fc ); err != nil {
272- return nil , fmt .Errorf ("failed to unmarshal filter config: %w" , err )
295+ return nil , nil , fmt .Errorf ("failed to unmarshal filter config: %w" , err )
273296 }
274297 runCtx .stderrLogger .Info ("Running external process" , "config" , fc )
275- runCtx .mustStartExtProc (ctx , & fc )
276- return fakeClient , nil
298+ done := runCtx .mustStartExtProc (ctx , & fc )
299+ return fakeClient , done , nil
277300}
278301
279302// mustStartExtProc starts the external process with the given working directory, port, and filter configuration.
280303func (runCtx * runCmdContext ) mustStartExtProc (
281304 ctx context.Context ,
282305 filterCfg * filterapi.Config ,
283- ) {
306+ ) <- chan error {
284307 marshaled , err := yaml .Marshal (filterCfg )
285308 if err != nil {
286309 panic (fmt .Sprintf ("BUG: failed to marshal filter config: %v" , err ))
@@ -300,11 +323,16 @@ func (runCtx *runCmdContext) mustStartExtProc(
300323 } else {
301324 args = append (args , "--logLevel" , "warn" )
302325 }
326+
327+ done := make (chan error )
303328 go func () {
304329 if err := mainlib .Main (ctx , args , os .Stderr ); err != nil {
305330 runCtx .stderrLogger .Error ("Failed to run external processor" , "error" , err )
331+ done <- fmt .Errorf ("%w: %w" , errExtProcRun , err )
306332 }
333+ close (done )
307334 }()
335+ return done
308336}
309337
310338// mustClearSetOwnerReferencesAndStatusAndWriteObj clears the owner references and status of the given object, marshals it
0 commit comments