Skip to content

Commit 9bca691

Browse files
feat(config): watch config file and hot-reload router without restart (#84)
* feat(config): watch config file and hot-reload router without restart Signed-off-by: Jintao Zhang <[email protected]> * test(config): add unit tests for ParseConfigFile and ReplaceGlobalConfig Signed-off-by: Jintao Zhang <[email protected]> --------- Signed-off-by: Jintao Zhang <[email protected]>
1 parent 79ecf3e commit 9bca691

File tree

6 files changed

+230
-23
lines changed

6 files changed

+230
-23
lines changed

src/semantic-router/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2525
github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect
2626
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
27+
github.com/fsnotify/fsnotify v1.7.0 // indirect
2728
github.com/go-logr/logr v1.4.2 // indirect
2829
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
2930
github.com/google/go-cmp v0.7.0 // indirect

src/semantic-router/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8k
1010
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
1111
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
1212
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
13+
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
14+
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
1315
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
1416
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
1517
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=

src/semantic-router/pkg/config/config.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"fmt"
55
"os"
6+
"path/filepath"
67
"sync"
78

89
"gopkg.in/yaml.v3"
@@ -291,30 +292,56 @@ var (
291292
config *RouterConfig
292293
configOnce sync.Once
293294
configErr error
295+
configMu sync.RWMutex
294296
)
295297

296-
// LoadConfig loads the configuration from the specified YAML file
298+
// LoadConfig loads the configuration from the specified YAML file once and caches it globally.
297299
func LoadConfig(configPath string) (*RouterConfig, error) {
298300
configOnce.Do(func() {
299-
data, err := os.ReadFile(configPath)
301+
cfg, err := ParseConfigFile(configPath)
300302
if err != nil {
301-
configErr = fmt.Errorf("failed to read config file: %w", err)
302-
return
303-
}
304-
305-
config = &RouterConfig{}
306-
if err := yaml.Unmarshal(data, config); err != nil {
307-
configErr = fmt.Errorf("failed to parse config file: %w", err)
303+
configErr = err
308304
return
309305
}
306+
configMu.Lock()
307+
config = cfg
308+
configMu.Unlock()
310309
})
311-
312310
if configErr != nil {
313311
return nil, configErr
314312
}
313+
configMu.RLock()
314+
defer configMu.RUnlock()
315315
return config, nil
316316
}
317317

318+
// ParseConfigFile parses the YAML config file without touching the global cache.
319+
func ParseConfigFile(configPath string) (*RouterConfig, error) {
320+
// Resolve symlinks to handle Kubernetes ConfigMap mounts
321+
resolved, _ := filepath.EvalSymlinks(configPath)
322+
if resolved == "" {
323+
resolved = configPath
324+
}
325+
data, err := os.ReadFile(resolved)
326+
if err != nil {
327+
return nil, fmt.Errorf("failed to read config file: %w", err)
328+
}
329+
cfg := &RouterConfig{}
330+
if err := yaml.Unmarshal(data, cfg); err != nil {
331+
return nil, fmt.Errorf("failed to parse config file: %w", err)
332+
}
333+
return cfg, nil
334+
}
335+
336+
// ReplaceGlobalConfig replaces the globally cached config. It is safe for concurrent readers.
337+
func ReplaceGlobalConfig(newCfg *RouterConfig) {
338+
configMu.Lock()
339+
defer configMu.Unlock()
340+
config = newCfg
341+
// Do not reset configOnce to avoid racing re-parses via LoadConfig; callers should use ParseConfigFile for fresher reads.
342+
configErr = nil
343+
}
344+
318345
// GetConfig returns the current configuration
319346
func GetConfig() *RouterConfig {
320347
return config
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package config_test
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"runtime"
7+
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
11+
"github.com/vllm-project/semantic-router/semantic-router/pkg/config"
12+
)
13+
14+
var _ = Describe("ParseConfigFile and ReplaceGlobalConfig", func() {
15+
var tempDir string
16+
17+
BeforeEach(func() {
18+
var err error
19+
tempDir, err = os.MkdirTemp("", "config_parse_test")
20+
Expect(err).NotTo(HaveOccurred())
21+
})
22+
23+
AfterEach(func() {
24+
os.RemoveAll(tempDir)
25+
config.ResetConfig()
26+
})
27+
28+
It("should parse configuration via symlink path", func() {
29+
if runtime.GOOS == "windows" {
30+
Skip("symlink test is skipped on Windows")
31+
}
32+
33+
// Create real config target
34+
target := filepath.Join(tempDir, "real-config.yaml")
35+
content := []byte("default_model: test-model\n")
36+
Expect(os.WriteFile(target, content, 0o644)).To(Succeed())
37+
38+
// Create symlink pointing to target
39+
link := filepath.Join(tempDir, "link-config.yaml")
40+
Expect(os.Symlink(target, link)).To(Succeed())
41+
42+
cfg, err := config.ParseConfigFile(link)
43+
Expect(err).NotTo(HaveOccurred())
44+
Expect(cfg).NotTo(BeNil())
45+
Expect(cfg.DefaultModel).To(Equal("test-model"))
46+
})
47+
48+
It("should return error when file does not exist", func() {
49+
_, err := config.ParseConfigFile(filepath.Join(tempDir, "no-such.yaml"))
50+
Expect(err).To(HaveOccurred())
51+
Expect(err.Error()).To(ContainSubstring("failed to read config file"))
52+
})
53+
54+
It("should replace global config and reflect via GetConfig", func() {
55+
// new config instance
56+
newCfg := &config.RouterConfig{DefaultModel: "new-default"}
57+
config.ReplaceGlobalConfig(newCfg)
58+
got := config.GetConfig()
59+
Expect(got).To(Equal(newCfg))
60+
Expect(got.DefaultModel).To(Equal("new-default"))
61+
})
62+
})

src/semantic-router/pkg/extproc/router.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ var _ ext_proc.ExternalProcessorServer = (*OpenAIRouter)(nil)
4141

4242
// NewOpenAIRouter creates a new OpenAI API router instance
4343
func NewOpenAIRouter(configPath string) (*OpenAIRouter, error) {
44-
cfg, err := config.LoadConfig(configPath)
44+
// Always parse fresh config for router construction (supports live reload)
45+
cfg, err := config.ParseConfigFile(configPath)
4546
if err != nil {
4647
return nil, fmt.Errorf("failed to load config: %w", err)
4748
}
49+
// Update global config reference for packages that rely on config.GetConfig()
50+
config.ReplaceGlobalConfig(cfg)
4851

4952
initMutex.Lock()
5053
defer initMutex.Unlock()
Lines changed: 124 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
package extproc
22

33
import (
4+
"context"
45
"fmt"
5-
"log"
66
"net"
77
"os"
88
"os/signal"
9+
"path/filepath"
10+
"sync/atomic"
911
"syscall"
12+
"time"
1013

1114
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
15+
"github.com/fsnotify/fsnotify"
16+
"github.com/vllm-project/semantic-router/semantic-router/pkg/observability"
1217
"google.golang.org/grpc"
1318
)
1419

1520
// Server represents a gRPC server for the Envoy ExtProc
1621
type Server struct {
17-
router *OpenAIRouter
18-
server *grpc.Server
19-
port int
22+
configPath string
23+
service *RouterService
24+
server *grpc.Server
25+
port int
2026
}
2127

2228
// NewServer creates a new ExtProc gRPC server
@@ -26,9 +32,11 @@ func NewServer(configPath string, port int) (*Server, error) {
2632
return nil, err
2733
}
2834

35+
service := NewRouterService(router)
2936
return &Server{
30-
router: router,
31-
port: port,
37+
configPath: configPath,
38+
service: service,
39+
port: port,
3240
}, nil
3341
}
3442

@@ -40,21 +48,26 @@ func (s *Server) Start() error {
4048
}
4149

4250
s.server = grpc.NewServer()
43-
ext_proc.RegisterExternalProcessorServer(s.server, s.router)
51+
ext_proc.RegisterExternalProcessorServer(s.server, s.service)
4452

45-
log.Printf("Starting LLM Router ExtProc server on port %d...", s.port)
53+
observability.Infof("Starting LLM Router ExtProc server on port %d...", s.port)
4654

4755
// Run the server in a separate goroutine
4856
serverErrCh := make(chan error, 1)
4957
go func() {
5058
if err := s.server.Serve(lis); err != nil && err != grpc.ErrServerStopped {
51-
log.Printf("Server error: %v", err)
59+
observability.Errorf("Server error: %v", err)
5260
serverErrCh <- err
5361
} else {
5462
serverErrCh <- nil
5563
}
5664
}()
5765

66+
// Start config file watcher in background
67+
ctx, cancel := context.WithCancel(context.Background())
68+
defer cancel()
69+
go s.watchConfigAndReload(ctx)
70+
5871
// Wait for interrupt signal to gracefully shut down the server
5972
signalChan := make(chan os.Signal, 1)
6073
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
@@ -63,11 +76,11 @@ func (s *Server) Start() error {
6376
select {
6477
case err := <-serverErrCh:
6578
if err != nil {
66-
log.Printf("Server exited with error: %v", err)
79+
observability.Errorf("Server exited with error: %v", err)
6780
return err
6881
}
6982
case <-signalChan:
70-
log.Println("Received shutdown signal, gracefully stopping server...")
83+
observability.Infof("Received shutdown signal, gracefully stopping server...")
7184
}
7285

7386
s.Stop()
@@ -78,6 +91,105 @@ func (s *Server) Start() error {
7891
func (s *Server) Stop() {
7992
if s.server != nil {
8093
s.server.GracefulStop()
81-
log.Println("Server stopped")
94+
observability.Infof("Server stopped")
95+
}
96+
}
97+
98+
// RouterService is a delegating gRPC service that forwards to the current router implementation.
99+
type RouterService struct {
100+
current atomic.Pointer[OpenAIRouter]
101+
}
102+
103+
func NewRouterService(r *OpenAIRouter) *RouterService {
104+
rs := &RouterService{}
105+
rs.current.Store(r)
106+
return rs
107+
}
108+
109+
// Swap replaces the current router implementation.
110+
func (rs *RouterService) Swap(r *OpenAIRouter) { rs.current.Store(r) }
111+
112+
// Process delegates to the current router.
113+
func (rs *RouterService) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
114+
r := rs.current.Load()
115+
return r.Process(stream)
116+
}
117+
118+
// watchConfigAndReload watches the config file and reloads router on changes.
119+
func (s *Server) watchConfigAndReload(ctx context.Context) {
120+
watcher, err := fsnotify.NewWatcher()
121+
if err != nil {
122+
observability.LogEvent("config_watcher_error", map[string]interface{}{
123+
"stage": "create_watcher",
124+
"error": err.Error(),
125+
})
126+
return
127+
}
128+
defer watcher.Close()
129+
130+
cfgFile := s.configPath
131+
cfgDir := filepath.Dir(cfgFile)
132+
133+
// Watch both the file and its directory to handle symlink swaps (Kubernetes ConfigMap)
134+
if err := watcher.Add(cfgDir); err != nil {
135+
observability.LogEvent("config_watcher_error", map[string]interface{}{
136+
"stage": "watch_dir",
137+
"dir": cfgDir,
138+
"error": err.Error(),
139+
})
140+
return
141+
}
142+
_ = watcher.Add(cfgFile) // best-effort; may fail if file replaced by symlink later
143+
144+
// Debounce events
145+
var (
146+
pending bool
147+
last time.Time
148+
)
149+
150+
reload := func() {
151+
// Parse and build a new router
152+
newRouter, err := NewOpenAIRouter(cfgFile)
153+
if err != nil {
154+
observability.LogEvent("config_reload_failed", map[string]interface{}{
155+
"file": cfgFile,
156+
"error": err.Error(),
157+
})
158+
return
159+
}
160+
s.service.Swap(newRouter)
161+
observability.LogEvent("config_reloaded", map[string]interface{}{
162+
"file": cfgFile,
163+
})
164+
}
165+
166+
for {
167+
select {
168+
case <-ctx.Done():
169+
return
170+
case ev, ok := <-watcher.Events:
171+
if !ok {
172+
return
173+
}
174+
if ev.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove|fsnotify.Chmod) != 0 {
175+
// If the event pertains to the config file or directory, trigger debounce
176+
if filepath.Base(ev.Name) == filepath.Base(cfgFile) || filepath.Dir(ev.Name) == cfgDir {
177+
if !pending || time.Since(last) > 250*time.Millisecond {
178+
pending = true
179+
last = time.Now()
180+
// Slight delay to let file settle
181+
go func() { time.Sleep(300 * time.Millisecond); reload() }()
182+
}
183+
}
184+
}
185+
case err, ok := <-watcher.Errors:
186+
if !ok {
187+
return
188+
}
189+
observability.LogEvent("config_watcher_error", map[string]interface{}{
190+
"stage": "watch_loop",
191+
"error": err.Error(),
192+
})
193+
}
82194
}
83195
}

0 commit comments

Comments
 (0)