Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit ff5e3a5

Browse files
authored
Merge pull request #1245 from aiordache/kube_logs_cmd
Kube backend: Implement `compose logs`
2 parents 9f56fc9 + 9be77be commit ff5e3a5

File tree

6 files changed

+122
-33
lines changed

6 files changed

+122
-33
lines changed

kube/client/client.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ package client
2121
import (
2222
"context"
2323
"fmt"
24+
"io"
2425

25-
v1 "k8s.io/api/core/v1"
26+
"github.com/docker/compose-cli/api/compose"
27+
"github.com/docker/compose-cli/utils"
28+
"golang.org/x/sync/errgroup"
29+
corev1 "k8s.io/api/core/v1"
2630
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2731
"k8s.io/cli-runtime/pkg/genericclioptions"
2832
"k8s.io/client-go/kubernetes"
29-
30-
"github.com/docker/compose-cli/api/compose"
3133
)
3234

3335
// KubeClient API to access kube objects
@@ -81,7 +83,7 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all
8183
return result, nil
8284
}
8385

84-
func podToContainerSummary(pod v1.Pod) compose.ContainerSummary {
86+
func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
8587
return compose.ContainerSummary{
8688
ID: pod.GetObjectMeta().GetName(),
8789
Name: pod.GetObjectMeta().GetName(),
@@ -90,3 +92,30 @@ func podToContainerSummary(pod v1.Pod) compose.ContainerSummary {
9092
Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
9193
}
9294
}
95+
96+
// GetLogs retrieves pod logs
97+
func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
98+
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
99+
LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
100+
})
101+
if err != nil {
102+
return err
103+
}
104+
eg, ctx := errgroup.WithContext(ctx)
105+
for _, pod := range pods.Items {
106+
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
107+
service := pod.Labels[compose.ServiceTag]
108+
w := utils.GetWriter(service, pod.Name, consumer)
109+
110+
eg.Go(func() error {
111+
r, err := request.Stream(ctx)
112+
defer r.Close() // nolint errcheck
113+
if err != nil {
114+
return err
115+
}
116+
_, err = io.Copy(w, r)
117+
return err
118+
})
119+
}
120+
return eg.Wait()
121+
}

kube/compose.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/docker/compose-cli/kube/client"
3434
"github.com/docker/compose-cli/kube/helm"
3535
"github.com/docker/compose-cli/kube/resources"
36+
"github.com/docker/compose-cli/utils"
3637
)
3738

3839
type composeService struct {
@@ -154,7 +155,10 @@ func (s *composeService) Stop(ctx context.Context, project *types.Project) error
154155

155156
// Logs executes the equivalent to a `compose logs`
156157
func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error {
157-
return errdefs.ErrNotImplemented
158+
if len(options.Services) > 0 {
159+
consumer = utils.FilteredLogConsumer(consumer, options.Services)
160+
}
161+
return s.client.GetLogs(ctx, projectName, consumer, options.Follow)
158162
}
159163

160164
// Ps executes the equivalent to a `compose ps`

kube/e2e/compose_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ func TestComposeUp(t *testing.T) {
108108
c.WaitForCmdResult(icmd.Command("docker", "--context", "default", "exec", "e2e-control-plane", "curl", endpoint), StdoutContains(`"word":`), 3*time.Minute, 3*time.Second)
109109
})
110110

111+
t.Run("compose logs web", func(t *testing.T) {
112+
res := c.RunDockerCmd("compose", "--project-name", projectName, "logs", "web")
113+
assert.Assert(t, strings.Contains(res.Stdout(), "Listening on port 80"), res.Stdout())
114+
})
115+
111116
t.Run("down", func(t *testing.T) {
112117
_ = c.RunDockerCmd("compose", "--project-name", projectName, "down")
113118
})

local/compose/attach.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/docker/compose-cli/api/compose"
2626
convert "github.com/docker/compose-cli/local/moby"
27+
"github.com/docker/compose-cli/utils"
2728

2829
"github.com/compose-spec/compose-go/types"
2930
moby "github.com/docker/docker/api/types"
@@ -62,7 +63,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
6263

6364
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
6465
serviceName := container.Labels[serviceLabel]
65-
w := getWriter(serviceName, getCanonicalContainerName(container), consumer)
66+
w := utils.GetWriter(serviceName, getCanonicalContainerName(container), consumer)
6667

6768
service, err := project.GetService(serviceName)
6869
if err != nil {

local/compose/logs.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package compose
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"io"
2322

2423
"github.com/docker/compose-cli/api/compose"
24+
"github.com/docker/compose-cli/utils"
2525

2626
"github.com/docker/docker/api/types"
2727
"github.com/docker/docker/api/types/filters"
@@ -73,7 +73,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
7373
if err != nil {
7474
return err
7575
}
76-
w := getWriter(service, container.Name[1:], consumer)
76+
w := utils.GetWriter(service, container.Name[1:], consumer)
7777
if container.Config.Tty {
7878
_, err = io.Copy(w, r)
7979
} else {
@@ -84,28 +84,3 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
8484
}
8585
return eg.Wait()
8686
}
87-
88-
type splitBuffer struct {
89-
service string
90-
container string
91-
consumer compose.LogConsumer
92-
}
93-
94-
// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
95-
func getWriter(service, container string, l compose.LogConsumer) io.Writer {
96-
return splitBuffer{
97-
service: service,
98-
container: container,
99-
consumer: l,
100-
}
101-
}
102-
103-
func (s splitBuffer) Write(b []byte) (n int, err error) {
104-
split := bytes.Split(b, []byte{'\n'})
105-
for _, line := range split {
106-
if len(line) != 0 {
107-
s.consumer.Log(s.service, s.container, string(line))
108-
}
109-
}
110-
return len(b), nil
111-
}

utils/logconsumer.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright 2020 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package utils
18+
19+
import (
20+
"bytes"
21+
"io"
22+
23+
"github.com/docker/compose-cli/api/compose"
24+
)
25+
26+
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
27+
func GetWriter(service, container string, l compose.LogConsumer) io.Writer {
28+
return splitBuffer{
29+
service: service,
30+
container: container,
31+
consumer: l,
32+
}
33+
}
34+
35+
// FilteredLogConsumer filters logs for given services
36+
func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
37+
if len(services) == 0 {
38+
return consumer
39+
}
40+
allowed := map[string]bool{}
41+
for _, s := range services {
42+
allowed[s] = true
43+
}
44+
return &allowListLogConsumer{
45+
allowList: allowed,
46+
delegate: consumer,
47+
}
48+
}
49+
50+
type allowListLogConsumer struct {
51+
allowList map[string]bool
52+
delegate compose.LogConsumer
53+
}
54+
55+
func (a *allowListLogConsumer) Log(service, container, message string) {
56+
if a.allowList[service] {
57+
a.delegate.Log(service, container, message)
58+
}
59+
}
60+
61+
type splitBuffer struct {
62+
service string
63+
container string
64+
consumer compose.LogConsumer
65+
}
66+
67+
func (s splitBuffer) Write(b []byte) (n int, err error) {
68+
split := bytes.Split(b, []byte{'\n'})
69+
for _, line := range split {
70+
if len(line) != 0 {
71+
s.consumer.Log(s.service, s.container, string(line))
72+
}
73+
}
74+
return len(b), nil
75+
}

0 commit comments

Comments
 (0)