Skip to content

Commit a4ae811

Browse files
committed
add cluster context to config resource to allow multi-cluster setup
1 parent 34dc9ba commit a4ae811

File tree

6 files changed

+65
-44
lines changed

6 files changed

+65
-44
lines changed

controller/controller_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func TestControllerRun(t *testing.T) {
3333
Resource: provider.Resource{
3434
Name: "a",
3535
Namespace: "y",
36+
Cluster: "m",
3637
},
3738
IPAddresses: map[string]*net.IPNet{
3839
netA.String(): netA,
@@ -55,6 +56,7 @@ func TestControllerRun(t *testing.T) {
5556
require.Contains(t, controller.configsCache, provider.Resource{
5657
Name: "a",
5758
Namespace: "y",
59+
Cluster: "m",
5860
})
5961

6062
// test adding the an egress config.
@@ -65,6 +67,7 @@ func TestControllerRun(t *testing.T) {
6567
Resource: provider.Resource{
6668
Name: "a",
6769
Namespace: "x",
70+
Cluster: "m",
6871
},
6972
IPAddresses: map[string]*net.IPNet{
7073
netA.String(): netA,
@@ -78,6 +81,7 @@ func TestControllerRun(t *testing.T) {
7881
require.Contains(t, controller.configsCache, provider.Resource{
7982
Name: "a",
8083
Namespace: "x",
84+
Cluster: "m",
8185
})
8286

8387
// test removing the config
@@ -88,6 +92,7 @@ func TestControllerRun(t *testing.T) {
8892
Resource: provider.Resource{
8993
Name: "a",
9094
Namespace: "x",
95+
Cluster: "m",
9196
},
9297
}
9398
cancel()

kube/configmap.go

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ import (
1616
)
1717

1818
type ConfigMapWatcher struct {
19-
clients []kubernetes.Interface
19+
clients map[string]kubernetes.Interface
2020
namespace string
2121
selector fields.Selector
2222
configs chan provider.EgressConfig
2323
}
2424

25-
func NewConfigMapWatcher(clients []kubernetes.Interface, namespace, selectorStr string, configs chan provider.EgressConfig) (*ConfigMapWatcher, error) {
25+
func NewConfigMapWatcher(clients map[string]kubernetes.Interface, namespace, selectorStr string, configs chan provider.EgressConfig) (*ConfigMapWatcher, error) {
2626
selector, err := fields.ParseSelector(selectorStr)
2727
if err != nil {
2828
return nil, err
@@ -37,12 +37,12 @@ func NewConfigMapWatcher(clients []kubernetes.Interface, namespace, selectorStr
3737
}
3838

3939
func (c *ConfigMapWatcher) Run(ctx context.Context) {
40-
for _, client := range c.clients {
41-
c.runForClient(ctx, client)
40+
for cluster, client := range c.clients {
41+
c.runForClient(ctx, client, cluster)
4242
}
4343
}
4444

45-
func (c *ConfigMapWatcher) runForClient(ctx context.Context, client kubernetes.Interface) {
45+
func (c *ConfigMapWatcher) runForClient(ctx context.Context, client kubernetes.Interface, cluster string) {
4646
informer := cache.NewSharedIndexInformer(
4747
&cache.ListWatch{
4848
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@@ -60,9 +60,9 @@ func (c *ConfigMapWatcher) runForClient(ctx context.Context, client kubernetes.I
6060
)
6161

6262
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
63-
AddFunc: c.add,
64-
UpdateFunc: c.update,
65-
DeleteFunc: c.del,
63+
AddFunc: c.add(cluster),
64+
UpdateFunc: c.update(cluster),
65+
DeleteFunc: c.del(cluster),
6666
})
6767

6868
go informer.Run(ctx.Done())
@@ -75,45 +75,52 @@ func (c *ConfigMapWatcher) runForClient(ctx context.Context, client kubernetes.I
7575
log.Info("Synced ConfigMap watcher")
7676
}
7777

78-
func (c *ConfigMapWatcher) add(obj interface{}) {
79-
cm, ok := obj.(*v1.ConfigMap)
80-
if !ok {
81-
log.Errorf("Failed to get ConfigMap object")
82-
return
83-
}
78+
func (c *ConfigMapWatcher) add(cluster string) func(obj interface{}) {
79+
return func(obj interface{}) {
80+
cm, ok := obj.(*v1.ConfigMap)
81+
if !ok {
82+
log.Errorf("Failed to get ConfigMap object")
83+
return
84+
}
8485

85-
c.configs <- configMapToEgressConfig(cm)
86+
c.configs <- configMapToEgressConfig(cm, cluster)
87+
}
8688
}
8789

88-
func (c *ConfigMapWatcher) update(oldObj, newObj interface{}) {
89-
newCM, ok := newObj.(*v1.ConfigMap)
90-
if !ok {
91-
log.Errorf("Failed to get new ConfigMap object")
92-
return
93-
}
90+
func (c *ConfigMapWatcher) update(cluster string) func(oldObj, newObj interface{}) {
91+
return func(oldObj, newObj interface{}) {
92+
newCM, ok := newObj.(*v1.ConfigMap)
93+
if !ok {
94+
log.Errorf("Failed to get new ConfigMap object")
95+
return
96+
}
9497

95-
c.configs <- configMapToEgressConfig(newCM)
98+
c.configs <- configMapToEgressConfig(newCM, cluster)
99+
}
96100
}
97101

98-
func (c *ConfigMapWatcher) del(obj interface{}) {
99-
cm, ok := obj.(*v1.ConfigMap)
100-
if !ok {
101-
log.Errorf("Failed to get ConfigMap object")
102-
return
103-
}
102+
func (c *ConfigMapWatcher) del(cluster string) func(obj interface{}) {
103+
return func(obj interface{}) {
104+
cm, ok := obj.(*v1.ConfigMap)
105+
if !ok {
106+
log.Errorf("Failed to get ConfigMap object")
107+
return
108+
}
104109

105-
c.configs <- provider.EgressConfig{
106-
Resource: provider.Resource{
107-
Name: cm.Name,
108-
Namespace: cm.Namespace,
109-
},
110+
c.configs <- provider.EgressConfig{
111+
Resource: provider.Resource{
112+
Name: cm.Name,
113+
Namespace: cm.Namespace,
114+
Cluster: cluster,
115+
},
116+
}
110117
}
111118
}
112119

113120
func (c *ConfigMapWatcher) ListConfigs(ctx context.Context) ([]provider.EgressConfig, error) {
114121
egressConfigs := []provider.EgressConfig{}
115-
for _, client := range c.clients {
116-
configs, err := c.listConfigsForClient(ctx, client)
122+
for cluster, client := range c.clients {
123+
configs, err := c.listConfigsForClient(ctx, client, cluster)
117124
if err != nil {
118125
return nil, err
119126
}
@@ -122,7 +129,7 @@ func (c *ConfigMapWatcher) ListConfigs(ctx context.Context) ([]provider.EgressCo
122129
return egressConfigs, nil
123130
}
124131

125-
func (c *ConfigMapWatcher) listConfigsForClient(ctx context.Context, client kubernetes.Interface) ([]provider.EgressConfig, error) {
132+
func (c *ConfigMapWatcher) listConfigsForClient(ctx context.Context, client kubernetes.Interface, cluster string) ([]provider.EgressConfig, error) {
126133
opts := metav1.ListOptions{
127134
LabelSelector: c.selector.String(),
128135
}
@@ -134,7 +141,7 @@ func (c *ConfigMapWatcher) listConfigsForClient(ctx context.Context, client kube
134141

135142
configs := make([]provider.EgressConfig, 0, len(configMaps.Items))
136143
for _, cm := range configMaps.Items {
137-
configs = append(configs, configMapToEgressConfig(&cm))
144+
configs = append(configs, configMapToEgressConfig(&cm, cluster))
138145
}
139146
return configs, nil
140147
}
@@ -143,7 +150,7 @@ func (c *ConfigMapWatcher) Config() <-chan provider.EgressConfig {
143150
return c.configs
144151
}
145152

146-
func configMapToEgressConfig(cm *v1.ConfigMap) provider.EgressConfig {
153+
func configMapToEgressConfig(cm *v1.ConfigMap, cluster string) provider.EgressConfig {
147154
ipAddresses := make(map[string]*net.IPNet)
148155
for key, cidr := range cm.Data {
149156
_, ipnet, err := net.ParseCIDR(cidr)
@@ -158,6 +165,7 @@ func configMapToEgressConfig(cm *v1.ConfigMap) provider.EgressConfig {
158165
Resource: provider.Resource{
159166
Name: cm.Name,
160167
Namespace: cm.Namespace,
168+
Cluster: cluster,
161169
},
162170
IPAddresses: ipAddresses,
163171
}

main.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ var (
3636
)
3737

3838
type Config struct {
39-
Masters []string
39+
Masters map[string]string
4040
KubeConfig string
4141
DryRun bool
4242
LogFormat string
@@ -121,7 +121,7 @@ Example:
121121
app.DefaultEnvars()
122122

123123
// Flags related to Kubernetes
124-
app.Flag("master", "The Kubernetes API server to connect to (default: auto-detect)").Default("").StringsVar(&cfg.Masters)
124+
app.Flag("master", "The Kubernetes API server to connect to (default: auto-detect)").Default("").StringMapVar(&cfg.Masters)
125125
app.Flag("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)").Default(defaultConfig.KubeConfig).StringVar(&cfg.KubeConfig)
126126
app.Flag("use-platform-credentials", "Use Platform credentials (default: disabled)").BoolVar(&cfg.UsePlatformCredentials)
127127
app.Flag("credentials-dir", "Directory where the Platform credentials are stored (default: /meta/credentials)").Default(auth.DefaultCredentialsDir).Envar(auth.CredentialsDirEnvar).StringVar(&cfg.CredentialsDir)
@@ -193,15 +193,15 @@ func main() {
193193
}
194194

195195
// newKubeClients returns multiple Kubernetes clients with the given config.
196-
func newKubeClients(cfg *Config) []kubernetes.Interface {
196+
func newKubeClients(cfg *Config) map[string]kubernetes.Interface {
197197
var kubeconfig string
198198
if _, err := os.Stat(clientcmd.RecommendedHomeFile); err == nil {
199199
kubeconfig = clientcmd.RecommendedHomeFile
200200
}
201201
log.Debugf("use config file %s", kubeconfig)
202-
var clients []kubernetes.Interface
203-
for _, master := range cfg.Masters {
204-
clients = append(clients, newKubeClient(cfg, master, kubeconfig))
202+
var clients map[string]kubernetes.Interface
203+
for cluster, master := range cfg.Masters {
204+
clients[cluster] = newKubeClient(cfg, master, kubeconfig)
205205
}
206206
return clients
207207
}

provider/aws/aws_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestGenerateStackSpec(t *testing.T) {
7373
{
7474
Name: "x",
7575
Namespace: "y",
76+
Cluster: "m",
7677
}: {
7778
netA.String(): netA,
7879
},
@@ -158,6 +159,7 @@ func TestGenerateTemplate(t *testing.T) {
158159
{
159160
Name: "x",
160161
Namespace: "y",
162+
Cluster: "m",
161163
}: {
162164
netA.String(): netA,
163165
},
@@ -325,6 +327,7 @@ func TestEnsure(tt *testing.T) {
325327
{
326328
Name: "a",
327329
Namespace: "x",
330+
Cluster: "m",
328331
}: {
329332
netA.String(): netA,
330333
},
@@ -431,6 +434,7 @@ func TestEnsure(tt *testing.T) {
431434
{
432435
Name: "a",
433436
Namespace: "x",
437+
Cluster: "m",
434438
}: {
435439
netA.String(): netA,
436440
netB.String(): netB,
@@ -499,6 +503,7 @@ func TestEnsure(tt *testing.T) {
499503
{
500504
Name: "a",
501505
Namespace: "x",
506+
Cluster: "m",
502507
}: {
503508
netA.String(): netA,
504509
netB.String(): netB,

provider/cidr_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestGenerateRoutes(tt *testing.T) {
2323
{
2424
Name: "a",
2525
Namespace: "x",
26+
Cluster: "m",
2627
}: {
2728
netA.String(): netA,
2829
netB.String(): netB,
@@ -38,6 +39,7 @@ func TestGenerateRoutes(tt *testing.T) {
3839
{
3940
Name: "a",
4041
Namespace: "x",
42+
Cluster: "m",
4143
}: {
4244
netA.String(): netA,
4345
netB.String(): netB,

provider/provider.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
type Resource struct {
99
Name string
1010
Namespace string
11+
Cluster string
1112
}
1213

1314
type EgressConfig struct {

0 commit comments

Comments
 (0)