Skip to content

Commit a8437d3

Browse files
Use modelID instead of model tag (#98)
* Add modelID resolver in model manager * Use modelID as key to record req/resp instead of model tag * Use modelID as key for runners & runnerConfigs map * Potential fix for code scanning alert no. 40: Log entries created from user input Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Potential fix for code scanning alert no. 42: Log entries created from user input Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Potential fix for code scanning alert no. 43: Log entries created from user input Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * already sanitized * Rename model to modelID to make it clear * Uses runnerInfo as a value type for runner map, to store runtime info of the runner. Currently the slot and the model reference used. * Include the ID of the model and the actual reference in the logs --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
1 parent 566ec6f commit a8437d3

File tree

4 files changed

+143
-97
lines changed

4 files changed

+143
-97
lines changed

pkg/inference/models/manager.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,27 @@ func (m *Manager) handleGetModel(w http.ResponseWriter, r *http.Request) {
256256
}
257257
}
258258

259+
// ResolveModelID resolves a model reference to a model ID. If resolution fails, it returns the original ref.
260+
func (m *Manager) ResolveModelID(modelRef string) string {
261+
// Sanitize modelRef to prevent log forgery
262+
sanitizedModelRef := strings.ReplaceAll(modelRef, "\n", "")
263+
sanitizedModelRef = strings.ReplaceAll(sanitizedModelRef, "\r", "")
264+
265+
model, err := m.GetModel(sanitizedModelRef)
266+
if err != nil {
267+
m.log.Warnf("Failed to resolve model ref %s to ID: %v", sanitizedModelRef, err)
268+
return sanitizedModelRef
269+
}
270+
271+
modelID, err := model.ID()
272+
if err != nil {
273+
m.log.Warnf("Failed to get model ID for ref %s: %v", sanitizedModelRef, err)
274+
return sanitizedModelRef
275+
}
276+
277+
return modelID
278+
}
279+
259280
func getLocalModel(m *Manager, name string) (*Model, error) {
260281
if m.distributionClient == nil {
261282
return nil, errors.New("model distribution service unavailable")

pkg/inference/scheduling/loader.go

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,20 @@ var (
4242
type runnerKey struct {
4343
// backend is the backend associated with the runner.
4444
backend string
45-
// model is the model associated with the runner.
46-
model string
45+
// modelID is the ID (digest) of the model associated with the runner.
46+
modelID string
4747
// mode is the operation mode associated with the runner.
4848
mode inference.BackendMode
4949
}
5050

51+
// runnerInfo holds information about a runner including its slot and the original model reference used to load it.
52+
type runnerInfo struct {
53+
// slot is the slot index where the runner is stored.
54+
slot int
55+
// modelRef is the original model reference (tag) used to load the runner.
56+
modelRef string
57+
}
58+
5159
// loader manages the loading and unloading of backend runners. It regulates
5260
// active backends in a manner that avoids exhausting system resources. Loaders
5361
// assume that all of their backends have been installed, so no load requests
@@ -80,7 +88,7 @@ type loader struct {
8088
// polling. Each signaling channel should be buffered (with size 1).
8189
waiters map[chan<- struct{}]bool
8290
// runners maps runner keys to their slot index.
83-
runners map[runnerKey]int
91+
runners map[runnerKey]runnerInfo
8492
// slots maps slot indices to associated runners. A slot is considered free
8593
// if the runner value in it is nil.
8694
slots []*runner
@@ -151,7 +159,7 @@ func newLoader(
151159
guard: make(chan struct{}, 1),
152160
availableMemory: totalMemory,
153161
waiters: make(map[chan<- struct{}]bool),
154-
runners: make(map[runnerKey]int, nSlots),
162+
runners: make(map[runnerKey]runnerInfo, nSlots),
155163
slots: make([]*runner, nSlots),
156164
references: make([]uint, nSlots),
157165
allocations: make([]uint64, nSlots),
@@ -196,24 +204,24 @@ func (l *loader) broadcast() {
196204
// lock. It returns the number of remaining runners.
197205
func (l *loader) evict(idleOnly bool) int {
198206
now := time.Now()
199-
for r, slot := range l.runners {
200-
unused := l.references[slot] == 0
201-
idle := unused && now.Sub(l.timestamps[slot]) > l.runnerIdleTimeout
207+
for r, runnerInfo := range l.runners {
208+
unused := l.references[runnerInfo.slot] == 0
209+
idle := unused && now.Sub(l.timestamps[runnerInfo.slot]) > l.runnerIdleTimeout
202210
defunct := false
203211
select {
204-
case <-l.slots[slot].done:
212+
case <-l.slots[runnerInfo.slot].done:
205213
defunct = true
206214
default:
207215
}
208216
if unused && (!idleOnly || idle || defunct) {
209-
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
210-
r.backend, r.model, r.mode,
217+
l.log.Infof("Evicting %s backend runner with model %s (%s) in %s mode",
218+
r.backend, r.modelID, runnerInfo.modelRef, r.mode,
211219
)
212-
l.slots[slot].terminate()
213-
l.slots[slot] = nil
214-
l.availableMemory += l.allocations[slot]
215-
l.allocations[slot] = 0
216-
l.timestamps[slot] = time.Time{}
220+
l.slots[runnerInfo.slot].terminate()
221+
l.slots[runnerInfo.slot] = nil
222+
l.availableMemory += l.allocations[runnerInfo.slot]
223+
l.allocations[runnerInfo.slot] = 0
224+
l.timestamps[runnerInfo.slot] = time.Time{}
217225
delete(l.runners, r)
218226
}
219227
}
@@ -224,17 +232,17 @@ func (l *loader) evict(idleOnly bool) int {
224232
// It returns the number of remaining runners.
225233
func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int {
226234
allBackends := backend == ""
227-
for r, slot := range l.runners {
228-
unused := l.references[slot] == 0
229-
if unused && (allBackends || r.backend == backend) && r.model == model && r.mode == mode {
230-
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
231-
r.backend, r.model, r.mode,
235+
for r, runnerInfo := range l.runners {
236+
unused := l.references[runnerInfo.slot] == 0
237+
if unused && (allBackends || r.backend == backend) && r.modelID == model && r.mode == mode {
238+
l.log.Infof("Evicting %s backend runner with model %s (%s) in %s mode",
239+
r.backend, r.modelID, runnerInfo.modelRef, r.mode,
232240
)
233-
l.slots[slot].terminate()
234-
l.slots[slot] = nil
235-
l.availableMemory += l.allocations[slot]
236-
l.allocations[slot] = 0
237-
l.timestamps[slot] = time.Time{}
241+
l.slots[runnerInfo.slot].terminate()
242+
l.slots[runnerInfo.slot] = nil
243+
l.availableMemory += l.allocations[runnerInfo.slot]
244+
l.allocations[runnerInfo.slot] = 0
245+
l.timestamps[runnerInfo.slot] = time.Time{}
238246
delete(l.runners, r)
239247
}
240248
}
@@ -254,11 +262,12 @@ func (l *loader) Unload(ctx context.Context, unload UnloadRequest) int {
254262
return l.evict(false)
255263
} else {
256264
for _, model := range unload.Models {
265+
modelID := l.modelManager.ResolveModelID(model)
257266
delete(l.runnerConfigs, runnerKey{unload.Backend, model, inference.BackendModeCompletion})
258267
// Evict both, completion and embedding models. We should consider
259268
// accepting a mode parameter in unload requests.
260-
l.evictRunner(unload.Backend, model, inference.BackendModeCompletion)
261-
l.evictRunner(unload.Backend, model, inference.BackendModeEmbedding)
269+
l.evictRunner(unload.Backend, modelID, inference.BackendModeCompletion)
270+
l.evictRunner(unload.Backend, modelID, inference.BackendModeEmbedding)
262271
}
263272
return len(l.runners)
264273
}
@@ -282,15 +291,15 @@ func stopAndDrainTimer(timer *time.Timer) {
282291
func (l *loader) idleCheckDuration() time.Duration {
283292
// Compute the oldest usage time for any idle runner.
284293
var oldest time.Time
285-
for _, slot := range l.runners {
294+
for _, runnerInfo := range l.runners {
286295
select {
287-
case <-l.slots[slot].done:
296+
case <-l.slots[runnerInfo.slot].done:
288297
// Check immediately if a runner is defunct
289298
return 0
290299
default:
291300
}
292-
if l.references[slot] == 0 {
293-
timestamp := l.timestamps[slot]
301+
if l.references[runnerInfo.slot] == 0 {
302+
timestamp := l.timestamps[runnerInfo.slot]
294303
if oldest.IsZero() || timestamp.Before(oldest) {
295304
oldest = timestamp
296305
}
@@ -378,10 +387,10 @@ func (l *loader) run(ctx context.Context) {
378387
}
379388
}
380389

381-
// load allocates a runner using the specified backend and model. If allocated,
390+
// load allocates a runner using the specified backend and modelID. If allocated,
382391
// it should be released by the caller using the release mechanism (once the
383392
// runner is no longer needed).
384-
func (l *loader) load(ctx context.Context, backendName, model string, mode inference.BackendMode) (*runner, error) {
393+
func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string, mode inference.BackendMode) (*runner, error) {
385394
// Grab the backend.
386395
backend, ok := l.backends[backendName]
387396
if !ok {
@@ -426,20 +435,20 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
426435
}
427436

428437
// See if we can satisfy the request with an existing runner.
429-
existing, ok := l.runners[runnerKey{backendName, model, mode}]
438+
existing, ok := l.runners[runnerKey{backendName, modelID, mode}]
430439
if ok {
431440
select {
432-
case <-l.slots[existing].done:
433-
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
434-
if l.references[existing] == 0 {
435-
l.evictRunner(backendName, model, mode)
441+
case <-l.slots[existing.slot].done:
442+
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, existing.modelRef)
443+
if l.references[existing.slot] == 0 {
444+
l.evictRunner(backendName, modelID, mode)
436445
} else {
437446
goto WaitForChange
438447
}
439448
default:
440-
l.references[existing] += 1
441-
l.timestamps[existing] = time.Time{}
442-
return l.slots[existing], nil
449+
l.references[existing.slot] += 1
450+
l.timestamps[existing.slot] = time.Time{}
451+
return l.slots[existing.slot], nil
443452
}
444453
}
445454

@@ -462,15 +471,15 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
462471
// If we've identified a slot, then we're ready to start a runner.
463472
if slot >= 0 {
464473
var runnerConfig *inference.BackendConfiguration
465-
if rc, ok := l.runnerConfigs[runnerKey{backendName, model, mode}]; ok {
474+
if rc, ok := l.runnerConfigs[runnerKey{backendName, modelID, mode}]; ok {
466475
runnerConfig = &rc
467476
}
468477
// Create the runner.
469-
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, model, mode)
470-
runner, err := run(l.log, backend, model, mode, slot, runnerConfig, l.openAIRecorder)
478+
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, modelID, mode)
479+
runner, err := run(l.log, backend, modelID, mode, slot, runnerConfig, l.openAIRecorder)
471480
if err != nil {
472481
l.log.Warnf("Unable to start %s backend runner with model %s in %s mode: %v",
473-
backendName, model, mode, err,
482+
backendName, modelID, mode, err,
474483
)
475484
return nil, fmt.Errorf("unable to start runner: %w", err)
476485
}
@@ -484,14 +493,14 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
484493
if err := runner.wait(ctx); err != nil {
485494
runner.terminate()
486495
l.log.Warnf("Initialization for %s backend runner with model %s in %s mode failed: %v",
487-
backendName, model, mode, err,
496+
backendName, modelID, mode, err,
488497
)
489498
return nil, fmt.Errorf("error waiting for runner to be ready: %w", err)
490499
}
491500

492501
// Perform registration and return the runner.
493502
l.availableMemory -= memory
494-
l.runners[runnerKey{backendName, model, mode}] = slot
503+
l.runners[runnerKey{backendName, modelID, mode}] = runnerInfo{slot, modelRef}
495504
l.slots[slot] = runner
496505
l.references[slot] = 1
497506
l.allocations[slot] = memory
@@ -523,17 +532,17 @@ func (l *loader) release(runner *runner) {
523532
slot := l.runners[runnerKey{runner.backend.Name(), runner.model, runner.mode}]
524533

525534
// Decrement the runner's reference count.
526-
l.references[slot] -= 1
535+
l.references[slot.slot] -= 1
527536

528537
// If the runner's reference count is now zero, then check if it is still
529538
// active, and record now as its idle start time and signal the idle
530539
// checker.
531-
if l.references[slot] == 0 {
540+
if l.references[slot.slot] == 0 {
532541
select {
533542
case <-runner.done:
534543
l.evictRunner(runner.backend.Name(), runner.model, runner.mode)
535544
default:
536-
l.timestamps[slot] = time.Now()
545+
l.timestamps[slot.slot] = time.Now()
537546
select {
538547
case l.idleCheck <- struct{}{}:
539548
default:
@@ -545,22 +554,22 @@ func (l *loader) release(runner *runner) {
545554
l.broadcast()
546555
}
547556

548-
func (l *loader) setRunnerConfig(ctx context.Context, backendName, model string, mode inference.BackendMode, runnerConfig inference.BackendConfiguration) error {
557+
func (l *loader) setRunnerConfig(ctx context.Context, backendName, modelID string, mode inference.BackendMode, runnerConfig inference.BackendConfiguration) error {
549558
l.lock(ctx)
550559
defer l.unlock()
551560

552-
runnerId := runnerKey{backendName, model, mode}
561+
runnerId := runnerKey{backendName, modelID, mode}
553562

554563
// If the configuration hasn't changed, then just return.
555564
if existingConfig, ok := l.runnerConfigs[runnerId]; ok && reflect.DeepEqual(runnerConfig, existingConfig) {
556-
l.log.Infof("Configuration for %s runner for model %s unchanged", backendName, model)
565+
l.log.Infof("Configuration for %s runner for modelID %s unchanged", backendName, modelID)
557566
return nil
558567
}
559568

560569
// If there's an active runner whose configuration we want to override, then
561570
// try evicting it (because it may not be in use).
562571
if _, ok := l.runners[runnerId]; ok {
563-
l.evictRunner(backendName, model, mode)
572+
l.evictRunner(backendName, modelID, mode)
564573
}
565574

566575
// If there's still then active runner, then we can't (or at least
@@ -569,7 +578,7 @@ func (l *loader) setRunnerConfig(ctx context.Context, backendName, model string,
569578
return errRunnerAlreadyActive
570579
}
571580

572-
l.log.Infof("Configuring %s runner for %s", backendName, model)
581+
l.log.Infof("Configuring %s runner for %s", backendName, modelID)
573582
l.runnerConfigs[runnerId] = runnerConfig
574583
return nil
575584
}

pkg/inference/scheduling/scheduler.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func NewScheduler(
5656
allowedOrigins []string,
5757
tracker *metrics.Tracker,
5858
) *Scheduler {
59-
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"))
59+
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"), modelManager)
6060

6161
// Create the scheduler.
6262
s := &Scheduler{
@@ -238,8 +238,10 @@ func (s *Scheduler) handleOpenAIInference(w http.ResponseWriter, r *http.Request
238238
s.tracker.TrackModel(model, r.UserAgent())
239239
}
240240

241+
modelID := s.modelManager.ResolveModelID(request.Model)
242+
241243
// Request a runner to execute the request and defer its release.
242-
runner, err := s.loader.load(r.Context(), backend.Name(), request.Model, backendMode)
244+
runner, err := s.loader.load(r.Context(), backend.Name(), modelID, request.Model, backendMode)
243245
if err != nil {
244246
http.Error(w, fmt.Errorf("unable to load runner: %w", err).Error(), http.StatusInternalServerError)
245247
return
@@ -295,17 +297,17 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus {
295297

296298
result := make([]BackendStatus, 0, len(s.loader.runners))
297299

298-
for key, slot := range s.loader.runners {
299-
if s.loader.slots[slot] != nil {
300+
for key, runnerInfo := range s.loader.runners {
301+
if s.loader.slots[runnerInfo.slot] != nil {
300302
status := BackendStatus{
301303
BackendName: key.backend,
302-
ModelName: key.model,
304+
ModelName: runnerInfo.modelRef,
303305
Mode: key.mode.String(),
304306
LastUsed: time.Time{},
305307
}
306308

307-
if s.loader.references[slot] == 0 {
308-
status.LastUsed = s.loader.timestamps[slot]
309+
if s.loader.references[runnerInfo.slot] == 0 {
310+
status.LastUsed = s.loader.timestamps[runnerInfo.slot]
309311
}
310312

311313
result = append(result, status)
@@ -414,9 +416,9 @@ func (s *Scheduler) Configure(w http.ResponseWriter, r *http.Request) {
414416
// Configure is called by compose for each model.
415417
s.tracker.TrackModel(model, r.UserAgent())
416418
}
417-
418-
if err := s.loader.setRunnerConfig(r.Context(), backend.Name(), configureRequest.Model, inference.BackendModeCompletion, runnerConfig); err != nil {
419-
s.log.Warnf("Failed to configure %s runner for %s: %s", backend.Name(), configureRequest.Model, err)
419+
modelID := s.modelManager.ResolveModelID(configureRequest.Model)
420+
if err := s.loader.setRunnerConfig(r.Context(), backend.Name(), modelID, inference.BackendModeCompletion, runnerConfig); err != nil {
421+
s.log.Warnf("Failed to configure %s runner for %s (%s): %s", backend.Name(), configureRequest.Model, modelID, err)
420422
if errors.Is(err, errRunnerAlreadyActive) {
421423
http.Error(w, err.Error(), http.StatusConflict)
422424
} else {
@@ -442,14 +444,14 @@ func (s *Scheduler) GetAllActiveRunners() []metrics.ActiveRunner {
442444
// Find the runner slot for this backend/model combination
443445
key := runnerKey{
444446
backend: backend.BackendName,
445-
model: backend.ModelName,
447+
modelID: backend.ModelName,
446448
mode: parseBackendMode(backend.Mode),
447449
}
448450

449-
if slot, exists := s.loader.runners[key]; exists {
450-
socket, err := RunnerSocketPath(slot)
451+
if runnerInfo, exists := s.loader.runners[key]; exists {
452+
socket, err := RunnerSocketPath(runnerInfo.slot)
451453
if err != nil {
452-
s.log.Warnf("Failed to get socket path for runner %s/%s: %v", backend.BackendName, backend.ModelName, err)
454+
s.log.Warnf("Failed to get socket path for runner %s/%s (%s): %v", backend.BackendName, backend.ModelName, key.modelID, err)
453455
continue
454456
}
455457

@@ -480,13 +482,13 @@ func (s *Scheduler) GetLlamaCppSocket() (string, error) {
480482
// Find the runner slot for this backend/model combination
481483
key := runnerKey{
482484
backend: backend.BackendName,
483-
model: backend.ModelName,
485+
modelID: backend.ModelName,
484486
mode: parseBackendMode(backend.Mode),
485487
}
486488

487-
if slot, exists := s.loader.runners[key]; exists {
489+
if runnerInfo, exists := s.loader.runners[key]; exists {
488490
// Use the RunnerSocketPath function to get the socket path
489-
return RunnerSocketPath(slot)
491+
return RunnerSocketPath(runnerInfo.slot)
490492
}
491493
}
492494
}

0 commit comments

Comments
 (0)