Skip to content

Commit 17c964d

Browse files
committed
DockerIM should use operation mechanism properly
1 parent 7237685 commit 17c964d

File tree

2 files changed

+139
-157
lines changed

2 files changed

+139
-157
lines changed

pkg/app/instances/docker.go

Lines changed: 139 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/docker/docker/api/types/mount"
3636
"github.com/docker/docker/api/types/volume"
3737
"github.com/docker/docker/client"
38+
"github.com/google/uuid"
3839
)
3940

4041
const DockerIMType IMType = "docker"
@@ -54,18 +55,12 @@ const uaMountTarget = "/var/lib/cuttlefish-common/userartifacts"
5455

5556
// Docker implementation of the instance manager.
5657
type DockerInstanceManager struct {
57-
Config Config
58-
Client *client.Client
59-
mutexes sync.Map
58+
Config Config
59+
Client *client.Client
60+
mutexes sync.Map
61+
operations sync.Map
6062
}
6163

62-
type OPType string
63-
64-
const (
65-
CreateHostOPType OPType = "createhost"
66-
DeleteHostOPType OPType = "deletehost"
67-
)
68-
6964
func NewDockerInstanceManager(cfg Config, cli *client.Client) *DockerInstanceManager {
7065
return &DockerInstanceManager{
7166
Config: cfg,
@@ -85,26 +80,14 @@ func (m *DockerInstanceManager) CreateHost(zone string, _ *apiv1.CreateHostReque
8580
if zone != "local" {
8681
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
8782
}
88-
mu := m.getRWMutex(user)
89-
mu.RLock()
90-
defer mu.RUnlock()
91-
ctx := context.TODO()
92-
if err := m.downloadDockerImageIfNeeded(ctx); err != nil {
93-
return nil, fmt.Errorf("failed to retrieve docker image name: %w", err)
94-
}
95-
// A docker volume is shared across all hosts under each user. If no volume
96-
// exists for given user, create it.
97-
if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil {
98-
return nil, fmt.Errorf("failed to prepare docker volume: %w", err)
99-
}
100-
host, err := m.createDockerContainer(ctx, user)
101-
if err != nil {
102-
return nil, fmt.Errorf("failed to prepare docker container: %w", err)
103-
}
104-
return &apiv1.Operation{
105-
Name: EncodeOperationName(CreateHostOPType, host),
106-
Done: true,
107-
}, nil
83+
op := m.newOperation()
84+
go func() {
85+
val, err := m.createHost(user)
86+
if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil {
87+
log.Printf("error completing operation %q: %v\n", op.Name, opErr)
88+
}
89+
}()
90+
return &op, nil
10891
}
10992

11093
func (m *DockerInstanceManager) ListHosts(zone string, user accounts.User, _ *ListHostsRequest) (*apiv1.ListHostsResponse, error) {
@@ -141,88 +124,25 @@ func (m *DockerInstanceManager) DeleteHost(zone string, user accounts.User, host
141124
if zone != "local" {
142125
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
143126
}
144-
ctx := context.TODO()
145-
if err := m.deleteDockerContainer(ctx, user, host); err != nil {
146-
return nil, fmt.Errorf("failed to delete docker container: %w", err)
147-
}
148-
// A docker volume is shared across all hosts under each user. If no host
149-
// exists for given user, delete volume afterwards to cleanup.
150-
if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil {
151-
return nil, fmt.Errorf("failed to cleanup docker volume: %w", err)
152-
}
153-
return &apiv1.Operation{
154-
Name: EncodeOperationName(DeleteHostOPType, host),
155-
Done: true,
156-
}, nil
157-
}
158-
159-
func EncodeOperationName(opType OPType, host string) string {
160-
return string(opType) + "_" + host
161-
}
162-
163-
func DecodeOperationName(name string) (OPType, string, error) {
164-
words := strings.SplitN(name, "_", 2)
165-
if len(words) == 2 {
166-
return OPType(words[0]), words[1], nil
167-
} else {
168-
return "", "", errors.NewBadRequestError(fmt.Sprintf("cannot parse operation from %q", name), nil)
169-
}
170-
}
171-
172-
func (m *DockerInstanceManager) waitCreateHostOperation(host string) (*apiv1.HostInstance, error) {
173-
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
174-
defer cancel()
175-
for {
176-
select {
177-
case <-ctx.Done():
178-
return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil)
179-
default:
180-
res, err := m.Client.ContainerInspect(ctx, host)
181-
if err != nil {
182-
return nil, fmt.Errorf("failed to inspect docker container: %w", err)
183-
}
184-
if res.State.Running {
185-
return &apiv1.HostInstance{
186-
Name: host,
187-
}, nil
188-
}
189-
time.Sleep(time.Second)
127+
op := m.newOperation()
128+
go func() {
129+
val, err := m.deleteHost(user, host)
130+
if opErr := m.completeOperation(op.Name, &operationResult{Error: err, Value: val}); opErr != nil {
131+
log.Printf("error completing operation %q: %v\n", op.Name, opErr)
190132
}
191-
}
192-
}
193-
194-
func (m *DockerInstanceManager) waitDeleteHostOperation(host string) (*apiv1.HostInstance, error) {
195-
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
196-
defer cancel()
197-
resCh, errCh := m.Client.ContainerWait(ctx, host, "")
198-
select {
199-
case <-ctx.Done():
200-
return nil, errors.NewServiceUnavailableError("Wait for operation timed out", nil)
201-
case err := <-errCh:
202-
return nil, fmt.Errorf("error is thrown while waiting for deleting host: %w", err)
203-
case <-resCh:
204-
return &apiv1.HostInstance{
205-
Name: host,
206-
}, nil
207-
}
133+
}()
134+
return &op, nil
208135
}
209136

210137
func (m *DockerInstanceManager) WaitOperation(zone string, _ accounts.User, name string) (any, error) {
211138
if zone != "local" {
212139
return nil, errors.NewBadRequestError("Invalid zone. It should be 'local'.", nil)
213140
}
214-
opType, host, err := DecodeOperationName(name)
141+
val, err := m.waitOperation(name, 3*time.Minute)
215142
if err != nil {
216143
return nil, err
217144
}
218-
switch opType {
219-
case CreateHostOPType:
220-
return m.waitCreateHostOperation(host)
221-
case DeleteHostOPType:
222-
return m.waitDeleteHostOperation(host)
223-
default:
224-
return nil, errors.NewBadRequestError(fmt.Sprintf("operation type %s not found.", opType), nil)
225-
}
145+
return val.Value, val.Error
226146
}
227147

228148
func (m *DockerInstanceManager) getIpAddr(container *types.Container) (string, error) {
@@ -318,6 +238,28 @@ func (m *DockerInstanceManager) getContainerLabel(host string, key string) (stri
318238
return value, nil
319239
}
320240

241+
func (m *DockerInstanceManager) createHost(user accounts.User) (*apiv1.HostInstance, error) {
242+
mu := m.getRWMutex(user)
243+
mu.RLock()
244+
defer mu.RUnlock()
245+
ctx := context.TODO()
246+
if err := m.downloadDockerImageIfNeeded(ctx); err != nil {
247+
return nil, fmt.Errorf("failed to retrieve docker image name: %w", err)
248+
}
249+
// A docker volume is shared across all hosts under each user. If no volume
250+
// exists for given user, create it.
251+
if err := m.createDockerVolumeIfNeeded(ctx, user); err != nil {
252+
return nil, fmt.Errorf("failed to prepare docker volume: %w", err)
253+
}
254+
host, err := m.createDockerContainer(ctx, user)
255+
if err != nil {
256+
return nil, fmt.Errorf("failed to prepare docker container: %w", err)
257+
}
258+
return &apiv1.HostInstance{
259+
Name: host,
260+
}, nil
261+
}
262+
321263
func (m *DockerInstanceManager) downloadDockerImageIfNeeded(ctx context.Context) error {
322264
listRes, err := m.Client.ImageList(ctx, image.ListOptions{})
323265
if err != nil {
@@ -364,6 +306,8 @@ func (m *DockerInstanceManager) createDockerVolumeIfNeeded(ctx context.Context,
364306
return nil
365307
}
366308

309+
const containerInspectRetryLimit = 5
310+
367311
func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user accounts.User) (string, error) {
368312
config := &container.Config{
369313
AttachStdin: true,
@@ -401,7 +345,31 @@ func (m *DockerInstanceManager) createDockerContainer(ctx context.Context, user
401345
if err := m.Client.ContainerExecStart(ctx, execRes.ID, container.ExecStartOptions{}); err != nil {
402346
return "", fmt.Errorf("failed to start container execution %q: %w", strings.Join(execConfig.Cmd, " "), err)
403347
}
404-
return createRes.ID, nil
348+
for i := 0; ; i++ {
349+
res, err := m.Client.ContainerInspect(ctx, createRes.ID)
350+
if err == nil && res.State.Running {
351+
return createRes.ID, nil
352+
}
353+
if i >= containerInspectRetryLimit {
354+
return "", fmt.Errorf("failed to inspect docker container: %w", err)
355+
}
356+
time.Sleep(time.Second)
357+
}
358+
}
359+
360+
func (m *DockerInstanceManager) deleteHost(user accounts.User, host string) (*apiv1.HostInstance, error) {
361+
ctx := context.TODO()
362+
if err := m.deleteDockerContainer(ctx, user, host); err != nil {
363+
return nil, fmt.Errorf("failed to delete docker container: %w", err)
364+
}
365+
// A docker volume is shared across all hosts under each user. If no host
366+
// exists for given user, delete volume afterwards to cleanup.
367+
if err := m.deleteDockerVolumeIfNeeded(ctx, user); err != nil {
368+
return nil, fmt.Errorf("failed to cleanup docker volume: %w", err)
369+
}
370+
return &apiv1.HostInstance{
371+
Name: host,
372+
}, nil
405373
}
406374

407375
func (m *DockerInstanceManager) deleteDockerContainer(ctx context.Context, user accounts.User, host string) error {
@@ -454,3 +422,71 @@ func (m *DockerInstanceManager) getRWMutex(user accounts.User) *sync.RWMutex {
454422
mu, _ := m.mutexes.LoadOrStore(user.Username(), &sync.RWMutex{})
455423
return mu.(*sync.RWMutex)
456424
}
425+
426+
type operationResult struct {
427+
Error error
428+
Value interface{}
429+
}
430+
431+
type operationEntry struct {
432+
op apiv1.Operation
433+
result *operationResult
434+
mutex sync.RWMutex
435+
done chan struct{}
436+
}
437+
438+
const newOperationRetryLimit = 100
439+
440+
func (m *DockerInstanceManager) newOperation() apiv1.Operation {
441+
for i := 0; i < newOperationRetryLimit; i++ {
442+
name := uuid.New().String()
443+
newEntry := &operationEntry{
444+
op: apiv1.Operation{
445+
Name: name,
446+
Done: false,
447+
},
448+
mutex: sync.RWMutex{},
449+
done: make(chan struct{}),
450+
}
451+
entry, loaded := m.operations.LoadOrStore(name, newEntry)
452+
if !loaded {
453+
// It succeeded to store a new operation entry.
454+
return entry.(*operationEntry).op
455+
}
456+
}
457+
panic("Reached newOperationRetryLimit")
458+
}
459+
460+
func (m *DockerInstanceManager) completeOperation(name string, result *operationResult) error {
461+
val, loaded := m.operations.Load(name)
462+
if !loaded {
463+
return fmt.Errorf("Operation not found for %q", name)
464+
}
465+
entry := val.(*operationEntry)
466+
467+
entry.mutex.Lock()
468+
defer entry.mutex.Unlock()
469+
entry.op.Done = true
470+
entry.result = result
471+
close(entry.done)
472+
return nil
473+
}
474+
475+
func (m *DockerInstanceManager) waitOperation(name string, dt time.Duration) (*operationResult, error) {
476+
val, loaded := m.operations.Load(name)
477+
if !loaded {
478+
return nil, fmt.Errorf("Operation not found for %q", name)
479+
}
480+
entry := val.(*operationEntry)
481+
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Minute)
482+
defer cancel()
483+
select {
484+
case <-entry.done:
485+
entry.mutex.RLock()
486+
result := entry.result
487+
entry.mutex.RUnlock()
488+
return result, nil
489+
case <-ctx.Done():
490+
return nil, fmt.Errorf("Reached timeout for %q", name)
491+
}
492+
}

pkg/app/instances/docker_test.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)