diff --git a/src/semantic-router/go.mod b/src/semantic-router/go.mod index 2c0726a4..0e6aaf98 100644 --- a/src/semantic-router/go.mod +++ b/src/semantic-router/go.mod @@ -24,6 +24,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.7.0 // indirect diff --git a/src/semantic-router/go.sum b/src/semantic-router/go.sum index 03ac457d..77570f5b 100644 --- a/src/semantic-router/go.sum +++ b/src/semantic-router/go.sum @@ -10,6 +10,8 @@ github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8k github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw= github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= diff --git a/src/semantic-router/pkg/config/config.go b/src/semantic-router/pkg/config/config.go index 85ee09f1..b3bc4101 100644 --- a/src/semantic-router/pkg/config/config.go +++ b/src/semantic-router/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "os" + "path/filepath" "sync" "gopkg.in/yaml.v3" @@ -291,30 +292,56 @@ var ( config *RouterConfig configOnce sync.Once configErr error + configMu sync.RWMutex ) -// LoadConfig loads the configuration from the specified YAML file +// LoadConfig loads the configuration from the specified YAML file once and caches it globally. func LoadConfig(configPath string) (*RouterConfig, error) { configOnce.Do(func() { - data, err := os.ReadFile(configPath) + cfg, err := ParseConfigFile(configPath) if err != nil { - configErr = fmt.Errorf("failed to read config file: %w", err) - return - } - - config = &RouterConfig{} - if err := yaml.Unmarshal(data, config); err != nil { - configErr = fmt.Errorf("failed to parse config file: %w", err) + configErr = err return } + configMu.Lock() + config = cfg + configMu.Unlock() }) - if configErr != nil { return nil, configErr } + configMu.RLock() + defer configMu.RUnlock() return config, nil } +// ParseConfigFile parses the YAML config file without touching the global cache. +func ParseConfigFile(configPath string) (*RouterConfig, error) { + // Resolve symlinks to handle Kubernetes ConfigMap mounts + resolved, _ := filepath.EvalSymlinks(configPath) + if resolved == "" { + resolved = configPath + } + data, err := os.ReadFile(resolved) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + cfg := &RouterConfig{} + if err := yaml.Unmarshal(data, cfg); err != nil { + return nil, fmt.Errorf("failed to parse config file: %w", err) + } + return cfg, nil +} + +// ReplaceGlobalConfig replaces the globally cached config. It is safe for concurrent readers. +func ReplaceGlobalConfig(newCfg *RouterConfig) { + configMu.Lock() + defer configMu.Unlock() + config = newCfg + // Do not reset configOnce to avoid racing re-parses via LoadConfig; callers should use ParseConfigFile for fresher reads. + configErr = nil +} + // GetConfig returns the current configuration func GetConfig() *RouterConfig { return config diff --git a/src/semantic-router/pkg/config/parse_configfile_test.go b/src/semantic-router/pkg/config/parse_configfile_test.go new file mode 100644 index 00000000..d0fe30bd --- /dev/null +++ b/src/semantic-router/pkg/config/parse_configfile_test.go @@ -0,0 +1,62 @@ +package config_test + +import ( + "os" + "path/filepath" + "runtime" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/vllm-project/semantic-router/semantic-router/pkg/config" +) + +var _ = Describe("ParseConfigFile and ReplaceGlobalConfig", func() { + var tempDir string + + BeforeEach(func() { + var err error + tempDir, err = os.MkdirTemp("", "config_parse_test") + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + os.RemoveAll(tempDir) + config.ResetConfig() + }) + + It("should parse configuration via symlink path", func() { + if runtime.GOOS == "windows" { + Skip("symlink test is skipped on Windows") + } + + // Create real config target + target := filepath.Join(tempDir, "real-config.yaml") + content := []byte("default_model: test-model\n") + Expect(os.WriteFile(target, content, 0o644)).To(Succeed()) + + // Create symlink pointing to target + link := filepath.Join(tempDir, "link-config.yaml") + Expect(os.Symlink(target, link)).To(Succeed()) + + cfg, err := config.ParseConfigFile(link) + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + Expect(cfg.DefaultModel).To(Equal("test-model")) + }) + + It("should return error when file does not exist", func() { + _, err := config.ParseConfigFile(filepath.Join(tempDir, "no-such.yaml")) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to read config file")) + }) + + It("should replace global config and reflect via GetConfig", func() { + // new config instance + newCfg := &config.RouterConfig{DefaultModel: "new-default"} + config.ReplaceGlobalConfig(newCfg) + got := config.GetConfig() + Expect(got).To(Equal(newCfg)) + Expect(got.DefaultModel).To(Equal("new-default")) + }) +}) diff --git a/src/semantic-router/pkg/extproc/router.go b/src/semantic-router/pkg/extproc/router.go index 773fe1c8..a417351e 100644 --- a/src/semantic-router/pkg/extproc/router.go +++ b/src/semantic-router/pkg/extproc/router.go @@ -41,10 +41,13 @@ var _ ext_proc.ExternalProcessorServer = (*OpenAIRouter)(nil) // NewOpenAIRouter creates a new OpenAI API router instance func NewOpenAIRouter(configPath string) (*OpenAIRouter, error) { - cfg, err := config.LoadConfig(configPath) + // Always parse fresh config for router construction (supports live reload) + cfg, err := config.ParseConfigFile(configPath) if err != nil { return nil, fmt.Errorf("failed to load config: %w", err) } + // Update global config reference for packages that rely on config.GetConfig() + config.ReplaceGlobalConfig(cfg) initMutex.Lock() defer initMutex.Unlock() diff --git a/src/semantic-router/pkg/extproc/server.go b/src/semantic-router/pkg/extproc/server.go index 4502cc0c..1c999976 100644 --- a/src/semantic-router/pkg/extproc/server.go +++ b/src/semantic-router/pkg/extproc/server.go @@ -1,22 +1,28 @@ package extproc import ( + "context" "fmt" - "log" "net" "os" "os/signal" + "path/filepath" + "sync/atomic" "syscall" + "time" ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/fsnotify/fsnotify" + "github.com/vllm-project/semantic-router/semantic-router/pkg/observability" "google.golang.org/grpc" ) // Server represents a gRPC server for the Envoy ExtProc type Server struct { - router *OpenAIRouter - server *grpc.Server - port int + configPath string + service *RouterService + server *grpc.Server + port int } // NewServer creates a new ExtProc gRPC server @@ -26,9 +32,11 @@ func NewServer(configPath string, port int) (*Server, error) { return nil, err } + service := NewRouterService(router) return &Server{ - router: router, - port: port, + configPath: configPath, + service: service, + port: port, }, nil } @@ -40,21 +48,26 @@ func (s *Server) Start() error { } s.server = grpc.NewServer() - ext_proc.RegisterExternalProcessorServer(s.server, s.router) + ext_proc.RegisterExternalProcessorServer(s.server, s.service) - log.Printf("Starting LLM Router ExtProc server on port %d...", s.port) + observability.Infof("Starting LLM Router ExtProc server on port %d...", s.port) // Run the server in a separate goroutine serverErrCh := make(chan error, 1) go func() { if err := s.server.Serve(lis); err != nil && err != grpc.ErrServerStopped { - log.Printf("Server error: %v", err) + observability.Errorf("Server error: %v", err) serverErrCh <- err } else { serverErrCh <- nil } }() + // Start config file watcher in background + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.watchConfigAndReload(ctx) + // Wait for interrupt signal to gracefully shut down the server signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) @@ -63,11 +76,11 @@ func (s *Server) Start() error { select { case err := <-serverErrCh: if err != nil { - log.Printf("Server exited with error: %v", err) + observability.Errorf("Server exited with error: %v", err) return err } case <-signalChan: - log.Println("Received shutdown signal, gracefully stopping server...") + observability.Infof("Received shutdown signal, gracefully stopping server...") } s.Stop() @@ -78,6 +91,105 @@ func (s *Server) Start() error { func (s *Server) Stop() { if s.server != nil { s.server.GracefulStop() - log.Println("Server stopped") + observability.Infof("Server stopped") + } +} + +// RouterService is a delegating gRPC service that forwards to the current router implementation. +type RouterService struct { + current atomic.Pointer[OpenAIRouter] +} + +func NewRouterService(r *OpenAIRouter) *RouterService { + rs := &RouterService{} + rs.current.Store(r) + return rs +} + +// Swap replaces the current router implementation. +func (rs *RouterService) Swap(r *OpenAIRouter) { rs.current.Store(r) } + +// Process delegates to the current router. +func (rs *RouterService) Process(stream ext_proc.ExternalProcessor_ProcessServer) error { + r := rs.current.Load() + return r.Process(stream) +} + +// watchConfigAndReload watches the config file and reloads router on changes. +func (s *Server) watchConfigAndReload(ctx context.Context) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + observability.LogEvent("config_watcher_error", map[string]interface{}{ + "stage": "create_watcher", + "error": err.Error(), + }) + return + } + defer watcher.Close() + + cfgFile := s.configPath + cfgDir := filepath.Dir(cfgFile) + + // Watch both the file and its directory to handle symlink swaps (Kubernetes ConfigMap) + if err := watcher.Add(cfgDir); err != nil { + observability.LogEvent("config_watcher_error", map[string]interface{}{ + "stage": "watch_dir", + "dir": cfgDir, + "error": err.Error(), + }) + return + } + _ = watcher.Add(cfgFile) // best-effort; may fail if file replaced by symlink later + + // Debounce events + var ( + pending bool + last time.Time + ) + + reload := func() { + // Parse and build a new router + newRouter, err := NewOpenAIRouter(cfgFile) + if err != nil { + observability.LogEvent("config_reload_failed", map[string]interface{}{ + "file": cfgFile, + "error": err.Error(), + }) + return + } + s.service.Swap(newRouter) + observability.LogEvent("config_reloaded", map[string]interface{}{ + "file": cfgFile, + }) + } + + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-watcher.Events: + if !ok { + return + } + if ev.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove|fsnotify.Chmod) != 0 { + // If the event pertains to the config file or directory, trigger debounce + if filepath.Base(ev.Name) == filepath.Base(cfgFile) || filepath.Dir(ev.Name) == cfgDir { + if !pending || time.Since(last) > 250*time.Millisecond { + pending = true + last = time.Now() + // Slight delay to let file settle + go func() { time.Sleep(300 * time.Millisecond); reload() }() + } + } + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + observability.LogEvent("config_watcher_error", map[string]interface{}{ + "stage": "watch_loop", + "error": err.Error(), + }) + } } }