Skip to content

Commit 407a0d5

Browse files
authored
up: fix various race/deadlock conditions on exit (docker#10934)
If running `up` in foreground mode (i.e. not `-d`), when exiting via `Ctrl-C`, Compose stops all the services it launched directly as part of that `up` command. In one of the E2E tests (`TestUpDependenciesNotStopped`), this was occasionally flaking because the stop behavior was racy: the return might not block on the stop operation because it gets added to the error group in a goroutine. As a result, it was possible for no services to get terminated on exit. There were a few other related pieces here that I uncovered and tried to fix while stressing this. For example, the printer could cause a deadlock if an event was sent to it after it stopped. Also, an error group wasn't really appropriate here; each goroutine is a different operation for printing, signal-handling, etc. If one part fails, we don't actually want printing to stop, for example. This has been switched to a `multierror.Group`, which has the same API but coalesces errors instead of canceling a context the moment the first one fails and returning that single error. Signed-off-by: Milas Bowman <[email protected]>
1 parent d0dfb84 commit 407a0d5

File tree

4 files changed

+105
-82
lines changed

4 files changed

+105
-82
lines changed

pkg/compose/printer.go

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package compose
1818

1919
import (
2020
"fmt"
21+
"sync/atomic"
2122

2223
"github.com/docker/compose/v2/pkg/api"
2324
)
@@ -33,32 +34,37 @@ type logPrinter interface {
3334
type printer struct {
3435
queue chan api.ContainerEvent
3536
consumer api.LogConsumer
36-
stopCh chan struct{}
37+
stopped atomic.Bool
3738
}
3839

3940
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
4041
func newLogPrinter(consumer api.LogConsumer) logPrinter {
4142
queue := make(chan api.ContainerEvent)
42-
stopCh := make(chan struct{}, 1) // printer MAY stop on his own, so Stop MUST not be blocking
4343
printer := printer{
4444
consumer: consumer,
4545
queue: queue,
46-
stopCh: stopCh,
4746
}
4847
return &printer
4948
}
5049

5150
func (p *printer) Cancel() {
52-
p.queue <- api.ContainerEvent{
53-
Type: api.UserCancel,
54-
}
51+
// note: HandleEvent is used to ensure this doesn't deadlock
52+
p.HandleEvent(api.ContainerEvent{Type: api.UserCancel})
5553
}
5654

5755
func (p *printer) Stop() {
58-
p.stopCh <- struct{}{}
56+
if p.stopped.CompareAndSwap(false, true) {
57+
// only close if this is the first call to stop
58+
close(p.queue)
59+
}
5960
}
6061

6162
func (p *printer) HandleEvent(event api.ContainerEvent) {
63+
// prevent deadlocking, if the printer is done, there's no reader for
64+
// queue, so this write could block indefinitely
65+
if p.stopped.Load() {
66+
return
67+
}
6268
p.queue <- event
6369
}
6470

@@ -69,61 +75,57 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
6975
exitCode int
7076
)
7177
containers := map[string]struct{}{}
72-
for {
73-
select {
74-
case <-p.stopCh:
75-
return exitCode, nil
76-
case event := <-p.queue:
77-
container, id := event.Container, event.ID
78-
switch event.Type {
79-
case api.UserCancel:
80-
aborting = true
81-
case api.ContainerEventAttach:
82-
if _, ok := containers[id]; ok {
83-
continue
84-
}
85-
containers[id] = struct{}{}
86-
p.consumer.Register(container)
87-
case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
88-
if !event.Restarting {
89-
delete(containers, id)
78+
for event := range p.queue {
79+
container, id := event.Container, event.ID
80+
switch event.Type {
81+
case api.UserCancel:
82+
aborting = true
83+
case api.ContainerEventAttach:
84+
if _, ok := containers[id]; ok {
85+
continue
86+
}
87+
containers[id] = struct{}{}
88+
p.consumer.Register(container)
89+
case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
90+
if !event.Restarting {
91+
delete(containers, id)
92+
}
93+
if !aborting {
94+
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
95+
if event.Type == api.ContainerEventRecreated {
96+
p.consumer.Status(container, "has been recreated")
9097
}
98+
}
99+
if cascadeStop {
91100
if !aborting {
92-
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
93-
if event.Type == api.ContainerEventRecreated {
94-
p.consumer.Status(container, "has been recreated")
101+
aborting = true
102+
err := stopFn()
103+
if err != nil {
104+
return 0, err
95105
}
96106
}
97-
if cascadeStop {
98-
if !aborting {
99-
aborting = true
100-
err := stopFn()
101-
if err != nil {
102-
return 0, err
103-
}
107+
if event.Type == api.ContainerEventExit {
108+
if exitCodeFrom == "" {
109+
exitCodeFrom = event.Service
104110
}
105-
if event.Type == api.ContainerEventExit {
106-
if exitCodeFrom == "" {
107-
exitCodeFrom = event.Service
108-
}
109-
if exitCodeFrom == event.Service {
110-
exitCode = event.ExitCode
111-
}
111+
if exitCodeFrom == event.Service {
112+
exitCode = event.ExitCode
112113
}
113114
}
114-
if len(containers) == 0 {
115-
// Last container terminated, done
116-
return exitCode, nil
117-
}
118-
case api.ContainerEventLog:
119-
if !aborting {
120-
p.consumer.Log(container, event.Line)
121-
}
122-
case api.ContainerEventErr:
123-
if !aborting {
124-
p.consumer.Err(container, event.Line)
125-
}
115+
}
116+
if len(containers) == 0 {
117+
// Last container terminated, done
118+
return exitCode, nil
119+
}
120+
case api.ContainerEventLog:
121+
if !aborting {
122+
p.consumer.Log(container, event.Line)
123+
}
124+
case api.ContainerEventErr:
125+
if !aborting {
126+
p.consumer.Err(container, event.Line)
126127
}
127128
}
128129
}
130+
return exitCode, nil
129131
}

pkg/compose/up.go

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ import (
2121
"fmt"
2222
"os"
2323
"os/signal"
24+
"sync"
2425
"syscall"
2526

26-
"github.com/docker/compose/v2/internal/tracing"
27-
2827
"github.com/compose-spec/compose-go/types"
2928
"github.com/docker/cli/cli"
29+
"github.com/docker/compose/v2/internal/tracing"
3030
"github.com/docker/compose/v2/pkg/api"
3131
"github.com/docker/compose/v2/pkg/progress"
32-
"golang.org/x/sync/errgroup"
32+
"github.com/hashicorp/go-multierror"
3333
)
3434

3535
func (s *composeService) Up(ctx context.Context, project *types.Project, options api.UpOptions) error {
@@ -55,39 +55,60 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
5555
return err
5656
}
5757

58-
printer := newLogPrinter(options.Start.Attach)
59-
60-
signalChan := make(chan os.Signal, 1)
58+
// if we get a second signal during shutdown, we kill the services
59+
// immediately, so the channel needs to have sufficient capacity or
60+
// we might miss a signal while setting up the second channel read
61+
// (this is also why signal.Notify is used vs signal.NotifyContext)
62+
signalChan := make(chan os.Signal, 2)
6163
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
64+
signalCancel := sync.OnceFunc(func() {
65+
signal.Stop(signalChan)
66+
close(signalChan)
67+
})
68+
defer signalCancel()
6269

70+
printer := newLogPrinter(options.Start.Attach)
6371
stopFunc := func() error {
6472
fmt.Fprintln(s.stdinfo(), "Aborting on container exit...")
6573
ctx := context.Background()
6674
return progress.Run(ctx, func(ctx context.Context) error {
75+
// race two goroutines - one that blocks until another signal is received
76+
// and then does a Kill() and one that immediately starts a friendly Stop()
77+
errCh := make(chan error, 1)
6778
go func() {
68-
<-signalChan
69-
s.Kill(ctx, project.Name, api.KillOptions{ //nolint:errcheck
79+
if _, ok := <-signalChan; !ok {
80+
// channel closed, so the outer function is done, which
81+
// means the other goroutine (calling Stop()) finished
82+
return
83+
}
84+
errCh <- s.Kill(ctx, project.Name, api.KillOptions{
7085
Services: options.Create.Services,
7186
Project: project,
7287
})
7388
}()
7489

75-
return s.Stop(ctx, project.Name, api.StopOptions{
76-
Services: options.Create.Services,
77-
Project: project,
78-
})
90+
go func() {
91+
errCh <- s.Stop(ctx, project.Name, api.StopOptions{
92+
Services: options.Create.Services,
93+
Project: project,
94+
})
95+
}()
96+
return <-errCh
7997
}, s.stdinfo())
8098
}
8199

82100
var isTerminated bool
83-
eg, ctx := errgroup.WithContext(ctx)
84-
go func() {
85-
<-signalChan
101+
var eg multierror.Group
102+
eg.Go(func() error {
103+
if _, ok := <-signalChan; !ok {
104+
// function finished without receiving a signal
105+
return nil
106+
}
86107
isTerminated = true
87108
printer.Cancel()
88109
fmt.Fprintln(s.stdinfo(), "Gracefully stopping... (press Ctrl+C again to force)")
89-
eg.Go(stopFunc)
90-
}()
110+
return stopFunc()
111+
})
91112

92113
var exitCode int
93114
eg.Go(func() error {
@@ -101,8 +122,10 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
101122
return err
102123
}
103124

125+
// signal for the goroutines to stop & wait for them to finish any remaining work
126+
signalCancel()
104127
printer.Stop()
105-
err = eg.Wait()
128+
err = eg.Wait().ErrorOrNil()
106129
if exitCode != 0 {
107130
errMsg := ""
108131
if err != nil {

pkg/e2e/assert.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import (
2828
// (running or exited).
2929
func RequireServiceState(t testing.TB, cli *CLI, service string, state string) {
3030
t.Helper()
31-
psRes := cli.RunDockerComposeCmd(t, "ps", "--format=json", service)
31+
psRes := cli.RunDockerComposeCmd(t, "ps", "--all", "--format=json", service)
3232
var svc map[string]interface{}
3333
require.NoError(t, json.Unmarshal([]byte(psRes.Stdout()), &svc),
34-
"Invalid `compose ps` JSON output")
34+
"Invalid `compose ps` JSON: command output: %s",
35+
psRes.Combined())
3536

3637
require.Equal(t, service, svc["Service"],
3738
"Found ps output for unexpected service")

pkg/e2e/up_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package e2e
2121

2222
import (
2323
"context"
24-
"os"
2524
"os/exec"
2625
"strings"
2726
"syscall"
@@ -45,9 +44,6 @@ func TestUpServiceUnhealthy(t *testing.T) {
4544
}
4645

4746
func TestUpDependenciesNotStopped(t *testing.T) {
48-
if _, ok := os.LookupEnv("CI"); ok {
49-
t.Skip("Skipping test on CI... flaky")
50-
}
5147
c := NewParallelCLI(t, WithEnv(
5248
"COMPOSE_PROJECT_NAME=up-deps-stop",
5349
))
@@ -76,8 +72,8 @@ func TestUpDependenciesNotStopped(t *testing.T) {
7672
"app",
7773
)
7874

79-
ctx, cancel := context.WithCancel(context.Background())
80-
defer cancel()
75+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
76+
t.Cleanup(cancel)
8177

8278
cmd, err := StartWithNewGroupID(ctx, testCmd, upOut, nil)
8379
assert.NilError(t, err, "Failed to run compose up")
@@ -91,12 +87,13 @@ func TestUpDependenciesNotStopped(t *testing.T) {
9187
require.NoError(t, syscall.Kill(-cmd.Process.Pid, syscall.SIGINT),
9288
"Failed to send SIGINT to compose up process")
9389

94-
time.AfterFunc(5*time.Second, cancel)
95-
9690
t.Log("Waiting for `compose up` to exit")
9791
err = cmd.Wait()
9892
if err != nil {
9993
exitErr := err.(*exec.ExitError)
94+
if exitErr.ExitCode() == -1 {
95+
t.Fatalf("`compose up` was killed: %v", err)
96+
}
10097
require.EqualValues(t, exitErr.ExitCode(), 130)
10198
}
10299

0 commit comments

Comments
 (0)