Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions internal/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,24 @@ func (c *Controller) checkListener(key string, listener *skupperv2alpha1.Listene
if listener != nil {
_, svcExists = c.observedServices[namespace+"/"+listener.Spec.Host]
}
return c.getSite(namespace).CheckListener(name, listener, svcExists)
err = c.getSite(namespace).CheckListener(name, listener, svcExists)
if err != nil {
return err
}
return nil
}

func (c *Controller) checkListenerService(key string, svc *corev1.Service) error {
c.log.Debug("checkListenerService", slog.String("key", key))
if svc == nil {
return nil
namespace, serviceName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if !c.namespaces.isControlled(namespace) {
return nil
}
return c.getSite(namespace).HandleDeletedListenerService(serviceName)
}
return c.getSite(svc.Namespace).CheckListenerService(svc)
}
Expand Down
73 changes: 73 additions & 0 deletions internal/kube/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/skupperproject/skupper/internal/kube/resource"
"github.com/skupperproject/skupper/internal/network"
"github.com/skupperproject/skupper/internal/qdr"
"github.com/skupperproject/skupper/internal/utils"
"github.com/skupperproject/skupper/internal/version"
skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1"
)
Expand Down Expand Up @@ -624,6 +625,18 @@ func TestUpdate(t *testing.T) {
serviceCheck("adifferentsvc", "test").check,
negativeServiceCheck("mysvc", "test"),
},
}, {
name: "listener service recreated when deleted",
skupperObjects: []runtime.Object{
f.site("mysite", "test", "", false, false),
f.listener("mylistener", "test", "mysvc", 8080),
},
functions: []WaitFunction{
isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED),
serviceCheck("mysvc", "test").check,
deleteListenerService("mysvc", "test"),
waitForService("mysvc", "test"),
},
}, {
name: "exposePodsByName handles pod delete",
k8sObjects: []runtime.Object{
Expand Down Expand Up @@ -801,6 +814,47 @@ func TestUpdate(t *testing.T) {
}
}

func TestListenerServiceRecreatedWhenDeleted(t *testing.T) {
runListenerServiceRecreatedWithFakeClient(t)
}

func runListenerServiceRecreatedWithFakeClient(t *testing.T) {
t.Helper()
flags := flag.NewFlagSet("", flag.ContinueOnError)
config, err := BoundConfig(flags)
assert.NilError(t, err)
clients, err := fakeclient.NewFakeClient("test", nil, []runtime.Object{
f.site("mysite", "test", "", false, false),
f.listener("mylistener", "test", "mysvc", 8080),
}, "")
assert.NilError(t, err)
enableSSA(clients.GetDynamicClient())
ctrl, err := NewController(clients, config, func(e *watchers.EventProcessor) { e.SetResyncShort(time.Second) })
assert.NilError(t, err)
stopCh := make(chan struct{})
err = ctrl.init(stopCh)
assert.NilError(t, err)
for i := 0; i < 2; i++ {
ctrl.eventProcessor.TestProcess()
}
err = utils.Retry(100*time.Millisecond, 50, func() (bool, error) {
ctrl.eventProcessor.TestProcess()
return isListenerStatusConditionTrue("mylistener", "test", skupperv2alpha1.CONDITION_TYPE_CONFIGURED)(t, clients), nil
})
assert.NilError(t, err)
err = utils.Retry(100*time.Millisecond, 50, func() (bool, error) {
ctrl.eventProcessor.TestProcess()
return serviceCheck("mysvc", "test").check(t, clients), nil
})
assert.NilError(t, err)
deleteListenerService("mysvc", "test")(t, clients)
err = utils.Retry(100*time.Millisecond, 50, func() (bool, error) {
ctrl.eventProcessor.TestProcess()
return waitForService("mysvc", "test")(t, clients), nil
})
assert.NilError(t, err)
}

func deleteAttachedConnector(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
err := clients.GetSkupperClient().SkupperV2alpha1().AttachedConnectors(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
Expand Down Expand Up @@ -1929,6 +1983,17 @@ func (s *ServiceCheck) checkAbsent(t *testing.T, clients internalclient.Clients)
return false
}

func waitForService(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
_, err := clients.GetKubeClient().CoreV1().Services(namespace).Get(context.Background(), name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false
}
assert.Assert(t, err)
return true
}
}

func updateListener(name string, namespace string, host string, port int) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
ctxt := context.Background()
Expand All @@ -1951,6 +2016,14 @@ func negativeServiceCheck(name string, namespace string) WaitFunction {
}
}

func deleteListenerService(serviceName string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
err := clients.GetKubeClient().CoreV1().Services(namespace).Delete(context.Background(), serviceName, metav1.DeleteOptions{})
assert.Assert(t, err)
return true
}
}

func deleteTargetPod(name string, namespace string) WaitFunction {
return func(t *testing.T, clients internalclient.Clients) bool {
ctxt := context.Background()
Expand Down
43 changes: 24 additions & 19 deletions internal/kube/site/extended_bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,33 @@ func (a *ExtendedBindings) ListenerUpdated(listener *skupperv2alpha1.Listener) {
slog.String("name", listener.Name),
slog.Any("error", err),
)
} else {
port := Port{
Name: listener.Name,
Port: listener.Spec.Port,
TargetPort: allocatedRouterPort,
Protocol: listener.Protocol(),
return
}
port := Port{
Name: listener.Name,
Port: listener.Spec.Port,
TargetPort: allocatedRouterPort,
Protocol: listener.Protocol(),
}
if exposed := a.exposed.Expose(listener.Spec.Host, port); exposed != nil {
if err := a.context.Expose(exposed); err != nil {
bindings_logger.Error("Error exposing listener",
slog.String("namespace", listener.Namespace),
slog.String("name", listener.Name),
slog.Any("error", err))
} else {
bindings_logger.Info("Exposed listener",
slog.String("namespace", listener.Namespace),
slog.String("name", listener.Name))
}
if exposed := a.exposed.Expose(listener.Spec.Host, port); exposed != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was only going to call context.Expose if a change is needed,
now it seems to be calling it either way. I see that the context.Expose has mechanisms
to detect if something has changed or not, but once it is called it tries to retrieve the latest
service from the kube api.
Did you find a reason to change this (sorry if it is obvious but I am not seeing it)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only effect of the change is to recreate the Service earlier in the case where a Listener event is processed before the Service delete event, mainly added it as a defensive path during implementation. It does not add a path that’s required for correctness, the delete path (HandleDeletedListenerService) already guarantees recreation. Looking at it again, I do suppose this change here is unnecessary, so I can revert this. Thanks for pointing it out.

if err := a.context.Expose(exposed); err != nil {
//TODO: write error to listener status
bindings_logger.Error("Error exposing listener",
slog.String("namespace", listener.Namespace),
slog.String("name", listener.Name),
slog.Any("error", err))
} else {
bindings_logger.Info("Exposed listener",
slog.String("namespace", listener.Namespace),
slog.String("name", listener.Name))
}
}

}
}
func (a *ExtendedBindings) GetExposedPortSet(host string) *ExposedPortSet {
if existing, ok := a.exposed[host]; ok && !existing.empty() {
return existing
}
return nil
}

func (a *ExtendedBindings) ListenerDeleted(listener *skupperv2alpha1.Listener) {
Expand Down
11 changes: 11 additions & 0 deletions internal/kube/site/site.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,17 @@ func (s *Site) CheckListenerService(svc *corev1.Service) error {
return nil
}

func (s *Site) HandleDeletedListenerService(serviceName string) error {
if s.site == nil {
return nil
}
portSet := s.bindings.GetExposedPortSet(serviceName)
if portSet == nil {
return nil
}
return s.Expose(portSet)
}

func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener, svcExists bool) error {
if s.site == nil {
if listener == nil {
Expand Down