Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions multicluster/cmd/service-mirror/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func Main(args []string) {
repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution")
enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring")
enableNamespaceCreation := cmd.Bool("enable-namespace-creation", false, "toggle support for namespace creation")
enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true, "Use EndpointSlice resources instead of Endpoints")
enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
localMirror := cmd.Bool("local-mirror", false, "watch the local cluster for federated service members")
federatedServiceSelector := cmd.String("federated-service-selector", k8s.DefaultFederatedServiceSelector, "Selector (label query) for federated service members in the local cluster")
Expand Down Expand Up @@ -92,14 +93,23 @@ func Main(args []string) {
// controllerK8sAPI is used by the cluster watcher to manage
// mirror resources such as services, namespaces, and endpoints.

// Build the list of resources to watch based on configuration
localAPIResources := []controllerK8s.APIResource{
controllerK8s.NS,
controllerK8s.Svc,
}
if *enableEndpointSlices {
localAPIResources = append(localAPIResources, controllerK8s.ES)
} else {
localAPIResources = append(localAPIResources, controllerK8s.Endpoint)
}

controllerK8sAPI, err := controllerK8s.InitializeAPI(
rootCtx,
*kubeConfigPath,
false,
"local",
controllerK8s.NS,
controllerK8s.Svc,
controllerK8s.Endpoint,
localAPIResources...,
)
if err != nil {
log.Fatalf("Failed to initialize K8s API: %s", err)
Expand Down Expand Up @@ -153,7 +163,7 @@ func Main(args []string) {
ExcludedLabels: excludedLabelList,
},
}
err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, link)
err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, *enableEndpointSlices, link)
if err != nil {
log.Fatalf("Failed to start local cluster watcher: %s", err)
}
Expand Down Expand Up @@ -200,7 +210,7 @@ func Main(args []string) {
if err != nil {
log.Errorf("Failed to load remote cluster credentials: %s", err)
}
err = restartClusterWatcher(ctx, link, *namespace, *probeSvc, creds, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation)
err = restartClusterWatcher(ctx, link, *namespace, *probeSvc, creds, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation, *enableEndpointSlices)
if err != nil {
// failed to restart cluster watcher; give a bit of slack
// and requeue the link to give it another try
Expand Down Expand Up @@ -329,6 +339,7 @@ func restartClusterWatcher(
metrics servicemirror.ProbeMetricVecs,
enableHeadlessSvc bool,
enableNamespaceCreation bool,
enableEndpointSlices bool,
) error {

cleanupWorkers()
Expand All @@ -352,7 +363,14 @@ func restartClusterWatcher(
if err != nil {
return fmt.Errorf("unable to parse kube config: %w", err)
}
remoteAPI, err := controllerK8s.InitializeAPIForConfig(ctx, cfg, false, link.Spec.TargetClusterName, controllerK8s.Svc, controllerK8s.Endpoint)
// Build the list of resources to watch based on configuration
remoteAPIResources := []controllerK8s.APIResource{controllerK8s.Svc}
if enableEndpointSlices {
remoteAPIResources = append(remoteAPIResources, controllerK8s.ES)
} else {
remoteAPIResources = append(remoteAPIResources, controllerK8s.Endpoint)
}
remoteAPI, err := controllerK8s.InitializeAPIForConfig(ctx, cfg, false, link.Spec.TargetClusterName, remoteAPIResources...)
if err != nil {
return fmt.Errorf("cannot initialize api for target cluster %s: %w", link.Spec.TargetClusterName, err)
}
Expand All @@ -369,6 +387,7 @@ func restartClusterWatcher(
ch,
enableHeadlessSvc,
enableNamespaceCreation,
enableEndpointSlices,
)
if err != nil {
return fmt.Errorf("unable to create cluster watcher: %w", err)
Expand All @@ -391,6 +410,7 @@ func startLocalClusterWatcher(
repairPeriod time.Duration,
enableHeadlessSvc bool,
enableNamespaceCreation bool,
enableEndpointSlices bool,
link v1alpha3.Link,
) error {
cw, err := servicemirror.NewRemoteClusterServiceWatcher(
Expand All @@ -406,6 +426,7 @@ func startLocalClusterWatcher(
make(chan bool),
enableHeadlessSvc,
enableNamespaceCreation,
enableEndpointSlices,
)
if err != nil {
return fmt.Errorf("unable to create cluster watcher: %w", err)
Expand Down
Loading