Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions pkg/inference/scheduling/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,22 @@
}

// evict evicts all unused runners from the loader. If idleOnly is true, then
// only those unused runners which are considered "idle" (based on usage
// timestamp) are evicted. The caller must hold the loader lock. It returns the
// number of remaining runners.
// only those unused, but functioning, runners which are considered "idle" (based
// on usage timestamp) are evicted. Defunct (e.g. crashed) runners will be evicted
// regardless of whether they are considered "idle". The caller must hold the loader
// lock. It returns the number of remaining runners.
func (l *loader) evict(idleOnly bool) int {
now := time.Now()
for r, slot := range l.runners {
unused := l.references[slot] == 0
idle := unused && now.Sub(l.timestamps[slot]) > runnerIdleTimeout
if unused && (!idleOnly || idle) {
defunct := false
select {
case <-l.slots[slot].done:
defunct = true
default:
}
if unused && (!idleOnly || idle || defunct) {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.model, r.mode,
)
Expand All @@ -179,11 +186,11 @@

// evictRunner evicts a specific runner. The caller must hold the loader lock.
// It returns the number of remaining runners.
func (l *loader) evictRunner(backend, model string) int {
func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int {
allBackends := backend == ""
for r, slot := range l.runners {
unused := l.references[slot] == 0
if unused && (allBackends || r.backend == backend) && r.model == model {
if unused && (allBackends || r.backend == backend) && r.model == model && r.mode == mode {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.model, r.mode,
)
Expand All @@ -210,7 +217,10 @@
return l.evict(false)
} else {
for _, model := range unload.Models {
l.evictRunner(unload.Backend, model)
// Evict both, completion and embedding models. We should consider
// accepting a mode parameter in unload requests.
l.evictRunner(unload.Backend, model, inference.BackendModeCompletion)
l.evictRunner(unload.Backend, model, inference.BackendModeEmbedding)
}
return len(l.runners)
}
Expand Down Expand Up @@ -364,6 +374,8 @@

// Loop until we can satisfy the request or an error occurs.
for {
slot := -1

// If loads are disabled, then there's nothing we can do.
if !l.loadsEnabled {
return nil, errLoadsDisabled
Expand All @@ -372,9 +384,15 @@
// See if we can satisfy the request with an existing runner.
existing, ok := l.runners[runnerKey{backendName, model, mode}]
if ok {
l.references[existing] += 1
l.timestamps[existing] = time.Time{}
return l.slots[existing], nil
select {
case <-l.slots[existing].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)

Check failure

Code scanning / CodeQL

Log entries created from user input High

This log entry depends on a
user-provided value
.

Copilot Autofix

AI 7 months ago

To fix the issue, the model variable should be sanitized before being used in the log entry. Since the logs appear to be plain text, we can remove potentially harmful characters such as newlines (\n, \r) using strings.ReplaceAll. This ensures that the log entry cannot be manipulated by malicious input. The sanitization should be applied directly before the log statement on line 389 in loader.go.

Suggested changeset 1
pkg/inference/scheduling/loader.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/pkg/inference/scheduling/loader.go b/pkg/inference/scheduling/loader.go
--- a/pkg/inference/scheduling/loader.go
+++ b/pkg/inference/scheduling/loader.go
@@ -12,2 +12,3 @@
 	"github.com/docker/model-runner/pkg/logging"
+	"strings"
 )
@@ -388,3 +389,5 @@
 			case <-l.slots[existing].done:
-				l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
+				sanitizedModel := strings.ReplaceAll(model, "\n", "")
+				sanitizedModel = strings.ReplaceAll(sanitizedModel, "\r", "")
+				l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, sanitizedModel)
 				goto WaitForChange
EOF
@@ -12,2 +12,3 @@
"github.com/docker/model-runner/pkg/logging"
"strings"
)
@@ -388,3 +389,5 @@
case <-l.slots[existing].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
sanitizedModel := strings.ReplaceAll(model, "\n", "")
sanitizedModel = strings.ReplaceAll(sanitizedModel, "\r", "")
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, sanitizedModel)
goto WaitForChange
Copilot is powered by AI and may make mistakes. Always verify output.
goto WaitForChange
default:
l.references[existing] += 1
l.timestamps[existing] = time.Time{}
return l.slots[existing], nil
}
}

// If there's not sufficient memory or all slots are full, then try
Expand All @@ -384,7 +402,6 @@
}

// If there's sufficient memory and a free slot, then find the slot.
slot := -1
if memory <= l.availableMemory && len(l.runners) < len(l.slots) {
for s, runner := range l.slots {
if runner == nil {
Expand Down Expand Up @@ -432,6 +449,7 @@
// Wait for something to change. Note that we always re-lock with
// context.Background() because we need to ensure we hold the lock by
// the time we return.
WaitForChange:
l.unlock()
select {
case <-ctx.Done():
Expand All @@ -455,13 +473,19 @@
// Decrement the runner's reference count.
l.references[slot] -= 1

// If the runner's reference count is now zero, then record now as its idle
// start time and signal the idle checker.
// If the runner's reference count is now zero, then check if it is still
// active, and record now as its idle start time and signal the idle
// checker.
if l.references[slot] == 0 {
l.timestamps[slot] = time.Now()
select {
case l.idleCheck <- struct{}{}:
case <-runner.done:
l.evictRunner(runner.backend.Name(), runner.model, runner.mode)
default:
l.timestamps[slot] = time.Now()
select {
case l.idleCheck <- struct{}{}:
default:
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/inference/scheduling/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ func run(
proxyLog: proxyLog,
}

proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) {
// If the error is EOF, the underlying runner likely bailed, and closed its socket
// unexpectedly. Wait for the runner process to complete, but time out in case
// the runner process only killed its comms and is stuck.
if errors.Is(err, io.EOF) {
w.WriteHeader(http.StatusInternalServerError)
select {
case <-r.done:
return
case <-time.After(30 * time.Second):
}
} else {
w.WriteHeader(http.StatusBadGateway)
}
}

// Start the backend run loop.
go func() {
if err := backend.Run(runCtx, socket, model, mode); err != nil {
Expand Down