Skip to content

Commit 890af8e

Browse files
authored
refactor TransportServer controller (#6389)
1 parent 53b27f7 commit 890af8e

File tree

3 files changed

+269
-253
lines changed

3 files changed

+269
-253
lines changed

internal/k8s/controller.go

Lines changed: 0 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -556,14 +556,6 @@ func (nsi *namespacedInformer) addPolicyHandler(handlers cache.ResourceEventHand
556556
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
557557
}
558558

559-
func (nsi *namespacedInformer) addTransportServerHandler(handlers cache.ResourceEventHandlerFuncs) {
560-
informer := nsi.confSharedInformerFactory.K8s().V1().TransportServers().Informer()
561-
informer.AddEventHandler(handlers)
562-
nsi.transportServerLister = informer.GetStore()
563-
564-
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
565-
}
566-
567559
func (lbc *LoadBalancerController) addNamespaceHandler(handlers cache.ResourceEventHandlerFuncs, nsLabel string) {
568560
optionsModifier := func(options *meta_v1.ListOptions) {
569561
options.LabelSelector = nsLabel
@@ -1238,35 +1230,6 @@ func (lbc *LoadBalancerController) syncPolicy(task task) {
12381230
// Note: updating the status of a policy based on a reload is not needed.
12391231
}
12401232

1241-
func (lbc *LoadBalancerController) syncTransportServer(task task) {
1242-
key := task.Key
1243-
var obj interface{}
1244-
var tsExists bool
1245-
var err error
1246-
1247-
ns, _, _ := cache.SplitMetaNamespaceKey(key)
1248-
obj, tsExists, err = lbc.getNamespacedInformer(ns).transportServerLister.GetByKey(key)
1249-
if err != nil {
1250-
lbc.syncQueue.Requeue(task, err)
1251-
return
1252-
}
1253-
1254-
var changes []ResourceChange
1255-
var problems []ConfigurationProblem
1256-
1257-
if !tsExists {
1258-
glog.V(2).Infof("Deleting TransportServer: %v\n", key)
1259-
changes, problems = lbc.configuration.DeleteTransportServer(key)
1260-
} else {
1261-
glog.V(2).Infof("Adding or Updating TransportServer: %v\n", key)
1262-
ts := obj.(*conf_v1.TransportServer)
1263-
changes, problems = lbc.configuration.AddOrUpdateTransportServer(ts)
1264-
}
1265-
1266-
lbc.processChanges(changes)
1267-
lbc.processProblems(problems)
1268-
}
1269-
12701233
func (lbc *LoadBalancerController) syncVirtualServer(task task) {
12711234
key := task.Key
12721235
var obj interface{}
@@ -1437,45 +1400,6 @@ func (lbc *LoadBalancerController) processChanges(changes []ResourceChange) {
14371400
}
14381401
}
14391402

1440-
func (lbc *LoadBalancerController) updateTransportServerStatusAndEventsOnDelete(tsConfig *TransportServerConfiguration, changeError string, deleteErr error) {
1441-
eventType := api_v1.EventTypeWarning
1442-
eventTitle := "Rejected"
1443-
eventWarningMessage := ""
1444-
var state string
1445-
1446-
// TransportServer either became invalid or lost its host or listener
1447-
if changeError != "" {
1448-
state = conf_v1.StateInvalid
1449-
eventWarningMessage = fmt.Sprintf("with error: %s", changeError)
1450-
} else if len(tsConfig.Warnings) > 0 {
1451-
state = conf_v1.StateWarning
1452-
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(tsConfig.Warnings))
1453-
}
1454-
1455-
// we don't need to report anything if eventWarningMessage is empty
1456-
// in that case, the resource was deleted because its class became incorrect
1457-
// (some other Ingress Controller will handle it)
1458-
1459-
if eventWarningMessage != "" {
1460-
if deleteErr != nil {
1461-
eventType = api_v1.EventTypeWarning
1462-
eventTitle = "RejectedWithError"
1463-
eventWarningMessage = fmt.Sprintf("%s; but was not applied: %v", eventWarningMessage, deleteErr)
1464-
state = conf_v1.StateInvalid
1465-
}
1466-
1467-
msg := fmt.Sprintf("TransportServer %s was rejected %s", getResourceKey(&tsConfig.TransportServer.ObjectMeta), eventWarningMessage)
1468-
lbc.recorder.Eventf(tsConfig.TransportServer, eventType, eventTitle, msg)
1469-
1470-
if lbc.reportCustomResourceStatusEnabled() {
1471-
err := lbc.statusUpdater.UpdateTransportServerStatus(tsConfig.TransportServer, state, eventTitle, msg)
1472-
if err != nil {
1473-
glog.Errorf("Error when updating the status for TransportServer %v/%v: %v", tsConfig.TransportServer.Namespace, tsConfig.TransportServer.Name, err)
1474-
}
1475-
}
1476-
}
1477-
}
1478-
14791403
// UpdateVirtualServerStatusAndEventsOnDelete updates the virtual server status and events
14801404
func (lbc *LoadBalancerController) UpdateVirtualServerStatusAndEventsOnDelete(vsConfig *VirtualServerConfiguration, changeError string, deleteErr error) {
14811405
eventType := api_v1.EventTypeWarning
@@ -1688,44 +1612,6 @@ func (lbc *LoadBalancerController) updateRegularIngressStatusAndEvents(ingConfig
16881612
}
16891613
}
16901614

1691-
func (lbc *LoadBalancerController) updateTransportServerStatusAndEvents(tsConfig *TransportServerConfiguration, warnings configs.Warnings, operationErr error) {
1692-
eventTitle := "AddedOrUpdated"
1693-
eventType := api_v1.EventTypeNormal
1694-
eventWarningMessage := ""
1695-
state := conf_v1.StateValid
1696-
1697-
if len(tsConfig.Warnings) > 0 {
1698-
eventType = api_v1.EventTypeWarning
1699-
eventTitle = "AddedOrUpdatedWithWarning"
1700-
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(tsConfig.Warnings))
1701-
state = conf_v1.StateWarning
1702-
}
1703-
1704-
if messages, ok := warnings[tsConfig.TransportServer]; ok {
1705-
eventType = api_v1.EventTypeWarning
1706-
eventTitle = "AddedOrUpdatedWithWarning"
1707-
eventWarningMessage = fmt.Sprintf("with warning(s): %s", formatWarningMessages(messages))
1708-
state = conf_v1.StateWarning
1709-
}
1710-
1711-
if operationErr != nil {
1712-
eventType = api_v1.EventTypeWarning
1713-
eventTitle = "AddedOrUpdatedWithError"
1714-
eventWarningMessage = fmt.Sprintf("%s; but was not applied: %v", eventWarningMessage, operationErr)
1715-
state = conf_v1.StateInvalid
1716-
}
1717-
1718-
msg := fmt.Sprintf("Configuration for %v was added or updated %s", getResourceKey(&tsConfig.TransportServer.ObjectMeta), eventWarningMessage)
1719-
lbc.recorder.Eventf(tsConfig.TransportServer, eventType, eventTitle, msg)
1720-
1721-
if lbc.reportCustomResourceStatusEnabled() {
1722-
err := lbc.statusUpdater.UpdateTransportServerStatus(tsConfig.TransportServer, state, eventTitle, msg)
1723-
if err != nil {
1724-
glog.Errorf("Error when updating the status for TransportServer %v/%v: %v", tsConfig.TransportServer.Namespace, tsConfig.TransportServer.Name, err)
1725-
}
1726-
}
1727-
}
1728-
17291615
func (lbc *LoadBalancerController) updateVirtualServerStatusAndEvents(vsConfig *VirtualServerConfiguration, warnings configs.Warnings, operationErr error) {
17301616
eventType := api_v1.EventTypeNormal
17311617
eventTitle := "AddedOrUpdated"
@@ -1870,15 +1756,6 @@ func (lbc *LoadBalancerController) updateVirtualServerMetrics() {
18701756
lbc.metricsCollector.SetVirtualServerRoutes(vsrCount)
18711757
}
18721758

1873-
func (lbc *LoadBalancerController) updateTransportServerMetrics() {
1874-
if !lbc.areCustomResourcesEnabled {
1875-
return
1876-
}
1877-
1878-
metrics := lbc.configuration.GetTransportServerMetrics()
1879-
lbc.metricsCollector.SetTransportServers(metrics.TotalTLSPassthrough, metrics.TotalTCP, metrics.TotalUDP)
1880-
}
1881-
18821759
func (lbc *LoadBalancerController) syncService(task task) {
18831760
key := task.Key
18841761

@@ -2214,45 +2091,6 @@ func (lbc *LoadBalancerController) updateVirtualServerRoutesStatusFromEvents() e
22142091
return nil
22152092
}
22162093

2217-
func (lbc *LoadBalancerController) updateTransportServersStatusFromEvents() error {
2218-
var allErrs []error
2219-
for _, nsi := range lbc.namespacedInformers {
2220-
for _, obj := range nsi.transportServerLister.List() {
2221-
ts := obj.(*conf_v1.TransportServer)
2222-
2223-
events, err := lbc.client.CoreV1().Events(ts.Namespace).List(context.TODO(),
2224-
meta_v1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%v,involvedObject.uid=%v", ts.Name, ts.UID)})
2225-
if err != nil {
2226-
allErrs = append(allErrs, fmt.Errorf("error trying to get events for TransportServer %v/%v: %w", ts.Namespace, ts.Name, err))
2227-
break
2228-
}
2229-
2230-
if len(events.Items) == 0 {
2231-
continue
2232-
}
2233-
2234-
var timestamp time.Time
2235-
var latestEvent api_v1.Event
2236-
for _, event := range events.Items {
2237-
if event.CreationTimestamp.After(timestamp) {
2238-
latestEvent = event
2239-
}
2240-
}
2241-
2242-
err = lbc.statusUpdater.UpdateTransportServerStatus(ts, getStatusFromEventTitle(latestEvent.Reason), latestEvent.Reason, latestEvent.Message)
2243-
if err != nil {
2244-
allErrs = append(allErrs, err)
2245-
}
2246-
}
2247-
}
2248-
2249-
if len(allErrs) > 0 {
2250-
return fmt.Errorf("not all TransportServers statuses were updated: %v", allErrs)
2251-
}
2252-
2253-
return nil
2254-
}
2255-
22562094
func getIPAddressesFromEndpoints(endpoints []podEndpoint) []string {
22572095
var endps []string
22582096
for _, ep := range endpoints {
@@ -3028,63 +2866,6 @@ func (lbc *LoadBalancerController) getTransportServerBackupEndpointsAndKey(trans
30282866
return bendps, backupEndpointsKey
30292867
}
30302868

3031-
func (lbc *LoadBalancerController) createTransportServerEx(transportServer *conf_v1.TransportServer, listenerPort int) *configs.TransportServerEx {
3032-
endpoints := make(map[string][]string)
3033-
externalNameSvcs := make(map[string]bool)
3034-
podsByIP := make(map[string]string)
3035-
disableIPV6 := lbc.configuration.isIPV6Disabled
3036-
3037-
for _, u := range transportServer.Spec.Upstreams {
3038-
podEndps, external, err := lbc.getEndpointsForUpstream(transportServer.Namespace, u.Service, uint16(u.Port))
3039-
if err == nil && external && lbc.isNginxPlus {
3040-
externalNameSvcs[configs.GenerateExternalNameSvcKey(transportServer.Namespace, u.Service)] = true
3041-
}
3042-
if err != nil {
3043-
glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err)
3044-
}
3045-
3046-
// subselector is not supported yet in TransportServer upstreams. That's why we pass "nil" here
3047-
endpointsKey := configs.GenerateEndpointsKey(transportServer.Namespace, u.Service, nil, uint16(u.Port))
3048-
3049-
endps := getIPAddressesFromEndpoints(podEndps)
3050-
endpoints[endpointsKey] = endps
3051-
3052-
if lbc.isNginxPlus && lbc.isPrometheusEnabled {
3053-
for _, endpoint := range podEndps {
3054-
podsByIP[endpoint.Address] = endpoint.PodName
3055-
}
3056-
}
3057-
3058-
if u.Backup != "" && u.BackupPort != nil {
3059-
bendps, backupEndpointsKey := lbc.getTransportServerBackupEndpointsAndKey(transportServer, u, externalNameSvcs)
3060-
endpoints[backupEndpointsKey] = bendps
3061-
}
3062-
}
3063-
3064-
scrtRefs := make(map[string]*secrets.SecretReference)
3065-
3066-
if transportServer.Spec.TLS != nil && transportServer.Spec.TLS.Secret != "" {
3067-
scrtKey := transportServer.Namespace + "/" + transportServer.Spec.TLS.Secret
3068-
3069-
scrtRef := lbc.secretStore.GetSecret(scrtKey)
3070-
if scrtRef.Error != nil {
3071-
glog.Warningf("Error trying to get the secret %v for TransportServer %v: %v", scrtKey, transportServer.Name, scrtRef.Error)
3072-
}
3073-
3074-
scrtRefs[scrtKey] = scrtRef
3075-
}
3076-
3077-
return &configs.TransportServerEx{
3078-
ListenerPort: listenerPort,
3079-
TransportServer: transportServer,
3080-
Endpoints: endpoints,
3081-
PodsByIP: podsByIP,
3082-
ExternalNameSvcs: externalNameSvcs,
3083-
DisableIPV6: disableIPV6,
3084-
SecretRefs: scrtRefs,
3085-
}
3086-
}
3087-
30882869
func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, upstreamService string, upstreamPort uint16) (endps []podEndpoint, isExternal bool, err error) {
30892870
svc, err := lbc.getServiceForUpstream(namespace, upstreamService, upstreamPort)
30902871
if err != nil {

internal/k8s/handlers.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -326,40 +326,6 @@ func createVirtualServerRouteHandlers(lbc *LoadBalancerController) cache.Resourc
326326
}
327327
}
328328

329-
func createTransportServerHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
330-
return cache.ResourceEventHandlerFuncs{
331-
AddFunc: func(obj interface{}) {
332-
ts := obj.(*conf_v1.TransportServer)
333-
glog.V(3).Infof("Adding TransportServer: %v", ts.Name)
334-
lbc.AddSyncQueue(ts)
335-
},
336-
DeleteFunc: func(obj interface{}) {
337-
ts, isTs := obj.(*conf_v1.TransportServer)
338-
if !isTs {
339-
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
340-
if !ok {
341-
glog.V(3).Infof("Error received unexpected object: %v", obj)
342-
return
343-
}
344-
ts, ok = deletedState.Obj.(*conf_v1.TransportServer)
345-
if !ok {
346-
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-TransportServer object: %v", deletedState.Obj)
347-
return
348-
}
349-
}
350-
glog.V(3).Infof("Removing TransportServer: %v", ts.Name)
351-
lbc.AddSyncQueue(ts)
352-
},
353-
UpdateFunc: func(old, cur interface{}) {
354-
curTs := cur.(*conf_v1.TransportServer)
355-
if !reflect.DeepEqual(old, cur) {
356-
glog.V(3).Infof("TransportServer %v changed, syncing", curTs.Name)
357-
lbc.AddSyncQueue(curTs)
358-
}
359-
},
360-
}
361-
}
362-
363329
func createPolicyHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
364330
return cache.ResourceEventHandlerFuncs{
365331
AddFunc: func(obj interface{}) {

0 commit comments

Comments
 (0)