Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit ac37bb2

Browse files
daxmc99keegancsmithAlex Russell-Saw
authored
Switch to k8s client-go (#15486)
* Drop github.com/ericchiang/k8s for the official client * Remove github.com/ericchiang/k8s from endpoints * small fixes * tests pass * dependency update * update ns func * mod * tidy * pin dependency * Update cmd/frontend/internal/app/debugproxies/scanner.go Co-authored-by: Keegan Carruthers-Smith <[email protected]> * Update cmd/frontend/internal/app/debugproxies/scanner.go Co-authored-by: Keegan Carruthers-Smith <[email protected]> * comments Co-authored-by: Keegan Carruthers-Smith <[email protected]> Co-authored-by: Alex Russell-Saw <[email protected]>
1 parent aedcaf7 commit ac37bb2

File tree

5 files changed

+335
-547
lines changed

5 files changed

+335
-547
lines changed
Lines changed: 57 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package debugproxies
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
6+
"io/ioutil"
77
"strconv"
8+
"strings"
89
"time"
910

10-
"github.com/ericchiang/k8s"
11-
corev1 "github.com/ericchiang/k8s/apis/core/v1"
1211
"github.com/inconshreveable/log15"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/watch"
14+
"k8s.io/client-go/kubernetes"
15+
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
16+
"k8s.io/client-go/rest"
1317
)
1418

1519
// Represents an endpoint
@@ -26,47 +30,21 @@ type Endpoint struct {
2630
// ScanConsumer is the callback to consume scan results.
2731
type ScanConsumer func([]Endpoint)
2832

29-
// Declares methods we use with k8s.Client. Useful to plug testing replacements or even logging middleware.
30-
type kubernetesClient interface {
31-
Watch(ctx context.Context, namespace string, r k8s.Resource, options ...k8s.Option) (*k8s.Watcher, error)
32-
List(ctx context.Context, namespace string, resp k8s.ResourceList, options ...k8s.Option) error
33-
Get(ctx context.Context, namespace, name string, resp k8s.Resource, options ...k8s.Option) error
34-
Namespace() string
35-
}
36-
37-
// "real" implementation that sends calls to the k8s.Client
38-
type k8sClientImpl struct {
39-
client *k8s.Client
40-
}
41-
42-
func (kci *k8sClientImpl) Watch(ctx context.Context, namespace string, r k8s.Resource, options ...k8s.Option) (*k8s.Watcher, error) {
43-
return kci.client.Watch(ctx, namespace, r, options...)
44-
}
45-
46-
func (kci *k8sClientImpl) List(ctx context.Context, namespace string, resp k8s.ResourceList, options ...k8s.Option) error {
47-
return kci.client.List(ctx, namespace, resp, options...)
48-
}
49-
50-
func (kci *k8sClientImpl) Get(ctx context.Context, namespace, name string, resp k8s.Resource, options ...k8s.Option) error {
51-
return kci.client.Get(ctx, namespace, name, resp, options...)
52-
}
53-
54-
func (kci *k8sClientImpl) Namespace() string {
55-
return kci.client.Namespace
56-
}
57-
5833
// clusterScanner scans the cluster for endpoints belonging to services that have annotation sourcegraph.prometheus/scrape=true.
5934
// It runs an event loop that reacts to changes to the endpoints set. Everytime there is a change it calls the ScanConsumer.
6035
type clusterScanner struct {
61-
client kubernetesClient
62-
consume ScanConsumer
36+
client v1.CoreV1Interface
37+
namespace string
38+
consume ScanConsumer
6339
}
6440

6541
// Starts a cluster scanner with the specified client and consumer. Does not block.
66-
func startClusterScannerWithClient(client kubernetesClient, consumer ScanConsumer) error {
42+
func startClusterScannerWithClient(client *kubernetes.Clientset, ns string, consumer ScanConsumer) error {
43+
6744
cs := &clusterScanner{
68-
client: client,
69-
consume: consumer,
45+
client: client.CoreV1(),
46+
namespace: ns,
47+
consume: consumer,
7048
}
7149

7250
go cs.runEventLoop()
@@ -75,13 +53,18 @@ func startClusterScannerWithClient(client kubernetesClient, consumer ScanConsume
7553

7654
// Starts a cluster scanner with the specified consumer. Does not block.
7755
func StartClusterScanner(consumer ScanConsumer) error {
78-
client, err := k8s.NewInClusterClient()
56+
config, err := rest.InClusterConfig()
57+
if err != nil {
58+
return err
59+
}
60+
ns := namespace()
61+
// access to K8s clients
62+
clientset, err := kubernetes.NewForConfig(config)
7963
if err != nil {
8064
return err
8165
}
8266

83-
kci := &k8sClientImpl{client: client}
84-
return startClusterScannerWithClient(kci, consumer)
67+
return startClusterScannerWithClient(clientset, ns, consumer)
8568
}
8669

8770
// Runs the k8s.Watch endpoints event loop, and triggers a rescan of cluster when something changes with endpoints.
@@ -102,21 +85,23 @@ func (cs *clusterScanner) runEventLoop() {
10285
// watchEndpointEvents uses the k8s watch API operation to watch for endpoint events. Spins forever unless an error
10386
// occurs that would necessitate creating a new watcher. The caller will then call again creating the new watcher.
10487
func (cs *clusterScanner) watchEndpointEvents() (bool, error) {
105-
watcher, err := cs.client.Watch(context.Background(), cs.client.Namespace(), new(corev1.Endpoints))
88+
89+
// TODO(Dax): Rewrite this to used NewSharedInformerFactory from k8s/client-go
90+
91+
watcher, err := cs.client.Endpoints(metav1.NamespaceAll).Watch(metav1.ListOptions{})
10692
if err != nil {
10793
return false, fmt.Errorf("k8s client.Watch error: %w", err)
10894
}
109-
defer watcher.Close()
95+
defer watcher.Stop()
11096

11197
for {
112-
var eps corev1.Endpoints
113-
eventType, err := watcher.Next(&eps)
98+
event := <-watcher.ResultChan()
11499
if err != nil {
115100
// we need a new watcher
116101
return true, fmt.Errorf("k8s watcher.Next error: %w", err)
117102
}
118103

119-
if eventType == k8s.EventError {
104+
if event.Type == watch.Error {
120105
// we need a new watcher
121106
return true, errors.New("error event")
122107
}
@@ -128,66 +113,63 @@ func (cs *clusterScanner) watchEndpointEvents() (bool, error) {
128113
// scanCluster looks for endpoints belonging to services that have annotation sourcegraph.prometheus/scrape=true.
129114
// It derives the appropriate port from the prometheus.io/port annotation.
130115
func (cs *clusterScanner) scanCluster() {
131-
var services corev1.ServiceList
132116

133-
err := cs.client.List(context.Background(), cs.client.Namespace(), &services)
117+
// Get services from all namespaces
118+
services, err := cs.client.Services(cs.namespace).List(metav1.ListOptions{})
134119
if err != nil {
135120
log15.Error("k8s failed to list services", "error", err)
136-
return
137121
}
138122

139123
var scanResults []Endpoint
140124

141125
for _, svc := range services.Items {
142-
svcName := *svc.Metadata.Name
126+
svcName := svc.Name
143127

144128
// TODO(uwedeportivo): pgsql doesn't work, figure out why
145129
if svcName == "pgsql" {
146130
continue
147131
}
148132

149-
if svc.Metadata.Annotations["sourcegraph.prometheus/scrape"] != "true" {
133+
if svc.Annotations["sourcegraph.prometheus/scrape"] != "true" {
150134
continue
151135
}
152136

153137
var port int
154-
if portStr := svc.Metadata.Annotations["prometheus.io/port"]; portStr != "" {
138+
if portStr := svc.Annotations["prometheus.io/port"]; portStr != "" {
155139
port, err = strconv.Atoi(portStr)
156140
if err != nil {
157141
log15.Debug("k8s prometheus.io/port annotation for service is not an integer", "service", svcName, "port", portStr)
158142
continue
159143
}
160144
}
161145

162-
var endpoints corev1.Endpoints
163-
err = cs.client.Get(context.Background(), cs.client.Namespace(), svcName, &endpoints)
146+
endpoints, err := cs.client.Endpoints(cs.namespace).Get(svcName, metav1.GetOptions{})
164147
if err != nil {
165148
log15.Error("k8s failed to get endpoints", "error", err)
166149
return
167150
}
168-
169151
for _, subset := range endpoints.Subsets {
170152
var ports []int
171153
if port != 0 {
172154
ports = []int{port}
173155
} else {
174-
for _, port := range subset.GetPorts() {
175-
ports = append(ports, int(port.GetPort()))
156+
for _, port := range subset.Ports {
157+
ports = append(ports, int(port.Port))
176158
}
177159
}
178160

179161
for _, addr := range subset.Addresses {
180162
for _, port := range ports {
181-
addrStr := fromStrPtr(addr.Ip)
163+
addrStr := addr.IP
182164
if addrStr == "" {
183-
addrStr = fromStrPtr(addr.Hostname)
165+
addrStr = addr.Hostname
184166
}
185167

186168
if addrStr != "" {
187169
scanResults = append(scanResults, Endpoint{
188170
Service: svcName,
189171
Addr: fmt.Sprintf("%s:%d", addrStr, port),
190-
Hostname: fromStrPtr(addr.Hostname),
172+
Hostname: addr.Hostname,
191173
})
192174
}
193175
}
@@ -198,10 +180,21 @@ func (cs *clusterScanner) scanCluster() {
198180
cs.consume(scanResults)
199181
}
200182

201-
// fromStrPtr returns *s. If s is nil the empty string is returned.
202-
func fromStrPtr(s *string) string {
203-
if s == nil {
204-
return ""
183+
// namespace returns the namespace the pod is currently running in
184+
// this is done because the k8s client we previously used set the namespace
185+
// when the client was created, the official k8s client does not
186+
func namespace() string {
187+
const filename = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
188+
data, err := ioutil.ReadFile(filename)
189+
if err != nil {
190+
log15.Warn("scanner: falling back to kubernetes default namespace", "filename", filename, "error", err)
191+
return "default"
192+
}
193+
194+
ns := strings.TrimSpace(string(data))
195+
if ns == "" {
196+
log15.Warn("file: ", filename, " empty using \"default\" ns")
197+
return "default"
205198
}
206-
return *s
199+
return ns
207200
}

0 commit comments

Comments
 (0)