Skip to content

Commit 4f8e1c9

Browse files
authored
ai/worker: Watch containers continuously even when not in use (#3416)
* ai/worker: Auto-restart containers (only 3x if not warm) * ai/worker: Remove manual restart of containers * ai/worker: Run watchContainer only once per container We should keep watching containers even when they are returned * ai/worker: Use context.Background instead of nil * ai/worker: Fix tests * ai/worker: Increase tests for new logic * doc: Add a little documentation on ai-worker container lifefycle * ai/worker: Remove AutoRemove flag Cannot have it together with a restart policy * Revert "ai/worker: Remove manual restart of containers" This reverts commit b346d37. * ai/docker: Add a little comment on borrowctx usage
1 parent 3765073 commit 4f8e1c9

File tree

6 files changed

+101
-24
lines changed

6 files changed

+101
-24
lines changed

ai/worker/container.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"log/slog"
7+
"sync"
78
"time"
89

910
"github.com/deepmap/oapi-codegen/v2/pkg/securityprovider"
@@ -21,6 +22,9 @@ type RunnerContainer struct {
2122
Name string
2223
Client *ClientWithResponses
2324
Hardware *HardwareInformation
25+
26+
BorrowCtx context.Context
27+
sync.RWMutex
2428
}
2529

2630
type RunnerEndpoint struct {

ai/worker/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Listens for inference requests from the Livepeer AI subnet and routes them to th
1313
1414
- Docker Manager (./docker.go):
1515
16-
Manages AI runner containers.
16+
Manages AI runner containers. For a state diagram showing the lifecycle of a container, see the /doc/worker.md file.
1717
1818
[AI runner containers]: https://github.com/livepeer/ai-runner
1919
*/

ai/worker/docker.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,11 @@ func (m *DockerManager) Warm(ctx context.Context, pipeline string, modelID strin
153153
m.mu.Lock()
154154
defer m.mu.Unlock()
155155

156-
rc, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags)
156+
_, err := m.createContainer(ctx, pipeline, modelID, true, optimizationFlags)
157157
if err != nil {
158158
return err
159159
}
160160

161-
// Watch with a background context since we're not borrowing the container.
162-
go m.watchContainer(rc, context.Background())
163-
164161
return nil
165162
}
166163

@@ -200,19 +197,25 @@ func (m *DockerManager) Borrow(ctx context.Context, pipeline, modelID string) (*
200197
}
201198
}
202199

203-
// Remove container so it is unavailable until Return() is called
200+
// Remove container and set the BorrowCtx so it is unavailable until returnContainer() is called by watchContainer()
204201
delete(m.containers, rc.Name)
205-
// watch container to return when request completed
206-
go m.watchContainer(rc, ctx)
202+
rc.Lock()
203+
rc.BorrowCtx = ctx
204+
rc.Unlock()
207205

208206
return rc, nil
209207
}
210208

211209
// returnContainer returns a container to the pool so it can be reused. It is called automatically by watchContainer
212-
// when the context used to borrow the container is done.
210+
// when the BorrowCtx of the container is done or the container is IDLE.
213211
func (m *DockerManager) returnContainer(rc *RunnerContainer) {
214212
m.mu.Lock()
215213
defer m.mu.Unlock()
214+
215+
rc.Lock()
216+
rc.BorrowCtx = nil
217+
rc.Unlock()
218+
216219
m.containers[rc.Name] = rc
217220
}
218221

@@ -348,6 +351,14 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
348351
gpuOpts.Set("device=" + gpu)
349352
}
350353

354+
restartPolicy := container.RestartPolicy{
355+
Name: "on-failure",
356+
MaximumRetryCount: 3,
357+
}
358+
if keepWarm {
359+
restartPolicy = container.RestartPolicy{Name: "always"}
360+
}
361+
351362
hostConfig := &container.HostConfig{
352363
Resources: container.Resources{
353364
DeviceRequests: gpuOpts.Value(),
@@ -367,7 +378,7 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
367378
},
368379
},
369380
},
370-
AutoRemove: true,
381+
RestartPolicy: restartPolicy,
371382
}
372383

373384
resp, err := m.dockerClient.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, containerName)
@@ -421,6 +432,8 @@ func (m *DockerManager) createContainer(ctx context.Context, pipeline string, mo
421432
m.containers[containerName] = rc
422433
m.gpuContainers[gpu] = containerName
423434

435+
go m.watchContainer(rc)
436+
424437
return rc, nil
425438
}
426439

@@ -479,7 +492,7 @@ func (m *DockerManager) destroyContainer(rc *RunnerContainer, locked bool) error
479492
// watchContainer monitors a container's running state and automatically cleans
480493
// up the internal state when the container stops. It will also monitor the
481494
// borrowCtx to return the container to the pool when it is done.
482-
func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Context) {
495+
func (m *DockerManager) watchContainer(rc *RunnerContainer) {
483496
defer func() {
484497
if r := recover(); r != nil {
485498
slog.Error("Panic in container watch routine",
@@ -509,10 +522,19 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Co
509522
return
510523
}
511524

525+
rc.RLock()
526+
// The BorrowCtx is set when the container has been borrowed for a request/stream. If it is not set (nil) it means
527+
// that it's not currently borrowed, so we don't need to wait for it to be done (hence using the background context).
528+
borrowCtx := rc.BorrowCtx
529+
if borrowCtx == nil {
530+
borrowCtx = context.Background()
531+
}
532+
rc.RUnlock()
533+
512534
select {
513535
case <-borrowCtx.Done():
514536
m.returnContainer(rc)
515-
return
537+
continue
516538
case <-ticker.C:
517539
ctx, cancel := context.WithTimeout(context.Background(), containerWatchInterval)
518540
health, err := rc.Client.HealthWithResponse(ctx)
@@ -543,7 +565,7 @@ func (m *DockerManager) watchContainer(rc *RunnerContainer, borrowCtx context.Co
543565
if time.Since(startTime) > pipelineStartGracePeriod {
544566
slog.Info("Container is idle, returning to pool", slog.String("container", rc.Name))
545567
m.returnContainer(rc)
546-
return
568+
continue
547569
}
548570
fallthrough
549571
case OK:

ai/worker/docker_test.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -741,17 +741,43 @@ func TestDockerManager_watchContainer(t *testing.T) {
741741
defer mockServer.AssertExpectations(t)
742742
defer mockDockerClient.AssertNotCalled(t, "ContainerRemove", mock.Anything, rc.Name, mock.Anything)
743743

744+
mockServer.On("ServeHTTP", "GET", "/health", mock.Anything).
745+
Return(200, "application/json", `{"status":"OK"}`)
746+
744747
borrowCtx, cancel := context.WithCancel(context.Background())
748+
rc.BorrowCtx = borrowCtx
745749

746-
go dockerManager.watchContainer(rc, borrowCtx)
750+
watchingCtx, stopWatching := context.WithCancel(context.Background())
751+
go func() {
752+
dockerManager.watchContainer(rc)
753+
stopWatching()
754+
}()
747755
cancel() // Cancel the context.
748756
time.Sleep(50 * time.Millisecond) // Ensure the ticker triggers.
749757

750758
// Verify that the container was returned.
751759
_, exists := dockerManager.containers[rc.Name]
752760
require.True(t, exists)
753761

754-
mockServer.AssertNotCalled(t, "ServeHTTP", "GET", "/health", mock.Anything)
762+
// Watch should keep running
763+
require.Nil(t, watchingCtx.Err())
764+
765+
// Borrow the container again
766+
borrowCtx, cancel = context.WithCancel(context.Background())
767+
delete(dockerManager.containers, rc.Name)
768+
rc.Lock()
769+
rc.BorrowCtx = borrowCtx
770+
rc.Unlock()
771+
772+
cancel() // Cancel the borrow context
773+
time.Sleep(50 * time.Millisecond) // Ensure the ticker triggers.
774+
775+
// Verify that the container was returned.
776+
_, exists = dockerManager.containers[rc.Name]
777+
require.True(t, exists)
778+
779+
// Watch should still keep running
780+
require.Nil(t, watchingCtx.Err())
755781
})
756782

757783
notHealthyTestCases := []struct {
@@ -818,8 +844,6 @@ func TestDockerManager_watchContainer(t *testing.T) {
818844
defer mockDockerClient.AssertExpectations(t)
819845
defer mockServer.AssertExpectations(t)
820846

821-
borrowCtx := context.Background()
822-
823847
tt.mockServerSetup(mockServer)
824848

825849
// Mock destroyContainer to verify it is called.
@@ -829,7 +853,7 @@ func TestDockerManager_watchContainer(t *testing.T) {
829853
done := make(chan struct{})
830854
go func() {
831855
defer close(done)
832-
dockerManager.watchContainer(rc, borrowCtx)
856+
dockerManager.watchContainer(rc)
833857
}()
834858
select {
835859
case <-done:
@@ -852,14 +876,12 @@ func TestDockerManager_watchContainer(t *testing.T) {
852876
defer mockDockerClient.AssertExpectations(t)
853877
defer mockServer.AssertExpectations(t)
854878

855-
borrowCtx := context.Background()
856-
857879
// Must fail twice before the watch routine gives up on it
858880
mockServer.On("ServeHTTP", "GET", "/health", mock.Anything).
859881
Return(200, "application/json", `{"status":"ERROR"}`).
860882
Times(4) // 4 calls during the grace period (first call only after 10ms)
861883

862-
go dockerManager.watchContainer(rc, borrowCtx)
884+
go dockerManager.watchContainer(rc)
863885
time.Sleep(40 * time.Millisecond) // Almost the entire grace period
864886

865887
// Make sure container wasn't destroyed yet
@@ -885,8 +907,6 @@ func TestDockerManager_watchContainer(t *testing.T) {
885907
defer mockServer.AssertExpectations(t)
886908
defer mockDockerClient.AssertNotCalled(t, "ContainerRemove", mock.Anything, rc.Name, mock.Anything)
887909

888-
borrowCtx := context.Background()
889-
890910
// Schedule:
891911
// - Return IDLE for the first 3 times during grace period (should not return)
892912
// - Then OK for the next 5 times (stayin' alive)
@@ -908,7 +928,7 @@ func TestDockerManager_watchContainer(t *testing.T) {
908928
time.Sleep(dur)
909929
}
910930
}
911-
go dockerManager.watchContainer(rc, borrowCtx)
931+
go dockerManager.watchContainer(rc)
912932
sleepUntil(30 * time.Millisecond) // Almost the entire grace period
913933

914934
// Verify that the container was not returned yet.
180 KB
Loading

doc/worker.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# AI Worker
2+
3+
The AI Worker node manages [`ai-runner`](https://github.com/livepeer/ai-runner) containers running on the host system.
4+
These containers are started, monitored and stopped dynamically depending on the usage.
5+
6+
This diagram describes the lifecycle of a container:
7+
8+
![ai-runner container lifecycle](./assets/ai-runner-container-lifecycle.jpg)
9+
10+
Source: [Miro Board](https://miro.com/app/board/uXjVIZ0vO4k=/?share_link_id=987855784886)
11+
12+
It can also be described by the following mermaid chart, but the rendered version is more confusing:
13+
```
14+
stateDiagram-v2
15+
direction TB
16+
[*] --> OFFLINE
17+
OFFLINE --> IDLE: Warm()->createCont()
18+
OFFLINE --> BORROWED: Borrow(ctx)->createCont()
19+
state RUNNING {
20+
[*] --> IDLE
21+
IDLE --> BORROWED: Borrow(ctx)
22+
BORROWED --> IDLE: BorrowCtx.Done()
23+
}
24+
hc: GET /health
25+
RUNNING --> hc
26+
state healthcheck <<choice>>
27+
hc --> healthcheck
28+
healthcheck --> OFFLINE: if error x2
29+
healthcheck --> RUNNING: if state=OK
30+
healthcheck --> IDLE: if state=IDLE
31+
```

0 commit comments

Comments
 (0)