Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit 25c3df6

Browse files
authored
Merge pull request #1758 from aiordache/kube_hack_april
Kube backend updates
2 parents d0e8ebe + 0d25709 commit 25c3df6

File tree

5 files changed

+229
-15
lines changed

5 files changed

+229
-15
lines changed

kube/client/client.go

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,31 @@ import (
2222
"context"
2323
"fmt"
2424
"io"
25+
"net/http"
26+
"os"
27+
"strings"
2528
"time"
2629

2730
"github.com/docker/compose-cli/api/compose"
2831
"github.com/docker/compose-cli/utils"
2932
"golang.org/x/sync/errgroup"
3033
corev1 "k8s.io/api/core/v1"
3134
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/runtime"
3236
"k8s.io/cli-runtime/pkg/genericclioptions"
3337
"k8s.io/client-go/kubernetes"
38+
"k8s.io/client-go/rest"
39+
"k8s.io/client-go/tools/portforward"
40+
"k8s.io/client-go/tools/remotecommand"
41+
"k8s.io/client-go/transport/spdy"
3442
)
3543

3644
// KubeClient API to access kube objects
3745
type KubeClient struct {
3846
client *kubernetes.Clientset
3947
namespace string
48+
config *rest.Config
49+
ioStreams genericclioptions.IOStreams
4050
}
4151

4252
// NewKubeClient new kubernetes client
@@ -48,7 +58,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
4858

4959
clientset, err := kubernetes.NewForConfig(restConfig)
5060
if err != nil {
51-
return nil, err
61+
return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
5262
}
5363

5464
namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
@@ -59,9 +69,84 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
5969
return &KubeClient{
6070
client: clientset,
6171
namespace: namespace,
72+
config: restConfig,
73+
ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
6274
}, nil
6375
}
6476

77+
// GetPod retrieves a service pod
78+
func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
79+
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
80+
LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
81+
})
82+
if err != nil {
83+
return nil, err
84+
}
85+
if pods == nil {
86+
return nil, nil
87+
}
88+
var pod corev1.Pod
89+
for _, p := range pods.Items {
90+
service := p.Labels[compose.ServiceTag]
91+
if service == serviceName {
92+
pod = p
93+
break
94+
}
95+
}
96+
return &pod, nil
97+
}
98+
99+
// Exec executes a command in a container
100+
func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error {
101+
pod, err := kc.GetPod(ctx, projectName, opts.Service)
102+
if err != nil || pod == nil {
103+
return err
104+
}
105+
if len(pod.Spec.Containers) == 0 {
106+
return fmt.Errorf("no containers running in pod %s", pod.Name)
107+
}
108+
// get first container in the pod
109+
container := &pod.Spec.Containers[0]
110+
containerName := container.Name
111+
112+
req := kc.client.CoreV1().RESTClient().Post().
113+
Resource("pods").
114+
Name(pod.Name).
115+
Namespace(kc.namespace).
116+
SubResource("exec")
117+
118+
option := &corev1.PodExecOptions{
119+
Container: containerName,
120+
Command: opts.Command,
121+
Stdin: true,
122+
Stdout: true,
123+
Stderr: true,
124+
TTY: opts.Tty,
125+
}
126+
127+
if opts.Reader == nil {
128+
option.Stdin = false
129+
}
130+
131+
scheme := runtime.NewScheme()
132+
if err := corev1.AddToScheme(scheme); err != nil {
133+
return fmt.Errorf("error adding to scheme: %v", err)
134+
}
135+
parameterCodec := runtime.NewParameterCodec(scheme)
136+
req.VersionedParams(option, parameterCodec)
137+
138+
exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
139+
if err != nil {
140+
return err
141+
}
142+
return exec.Stream(remotecommand.StreamOptions{
143+
Stdin: opts.Reader,
144+
Stdout: opts.Writer,
145+
Stderr: opts.Writer,
146+
Tty: opts.Tty,
147+
})
148+
}
149+
65150
// GetContainers get containers for a given compose project
66151
func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
67152
fieldSelector := ""
@@ -76,9 +161,39 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all
76161
if err != nil {
77162
return nil, err
78163
}
164+
services := map[string][]compose.PortPublisher{}
79165
result := []compose.ContainerSummary{}
80166
for _, pod := range pods.Items {
81-
result = append(result, podToContainerSummary(pod))
167+
summary := podToContainerSummary(pod)
168+
serviceName := pod.GetObjectMeta().GetLabels()[compose.ServiceTag]
169+
ports, ok := services[serviceName]
170+
if !ok {
171+
s, err := kc.client.CoreV1().Services(kc.namespace).Get(ctx, serviceName, metav1.GetOptions{})
172+
if err != nil {
173+
if !strings.Contains(err.Error(), "not found") {
174+
return nil, err
175+
}
176+
result = append(result, summary)
177+
continue
178+
}
179+
ports = []compose.PortPublisher{}
180+
if s != nil {
181+
if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
182+
if len(s.Status.LoadBalancer.Ingress) > 0 {
183+
port := compose.PortPublisher{URL: s.Status.LoadBalancer.Ingress[0].IP}
184+
if len(s.Spec.Ports) > 0 {
185+
port.URL = fmt.Sprintf("%s:%d", port.URL, s.Spec.Ports[0].Port)
186+
port.TargetPort = s.Spec.Ports[0].TargetPort.IntValue()
187+
port.Protocol = string(s.Spec.Ports[0].Protocol)
188+
}
189+
ports = append(ports, port)
190+
}
191+
}
192+
}
193+
services[serviceName] = ports
194+
}
195+
summary.Publishers = ports
196+
result = append(result, summary)
82197
}
83198

84199
return result, nil
@@ -161,3 +276,42 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOpti
161276
}
162277
return nil
163278
}
279+
280+
//MapPortsToLocalhost runs a port-forwarder daemon process
281+
func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOptions) error {
282+
stopChannel := make(chan struct{}, 1)
283+
readyChannel := make(chan struct{})
284+
285+
eg, ctx := errgroup.WithContext(ctx)
286+
for serviceName, servicePorts := range opts.Services {
287+
serviceName := serviceName
288+
servicePorts := servicePorts
289+
pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
290+
if err != nil {
291+
return err
292+
}
293+
eg.Go(func() error {
294+
ports := []string{}
295+
for _, p := range servicePorts {
296+
ports = append(ports, fmt.Sprintf("%d:%d", p.PublishedPort, p.TargetPort))
297+
}
298+
299+
req := kc.client.CoreV1().RESTClient().Post().
300+
Resource("pods").
301+
Name(pod.Name).
302+
Namespace(kc.namespace).
303+
SubResource("portforward")
304+
transport, upgrader, err := spdy.RoundTripperFor(kc.config)
305+
if err != nil {
306+
return err
307+
}
308+
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
309+
fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, os.Stdout, os.Stderr)
310+
if err != nil {
311+
return err
312+
}
313+
return fw.ForwardPorts()
314+
})
315+
}
316+
return eg.Wait()
317+
}

kube/client/utils.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,27 @@ import (
2828
)
2929

3030
func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
31+
state := compose.RUNNING
32+
33+
if pod.DeletionTimestamp != nil {
34+
state = compose.REMOVING
35+
} else {
36+
for _, container := range pod.Status.ContainerStatuses {
37+
if container.State.Waiting != nil || container.State.Terminated != nil {
38+
state = compose.UPDATING
39+
break
40+
}
41+
}
42+
if state == compose.RUNNING && pod.Status.Phase != corev1.PodRunning {
43+
state = string(pod.Status.Phase)
44+
}
45+
}
46+
3147
return compose.ContainerSummary{
3248
ID: pod.GetObjectMeta().GetName(),
3349
Name: pod.GetObjectMeta().GetName(),
3450
Service: pod.GetObjectMeta().GetLabels()[compose.ServiceTag],
35-
State: string(pod.Status.Phase),
51+
State: state,
3652
Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
3753
}
3854
}
@@ -46,6 +62,13 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool,
4662
if len(services) > 0 && !utils.StringContains(services, service) {
4763
continue
4864
}
65+
containersRunning := true
66+
for _, container := range pod.Status.ContainerStatuses {
67+
if container.State.Running == nil {
68+
containersRunning = false
69+
break
70+
}
71+
}
4972
servicePods[service] = pod.Status.Message
5073

5174
if status == compose.REMOVING {
@@ -54,7 +77,7 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool,
5477
if pod.Status.Phase == corev1.PodFailed {
5578
return false, servicePods, fmt.Errorf(pod.Status.Reason)
5679
}
57-
if status == compose.RUNNING && pod.Status.Phase != corev1.PodRunning {
80+
if status == compose.RUNNING && (pod.Status.Phase != corev1.PodRunning || !containersRunning) {
5881
stateReached = false
5982
}
6083
}
@@ -75,3 +98,12 @@ type WaitForStatusOptions struct {
7598
Timeout *time.Duration
7699
Log LogFunc
77100
}
101+
102+
// Ports holds published ports data
103+
type Ports []compose.PortPublisher
104+
105+
// PortMappingOptions holds the port mapping for project services
106+
type PortMappingOptions struct {
107+
ProjectName string
108+
Services map[string]Ports
109+
}

kube/compose.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func NewComposeService() (compose.Service, error) {
7474
func (s *composeService) Up(ctx context.Context, project *types.Project, options compose.UpOptions) error {
7575
w := progress.ContextWriter(ctx)
7676

77-
eventName := "Convert to Helm charts"
77+
eventName := "Convert Compose file to Helm charts"
7878
w.Event(progress.CreatingEvent(eventName))
7979

8080
chart, err := helm.GetChartInMemory(project)
@@ -83,16 +83,31 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
8383
}
8484
w.Event(progress.NewEvent(eventName, progress.Done, ""))
8585

86-
eventName = "Install Helm charts"
87-
w.Event(progress.CreatingEvent(eventName))
88-
89-
err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) {
90-
message := fmt.Sprintf(format, v...)
91-
w.Event(progress.NewEvent(eventName, progress.Done, message))
92-
})
86+
stack, err := s.sdk.Get(project.Name)
87+
if err != nil || stack == nil {
88+
// install stack
89+
eventName = "Install Compose stack"
90+
w.Event(progress.CreatingEvent(eventName))
91+
92+
err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) {
93+
message := fmt.Sprintf(format, v...)
94+
w.Event(progress.NewEvent(eventName, progress.Done, message))
95+
})
96+
97+
} else {
98+
//update stack
99+
eventName = "Updating Compose stack"
100+
w.Event(progress.CreatingEvent(eventName))
101+
102+
err = s.sdk.UpdateChart(project.Name, chart, func(format string, v ...interface{}) {
103+
message := fmt.Sprintf(format, v...)
104+
w.Event(progress.NewEvent(eventName, progress.Done, message))
105+
})
106+
}
93107
if err != nil {
94108
return err
95109
}
110+
96111
w.Event(progress.NewEvent(eventName, progress.Done, ""))
97112

98113
return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
@@ -266,7 +281,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
266281

267282
// Exec executes a command in a running service container
268283
func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) {
269-
return 0, errdefs.ErrNotImplemented
284+
return 0, s.client.Exec(ctx, project.Name, opts)
270285
}
271286

272287
func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error {

kube/helm/helm.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ func (hc *Actions) InstallChart(name string, chart *chart.Chart, logger func(for
8484
return err
8585
}
8686

87+
// UpdateChart upgrades chart
88+
func (hc *Actions) UpdateChart(name string, chart *chart.Chart, logger func(format string, v ...interface{})) error {
89+
err := hc.initialize(logger)
90+
if err != nil {
91+
return err
92+
}
93+
94+
actUpgrade := action.NewUpgrade(hc.Config)
95+
actUpgrade.Namespace = hc.Namespace
96+
_, err = actUpgrade.Run(name, chart, map[string]interface{}{})
97+
return err
98+
}
99+
87100
// Uninstall uninstall chart
88101
func (hc *Actions) Uninstall(name string, logger func(format string, v ...interface{})) error {
89102
err := hc.initialize(logger)

kube/resources/kube.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ func mapToService(project *types.Project, service types.ServiceConfig) *core.Ser
9494
}
9595
ports = append(ports,
9696
core.ServicePort{
97-
Name: fmt.Sprintf("%d-%s", p.Target, strings.ToLower(p.Protocol)),
98-
Port: int32(p.Target),
97+
Name: fmt.Sprintf("%d-%s", p.Published, strings.ToLower(p.Protocol)),
98+
Port: int32(p.Published),
9999
TargetPort: intstr.FromInt(int(p.Target)),
100100
Protocol: toProtocol(p.Protocol),
101101
})

0 commit comments

Comments
 (0)