Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit 6f6ae07

Browse files
committed
improve container events watch robustness
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent c16834c commit 6f6ae07

File tree

5 files changed

+94
-86
lines changed

5 files changed

+94
-86
lines changed

api/compose/api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,9 @@ type ContainerEvent struct {
379379
Container string
380380
Service string
381381
Line string
382-
ExitCode int
382+
// ContainerEventExit only
383+
ExitCode int
384+
Restarting bool
383385
}
384386

385387
const (

api/compose/printer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,22 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
6262
containers := map[string]struct{}{}
6363
for {
6464
event := <-p.queue
65+
container := event.Container
6566
switch event.Type {
6667
case UserCancel:
6768
aborting = true
6869
case ContainerEventAttach:
69-
if _, ok := containers[event.Container]; ok {
70+
if _, ok := containers[container]; ok {
7071
continue
7172
}
72-
containers[event.Container] = struct{}{}
73-
p.consumer.Register(event.Container)
73+
containers[container] = struct{}{}
74+
p.consumer.Register(container)
7475
case ContainerEventExit:
75-
delete(containers, event.Container)
76+
if !event.Restarting {
77+
delete(containers, container)
78+
}
7679
if !aborting {
77-
p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
80+
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
7881
}
7982
if cascadeStop {
8083
if !aborting {
@@ -99,7 +102,7 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
99102
}
100103
case ContainerEventLog:
101104
if !aborting {
102-
p.consumer.Log(event.Container, event.Service, event.Line)
105+
p.consumer.Log(container, event.Service, event.Line)
103106
}
104107
}
105108
}

local/compose/start.go

Lines changed: 77 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,68 +20,30 @@ import (
2020
"context"
2121

2222
"github.com/docker/compose-cli/api/compose"
23-
convert "github.com/docker/compose-cli/local/moby"
2423
"github.com/docker/compose-cli/utils"
2524

2625
"github.com/compose-spec/compose-go/types"
2726
moby "github.com/docker/docker/api/types"
28-
"github.com/docker/docker/api/types/container"
27+
"github.com/pkg/errors"
2928
"golang.org/x/sync/errgroup"
3029
)
3130

3231
func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
32+
listener := options.Attach
3333
if len(options.Services) == 0 {
3434
options.Services = project.ServiceNames()
3535
}
3636

37-
var containers Containers
38-
if options.Attach != nil {
39-
attached, err := s.attach(ctx, project, options.Attach, options.Services)
37+
eg, ctx := errgroup.WithContext(ctx)
38+
if listener != nil {
39+
attached, err := s.attach(ctx, project, listener, options.Services)
4040
if err != nil {
4141
return err
4242
}
43-
containers = attached
4443

45-
// Watch events to capture container restart and re-attach
46-
go func() {
47-
watched := map[string]struct{}{}
48-
for _, c := range containers {
49-
watched[c.ID] = struct{}{}
50-
}
51-
s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
52-
Services: options.Services,
53-
Consumer: func(event compose.Event) error {
54-
if event.Status == "start" {
55-
inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
56-
if err != nil {
57-
return err
58-
}
59-
60-
container := moby.Container{
61-
ID: event.Container,
62-
Names: []string{inspect.Name},
63-
State: convert.ContainerRunning,
64-
Labels: map[string]string{
65-
projectLabel: project.Name,
66-
serviceLabel: event.Service,
67-
},
68-
}
69-
70-
// Just ignore errors when reattaching to already crashed containers
71-
s.attachContainer(ctx, container, options.Attach, project) // nolint: errcheck
72-
73-
if _, ok := watched[inspect.ID]; !ok {
74-
// a container has been added to the service, see --scale option
75-
watched[inspect.ID] = struct{}{}
76-
go func() {
77-
s.waitContainer(container, options.Attach) // nolint: errcheck
78-
}()
79-
}
80-
}
81-
return nil
82-
},
83-
})
84-
}()
44+
eg.Go(func() error {
45+
return s.watchContainers(project, options.Services, listener, attached)
46+
})
8547
}
8648

8749
err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@@ -93,34 +55,79 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
9355
if err != nil {
9456
return err
9557
}
58+
return eg.Wait()
59+
}
9660

97-
if options.Attach == nil {
98-
return nil
99-
}
100-
101-
eg, ctx := errgroup.WithContext(ctx)
61+
// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
62+
func (s *composeService) watchContainers(project *types.Project, services []string, listener compose.ContainerEventListener, containers Containers) error {
63+
watched := map[string]int{}
10264
for _, c := range containers {
103-
c := c
104-
eg.Go(func() error {
105-
return s.waitContainer(c, options.Attach)
106-
})
65+
watched[c.ID] = 0
10766
}
108-
return eg.Wait()
109-
}
11067

111-
func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) error {
112-
statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
113-
name := getContainerNameWithoutProject(c)
114-
select {
115-
case status := <-statusC:
116-
listener(compose.ContainerEvent{
117-
Type: compose.ContainerEventExit,
118-
Container: name,
119-
Service: c.Labels[serviceLabel],
120-
ExitCode: int(status.StatusCode),
121-
})
68+
ctx, stop := context.WithCancel(context.Background())
69+
err := s.Events(ctx, project.Name, compose.EventsOptions{
70+
Services: services,
71+
Consumer: func(event compose.Event) error {
72+
inspected, err := s.apiClient.ContainerInspect(ctx, event.Container)
73+
if err != nil {
74+
return err
75+
}
76+
container := moby.Container{
77+
ID: inspected.ID,
78+
Names: []string{inspected.Name},
79+
Labels: inspected.Config.Labels,
80+
}
81+
name := getContainerNameWithoutProject(container)
82+
83+
if event.Status == "die" {
84+
restarted := watched[container.ID]
85+
watched[container.ID] = restarted + 1
86+
// Container terminated.
87+
willRestart := inspected.HostConfig.RestartPolicy.MaximumRetryCount > restarted
88+
89+
listener(compose.ContainerEvent{
90+
Type: compose.ContainerEventExit,
91+
Container: name,
92+
Service: container.Labels[serviceLabel],
93+
ExitCode: inspected.State.ExitCode,
94+
Restarting: willRestart,
95+
})
96+
97+
if !willRestart {
98+
// we're done with this one
99+
delete(watched, container.ID)
100+
}
101+
102+
if len(watched) == 0 {
103+
// all project containers stopped, we're done
104+
stop()
105+
}
106+
return nil
107+
}
108+
109+
if event.Status == "start" {
110+
count, ok := watched[container.ID]
111+
mustAttach := ok && count > 0 // Container restarted, need to re-attach
112+
if !ok {
113+
// A new container has just been added to service by scale
114+
watched[container.ID] = 0
115+
mustAttach = true
116+
}
117+
if mustAttach {
118+
// Container restarted, need to re-attach
119+
err := s.attachContainer(ctx, container, listener, project)
120+
if err != nil {
121+
return err
122+
}
123+
}
124+
}
125+
126+
return nil
127+
},
128+
})
129+
if errors.Is(ctx.Err(), context.Canceled) {
122130
return nil
123-
case err := <-errC:
124-
return err
125131
}
132+
return err
126133
}

local/e2e/compose/compose_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"testing"
2828
"time"
2929

30+
testify "github.com/stretchr/testify/assert"
3031
"gotest.tools/v3/assert"
3132
"gotest.tools/v3/icmd"
3233

@@ -197,18 +198,16 @@ func TestAttachRestart(t *testing.T) {
197198

198199
c.WaitForCondition(func() (bool, string) {
199200
debug := res.Combined()
200-
return strings.Count(res.Stdout(), "another_1 exited with code 1") == 3, fmt.Sprintf("'another_1 exited with code 1' not found 3 times in : \n%s\n", debug)
201+
return strings.Count(res.Stdout(), "failing_1 exited with code 1") == 3, fmt.Sprintf("'failing_1 exited with code 1' not found 3 times in : \n%s\n", debug)
201202
}, 2*time.Minute, 2*time.Second)
202203

203-
assert.Equal(t, strings.Count(res.Stdout(), "another_1 | world"), 3, res.Combined())
204+
assert.Equal(t, strings.Count(res.Stdout(), "failing_1 | world"), 3, res.Combined())
204205
}
205206

206207
func TestInitContainer(t *testing.T) {
207208
c := NewParallelE2eCLI(t, binDir)
208209

209210
res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "./fixtures/init-container", "up")
210211
defer c.RunDockerOrExitError("compose", "-p", "init-container", "down")
211-
output := res.Stdout()
212-
213-
assert.Assert(t, strings.Contains(output, "foo_1 | hello\nbar_1 | world"), res.Combined())
212+
testify.Regexp(t, "foo_1 | hello(?m:.*)bar_1 | world", res.Stdout())
214213
}

local/e2e/compose/fixtures/attach-restart/compose.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
services:
2-
simple:
3-
image: alpine
4-
command: sh -c "sleep infinity"
5-
another:
2+
failing:
63
image: alpine
74
command: sh -c "sleep 0.1 && echo world && /bin/false"
85
deploy:

0 commit comments

Comments
 (0)