Skip to content

Commit cfdcd19

Browse files
committed
feat: implement decision-based routing with plugin architecture
Signed-off-by: bitliu <[email protected]>
1 parent d69e177 commit cfdcd19

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

src/semantic-router/pkg/config/loader.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ var (
1414
configOnce sync.Once
1515
configErr error
1616
configMu sync.RWMutex
17+
18+
// Config change notification channel
19+
configUpdateCh chan *RouterConfig
20+
configUpdateMu sync.Mutex
1721
)
1822

1923
// Load loads the configuration from the specified YAML file once and caches it globally.
@@ -64,10 +68,20 @@ func Parse(configPath string) (*RouterConfig, error) {
6468
// Replace replaces the globally cached config. It is safe for concurrent readers.
6569
func Replace(newCfg *RouterConfig) {
6670
configMu.Lock()
67-
defer configMu.Unlock()
6871
config = newCfg
69-
// Do not reset configOnce to avoid racing re-parses via LoadConfig; callers should use ParseConfigFile for fresher reads.
7072
configErr = nil
73+
configMu.Unlock()
74+
75+
// Notify listeners of config change
76+
configUpdateMu.Lock()
77+
if configUpdateCh != nil {
78+
select {
79+
case configUpdateCh <- newCfg:
80+
default:
81+
// Channel full or no listener, skip
82+
}
83+
}
84+
configUpdateMu.Unlock()
7185
}
7286

7387
// Get returns the current configuration
@@ -76,3 +90,15 @@ func Get() *RouterConfig {
7690
defer configMu.RUnlock()
7791
return config
7892
}
93+
94+
// WatchConfigUpdates returns a channel that receives config updates
95+
// Only one watcher is supported at a time
96+
func WatchConfigUpdates() <-chan *RouterConfig {
97+
configUpdateMu.Lock()
98+
defer configUpdateMu.Unlock()
99+
100+
if configUpdateCh == nil {
101+
configUpdateCh = make(chan *RouterConfig, 1)
102+
}
103+
return configUpdateCh
104+
}

src/semantic-router/pkg/extproc/server.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,11 @@ func (rs *RouterService) Process(stream ext_proc.ExternalProcessor_ProcessServer
160160
// watchConfigAndReload watches the config file and reloads router on changes.
161161
func (s *Server) watchConfigAndReload(ctx context.Context) {
162162
// Check if we're using Kubernetes config source
163-
// If so, skip file watching as config is managed by the Kubernetes controller
164163
cfg := config.Get()
165164
if cfg != nil && cfg.ConfigSource == config.ConfigSourceKubernetes {
166-
logging.Infof("ConfigSource is kubernetes, skipping file watcher")
167-
<-ctx.Done() // Just wait for context cancellation
165+
logging.Infof("ConfigSource is kubernetes, watching for config updates from controller")
166+
// Watch for config updates from the Kubernetes controller
167+
s.watchKubernetesConfigUpdates(ctx)
168168
return
169169
}
170170

@@ -244,3 +244,35 @@ func (s *Server) watchConfigAndReload(ctx context.Context) {
244244
}
245245
}
246246
}
247+
248+
// watchKubernetesConfigUpdates watches for config updates from the Kubernetes controller
249+
func (s *Server) watchKubernetesConfigUpdates(ctx context.Context) {
250+
updateCh := config.WatchConfigUpdates()
251+
252+
for {
253+
select {
254+
case <-ctx.Done():
255+
return
256+
case newCfg := <-updateCh:
257+
if newCfg == nil {
258+
continue
259+
}
260+
261+
// Build a new router with the updated config
262+
// Note: We pass the configPath but NewOpenAIRouter will use the global config
263+
newRouter, err := NewOpenAIRouter(s.configPath)
264+
if err != nil {
265+
logging.LogEvent("config_reload_failed", map[string]interface{}{
266+
"source": "kubernetes",
267+
"error": err.Error(),
268+
})
269+
continue
270+
}
271+
272+
s.service.Swap(newRouter)
273+
logging.LogEvent("config_reloaded", map[string]interface{}{
274+
"source": "kubernetes",
275+
})
276+
}
277+
}
278+
}

0 commit comments

Comments
 (0)