Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 0a681cd

Browse files
aybidipingsutw
andauthored
feat: add support for inter-cluster communication for Ray plugin (#321)
* feat: add inter-cluster com to ray plugin Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> * test: add tests for remote cluster Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> * refactor: move auth to config Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> * fix description Co-authored-by: Kevin Su <pingsutw@gmail.com> Signed-off-by: Abdullah Mobeen <a.mobeenn@gmail.com> * refactor: move clusterconfig and auth to pluginmachinery k8s Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> * refactor: move clusterconfig and auth to pluginmachinery k8s Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> * chore: remove commented out code Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> --------- Signed-off-by: Abdullah Mobeen <amobeen@spotify.com> Signed-off-by: Abdullah Mobeen <a.mobeenn@gmail.com> Co-authored-by: Kevin Su <pingsutw@gmail.com>
1 parent f0db511 commit 0a681cd

File tree

5 files changed

+109
-0
lines changed

5 files changed

+109
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package k8s
2+
3+
import (
4+
"fmt"
5+
"io/ioutil"
6+
7+
"github.com/pkg/errors"
8+
restclient "k8s.io/client-go/rest"
9+
)
10+
11+
type ClusterConfig struct {
12+
Name string `json:"name" pflag:",Friendly name of the remote cluster"`
13+
Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
14+
Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"`
15+
Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"`
16+
}
17+
18+
type Auth struct {
19+
TokenPath string `json:"tokenPath" pflag:", Token path"`
20+
CaCertPath string `json:"caCertPath" pflag:", Certificate path"`
21+
}
22+
23+
func (auth Auth) GetCA() ([]byte, error) {
24+
cert, err := ioutil.ReadFile(auth.CaCertPath)
25+
if err != nil {
26+
return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path")
27+
}
28+
return cert, nil
29+
}
30+
31+
func (auth Auth) GetToken() (string, error) {
32+
token, err := ioutil.ReadFile(auth.TokenPath)
33+
if err != nil {
34+
return "", errors.Wrap(err, "failed to read k8s bearer token from configured path")
35+
}
36+
return string(token), nil
37+
}
38+
39+
// KubeClientConfig ...
40+
func KubeClientConfig(host string, auth Auth) (*restclient.Config, error) {
41+
tokenString, err := auth.GetToken()
42+
if err != nil {
43+
return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err))
44+
}
45+
46+
caCert, err := auth.GetCA()
47+
if err != nil {
48+
return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err))
49+
}
50+
51+
tlsClientConfig := restclient.TLSClientConfig{}
52+
tlsClientConfig.CAData = caCert
53+
return &restclient.Config{
54+
Host: host,
55+
TLSClientConfig: tlsClientConfig,
56+
BearerToken: tokenString,
57+
}, nil
58+
}

go/tasks/plugins/k8s/ray/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ray
22

33
import (
44
pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
5+
pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
56
)
67

78
//go:generate pflags Config --default-var=defaultConfig
@@ -40,6 +41,9 @@ type Config struct {
4041

4142
// NodeIPAddress the IP address of the head node. By default, this is pod ip address.
4243
NodeIPAddress string `json:"nodeIPAddress,omitempty"`
44+
45+
// Remote Ray Cluster Config
46+
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
4347
}
4448

4549
func GetConfig() *Config {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ray
2+
3+
import (
4+
"testing"
5+
6+
pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
7+
"gotest.tools/assert"
8+
)
9+
10+
func TestLoadConfig(t *testing.T) {
11+
rayConfig := GetConfig()
12+
assert.Assert(t, rayConfig != nil)
13+
14+
t.Run("remote cluster", func(t *testing.T) {
15+
config := GetConfig()
16+
remoteConfig := pluginmachinery.ClusterConfig{
17+
Enabled: false,
18+
Endpoint: "",
19+
Auth: pluginmachinery.Auth{
20+
TokenPath: "",
21+
CaCertPath: "",
22+
},
23+
}
24+
assert.DeepEqual(t, config.RemoteClusterConfig, remoteConfig)
25+
})
26+
}

go/tasks/plugins/k8s/ray/ray.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,5 +371,19 @@ func init() {
371371
ResourceToWatch: &rayv1alpha1.RayJob{},
372372
Plugin: rayJobResourceHandler{},
373373
IsDefault: false,
374+
CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) {
375+
remoteConfig := GetConfig().RemoteClusterConfig
376+
if !remoteConfig.Enabled {
377+
// use controller-runtime KubeClient
378+
return nil, nil
379+
}
380+
381+
kubeConfig, err := k8s.KubeClientConfig(remoteConfig.Endpoint, remoteConfig.Auth)
382+
if err != nil {
383+
return nil, err
384+
}
385+
386+
return k8s.NewDefaultKubeClient(kubeConfig)
387+
},
374388
})
375389
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
plugins:
2+
ray:
3+
remoteClusterConfig:
4+
endpoint: 127.0.0.1
5+
auth:
6+
tokenPath: /path/token
7+
caCertPath: /path/cert

0 commit comments

Comments
 (0)