Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit c16834c

Browse files
committed
attach to containers added by "scale"
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent ccaa175 commit c16834c

File tree

8 files changed

+186
-131
lines changed

8 files changed

+186
-131
lines changed

api/compose/printer.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2020 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package compose
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/sirupsen/logrus"
23+
)
24+
25+
// LogPrinter watch application containers an collect their logs
26+
type LogPrinter interface {
27+
HandleEvent(event ContainerEvent)
28+
Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
29+
Cancel()
30+
}
31+
32+
// NewLogPrinter builds a LogPrinter passing containers logs to LogConsumer
33+
func NewLogPrinter(consumer LogConsumer) LogPrinter {
34+
queue := make(chan ContainerEvent)
35+
printer := printer{
36+
consumer: consumer,
37+
queue: queue,
38+
}
39+
return &printer
40+
}
41+
42+
func (p *printer) Cancel() {
43+
p.queue <- ContainerEvent{
44+
Type: UserCancel,
45+
}
46+
}
47+
48+
type printer struct {
49+
queue chan ContainerEvent
50+
consumer LogConsumer
51+
}
52+
53+
func (p *printer) HandleEvent(event ContainerEvent) {
54+
p.queue <- event
55+
}
56+
57+
func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) {
58+
var (
59+
aborting bool
60+
exitCode int
61+
)
62+
containers := map[string]struct{}{}
63+
for {
64+
event := <-p.queue
65+
switch event.Type {
66+
case UserCancel:
67+
aborting = true
68+
case ContainerEventAttach:
69+
if _, ok := containers[event.Container]; ok {
70+
continue
71+
}
72+
containers[event.Container] = struct{}{}
73+
p.consumer.Register(event.Container)
74+
case ContainerEventExit:
75+
delete(containers, event.Container)
76+
if !aborting {
77+
p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
78+
}
79+
if cascadeStop {
80+
if !aborting {
81+
aborting = true
82+
fmt.Println("Aborting on container exit...")
83+
err := stopFn()
84+
if err != nil {
85+
return 0, err
86+
}
87+
}
88+
if exitCodeFrom == "" {
89+
exitCodeFrom = event.Service
90+
}
91+
if exitCodeFrom == event.Service {
92+
logrus.Error(event.ExitCode)
93+
exitCode = event.ExitCode
94+
}
95+
}
96+
if len(containers) == 0 {
97+
// Last container terminated, done
98+
return exitCode, nil
99+
}
100+
case ContainerEventLog:
101+
if !aborting {
102+
p.consumer.Log(event.Container, event.Service, event.Line)
103+
}
104+
}
105+
}
106+
}

cli/cmd/compose/up.go

Lines changed: 7 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929

3030
"github.com/compose-spec/compose-go/types"
3131
"github.com/docker/cli/cli"
32-
"github.com/sirupsen/logrus"
3332
"github.com/spf13/cobra"
3433
"golang.org/x/sync/errgroup"
3534

@@ -275,13 +274,12 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
275274
return nil
276275
}
277276

278-
queue := make(chan compose.ContainerEvent)
279-
printer := printer{
280-
queue: queue,
281-
}
277+
consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
278+
printer := compose.NewLogPrinter(consumer)
282279

283280
signalChan := make(chan os.Signal, 1)
284281
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
282+
285283
stopFunc := func() error {
286284
ctx := context.Background()
287285
_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
@@ -296,27 +294,21 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
296294
}
297295
go func() {
298296
<-signalChan
299-
queue <- compose.ContainerEvent{
300-
Type: compose.UserCancel,
301-
}
297+
printer.Cancel()
302298
fmt.Println("Gracefully stopping... (press Ctrl+C again to force)")
303299
stopFunc() // nolint:errcheck
304300
}()
305301

306-
consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
307-
308302
var exitCode int
309303
eg, ctx := errgroup.WithContext(ctx)
310304
eg.Go(func() error {
311-
code, err := printer.run(opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc)
305+
code, err := printer.Run(opts.cascadeStop, opts.exitCodeFrom, stopFunc)
312306
exitCode = code
313307
return err
314308
})
315309

316310
err = backend.Start(ctx, project, compose.StartOptions{
317-
Attach: func(event compose.ContainerEvent) {
318-
queue <- event
319-
},
311+
Attach: printer.HandleEvent,
320312
Services: services,
321313
})
322314
if err != nil {
@@ -341,11 +333,7 @@ func setServiceScale(project *types.Project, name string, replicas int) error {
341333
if err != nil {
342334
return err
343335
}
344-
if service.Deploy == nil {
345-
service.Deploy = &types.DeployConfig{}
346-
}
347-
count := uint64(replicas)
348-
service.Deploy.Replicas = &count
336+
service.Scale = replicas
349337
project.Services[i] = service
350338
return nil
351339
}
@@ -392,49 +380,3 @@ func setup(opts composeOptions, services []string) (*types.Project, error) {
392380

393381
return project, nil
394382
}
395-
396-
type printer struct {
397-
queue chan compose.ContainerEvent
398-
}
399-
400-
func (p printer) run(cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) {
401-
var aborting bool
402-
var count int
403-
for {
404-
event := <-p.queue
405-
switch event.Type {
406-
case compose.UserCancel:
407-
aborting = true
408-
case compose.ContainerEventAttach:
409-
consumer.Register(event.Container)
410-
count++
411-
case compose.ContainerEventExit:
412-
if !aborting {
413-
consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
414-
}
415-
if cascadeStop {
416-
if !aborting {
417-
aborting = true
418-
fmt.Println("Aborting on container exit...")
419-
err := stopFn()
420-
if err != nil {
421-
return 0, err
422-
}
423-
}
424-
if exitCodeFrom == "" || exitCodeFrom == event.Service {
425-
logrus.Error(event.ExitCode)
426-
return event.ExitCode, nil
427-
}
428-
}
429-
count--
430-
if count == 0 {
431-
// Last container terminated, done
432-
return 0, nil
433-
}
434-
case compose.ContainerEventLog:
435-
if !aborting {
436-
consumer.Log(event.Container, event.Service, event.Line)
437-
}
438-
}
439-
}
440-
}

cli/cmd/compose/up_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ func TestApplyScaleOpt(t *testing.T) {
3939
assert.NilError(t, err)
4040
foo, err := p.GetService("foo")
4141
assert.NilError(t, err)
42-
assert.Check(t, *foo.Deploy.Replicas == 2)
42+
assert.Equal(t, foo.Scale, 2)
4343
}

local/compose/attach.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@ import (
3333
)
3434

3535
func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) {
36-
if len(selectedServices) == 0 {
37-
selectedServices = project.ServiceNames()
38-
}
39-
4036
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, selectedServices...)
4137
if err != nil {
4238
return nil, err
@@ -57,44 +53,6 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
5753
return nil, err
5854
}
5955
}
60-
61-
// Watch events to capture container restart and re-attach
62-
go func() {
63-
crashed := map[string]struct{}{}
64-
s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
65-
Services: selectedServices,
66-
Consumer: func(event compose.Event) error {
67-
if event.Status == "die" {
68-
crashed[event.Container] = struct{}{}
69-
return nil
70-
}
71-
if _, ok := crashed[event.Container]; ok {
72-
inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
73-
if err != nil {
74-
return err
75-
}
76-
77-
container := moby.Container{
78-
ID: event.Container,
79-
Names: []string{inspect.Name},
80-
State: convert.ContainerRunning,
81-
Labels: map[string]string{
82-
projectLabel: project.Name,
83-
serviceLabel: event.Service,
84-
},
85-
}
86-
87-
// Just ignore errors when reattaching to already crashed containers
88-
s.attachContainer(ctx, container, listener, project) // nolint: errcheck
89-
delete(crashed, event.Container)
90-
91-
s.waitContainer(container, listener)
92-
}
93-
return nil
94-
},
95-
})
96-
}()
97-
9856
return containers, err
9957
}
10058

local/compose/down.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,18 +191,22 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress
191191
}
192192

193193
func (s *composeService) stopContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {
194+
eg, ctx := errgroup.WithContext(ctx)
194195
for _, container := range containers {
195-
toStop := container
196-
eventName := getContainerProgressName(toStop)
197-
w.Event(progress.StoppingEvent(eventName))
198-
err := s.apiClient.ContainerStop(ctx, toStop.ID, timeout)
199-
if err != nil {
200-
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
201-
return err
202-
}
203-
w.Event(progress.StoppedEvent(eventName))
196+
container := container
197+
eg.Go(func() error {
198+
eventName := getContainerProgressName(container)
199+
w.Event(progress.StoppingEvent(eventName))
200+
err := s.apiClient.ContainerStop(ctx, container.ID, timeout)
201+
if err != nil {
202+
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
203+
return err
204+
}
205+
w.Event(progress.StoppedEvent(eventName))
206+
return nil
207+
})
204208
}
205-
return nil
209+
return eg.Wait()
206210
}
207211

208212
func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {

0 commit comments

Comments
 (0)