diff --git a/pkg/inference/scheduling/loader.go b/pkg/inference/scheduling/loader.go index 70089f952..59a2e8dea 100644 --- a/pkg/inference/scheduling/loader.go +++ b/pkg/inference/scheduling/loader.go @@ -154,15 +154,22 @@ func (l *loader) broadcast() { } // evict evicts all unused runners from the loader. If idleOnly is true, then -// only those unused runners which are considered "idle" (based on usage -// timestamp) are evicted. The caller must hold the loader lock. It returns the -// number of remaining runners. +// only those unused, but functioning, runners which are considered "idle" (based +// on usage timestamp) are evicted. Defunct (e.g. crashed) runners will be evicted +// regardless of whether they are considered "idle". The caller must hold the loader +// lock. It returns the number of remaining runners. func (l *loader) evict(idleOnly bool) int { now := time.Now() for r, slot := range l.runners { unused := l.references[slot] == 0 idle := unused && now.Sub(l.timestamps[slot]) > runnerIdleTimeout - if unused && (!idleOnly || idle) { + defunct := false + select { + case <-l.slots[slot].done: + defunct = true + default: + } + if unused && (!idleOnly || idle || defunct) { l.log.Infof("Evicting %s backend runner with model %s in %s mode", r.backend, r.model, r.mode, ) @@ -179,11 +186,11 @@ func (l *loader) evict(idleOnly bool) int { // evictRunner evicts a specific runner. The caller must hold the loader lock. // It returns the number of remaining runners. -func (l *loader) evictRunner(backend, model string) int { +func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int { allBackends := backend == "" for r, slot := range l.runners { unused := l.references[slot] == 0 - if unused && (allBackends || r.backend == backend) && r.model == model { + if unused && (allBackends || r.backend == backend) && r.model == model && r.mode == mode { l.log.Infof("Evicting %s backend runner with model %s in %s mode", r.backend, r.model, r.mode, ) @@ -210,7 +217,10 @@ func (l *loader) Unload(ctx context.Context, unload UnloadRequest) int { return l.evict(false) } else { for _, model := range unload.Models { - l.evictRunner(unload.Backend, model) + // Evict both, completion and embedding models. We should consider + // accepting a mode parameter in unload requests. + l.evictRunner(unload.Backend, model, inference.BackendModeCompletion) + l.evictRunner(unload.Backend, model, inference.BackendModeEmbedding) } return len(l.runners) } @@ -364,6 +374,8 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer // Loop until we can satisfy the request or an error occurs. for { + slot := -1 + // If loads are disabled, then there's nothing we can do. if !l.loadsEnabled { return nil, errLoadsDisabled @@ -372,9 +384,15 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer // See if we can satisfy the request with an existing runner. existing, ok := l.runners[runnerKey{backendName, model, mode}] if ok { - l.references[existing] += 1 - l.timestamps[existing] = time.Time{} - return l.slots[existing], nil + select { + case <-l.slots[existing].done: + l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model) + goto WaitForChange + default: + l.references[existing] += 1 + l.timestamps[existing] = time.Time{} + return l.slots[existing], nil + } } // If there's not sufficient memory or all slots are full, then try @@ -384,7 +402,6 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer } // If there's sufficient memory and a free slot, then find the slot. - slot := -1 if memory <= l.availableMemory && len(l.runners) < len(l.slots) { for s, runner := range l.slots { if runner == nil { @@ -432,6 +449,7 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer // Wait for something to change. Note that we always re-lock with // context.Background() because we need to ensure we hold the lock by // the time we return. + WaitForChange: l.unlock() select { case <-ctx.Done(): @@ -455,13 +473,19 @@ func (l *loader) release(runner *runner) { // Decrement the runner's reference count. l.references[slot] -= 1 - // If the runner's reference count is now zero, then record now as its idle - // start time and signal the idle checker. + // If the runner's reference count is now zero, then check if it is still + // active, and record now as its idle start time and signal the idle + // checker. if l.references[slot] == 0 { - l.timestamps[slot] = time.Now() select { - case l.idleCheck <- struct{}{}: + case <-runner.done: + l.evictRunner(runner.backend.Name(), runner.model, runner.mode) default: + l.timestamps[slot] = time.Now() + select { + case l.idleCheck <- struct{}{}: + default: + } } } diff --git a/pkg/inference/scheduling/runner.go b/pkg/inference/scheduling/runner.go index a62870e39..ea0dd1495 100644 --- a/pkg/inference/scheduling/runner.go +++ b/pkg/inference/scheduling/runner.go @@ -134,6 +134,22 @@ func run( proxyLog: proxyLog, } + proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) { + // If the error is EOF, the underlying runner likely bailed, and closed its socket + // unexpectedly. Wait for the runner process to complete, but time out in case + // the runner process only killed its comms and is stuck. + if errors.Is(err, io.EOF) { + w.WriteHeader(http.StatusInternalServerError) + select { + case <-r.done: + return + case <-time.After(30 * time.Second): + } + } else { + w.WriteHeader(http.StatusBadGateway) + } + } + // Start the backend run loop. go func() { if err := backend.Run(runCtx, socket, model, mode); err != nil {