Skip to content

Commit 2ece87e

Browse files
authored
pilot: watch meshConfig in remote clusters (#58455)
* Add missing permission for watching remote configmaps Signed-off-by: Jacek Ewertowski <[email protected]> * Implement watching remote trust domain Signed-off-by: Jacek Ewertowski <[email protected]> * Add a release note Signed-off-by: Jacek Ewertowski <[email protected]> * Implement restricted mesh watcher Signed-off-by: Jacek Ewertowski <[email protected]> * Make TestWatcher an impl of RestrictedConfigWatcher Signed-off-by: Jacek Ewertowski <[email protected]> --------- Signed-off-by: Jacek Ewertowski <[email protected]>
1 parent e8b2548 commit 2ece87e

File tree

12 files changed

+221
-27
lines changed

12 files changed

+221
-27
lines changed

manifests/charts/istio-control/istio-discovery/templates/reader-clusterrole.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ rules:
2222
resources: ["*"]
2323
verbs: ["get", "list", "watch"]
2424
- apiGroups: [""]
25-
resources: ["endpoints", "pods", "services", "nodes", "replicationcontrollers", "namespaces", "secrets"]
25+
resources: ["endpoints", "pods", "services", "nodes", "replicationcontrollers", "namespaces", "secrets", "configmaps"]
2626
verbs: ["get", "list", "watch"]
2727
- apiGroups: ["networking.istio.io"]
2828
verbs: [ "get", "watch", "list" ]

pilot/pkg/serviceregistry/aggregate/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,12 @@ func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance)
378378
newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID)
379379
dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses)
380380
}
381+
// Merge service accounts from different clusters
382+
// Each cluster may have a different trust domain, so we need to collect all unique service accounts
383+
if len(src.ServiceAccounts) > 0 {
384+
dst.ServiceAccounts = append(dst.ServiceAccounts, src.ServiceAccounts...)
385+
dst.ServiceAccounts = slices.FilterDuplicates(dst.ServiceAccounts)
386+
}
381387
}
382388

383389
// NetworkGateways merges the service-based cross-network gateways from each registry.

pilot/pkg/serviceregistry/aggregate/controller_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,3 +477,118 @@ func TestDeferredRun(t *testing.T) {
477477
expectRunningOrFail(t, ctrl, true)
478478
})
479479
}
480+
481+
func TestMergeServiceWithSameTrustDomain(t *testing.T) {
482+
svc1 := mock.MakeService(mock.ServiceArgs{
483+
Hostname: "test.default.svc.cluster.local",
484+
Address: "10.1.0.1",
485+
ServiceAccounts: []string{"spiffe://cluster.local/ns/default/sa/test-sa"},
486+
ClusterID: "cluster-1",
487+
})
488+
svc2 := mock.MakeService(mock.ServiceArgs{
489+
Hostname: "test.default.svc.cluster.local",
490+
Address: "10.2.0.1",
491+
ServiceAccounts: []string{"spiffe://cluster.local/ns/default/sa/test-sa"},
492+
ClusterID: "cluster-2",
493+
})
494+
495+
discovery1 := memory.NewServiceDiscovery(svc1)
496+
discovery2 := memory.NewServiceDiscovery(svc2)
497+
498+
registry1 := serviceregistry.Simple{
499+
ProviderID: provider.Kubernetes,
500+
ClusterID: "cluster-1",
501+
DiscoveryController: discovery1,
502+
}
503+
504+
registry2 := serviceregistry.Simple{
505+
ProviderID: provider.Kubernetes,
506+
ClusterID: "cluster-2",
507+
DiscoveryController: discovery2,
508+
}
509+
510+
ctrl := NewController(Options{
511+
MeshHolder: &mockMeshConfigHolder{},
512+
})
513+
ctrl.AddRegistry(registry1)
514+
ctrl.AddRegistry(registry2)
515+
516+
mergedSvc := ctrl.GetService(svc1.Hostname)
517+
if mergedSvc == nil {
518+
t.Fatal("Failed to get merged service")
519+
}
520+
521+
expectedClusterVIPs := map[cluster.ID][]string{
522+
"cluster-1": {"10.1.0.1"},
523+
"cluster-2": {"10.2.0.1"},
524+
}
525+
if !reflect.DeepEqual(mergedSvc.ClusterVIPs.Addresses, expectedClusterVIPs) {
526+
t.Errorf("ClusterVIPs mismatch.\nGot: %v\nWant: %v",
527+
mergedSvc.ClusterVIPs.Addresses, expectedClusterVIPs)
528+
}
529+
530+
expectedServiceAccounts := []string{"spiffe://cluster.local/ns/default/sa/test-sa"}
531+
if !reflect.DeepEqual(mergedSvc.ServiceAccounts, expectedServiceAccounts) {
532+
t.Errorf("ServiceAccounts mismatch.\nGot: %v\nWant: %v",
533+
mergedSvc.ServiceAccounts, expectedServiceAccounts)
534+
}
535+
}
536+
537+
func TestMergeServiceWithDistinctTrustDomains(t *testing.T) {
538+
svc1 := mock.MakeService(mock.ServiceArgs{
539+
Hostname: "test.default.svc.cluster.local",
540+
Address: "10.1.0.1",
541+
ServiceAccounts: []string{"spiffe://mesh.east/ns/default/sa/test-sa"},
542+
ClusterID: "cluster-east",
543+
})
544+
svc2 := mock.MakeService(mock.ServiceArgs{
545+
Hostname: "test.default.svc.cluster.local",
546+
Address: "10.2.0.1",
547+
ServiceAccounts: []string{"spiffe://mesh.west/ns/default/sa/test-sa"},
548+
ClusterID: "cluster-west",
549+
})
550+
551+
discovery1 := memory.NewServiceDiscovery(svc1)
552+
discovery2 := memory.NewServiceDiscovery(svc2)
553+
554+
registry1 := serviceregistry.Simple{
555+
ProviderID: provider.Kubernetes,
556+
ClusterID: "cluster-east",
557+
DiscoveryController: discovery1,
558+
}
559+
560+
registry2 := serviceregistry.Simple{
561+
ProviderID: provider.Kubernetes,
562+
ClusterID: "cluster-west",
563+
DiscoveryController: discovery2,
564+
}
565+
566+
ctrl := NewController(Options{
567+
MeshHolder: &mockMeshConfigHolder{},
568+
})
569+
ctrl.AddRegistry(registry1)
570+
ctrl.AddRegistry(registry2)
571+
572+
mergedSvc := ctrl.GetService(svc1.Hostname)
573+
if mergedSvc == nil {
574+
t.Fatal("Failed to get merged service")
575+
}
576+
577+
expectedClusterVIPs := map[cluster.ID][]string{
578+
"cluster-east": {"10.1.0.1"},
579+
"cluster-west": {"10.2.0.1"},
580+
}
581+
if !reflect.DeepEqual(mergedSvc.ClusterVIPs.Addresses, expectedClusterVIPs) {
582+
t.Errorf("ClusterVIPs mismatch.\nGot: %v\nWant: %v",
583+
mergedSvc.ClusterVIPs.Addresses, expectedClusterVIPs)
584+
}
585+
586+
expectedServiceAccounts := []string{
587+
"spiffe://mesh.east/ns/default/sa/test-sa",
588+
"spiffe://mesh.west/ns/default/sa/test-sa",
589+
}
590+
if !reflect.DeepEqual(mergedSvc.ServiceAccounts, expectedServiceAccounts) {
591+
t.Errorf("ServiceAccounts mismatch.\nGot: %v\nWant: %v",
592+
mergedSvc.ServiceAccounts, expectedServiceAccounts)
593+
}
594+
}

pilot/pkg/serviceregistry/kube/controller/controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ type Controller struct {
234234

235235
// initialSyncTimedout is set to true after performing an initial processing timed out.
236236
initialSyncTimedout *atomic.Bool
237-
meshWatcher mesh.Watcher
237+
meshWatcher mesh.RestrictedConfigWatcher
238238

239239
podsClient kclient.Client[*v1.Pod]
240240

@@ -331,7 +331,7 @@ func NewController(kubeClient kubelib.Client, options Options) *Controller {
331331
c.exports = newServiceExportCache(c)
332332
c.imports = newServiceImportCache(c)
333333

334-
c.meshWatcher = options.MeshWatcher
334+
c.meshWatcher = mesh.NewRestrictedConfigWatcher(options.MeshWatcher)
335335
if c.opts.MeshNetworksWatcher != nil {
336336
c.networksHandlerRegistration = c.opts.MeshNetworksWatcher.AddNetworksHandler(func() {
337337
c.reloadMeshNetworks()
@@ -420,7 +420,7 @@ func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) er
420420
log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)
421421

422422
// Create the standard (cluster.local) service.
423-
svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster(), c.meshWatcher.Mesh())
423+
svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster(), c.meshWatcher.TrustDomain())
424424

425425
switch event {
426426
case model.EventDelete:

pilot/pkg/serviceregistry/kube/controller/controller_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2813,16 +2813,16 @@ func TestServiceUpdateNeedsPush(t *testing.T) {
28132813
name: "target ports changed",
28142814
prev: &svc,
28152815
curr: &updatedSvc,
2816-
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
2817-
currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, "", nil),
2816+
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", ""),
2817+
currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, "", ""),
28182818
expect: true,
28192819
},
28202820
testcase{
28212821
name: "target ports unchanged",
28222822
prev: &svc,
28232823
curr: &svc,
2824-
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
2825-
currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
2824+
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", ""),
2825+
currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", ""),
28262826
expect: false,
28272827
})
28282828

pilot/pkg/serviceregistry/kube/controller/endpoint_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (c *Controller) NewEndpointBuilder(pod *v1.Pod) *EndpointBuilder {
5252
var podLabels labels.Instance
5353
if pod != nil {
5454
locality = c.getPodLocality(pod)
55-
sa = kube.SecureNamingSAN(pod, c.meshWatcher.Mesh())
55+
sa = kube.SecureNamingSAN(pod, c.meshWatcher.TrustDomain())
5656
podLabels = pod.Labels
5757
namespace = pod.Namespace
5858
subdomain = pod.Spec.Subdomain

pilot/pkg/serviceregistry/kube/controller/multicluster.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@ import (
3333
"istio.io/istio/pilot/pkg/serviceregistry/provider"
3434
"istio.io/istio/pilot/pkg/serviceregistry/serviceentry"
3535
"istio.io/istio/pkg/backoff"
36+
"istio.io/istio/pkg/config/mesh/kubemesh"
37+
"istio.io/istio/pkg/config/mesh/meshwatcher"
3638
"istio.io/istio/pkg/config/schema/collection"
3739
"istio.io/istio/pkg/config/schema/collections"
3840
kubelib "istio.io/istio/pkg/kube"
41+
"istio.io/istio/pkg/kube/krt"
3942
"istio.io/istio/pkg/kube/multicluster"
4043
"istio.io/istio/pkg/webhooks"
4144
)
@@ -123,6 +126,26 @@ func NewMulticluster(
123126
}
124127
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
125128
options.ConfigCluster = configCluster
129+
130+
// Create per-cluster mesh watcher for remote clusters
131+
// This allows controllers to detect cluster-specific settings that may be relevant for the local proxies, e.g. trust domain.
132+
if !configCluster {
133+
meshConfigMapName := mc.getMeshConfigMapName()
134+
meshSource := kubemesh.NewConfigMapSource(
135+
client,
136+
options.SystemNamespace,
137+
meshConfigMapName,
138+
kubemesh.MeshConfigKey,
139+
krt.NewOptionsBuilder(stop, "", nil),
140+
)
141+
remoteMeshCollection := meshwatcher.NewCollection(
142+
krt.NewOptionsBuilder(stop, "", nil),
143+
meshSource,
144+
)
145+
options.MeshWatcher = meshwatcher.ConfigAdapter(remoteMeshCollection)
146+
log.Infof("Created mesh watcher for remote cluster %q", cluster.ID)
147+
}
148+
126149
kubeRegistry := NewController(client, options)
127150
kubeController := &kubeController{
128151
MeshServiceController: opts.MeshServiceController,
@@ -136,6 +159,15 @@ func NewMulticluster(
136159
return mc
137160
}
138161

162+
// getMeshConfigMapName returns the mesh ConfigMap name based on the revision
163+
func (m *Multicluster) getMeshConfigMapName() string {
164+
name := "istio"
165+
if m.revision == "" || m.revision == "default" {
166+
return name
167+
}
168+
return name + "-" + m.revision
169+
}
170+
139171
// initializeCluster initializes the cluster by setting various handlers.
140172
func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeController *kubeController, kubeRegistry *Controller,
141173
options Options, configCluster bool, clusterStopCh <-chan struct{},

pilot/pkg/serviceregistry/kube/conversion.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"istio.io/api/annotation"
2727
"istio.io/api/label"
28-
meshconfig "istio.io/api/mesh/v1alpha1"
2928
"istio.io/istio/pilot/pkg/model"
3029
"istio.io/istio/pilot/pkg/serviceregistry/provider"
3130
"istio.io/istio/pkg/cluster"
@@ -45,7 +44,7 @@ func convertPort(port corev1.ServicePort) *model.Port {
4544
}
4645
}
4746

48-
func ConvertService(svc corev1.Service, domainSuffix string, clusterID cluster.ID, mesh *meshconfig.MeshConfig) *model.Service {
47+
func ConvertService(svc corev1.Service, domainSuffix string, clusterID cluster.ID, trustDomain string) *model.Service {
4948
addrs := []string{constants.UnspecifiedIP}
5049
resolution := model.ClientSideLB
5150
externalName := ""
@@ -80,7 +79,7 @@ func ConvertService(svc corev1.Service, domainSuffix string, clusterID cluster.I
8079
}
8180
if svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name] != "" {
8281
for _, ksa := range strings.Split(svc.Annotations[annotation.AlphaKubernetesServiceAccounts.Name], ",") {
83-
serviceaccounts = append(serviceaccounts, kubeToIstioServiceAccount(ksa, svc.Namespace, mesh))
82+
serviceaccounts = append(serviceaccounts, kubeToIstioServiceAccount(ksa, svc.Namespace, trustDomain))
8483
}
8584
}
8685
if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
@@ -179,13 +178,13 @@ func ServiceHostnameForKR(obj metav1.Object, domainSuffix string) host.Name {
179178
}
180179

181180
// kubeToIstioServiceAccount converts a K8s service account to an Istio service account
182-
func kubeToIstioServiceAccount(saname string, ns string, mesh *meshconfig.MeshConfig) string {
183-
return spiffe.MustGenSpiffeURI(mesh, ns, saname)
181+
func kubeToIstioServiceAccount(saname string, ns string, trustDomain string) string {
182+
return spiffe.MustGenSpiffeURIForTrustDomain(trustDomain, ns, saname)
184183
}
185184

186185
// SecureNamingSAN creates the secure naming used for SAN verification from pod metadata
187-
func SecureNamingSAN(pod *corev1.Pod, mesh *meshconfig.MeshConfig) string {
188-
return spiffe.MustGenSpiffeURI(mesh, pod.Namespace, pod.Spec.ServiceAccountName)
186+
func SecureNamingSAN(pod *corev1.Pod, trustDomain string) string {
187+
return spiffe.MustGenSpiffeURIForTrustDomain(trustDomain, pod.Namespace, pod.Spec.ServiceAccountName)
189188
}
190189

191190
// PodTLSMode returns the tls mode associated with the pod if pod has been injected with sidecar

pilot/pkg/serviceregistry/kube/conversion_test.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626

2727
"istio.io/api/annotation"
28-
meshconfig "istio.io/api/mesh/v1alpha1"
2928
"istio.io/istio/pkg/cluster"
3029
"istio.io/istio/pkg/config/kube"
3130
"istio.io/istio/pkg/config/protocol"
@@ -169,7 +168,7 @@ func TestServiceConversion(t *testing.T) {
169168
},
170169
}
171170

172-
service := ConvertService(localSvc, domainSuffix, clusterID, &meshconfig.MeshConfig{TrustDomain: domainSuffix})
171+
service := ConvertService(localSvc, domainSuffix, clusterID, domainSuffix)
173172
if service == nil {
174173
t.Fatal("could not convert service")
175174
}
@@ -255,7 +254,7 @@ func TestServiceConversionWithEmptyServiceAccountsAnnotation(t *testing.T) {
255254
},
256255
}
257256

258-
service := ConvertService(localSvc, domainSuffix, clusterID, nil)
257+
service := ConvertService(localSvc, domainSuffix, clusterID, "")
259258
if service == nil {
260259
t.Fatal("could not convert service")
261260
}
@@ -310,7 +309,7 @@ func TestServiceConversionWithExportToAnnotation(t *testing.T) {
310309
}
311310
for _, test := range tests {
312311
localSvc.Annotations[annotation.NetworkingExportTo.Name] = test.Annotation
313-
service := ConvertService(localSvc, domainSuffix, clusterID, nil)
312+
service := ConvertService(localSvc, domainSuffix, clusterID, "")
314313
if service == nil {
315314
t.Fatal("could not convert service")
316315
}
@@ -343,7 +342,7 @@ func TestExternalServiceConversion(t *testing.T) {
343342
},
344343
}
345344

346-
service := ConvertService(extSvc, domainSuffix, clusterID, nil)
345+
service := ConvertService(extSvc, domainSuffix, clusterID, "")
347346
if service == nil {
348347
t.Fatal("could not convert external service")
349348
}
@@ -393,7 +392,7 @@ func TestExternalClusterLocalServiceConversion(t *testing.T) {
393392

394393
domainSuffix := "cluster.local"
395394

396-
service := ConvertService(extSvc, domainSuffix, clusterID, nil)
395+
service := ConvertService(extSvc, domainSuffix, clusterID, "")
397396
if service == nil {
398397
t.Fatal("could not convert external service")
399398
}
@@ -454,7 +453,7 @@ func TestLBServiceConversion(t *testing.T) {
454453
},
455454
}
456455

457-
service := ConvertService(extSvc, domainSuffix, clusterID, nil)
456+
service := ConvertService(extSvc, domainSuffix, clusterID, "")
458457
if service == nil {
459458
t.Fatal("could not convert external service")
460459
}
@@ -500,7 +499,7 @@ func TestInternalTrafficPolicyServiceConversion(t *testing.T) {
500499
},
501500
}
502501

503-
service := ConvertService(svc, domainSuffix, clusterID, nil)
502+
service := ConvertService(svc, domainSuffix, clusterID, "")
504503
if service == nil {
505504
t.Fatal("could not convert service")
506505
}
@@ -520,9 +519,7 @@ func TestSecureNamingSAN(t *testing.T) {
520519
pod.Namespace = ns
521520
pod.Spec.ServiceAccountName = sa
522521

523-
mesh := &meshconfig.MeshConfig{TrustDomain: "td.local"}
524-
525-
san := SecureNamingSAN(pod, mesh)
522+
san := SecureNamingSAN(pod, "td.local")
526523

527524
expectedSAN := fmt.Sprintf("spiffe://td.local/ns/%v/sa/%v", ns, sa)
528525

0 commit comments

Comments
 (0)