Skip to content

Commit 8a9e4de

Browse files
authored
Move headless-service-specific functions to cluster_watcher_headless.go (#7559)
This just unclutters a bit `cluster_watcher.go` by moving all the functions specific to headless services into the new `cluster_watch_headless.go` file.
1 parent 75903af commit 8a9e4de

File tree

2 files changed

+349
-335
lines changed

2 files changed

+349
-335
lines changed

multicluster/service-mirror/cluster_watcher.go

Lines changed: 0 additions & 335 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,298 +1066,6 @@ func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints(
10661066
return err
10671067
}
10681068

1069-
// createOrUpdateHeadlessEndpoints processes endpoints objects for exported
1070-
// headless services. When an endpoints object is created or updated in the
1071-
// remote cluster, it will be processed here in order to reconcile the local
1072-
// cluster state with the remote cluster state.
1073-
//
1074-
// createOrUpdateHeadlessEndpoints is also responsible for creating the service
1075-
// mirror in the source cluster. In order for an exported headless service to be
1076-
// mirrored as headless, it must have at least one port defined and at least one
1077-
// named address in its endpoints object (e.g a deployment would not work since
1078-
// pods may not have arbitrary hostnames). As such, when an endpoints object is
1079-
// first processed, if there is no mirror service, we create one, by looking at
1080-
// the endpoints object itself. If the exported service is deemed to be valid
1081-
// for headless mirroring, then the function will create the headless mirror and
1082-
// then create an endpoints object for it in the source cluster. If it is not
1083-
// valid, the exported service will be mirrored as clusterIP and its endpoints
1084-
// will point to the gateway.
1085-
//
1086-
// When creating endpoints for a headless mirror, we also create an endpoint
1087-
// mirror (clusterIP) service for each of the endpoints' named addresses. If the
1088-
// headless mirror exists and has an endpoints object, we simply update by
1089-
// either creating or deleting endpoint mirror services.
1090-
func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error {
1091-
exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
1092-
if err != nil {
1093-
rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
1094-
return fmt.Errorf("error retrieving exported service %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
1095-
}
1096-
1097-
// Check whether the endpoints should be processed for a headless exported
1098-
// service. If the exported service does not have any ports exposed, then
1099-
// neither will its corresponding endpoint mirrors, it should not be created
1100-
// as a headless mirror. If the service does not have any named addresses in
1101-
// its Endpoints object, then the endpoints should not be processed.
1102-
if len(exportedService.Spec.Ports) == 0 {
1103-
rcsw.recorder.Event(exportedService, v1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports")
1104-
rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name)
1105-
return nil
1106-
}
1107-
1108-
mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
1109-
mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName)
1110-
if err != nil {
1111-
if !kerrors.IsNotFound(err) {
1112-
return err
1113-
}
1114-
1115-
// If the mirror service does not exist, create it, either as clusterIP
1116-
// or as headless.
1117-
mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints)
1118-
if err != nil {
1119-
return err
1120-
}
1121-
}
1122-
1123-
headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name)
1124-
headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName)
1125-
if err != nil {
1126-
if !kerrors.IsNotFound(err) {
1127-
return err
1128-
}
1129-
1130-
if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
1131-
return rcsw.createGatewayEndpoints(ctx, exportedService)
1132-
}
1133-
1134-
// Create endpoint mirrors for headless mirror
1135-
if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil {
1136-
rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
1137-
return err
1138-
}
1139-
1140-
return nil
1141-
}
1142-
1143-
// Past this point, we do not want to process a mirror service that is not
1144-
// headless. We want to process only endpoints for headless mirrors; before
1145-
// this point it would have been possible to have a clusterIP mirror, since
1146-
// we are creating the mirror service in the scope of the function.
1147-
if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
1148-
return nil
1149-
}
1150-
1151-
mirrorEndpoints := headlessMirrorEndpoints.DeepCopy()
1152-
endpointMirrors := make(map[string]struct{})
1153-
newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
1154-
for _, subset := range exportedEndpoints.Subsets {
1155-
newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
1156-
for _, address := range subset.Addresses {
1157-
if address.Hostname == "" {
1158-
continue
1159-
}
1160-
1161-
endpointMirrorName := rcsw.mirroredResourceName(address.Hostname)
1162-
endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName)
1163-
if err != nil {
1164-
if !kerrors.IsNotFound(err) {
1165-
return err
1166-
}
1167-
// If the error is 'NotFound' then the Endpoint Mirror service
1168-
// does not exist, so create it.
1169-
endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
1170-
if err != nil {
1171-
return err
1172-
}
1173-
}
1174-
1175-
endpointMirrors[endpointMirrorName] = struct{}{}
1176-
newAddresses = append(newAddresses, corev1.EndpointAddress{
1177-
Hostname: address.Hostname,
1178-
IP: endpointMirrorService.Spec.ClusterIP,
1179-
})
1180-
}
1181-
1182-
if len(newAddresses) == 0 {
1183-
continue
1184-
}
1185-
1186-
// copy ports, create subset
1187-
newSubsets = append(newSubsets, corev1.EndpointSubset{
1188-
Addresses: newAddresses,
1189-
Ports: subset.DeepCopy().Ports,
1190-
})
1191-
}
1192-
1193-
headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name)
1194-
matchLabels := map[string]string{
1195-
consts.MirroredHeadlessSvcNameLabel: headlessMirrorName,
1196-
}
1197-
1198-
// Fetch all Endpoint Mirror services that belong to the same Headless Mirror
1199-
endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
1200-
if err != nil {
1201-
return err
1202-
}
1203-
1204-
var errors []error
1205-
for _, service := range endpointMirrorServices {
1206-
// If the service's name does not show up in the up-to-date map of
1207-
// Endpoint Mirror names, then we should delete it.
1208-
if _, found := endpointMirrors[service.Name]; found {
1209-
continue
1210-
}
1211-
err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{})
1212-
if err != nil {
1213-
if !kerrors.IsNotFound(err) {
1214-
errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %v", service.Namespace, service.Name, err))
1215-
}
1216-
}
1217-
}
1218-
1219-
if len(errors) > 0 {
1220-
return RetryableError{errors}
1221-
}
1222-
1223-
// Update
1224-
mirrorEndpoints.Subsets = newSubsets
1225-
_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(mirrorEndpoints.Namespace).Update(ctx, mirrorEndpoints, metav1.UpdateOptions{})
1226-
if err != nil {
1227-
return RetryableError{[]error{err}}
1228-
}
1229-
1230-
return nil
1231-
}
1232-
1233-
// createRemoteHeadlessService creates a mirror service for an exported headless
1234-
// service. Whether the mirror will be created as a headless or clusterIP
1235-
// service depends on the endpoints object associated with the exported service.
1236-
// If there is at least one named address, then the service will be mirrored as
1237-
// headless.
1238-
//
1239-
// Note: we do not check for any exposed ports because it was previously done
1240-
// when the service was picked up by the service mirror. We also do not need to
1241-
// check if the exported service is headless; its endpoints will be processed
1242-
// only if it is headless so we are certain at this point that is the case.
1243-
func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) {
1244-
// If we don't have any subsets to process then avoid creating the service.
1245-
// We need at least one address to be make a decision (whether we should
1246-
// create as clusterIP or headless), rely on the endpoints being eventually
1247-
// consistent, will likely receive an update with subsets.
1248-
if len(exportedEndpoints.Subsets) == 0 {
1249-
return &corev1.Service{}, nil
1250-
}
1251-
1252-
remoteService := exportedService.DeepCopy()
1253-
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
1254-
localServiceName := rcsw.mirroredResourceName(remoteService.Name)
1255-
1256-
if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil {
1257-
return &corev1.Service{}, err
1258-
}
1259-
1260-
serviceToCreate := &corev1.Service{
1261-
ObjectMeta: metav1.ObjectMeta{
1262-
Name: localServiceName,
1263-
Namespace: remoteService.Namespace,
1264-
Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
1265-
Labels: rcsw.getMirroredServiceLabels(remoteService),
1266-
},
1267-
Spec: corev1.ServiceSpec{
1268-
Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
1269-
},
1270-
}
1271-
1272-
if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) {
1273-
serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone
1274-
rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo)
1275-
} else {
1276-
rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
1277-
}
1278-
1279-
svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{})
1280-
if err != nil {
1281-
if !kerrors.IsAlreadyExists(err) {
1282-
// we might have created it during earlier attempt, if that is not the case, we retry
1283-
return &corev1.Service{}, RetryableError{[]error{err}}
1284-
}
1285-
}
1286-
1287-
return svc, err
1288-
}
1289-
1290-
// createHeadlessMirrorEndpoints creates an endpoints object for a Headless
1291-
// Mirror service. The endpoints object will contain the same subsets and hosts
1292-
// as the endpoints object of the exported headless service. Each host in the
1293-
// Headless Mirror's endpoints object will point to an Endpoint Mirror service.
1294-
func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error {
1295-
exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
1296-
endpointsHostnames := make(map[string]struct{})
1297-
subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
1298-
for _, subset := range exportedEndpoints.Subsets {
1299-
newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
1300-
for _, addr := range subset.Addresses {
1301-
if addr.Hostname == "" {
1302-
continue
1303-
}
1304-
1305-
endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname)
1306-
createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
1307-
if err != nil {
1308-
rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err)
1309-
continue
1310-
}
1311-
1312-
endpointsHostnames[addr.Hostname] = struct{}{}
1313-
newAddresses = append(newAddresses, corev1.EndpointAddress{
1314-
Hostname: addr.TargetRef.Name,
1315-
IP: createdService.Spec.ClusterIP,
1316-
})
1317-
1318-
}
1319-
1320-
if len(newAddresses) == 0 {
1321-
continue
1322-
}
1323-
1324-
subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{
1325-
Addresses: newAddresses,
1326-
Ports: subset.DeepCopy().Ports,
1327-
})
1328-
}
1329-
1330-
headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
1331-
headlessMirrorEndpoints := &corev1.Endpoints{
1332-
ObjectMeta: metav1.ObjectMeta{
1333-
Name: headlessMirrorServiceName,
1334-
Namespace: exportedService.Namespace,
1335-
Labels: map[string]string{
1336-
consts.MirroredResourceLabel: "true",
1337-
consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
1338-
},
1339-
Annotations: map[string]string{
1340-
consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
1341-
},
1342-
},
1343-
Subsets: subsetsToCreate,
1344-
}
1345-
1346-
if rcsw.link.GatewayIdentity != "" {
1347-
headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
1348-
}
1349-
1350-
rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace)
1351-
if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}); err != nil {
1352-
// we clean up after ourselves
1353-
rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{})
1354-
// and retry
1355-
return RetryableError{[]error{err}}
1356-
}
1357-
1358-
return nil
1359-
}
1360-
13611069
// createEndpointMirrorService creates a new Endpoint Mirror service and its
13621070
// corresponding endpoints object. It returns the newly created Endpoint Mirror
13631071
// service object. When a headless service is exported, we create a Headless
@@ -1435,30 +1143,6 @@ func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context
14351143
return createdService, nil
14361144
}
14371145

1438-
// shouldExportAsHeadlessService checks if an exported service should be
1439-
// mirrored as a headless service or as a clusterIP service, based on its
1440-
// endpoints object. For an exported service to be a headless mirror, it needs
1441-
// to have at least one named address in its endpoints (that is, a pod with a
1442-
// `hostname`). If the endpoints object does not contain at least one named
1443-
// address, it should be exported as clusterIP.
1444-
func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool {
1445-
for _, subset := range endpoints.Subsets {
1446-
for _, addr := range subset.Addresses {
1447-
if addr.Hostname != "" {
1448-
return true
1449-
}
1450-
}
1451-
1452-
for _, addr := range subset.NotReadyAddresses {
1453-
if addr.Hostname != "" {
1454-
return true
1455-
}
1456-
}
1457-
}
1458-
log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name)
1459-
return false
1460-
}
1461-
14621146
func isExportedEndpoints(obj interface{}, log *logging.Entry) bool {
14631147
ep, ok := obj.(*corev1.Endpoints)
14641148
if !ok {
@@ -1473,22 +1157,3 @@ func isExportedEndpoints(obj interface{}, log *logging.Entry) bool {
14731157

14741158
return true
14751159
}
1476-
1477-
// isHeadlessEndpoints checks if an endpoints object belongs to a
1478-
// headless service.
1479-
func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool {
1480-
ep, ok := obj.(*corev1.Endpoints)
1481-
if !ok {
1482-
log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
1483-
return false
1484-
}
1485-
1486-
if _, found := ep.Labels[corev1.IsHeadlessService]; !found {
1487-
// Not an Endpoints object for a headless service? Then we likely don't want
1488-
// to update anything.
1489-
log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService)
1490-
return false
1491-
}
1492-
1493-
return true
1494-
}

0 commit comments

Comments
 (0)