Skip to content

Commit a3e79da

Browse files
committed
refactor(console): replace steve aggregator with proxy manager for cluster console
- Replace steve server aggregation with k8s proxy implementation - Add new ProxyManager to handle cluster console requests - Remove unused steve wrapper and related dependencies - Update import paths to use ticket constants instead of apistructs directly - Add proper error handling for missing or invalid cluster names - Improve probe agent pod lookup with dedicated helper function - Add constants for probe agent labels and container configuration - Refactor exec URL construction and query parameter building - Update Dockerfile base image sources to use mirror registries - Add .dockerignore file with build cache directories - Update .gitignore with go cache directories - Remove syntax directive from Dockerfile - Change yaml.v2 dependency from indirect to direct in go.mod
1 parent 860011c commit a3e79da

File tree

9 files changed

+281
-271
lines changed

9 files changed

+281
-271
lines changed

.dockerignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
dist/
2+
3+
# build output
4+
/bin
5+
bin
6+
7+
# go cache
8+
.gocache/
9+
.gomodcache/

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,7 @@ go.sum
4040
# for local debug
4141
/.env
4242
/.env.*
43+
44+
# go cache
45+
.gocache/
46+
.gomodcache/

Dockerfile

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
# syntax = docker/dockerfile:1.2
2-
31
# Build the manager binary
4-
FROM golang:1.16 as builder
2+
FROM m.daocloud.io/docker.io/golang:1.16 AS builder
53

64
ARG APP
75
WORKDIR /workspace
@@ -14,8 +12,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
1412
--mount=type=cache,target=/go/pkg/mod \
1513
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -mod readonly -a -o ${APP} ./cmd/${APP}/${APP}.go
1614

17-
#FROM centos:7
18-
FROM kubeprober/alpine:v3.9
15+
FROM registry.erda.cloud/retag/kubeprober/alpine:v3.9
1916

2017
ARG ARCH=amd64
2118
ARG APP

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ require (
2727
github.com/spf13/viper v1.8.1
2828
github.com/stretchr/testify v1.7.0
2929
go.uber.org/zap v1.17.0
30-
gopkg.in/yaml.v2 v2.4.0 // indirect
30+
gopkg.in/yaml.v2 v2.4.0
3131
gotest.tools v2.2.0+incompatible
3232
k8s.io/api v0.21.2
3333
k8s.io/apimachinery v0.21.2
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package ticket
2+
3+
import erda_api "github.com/erda-project/erda/apistructs"
4+
5+
type IssuePriority = erda_api.IssuePriority
6+
type IssueType = erda_api.IssueType
7+
8+
const (
9+
IssueTypeTicket = erda_api.IssueTypeTicket
10+
IssuePriorityUrgent = erda_api.IssuePriorityUrgent
11+
IssuePriorityHigh = erda_api.IssuePriorityHigh
12+
IssuePriorityNormal = erda_api.IssuePriorityNormal
13+
IssuePriorityLow = erda_api.IssuePriorityLow
14+
)
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/url"
8+
"strings"
9+
10+
"github.com/sirupsen/logrus"
11+
"k8s.io/apimachinery/pkg/util/proxy"
12+
"k8s.io/client-go/rest"
13+
"k8s.io/client-go/transport"
14+
15+
kubeproberv1 "github.com/erda-project/kubeprober/apis/v1"
16+
"github.com/erda-project/kubeprober/pkg/probe-master/k8sclient"
17+
dialclient "github.com/erda-project/kubeprober/pkg/probe-master/tunnel-client"
18+
)
19+
20+
type ProxyManager struct {
21+
Ctx context.Context
22+
}
23+
24+
var proxyManager *ProxyManager
25+
26+
// NewProxyManager initializes the proxy manager used by console/exec proxying.
27+
func NewProxyManager(ctx context.Context) *ProxyManager {
28+
proxyManager = &ProxyManager{Ctx: ctx}
29+
return proxyManager
30+
}
31+
32+
func (p *ProxyManager) ProxyRequest(rw http.ResponseWriter, req *http.Request, clusterName string) {
33+
if clusterName == "" {
34+
http.Error(rw, "cluster name is required", http.StatusNotFound)
35+
return
36+
}
37+
38+
cluster, err := k8sclient.GetCluster(clusterName)
39+
if err != nil {
40+
logrus.Errorf("failed to get cluster %s, %v", clusterName, err)
41+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
42+
return
43+
}
44+
if cluster == nil {
45+
http.Error(rw, fmt.Sprintf("cluster %s not found", clusterName), http.StatusNotFound)
46+
return
47+
}
48+
49+
handler, err := newClusterProxyHandler(cluster)
50+
if err != nil {
51+
logrus.Errorf("failed to create proxy handler for cluster %s, %v", clusterName, err)
52+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
53+
return
54+
}
55+
56+
handler.ServeHTTP(rw, req)
57+
}
58+
59+
func newClusterProxyHandler(cluster *kubeproberv1.Cluster) (http.Handler, error) {
60+
restConfig, err := dialclient.GenerateProbeClientConf(cluster)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
return newK8sProxyHandler(cluster.Name, restConfig)
66+
}
67+
68+
func newK8sProxyHandler(clusterName string, cfg *rest.Config) (http.Handler, error) {
69+
host := cfg.Host
70+
if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
71+
host = "https://" + host
72+
}
73+
if !strings.HasSuffix(host, "/") {
74+
host = host + "/"
75+
}
76+
target, err := url.Parse(host)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
transportRT, err := rest.TransportFor(cfg)
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
upgradeTransport, err := makeUpgradeTransport(cfg, transportRT)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
proxyHandler := proxy.NewUpgradeAwareHandler(target, transportRT, false, false, &proxyResponder{})
92+
proxyHandler.UpgradeTransport = upgradeTransport
93+
proxyHandler.UseRequestLocation = true
94+
proxyHandler.UseLocationHost = true
95+
96+
handler := http.Handler(proxyHandler)
97+
if len(target.Path) > 1 {
98+
handler = prependPath(target.Path[:len(target.Path)-1], handler)
99+
}
100+
101+
prefix := clusterURLPrefix(clusterName)
102+
if len(prefix) > 2 {
103+
handler = stripLeaveSlash(prefix, handler)
104+
}
105+
106+
return proxyHeaders(handler), nil
107+
}
108+
109+
type proxyResponder struct{}
110+
111+
func (r *proxyResponder) Error(w http.ResponseWriter, req *http.Request, err error) {
112+
logrus.Errorf("error while proxying request: %v", err)
113+
http.Error(w, err.Error(), http.StatusInternalServerError)
114+
}
115+
116+
func clusterURLPrefix(clusterName string) string {
117+
return "/api/k8s/clusters/" + clusterName
118+
}
119+
120+
func proxyHeaders(handler http.Handler) http.Handler {
121+
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
122+
req.Header.Del("Authorization")
123+
if req.Header.Get("X-Forwarded-Proto") == "" && req.TLS != nil {
124+
req.Header.Set("X-Forwarded-Proto", "https")
125+
}
126+
handler.ServeHTTP(rw, req)
127+
})
128+
}
129+
130+
func prependPath(prefix string, handler http.Handler) http.Handler {
131+
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
132+
if len(req.URL.Path) > 1 {
133+
req.URL.Path = prefix + req.URL.Path
134+
} else {
135+
req.URL.Path = prefix
136+
}
137+
handler.ServeHTTP(rw, req)
138+
})
139+
}
140+
141+
func stripLeaveSlash(prefix string, handler http.Handler) http.Handler {
142+
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
143+
path := strings.TrimPrefix(req.URL.Path, prefix)
144+
if len(path) > 0 && path[:1] != "/" {
145+
path = "/" + path
146+
}
147+
req.URL.Path = path
148+
handler.ServeHTTP(rw, req)
149+
})
150+
}
151+
152+
func makeUpgradeTransport(cfg *rest.Config, rt http.RoundTripper) (proxy.UpgradeRequestRoundTripper, error) {
153+
transportConfig, err := cfg.TransportConfig()
154+
if err != nil {
155+
return nil, err
156+
}
157+
158+
upgrader, err := transport.HTTPWrappersForConfig(transportConfig, proxy.MirrorRequest)
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
return proxy.NewUpgradeRequestRoundTripper(rt, upgrader), nil
164+
}

pkg/probe-master/tunnel-server/handler/console.go

Lines changed: 78 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,93 +25,115 @@ import (
2525
v1 "k8s.io/api/core/v1"
2626
"sigs.k8s.io/controller-runtime/pkg/client"
2727

28-
"github.com/erda-project/erda/apistructs"
2928
"github.com/erda-project/kubeprober/pkg/probe-master/k8sclient"
3029
dialclient "github.com/erda-project/kubeprober/pkg/probe-master/tunnel-client"
3130
)
3231

32+
const (
33+
probeAgentLabelKey = "app"
34+
probeAgentLabelValue = "probe-agent"
35+
probeAgentContainer = "probe-agent"
36+
execCommandScript = "kubectl-shell.sh"
37+
)
38+
3339
func ClusterConsole(rw http.ResponseWriter, req *http.Request) {
34-
// TODO make blow correct
35-
vars := mux.Vars(req)
36-
clusterName := vars["clusterName"]
40+
clusterName := mux.Vars(req)["clusterName"]
41+
if clusterName == "" {
42+
http.Error(rw, "cluster name is required", http.StatusNotFound)
43+
return
44+
}
3745

3846
cluster, err := k8sclient.GetCluster(clusterName)
3947
if err != nil {
40-
errMsg := fmt.Sprintf("[cluster console] failed to list cluster with name: %s", clusterName)
41-
logrus.Errorf(errMsg)
42-
rw.Write([]byte(errMsg))
43-
rw.WriteHeader(http.StatusInternalServerError)
48+
logrus.Errorf("[cluster console] failed to get cluster %s: %v", clusterName, err)
49+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
4450
return
4551
}
4652
if cluster == nil {
47-
errMsg := fmt.Sprintf("[cluster console] failed to find cluster with name: %s\n", clusterName)
48-
rw.Write([]byte(errMsg))
49-
rw.WriteHeader(http.StatusBadRequest)
50-
return
51-
}
52-
53-
token := cluster.Spec.ClusterConfig.Token
54-
if token == "" {
55-
errMsg := fmt.Sprintf("[cluster console] invalid token for cluster with name: %s\n", clusterName)
56-
rw.Write([]byte(errMsg))
57-
rw.WriteHeader(http.StatusInternalServerError)
53+
http.Error(rw, fmt.Sprintf("cluster %s not found", clusterName), http.StatusBadRequest)
5854
return
5955
}
6056

61-
t, err := base64.StdEncoding.DecodeString(cluster.Spec.ClusterConfig.Token)
57+
token, err := decodeClusterToken(clusterName, cluster.Spec.ClusterConfig.Token)
6258
if err != nil {
63-
errMsg := fmt.Sprintf("[cluster console] invalid token for cluster: %s\n", clusterName)
64-
rw.Write([]byte(errMsg))
65-
rw.WriteHeader(http.StatusInternalServerError)
59+
logrus.Errorf("[cluster console] invalid token for cluster %s: %v", clusterName, err)
60+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
6661
return
6762
}
68-
token = string(t)
6963

7064
clusterclient, err := dialclient.GenerateProbeClient(cluster)
7165
if err != nil {
72-
errMsg := fmt.Sprintf("[cluster console] invalid token for cluster with name: %s\n", clusterName)
73-
rw.Write([]byte(errMsg))
74-
rw.WriteHeader(http.StatusInternalServerError)
66+
logrus.Errorf("[cluster console] failed to build k8s client for cluster %s: %v", clusterName, err)
67+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
7568
return
7669
}
77-
podList := &v1.PodList{}
7870

79-
err = clusterclient.List(context.Background(), podList,
80-
client.InNamespace(cluster.Spec.ClusterConfig.ProbeNamespaces),
81-
client.MatchingLabels{"app": "probe-agent"})
71+
pod, err := findRunningProbeAgent(req.Context(), clusterclient, cluster.Spec.ClusterConfig.ProbeNamespaces)
8272
if err != nil {
83-
errMsg := fmt.Sprintf("[cluster console] failed to find probe-agent pod for cluster with name: %s\n", clusterName)
84-
rw.Write([]byte(errMsg))
85-
rw.WriteHeader(http.StatusInternalServerError)
73+
logrus.Errorf("[cluster console] failed to list probe-agent pods for cluster %s: %v", clusterName, err)
74+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
75+
return
76+
}
77+
if pod == nil {
78+
logrus.Errorf("failed to find a ready probe-agent pod for cluster %s", clusterName)
79+
http.Error(rw, fmt.Sprintf("cluster %s does not have a ready probe-agent pod", clusterName), http.StatusInternalServerError)
8680
return
8781
}
8882

89-
for _, pod := range podList.Items {
90-
if pod.Status.Phase != v1.PodRunning {
91-
continue
92-
}
83+
req.URL.Path = execURLPath(clusterName, pod.Namespace, pod.Name)
84+
req.URL.RawQuery = execQuery(token).Encode()
85+
86+
if proxyManager == nil {
87+
logrus.Errorf("proxy manager not initialized for cluster %s", clusterName)
88+
http.Error(rw, "Internal server error", http.StatusInternalServerError)
89+
return
90+
}
9391

94-
vars := url.Values{}
95-
vars.Add("container", "probe-agent")
96-
vars.Add("stdout", "1")
97-
vars.Add("stdin", "1")
98-
vars.Add("stderr", "1")
99-
vars.Add("tty", "1")
100-
vars.Add("command", "kubectl-shell.sh")
101-
vars.Add("command", token)
92+
proxyManager.ProxyRequest(rw, req, clusterName)
93+
}
10294

103-
path := fmt.Sprintf("/api/k8s/clusters/%s/api/v1/namespaces/%s/pods/%s/exec", clusterName, pod.Namespace, pod.Name)
95+
func decodeClusterToken(clusterName, encoded string) (string, error) {
96+
if encoded == "" {
97+
return "", fmt.Errorf("empty token for cluster %s", clusterName)
98+
}
99+
decoded, err := base64.StdEncoding.DecodeString(encoded)
100+
if err != nil {
101+
return "", err
102+
}
103+
return string(decoded), nil
104+
}
104105

105-
req.URL.Path = path
106-
req.URL.RawQuery = vars.Encode()
106+
func findRunningProbeAgent(ctx context.Context, c client.Client, namespace string) (*v1.Pod, error) {
107+
podList := &v1.PodList{}
108+
err := c.List(ctx, podList,
109+
client.InNamespace(namespace),
110+
client.MatchingLabels{probeAgentLabelKey: probeAgentLabelValue})
111+
if err != nil {
112+
return nil, err
113+
}
107114

108-
a.ServeHTTP(rw, req)
109-
return
115+
for i := range podList.Items {
116+
pod := &podList.Items[i]
117+
if pod.Status.Phase == v1.PodRunning {
118+
return pod, nil
119+
}
110120
}
111121

112-
logrus.Errorf("failed to find a ready probe-agent pod for cluster %s", clusterName)
113-
rw.WriteHeader(http.StatusInternalServerError)
114-
rw.Write(apistructs.NewSteveError(apistructs.ServerError,
115-
fmt.Sprintf("cluster %s does not have a ready probe-agent pod", clusterName)).JSON())
116-
return
122+
return nil, nil
123+
}
124+
125+
func execQuery(token string) url.Values {
126+
query := url.Values{}
127+
query.Add("container", probeAgentContainer)
128+
query.Add("stdout", "1")
129+
query.Add("stdin", "1")
130+
query.Add("stderr", "1")
131+
query.Add("tty", "1")
132+
query.Add("command", execCommandScript)
133+
query.Add("command", token)
134+
return query
135+
}
136+
137+
func execURLPath(clusterName, namespace, podName string) string {
138+
return fmt.Sprintf("/api/k8s/clusters/%s/api/v1/namespaces/%s/pods/%s/exec", clusterName, namespace, podName)
117139
}

0 commit comments

Comments
 (0)