Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit d8db079

Browse files
author
aiordache
committed
Add exec command
Signed-off-by: aiordache <[email protected]>
1 parent 7fd3c6f commit d8db079

File tree

2 files changed

+109
-29
lines changed

2 files changed

+109
-29
lines changed

kube/client/client.go

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import (
3131
"golang.org/x/sync/errgroup"
3232
corev1 "k8s.io/api/core/v1"
3333
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/cli-runtime/pkg/genericclioptions"
3536
"k8s.io/client-go/kubernetes"
3637
"k8s.io/client-go/rest"
3738
"k8s.io/client-go/tools/portforward"
39+
"k8s.io/client-go/tools/remotecommand"
3840
"k8s.io/client-go/transport/spdy"
3941
)
4042

@@ -43,6 +45,7 @@ type KubeClient struct {
4345
client *kubernetes.Clientset
4446
namespace string
4547
config *rest.Config
48+
ioStreams genericclioptions.IOStreams
4649
}
4750

4851
// NewKubeClient new kubernetes client
@@ -54,7 +57,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
5457

5558
clientset, err := kubernetes.NewForConfig(restConfig)
5659
if err != nil {
57-
return nil, err
60+
return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
5861
}
5962

6063
namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
@@ -66,9 +69,83 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
6669
client: clientset,
6770
namespace: namespace,
6871
config: restConfig,
72+
ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
6973
}, nil
7074
}
7175

76+
// GetContainers get containers for a given compose project
77+
func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
78+
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
79+
LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
80+
})
81+
if err != nil {
82+
return nil, err
83+
}
84+
if pods == nil {
85+
return nil, nil
86+
}
87+
var pod corev1.Pod
88+
for _, p := range pods.Items {
89+
service := p.Labels[compose.ServiceTag]
90+
if service == serviceName {
91+
pod = p
92+
break
93+
}
94+
}
95+
return &pod, nil
96+
}
97+
98+
// Exec executes a command in a container
99+
func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error {
100+
pod, err := kc.GetPod(ctx, projectName, opts.Service)
101+
if err != nil || pod == nil {
102+
return err
103+
}
104+
if len(pod.Spec.Containers) == 0 {
105+
return fmt.Errorf("no containers running in pod %s", pod.Name)
106+
}
107+
// get first container in the pod
108+
container := &pod.Spec.Containers[0]
109+
containerName := container.Name
110+
111+
req := kc.client.CoreV1().RESTClient().Post().
112+
Resource("pods").
113+
Name(pod.Name).
114+
Namespace(kc.namespace).
115+
SubResource("exec")
116+
117+
option := &corev1.PodExecOptions{
118+
Container: containerName,
119+
Command: opts.Command,
120+
Stdin: true,
121+
Stdout: true,
122+
Stderr: true,
123+
TTY: opts.Tty,
124+
}
125+
126+
if opts.Reader == nil {
127+
option.Stdin = false
128+
}
129+
130+
scheme := runtime.NewScheme()
131+
if err := corev1.AddToScheme(scheme); err != nil {
132+
return fmt.Errorf("error adding to scheme: %v", err)
133+
}
134+
parameterCodec := runtime.NewParameterCodec(scheme)
135+
req.VersionedParams(option, parameterCodec)
136+
137+
exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
138+
if err != nil {
139+
return err
140+
}
141+
return exec.Stream(remotecommand.StreamOptions{
142+
Stdin: opts.Reader,
143+
Stdout: opts.Writer,
144+
Stderr: opts.Writer,
145+
Tty: opts.Tty,
146+
})
147+
}
148+
72149
// GetContainers get containers for a given compose project
73150
func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
74151
fieldSelector := ""
@@ -178,9 +255,13 @@ func (kc KubeClient) MapPorts(ctx context.Context, opts PortMappingOptions) erro
178255
for serviceName, servicePorts := range opts.Services {
179256
serviceName = serviceName
180257
servicePorts = servicePorts
258+
pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
259+
if err != nil {
260+
return err
261+
}
181262
eg.Go(func() error {
182263

183-
req := kc.client.RESTClient().Post().Resource("services").Namespace(kc.namespace).Name(serviceName).SubResource("portforward")
264+
req := kc.client.RESTClient().Post().Resource("pods").Namespace(kc.namespace).Name(pod.Name).SubResource("portforward") //fmt.Sprintf("service/%s", serviceName)).SubResource("portforward")
184265
transport, upgrader, err := spdy.RoundTripperFor(kc.config)
185266
if err != nil {
186267
return err

kube/compose.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -122,35 +122,34 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
122122
w.Event(progress.NewEvent(pod, state, message))
123123
},
124124
})
125-
return err
126-
/*
127-
if err != nil {
128-
return err
129-
}
125+
//return err
126+
127+
if err != nil {
128+
return err
129+
}
130130

131-
// check if there is a port mapping
132-
services := map[string]client.Ports{}
133-
134-
for _, s := range project.Services {
135-
if len(s.Ports) > 0 {
136-
services[s.Name] = client.Ports{}
137-
for _, p := range s.Ports {
138-
services[s.Name] = append(services[s.Name], compose.PortPublisher{
139-
TargetPort: int(p.Target),
140-
PublishedPort: int(p.Published),
141-
Protocol: p.Protocol,
142-
})
143-
}
131+
// check if there is a port mapping
132+
services := map[string]client.Ports{}
133+
134+
for _, s := range project.Services {
135+
if len(s.Ports) > 0 {
136+
services[s.Name] = client.Ports{}
137+
for _, p := range s.Ports {
138+
services[s.Name] = append(services[s.Name], compose.PortPublisher{
139+
TargetPort: int(p.Target),
140+
PublishedPort: int(p.Published),
141+
Protocol: p.Protocol,
142+
})
144143
}
145144
}
146-
if len(services) > 0 {
147-
return s.client.MapPorts(ctx, client.PortMappingOptions{
148-
ProjectName: project.Name,
149-
Services: services,
150-
})
151-
}
152-
return nil
153-
*/
145+
}
146+
if len(services) > 0 {
147+
return s.client.MapPorts(ctx, client.PortMappingOptions{
148+
ProjectName: project.Name,
149+
Services: services,
150+
})
151+
}
152+
return nil
154153

155154
}
156155

@@ -311,7 +310,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
311310

312311
// Exec executes a command in a running service container
313312
func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) {
314-
return 0, errdefs.ErrNotImplemented
313+
return 0, s.client.Exec(ctx, project.Name, opts)
315314
}
316315

317316
func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error {

0 commit comments

Comments
 (0)