Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/semantic-router/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions src/semantic-router/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8k
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
Expand Down
47 changes: 37 additions & 10 deletions src/semantic-router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"os"
"path/filepath"
"sync"

"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -291,30 +292,56 @@ var (
config *RouterConfig
configOnce sync.Once
configErr error
configMu sync.RWMutex
)

// LoadConfig loads the configuration from the specified YAML file
// LoadConfig loads the configuration from the specified YAML file once and caches it globally.
func LoadConfig(configPath string) (*RouterConfig, error) {
configOnce.Do(func() {
data, err := os.ReadFile(configPath)
cfg, err := ParseConfigFile(configPath)
if err != nil {
configErr = fmt.Errorf("failed to read config file: %w", err)
return
}

config = &RouterConfig{}
if err := yaml.Unmarshal(data, config); err != nil {
configErr = fmt.Errorf("failed to parse config file: %w", err)
configErr = err
return
}
configMu.Lock()
config = cfg
configMu.Unlock()
})

if configErr != nil {
return nil, configErr
}
configMu.RLock()
defer configMu.RUnlock()
return config, nil
}

// ParseConfigFile parses the YAML config file without touching the global cache.
func ParseConfigFile(configPath string) (*RouterConfig, error) {
// Resolve symlinks to handle Kubernetes ConfigMap mounts
resolved, _ := filepath.EvalSymlinks(configPath)
if resolved == "" {
resolved = configPath
}
data, err := os.ReadFile(resolved)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
cfg := &RouterConfig{}
if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
return cfg, nil
}

// ReplaceGlobalConfig replaces the globally cached config. It is safe for concurrent readers.
func ReplaceGlobalConfig(newCfg *RouterConfig) {
configMu.Lock()
defer configMu.Unlock()
config = newCfg
// Do not reset configOnce to avoid racing re-parses via LoadConfig; callers should use ParseConfigFile for fresher reads.
configErr = nil
}

// GetConfig returns the current configuration
func GetConfig() *RouterConfig {
return config
Expand Down
62 changes: 62 additions & 0 deletions src/semantic-router/pkg/config/parse_configfile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package config_test

import (
"os"
"path/filepath"
"runtime"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/vllm-project/semantic-router/semantic-router/pkg/config"
)

var _ = Describe("ParseConfigFile and ReplaceGlobalConfig", func() {
var tempDir string

BeforeEach(func() {
var err error
tempDir, err = os.MkdirTemp("", "config_parse_test")
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
os.RemoveAll(tempDir)
config.ResetConfig()
})

It("should parse configuration via symlink path", func() {
if runtime.GOOS == "windows" {
Skip("symlink test is skipped on Windows")
}

// Create real config target
target := filepath.Join(tempDir, "real-config.yaml")
content := []byte("default_model: test-model\n")
Expect(os.WriteFile(target, content, 0o644)).To(Succeed())

// Create symlink pointing to target
link := filepath.Join(tempDir, "link-config.yaml")
Expect(os.Symlink(target, link)).To(Succeed())

cfg, err := config.ParseConfigFile(link)
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
Expect(cfg.DefaultModel).To(Equal("test-model"))
})

It("should return error when file does not exist", func() {
_, err := config.ParseConfigFile(filepath.Join(tempDir, "no-such.yaml"))
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to read config file"))
})

It("should replace global config and reflect via GetConfig", func() {
// new config instance
newCfg := &config.RouterConfig{DefaultModel: "new-default"}
config.ReplaceGlobalConfig(newCfg)
got := config.GetConfig()
Expect(got).To(Equal(newCfg))
Expect(got.DefaultModel).To(Equal("new-default"))
})
})
5 changes: 4 additions & 1 deletion src/semantic-router/pkg/extproc/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ var _ ext_proc.ExternalProcessorServer = (*OpenAIRouter)(nil)

// NewOpenAIRouter creates a new OpenAI API router instance
func NewOpenAIRouter(configPath string) (*OpenAIRouter, error) {
cfg, err := config.LoadConfig(configPath)
// Always parse fresh config for router construction (supports live reload)
cfg, err := config.ParseConfigFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %w", err)
}
// Update global config reference for packages that rely on config.GetConfig()
config.ReplaceGlobalConfig(cfg)

initMutex.Lock()
defer initMutex.Unlock()
Expand Down
136 changes: 124 additions & 12 deletions src/semantic-router/pkg/extproc/server.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package extproc

import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"sync/atomic"
"syscall"
"time"

ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/fsnotify/fsnotify"
"github.com/vllm-project/semantic-router/semantic-router/pkg/observability"
"google.golang.org/grpc"
)

// Server represents a gRPC server for the Envoy ExtProc
type Server struct {
router *OpenAIRouter
server *grpc.Server
port int
configPath string
service *RouterService
server *grpc.Server
port int
}

// NewServer creates a new ExtProc gRPC server
Expand All @@ -26,9 +32,11 @@ func NewServer(configPath string, port int) (*Server, error) {
return nil, err
}

service := NewRouterService(router)
return &Server{
router: router,
port: port,
configPath: configPath,
service: service,
port: port,
}, nil
}

Expand All @@ -40,21 +48,26 @@ func (s *Server) Start() error {
}

s.server = grpc.NewServer()
ext_proc.RegisterExternalProcessorServer(s.server, s.router)
ext_proc.RegisterExternalProcessorServer(s.server, s.service)

log.Printf("Starting LLM Router ExtProc server on port %d...", s.port)
observability.Infof("Starting LLM Router ExtProc server on port %d...", s.port)

// Run the server in a separate goroutine
serverErrCh := make(chan error, 1)
go func() {
if err := s.server.Serve(lis); err != nil && err != grpc.ErrServerStopped {
log.Printf("Server error: %v", err)
observability.Errorf("Server error: %v", err)
serverErrCh <- err
} else {
serverErrCh <- nil
}
}()

// Start config file watcher in background
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.watchConfigAndReload(ctx)

// Wait for interrupt signal to gracefully shut down the server
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -63,11 +76,11 @@ func (s *Server) Start() error {
select {
case err := <-serverErrCh:
if err != nil {
log.Printf("Server exited with error: %v", err)
observability.Errorf("Server exited with error: %v", err)
return err
}
case <-signalChan:
log.Println("Received shutdown signal, gracefully stopping server...")
observability.Infof("Received shutdown signal, gracefully stopping server...")
}

s.Stop()
Expand All @@ -78,6 +91,105 @@ func (s *Server) Start() error {
func (s *Server) Stop() {
if s.server != nil {
s.server.GracefulStop()
log.Println("Server stopped")
observability.Infof("Server stopped")
}
}

// RouterService is a delegating gRPC service that forwards to the current router implementation.
type RouterService struct {
current atomic.Pointer[OpenAIRouter]
}

func NewRouterService(r *OpenAIRouter) *RouterService {
rs := &RouterService{}
rs.current.Store(r)
return rs
}

// Swap replaces the current router implementation.
func (rs *RouterService) Swap(r *OpenAIRouter) { rs.current.Store(r) }

// Process delegates to the current router.
func (rs *RouterService) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
r := rs.current.Load()
return r.Process(stream)
}

// watchConfigAndReload watches the config file and reloads router on changes.
func (s *Server) watchConfigAndReload(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
observability.LogEvent("config_watcher_error", map[string]interface{}{
"stage": "create_watcher",
"error": err.Error(),
})
return
}
defer watcher.Close()

cfgFile := s.configPath
cfgDir := filepath.Dir(cfgFile)

// Watch both the file and its directory to handle symlink swaps (Kubernetes ConfigMap)
if err := watcher.Add(cfgDir); err != nil {
observability.LogEvent("config_watcher_error", map[string]interface{}{
"stage": "watch_dir",
"dir": cfgDir,
"error": err.Error(),
})
return
}
_ = watcher.Add(cfgFile) // best-effort; may fail if file replaced by symlink later

// Debounce events
var (
pending bool
last time.Time
)

reload := func() {
// Parse and build a new router
newRouter, err := NewOpenAIRouter(cfgFile)
if err != nil {
observability.LogEvent("config_reload_failed", map[string]interface{}{
"file": cfgFile,
"error": err.Error(),
})
return
}
s.service.Swap(newRouter)
observability.LogEvent("config_reloaded", map[string]interface{}{
"file": cfgFile,
})
}

for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a followup pr to test config file update under different scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#92 for tracking

select {
case <-ctx.Done():
return
case ev, ok := <-watcher.Events:
if !ok {
return
}
if ev.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove|fsnotify.Chmod) != 0 {
// If the event pertains to the config file or directory, trigger debounce
if filepath.Base(ev.Name) == filepath.Base(cfgFile) || filepath.Dir(ev.Name) == cfgDir {
if !pending || time.Since(last) > 250*time.Millisecond {
pending = true
last = time.Now()
// Slight delay to let file settle
go func() { time.Sleep(300 * time.Millisecond); reload() }()
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
observability.LogEvent("config_watcher_error", map[string]interface{}{
"stage": "watch_loop",
"error": err.Error(),
})
}
}
}
Loading