11package extproc
22
33import (
4+ "context"
45 "fmt"
5- "log"
66 "net"
77 "os"
88 "os/signal"
9+ "path/filepath"
10+ "sync/atomic"
911 "syscall"
12+ "time"
1013
1114 ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
15+ "github.com/fsnotify/fsnotify"
16+ "github.com/vllm-project/semantic-router/semantic-router/pkg/observability"
1217 "google.golang.org/grpc"
1318)
1419
1520// Server represents a gRPC server for the Envoy ExtProc
1621type Server struct {
17- router * OpenAIRouter
18- server * grpc.Server
19- port int
22+ configPath string
23+ service * RouterService
24+ server * grpc.Server
25+ port int
2026}
2127
2228// NewServer creates a new ExtProc gRPC server
@@ -26,9 +32,11 @@ func NewServer(configPath string, port int) (*Server, error) {
2632 return nil , err
2733 }
2834
35+ service := NewRouterService (router )
2936 return & Server {
30- router : router ,
31- port : port ,
37+ configPath : configPath ,
38+ service : service ,
39+ port : port ,
3240 }, nil
3341}
3442
@@ -40,21 +48,26 @@ func (s *Server) Start() error {
4048 }
4149
4250 s .server = grpc .NewServer ()
43- ext_proc .RegisterExternalProcessorServer (s .server , s .router )
51+ ext_proc .RegisterExternalProcessorServer (s .server , s .service )
4452
45- log . Printf ("Starting LLM Router ExtProc server on port %d..." , s .port )
53+ observability . Infof ("Starting LLM Router ExtProc server on port %d..." , s .port )
4654
4755 // Run the server in a separate goroutine
4856 serverErrCh := make (chan error , 1 )
4957 go func () {
5058 if err := s .server .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
51- log . Printf ("Server error: %v" , err )
59+ observability . Errorf ("Server error: %v" , err )
5260 serverErrCh <- err
5361 } else {
5462 serverErrCh <- nil
5563 }
5664 }()
5765
66+ // Start config file watcher in background
67+ ctx , cancel := context .WithCancel (context .Background ())
68+ defer cancel ()
69+ go s .watchConfigAndReload (ctx )
70+
5871 // Wait for interrupt signal to gracefully shut down the server
5972 signalChan := make (chan os.Signal , 1 )
6073 signal .Notify (signalChan , syscall .SIGINT , syscall .SIGTERM )
@@ -63,11 +76,11 @@ func (s *Server) Start() error {
6376 select {
6477 case err := <- serverErrCh :
6578 if err != nil {
66- log . Printf ("Server exited with error: %v" , err )
79+ observability . Errorf ("Server exited with error: %v" , err )
6780 return err
6881 }
6982 case <- signalChan :
70- log . Println ("Received shutdown signal, gracefully stopping server..." )
83+ observability . Infof ("Received shutdown signal, gracefully stopping server..." )
7184 }
7285
7386 s .Stop ()
@@ -78,6 +91,105 @@ func (s *Server) Start() error {
7891func (s * Server ) Stop () {
7992 if s .server != nil {
8093 s .server .GracefulStop ()
81- log .Println ("Server stopped" )
94+ observability .Infof ("Server stopped" )
95+ }
96+ }
97+
98+ // RouterService is a delegating gRPC service that forwards to the current router implementation.
99+ type RouterService struct {
100+ current atomic.Pointer [OpenAIRouter ]
101+ }
102+
103+ func NewRouterService (r * OpenAIRouter ) * RouterService {
104+ rs := & RouterService {}
105+ rs .current .Store (r )
106+ return rs
107+ }
108+
109+ // Swap replaces the current router implementation.
110+ func (rs * RouterService ) Swap (r * OpenAIRouter ) { rs .current .Store (r ) }
111+
112+ // Process delegates to the current router.
113+ func (rs * RouterService ) Process (stream ext_proc.ExternalProcessor_ProcessServer ) error {
114+ r := rs .current .Load ()
115+ return r .Process (stream )
116+ }
117+
118+ // watchConfigAndReload watches the config file and reloads router on changes.
119+ func (s * Server ) watchConfigAndReload (ctx context.Context ) {
120+ watcher , err := fsnotify .NewWatcher ()
121+ if err != nil {
122+ observability .LogEvent ("config_watcher_error" , map [string ]interface {}{
123+ "stage" : "create_watcher" ,
124+ "error" : err .Error (),
125+ })
126+ return
127+ }
128+ defer watcher .Close ()
129+
130+ cfgFile := s .configPath
131+ cfgDir := filepath .Dir (cfgFile )
132+
133+ // Watch both the file and its directory to handle symlink swaps (Kubernetes ConfigMap)
134+ if err := watcher .Add (cfgDir ); err != nil {
135+ observability .LogEvent ("config_watcher_error" , map [string ]interface {}{
136+ "stage" : "watch_dir" ,
137+ "dir" : cfgDir ,
138+ "error" : err .Error (),
139+ })
140+ return
141+ }
142+ _ = watcher .Add (cfgFile ) // best-effort; may fail if file replaced by symlink later
143+
144+ // Debounce events
145+ var (
146+ pending bool
147+ last time.Time
148+ )
149+
150+ reload := func () {
151+ // Parse and build a new router
152+ newRouter , err := NewOpenAIRouter (cfgFile )
153+ if err != nil {
154+ observability .LogEvent ("config_reload_failed" , map [string ]interface {}{
155+ "file" : cfgFile ,
156+ "error" : err .Error (),
157+ })
158+ return
159+ }
160+ s .service .Swap (newRouter )
161+ observability .LogEvent ("config_reloaded" , map [string ]interface {}{
162+ "file" : cfgFile ,
163+ })
164+ }
165+
166+ for {
167+ select {
168+ case <- ctx .Done ():
169+ return
170+ case ev , ok := <- watcher .Events :
171+ if ! ok {
172+ return
173+ }
174+ if ev .Op & (fsnotify .Write | fsnotify .Create | fsnotify .Rename | fsnotify .Remove | fsnotify .Chmod ) != 0 {
175+ // If the event pertains to the config file or directory, trigger debounce
176+ if filepath .Base (ev .Name ) == filepath .Base (cfgFile ) || filepath .Dir (ev .Name ) == cfgDir {
177+ if ! pending || time .Since (last ) > 250 * time .Millisecond {
178+ pending = true
179+ last = time .Now ()
180+ // Slight delay to let file settle
181+ go func () { time .Sleep (300 * time .Millisecond ); reload () }()
182+ }
183+ }
184+ }
185+ case err , ok := <- watcher .Errors :
186+ if ! ok {
187+ return
188+ }
189+ observability .LogEvent ("config_watcher_error" , map [string ]interface {}{
190+ "stage" : "watch_loop" ,
191+ "error" : err .Error (),
192+ })
193+ }
82194 }
83195}
0 commit comments