Skip to content

Commit e754b5f

Browse files
author
Piotr Stankiewicz
committed
Reload defunct runners
In case a runner becomes defunct, e.g. as a result of a backend crash it would be neat to be able to reload it. So, if the loader finds runner, have it check if the runner is still alive, and create a new one if the runner is defunct. Signed-off-by: Piotr Stankiewicz <piotr.stankiewicz@docker.com>
1 parent 1e92504 commit e754b5f

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

pkg/inference/scheduling/loader.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,22 @@ func (l *loader) broadcast() {
154154
}
155155

156156
// evict evicts all unused runners from the loader. If idleOnly is true, then
157-
// only those unused runners which are considered "idle" (based on usage
158-
// timestamp) are evicted. The caller must hold the loader lock. It returns the
159-
// number of remaining runners.
157+
// only those unused, but functioning, runners which are considered "idle" (based
158+
// on usage timestamp) are evicted. Defunct (e.g. crashed) runners will be evicted
159+
// regardless of whether they are considered "idle". The caller must hold the loader
160+
// lock. It returns the number of remaining runners.
160161
func (l *loader) evict(idleOnly bool) int {
161162
now := time.Now()
162163
for r, slot := range l.runners {
163164
unused := l.references[slot] == 0
164165
idle := unused && now.Sub(l.timestamps[slot]) > runnerIdleTimeout
165-
if unused && (!idleOnly || idle) {
166+
defunct := false
167+
select {
168+
case <-l.slots[slot].done:
169+
defunct = true
170+
default:
171+
}
172+
if unused && (!idleOnly || idle || defunct) {
166173
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
167174
r.backend, r.model, r.mode,
168175
)
@@ -364,6 +371,8 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
364371

365372
// Loop until we can satisfy the request or an error occurs.
366373
for {
374+
slot := -1
375+
367376
// If loads are disabled, then there's nothing we can do.
368377
if !l.loadsEnabled {
369378
return nil, errLoadsDisabled
@@ -372,9 +381,15 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
372381
// See if we can satisfy the request with an existing runner.
373382
existing, ok := l.runners[runnerKey{backendName, model, mode}]
374383
if ok {
375-
l.references[existing] += 1
376-
l.timestamps[existing] = time.Time{}
377-
return l.slots[existing], nil
384+
select {
385+
case <-l.slots[existing].done:
386+
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
387+
goto WaitForChange
388+
default:
389+
l.references[existing] += 1
390+
l.timestamps[existing] = time.Time{}
391+
return l.slots[existing], nil
392+
}
378393
}
379394

380395
// If there's not sufficient memory or all slots are full, then try
@@ -384,7 +399,6 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
384399
}
385400

386401
// If there's sufficient memory and a free slot, then find the slot.
387-
slot := -1
388402
if memory <= l.availableMemory && len(l.runners) < len(l.slots) {
389403
for s, runner := range l.slots {
390404
if runner == nil {
@@ -432,6 +446,7 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
432446
// Wait for something to change. Note that we always re-lock with
433447
// context.Background() because we need to ensure we hold the lock by
434448
// the time we return.
449+
WaitForChange:
435450
l.unlock()
436451
select {
437452
case <-ctx.Done():
@@ -455,13 +470,19 @@ func (l *loader) release(runner *runner) {
455470
// Decrement the runner's reference count.
456471
l.references[slot] -= 1
457472

458-
// If the runner's reference count is now zero, then record now as its idle
459-
// start time and signal the idle checker.
473+
// If the runner's reference count is now zero, then check if it is still
474+
// active, and record now as its idle start time and signal the idle
475+
// checker.
460476
if l.references[slot] == 0 {
461-
l.timestamps[slot] = time.Now()
462477
select {
463-
case l.idleCheck <- struct{}{}:
478+
case <-runner.done:
479+
l.evictRunner(runner.backend.Name(), runner.model)
464480
default:
481+
l.timestamps[slot] = time.Now()
482+
select {
483+
case l.idleCheck <- struct{}{}:
484+
default:
485+
}
465486
}
466487
}
467488

0 commit comments

Comments
 (0)