Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/kthena-router/app/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func startControllers(store datastore.Store, stop <-chan struct{}, enableGateway
}

gatewayInformerFactory := gatewayinformers.NewSharedInformerFactory(gatewayClient, 0)
gatewayController := controller.NewGatewayController(gatewayInformerFactory, store)
gatewayController := controller.NewGatewayController(gatewayClient, gatewayInformerFactory, store)

// Gateway API Inference Extension controllers are optional
var httpRouteController *controller.HTTPRouteController
if enableGatewayAPIInferenceExtension {
httpRouteController = controller.NewHTTPRouteController(gatewayInformerFactory, store)
httpRouteController = controller.NewHTTPRouteController(gatewayClient, gatewayInformerFactory, store)
}

// Start informer factory after all controllers that use it are created
Expand All @@ -136,7 +136,7 @@ func startControllers(store datastore.Store, stop <-chan struct{}, enableGateway
klog.Fatalf("Error building dynamic client: %s", err.Error())
}
dynamicInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
inferencePoolController := controller.NewInferencePoolController(dynamicInformerFactory, store)
inferencePoolController := controller.NewInferencePoolController(dynamicClient, dynamicInformerFactory, store)

dynamicInformerFactory.Start(stop)

Expand Down
25 changes: 23 additions & 2 deletions cmd/kthena-router/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type PortListenerInfo struct {
Server *http.Server
ShutdownFunc context.CancelFunc
Listeners []ListenerConfig
LastError error
}

// ListenerManager manages Gateway listeners dynamically
Expand Down Expand Up @@ -409,6 +410,9 @@ func (lm *ListenerManager) removeListenerFromPort(port int32, configToRemove Lis
}
portInfo.Listeners = filtered
portInfo.mu.Unlock()

// Remove listener status from store to prevent memory leaks
lm.store.RemoveListenerStatus(configToRemove.GatewayKey, configToRemove.ListenerName)
}

// addListenerToPort adds a listener config to a port
Expand Down Expand Up @@ -437,7 +441,11 @@ func (lm *ListenerManager) addListenerToPort(port int32, config ListenerConfig,
portInfo.ShutdownFunc = cancel

// Start the server
go func(p int32, srv *http.Server, ctx context.Context, tls bool, cert, key string) {
// Set status to nil to indicate a clean state (no errors detected yet)
// This will translate to Accepted: True and Programmed: True in the Gateway status
lm.store.SetListenerStatus(config.GatewayKey, config.ListenerName, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why set nil(// Clear listener status in store)


go func(p int32, srv *http.Server, pi *PortListenerInfo, tls bool, cert, key string) {
klog.Infof("Starting Gateway listener server on port %d", p)
var err error
if tls {
Expand All @@ -450,8 +458,18 @@ func (lm *ListenerManager) addListenerToPort(port int32, config ListenerConfig,
}
if err != nil && err != http.ErrServerClosed {
klog.Errorf("listen failed for port %d: %v", p, err)

// Update all listeners on this port with the error
// pi is a pointer to the PortListenerInfo, so we can lock it directly
// without needing to lock the top-level ListenerManager
pi.mu.Lock()
pi.LastError = err
for _, l := range pi.Listeners {
lm.store.SetListenerStatus(l.GatewayKey, l.ListenerName, err)
}
pi.mu.Unlock()
}
}(port, server, listenerCtx, enableTLS, tlsCertFile, tlsKeyFile)
}(port, server, portInfo, enableTLS, tlsCertFile, tlsKeyFile)

// Start graceful shutdown goroutine
go func(p int32, srv *http.Server, cancel context.CancelFunc) {
Expand All @@ -467,7 +485,10 @@ func (lm *ListenerManager) addListenerToPort(port int32, config ListenerConfig,
// Add listener to existing port
portInfo.mu.Lock()
portInfo.Listeners = append(portInfo.Listeners, config)
currentErr := portInfo.LastError
portInfo.mu.Unlock()

lm.store.SetListenerStatus(config.GatewayKey, config.ListenerName, currentErr)
klog.V(4).Infof("Added listener %s/%s to existing port %d", config.GatewayKey, config.ListenerName, port)
}
}
Expand Down
140 changes: 139 additions & 1 deletion pkg/kthena-router/controller/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ limitations under the License.
package controller

import (
"context"
"fmt"
"sync/atomic"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
gatewayinformers "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions"
gatewaylisters "sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1"

"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
)

type GatewayController struct {
gatewayClient gatewayclientset.Interface
gatewayLister gatewaylisters.GatewayLister
gatewaySynced cache.InformerSynced
registration cache.ResourceEventHandlerRegistration
Expand All @@ -45,12 +49,14 @@ type GatewayController struct {
}

func NewGatewayController(
gatewayClient gatewayclientset.Interface,
gatewayInformerFactory gatewayinformers.SharedInformerFactory,
store datastore.Store,
) *GatewayController {
gatewayInformer := gatewayInformerFactory.Gateway().V1().Gateways()

controller := &GatewayController{
gatewayClient: gatewayClient,
gatewayLister: gatewayInformer.Lister(),
gatewaySynced: gatewayInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
Expand Down Expand Up @@ -160,7 +166,139 @@ func (c *GatewayController) syncHandler(key string) error {
return err
}

return nil
return c.updateGatewayStatus(gateway)
}

func (c *GatewayController) updateGatewayStatus(gateway *gatewayv1.Gateway) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add corresponding E2E to make sure the status setting for Gateway/HTTPRoute/InferencePool all work properly.

gateway = gateway.DeepCopy()

// Update conditions
acceptedCond := metav1.Condition{
Type: string(gatewayv1.GatewayConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1.GatewayReasonAccepted),
Message: "Gateway has been accepted by kthena-router",
LastTransitionTime: metav1.Now(),
ObservedGeneration: gateway.Generation,
}

programmedCond := metav1.Condition{
Type: string(gatewayv1.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gatewayv1.GatewayReasonProgrammed),
Message: "Gateway has been programmed by kthena-router",
LastTransitionTime: metav1.Now(),
ObservedGeneration: gateway.Generation,
}

c.setGatewayCondition(gateway, acceptedCond)
c.setGatewayCondition(gateway, programmedCond)

// Update listener status
gatewayKey := fmt.Sprintf("%s/%s", gateway.Namespace, gateway.Name)
for _, listener := range gateway.Spec.Listeners {
listenerErr := c.store.GetListenerStatus(gatewayKey, string(listener.Name))

acceptedCond, programmedCond := c.getGatewayListenerConditions(listenerErr, gateway.Generation)
c.setGatewayListenerStatus(gateway, listener.Name, []metav1.Condition{acceptedCond, programmedCond})
}

_, err := c.gatewayClient.GatewayV1().Gateways(gateway.Namespace).UpdateStatus(context.Background(), gateway, metav1.UpdateOptions{})
return err
}

func (c *GatewayController) setGatewayCondition(gateway *gatewayv1.Gateway, newCond metav1.Condition) {
for i, cond := range gateway.Status.Conditions {
if cond.Type == newCond.Type {
if cond.Status == newCond.Status && cond.Reason == newCond.Reason {
newCond.LastTransitionTime = cond.LastTransitionTime
}
gateway.Status.Conditions[i] = newCond
return
}
}
gateway.Status.Conditions = append(gateway.Status.Conditions, newCond)
}

func (c *GatewayController) setGatewayListenerStatus(gateway *gatewayv1.Gateway, listenerName gatewayv1.SectionName, conditions []metav1.Condition) {
var listenerStatus *gatewayv1.ListenerStatus
for i := range gateway.Status.Listeners {
if gateway.Status.Listeners[i].Name == listenerName {
listenerStatus = &gateway.Status.Listeners[i]
break
}
}

if listenerStatus == nil {
gateway.Status.Listeners = append(gateway.Status.Listeners, gatewayv1.ListenerStatus{
Name: listenerName,
SupportedKinds: []gatewayv1.RouteGroupKind{
{
Group: (*gatewayv1.Group)(&gatewayv1.GroupVersion.Group),
Kind: gatewayv1.Kind("HTTPRoute"),
},
},
})
listenerStatus = &gateway.Status.Listeners[len(gateway.Status.Listeners)-1]
}

// Update conditions
for _, newCond := range conditions {
found := false
for i, cond := range listenerStatus.Conditions {
if cond.Type == newCond.Type {
if cond.Status == newCond.Status && cond.Reason == newCond.Reason {
newCond.LastTransitionTime = cond.LastTransitionTime
}
listenerStatus.Conditions[i] = newCond
found = true
break
}
}
if !found {
listenerStatus.Conditions = append(listenerStatus.Conditions, newCond)
}
}
}

func (c *GatewayController) getGatewayListenerConditions(listenerErr error, generation int64) (metav1.Condition, metav1.Condition) {
acceptedStatus := metav1.ConditionTrue
acceptedReason := string(gatewayv1.ListenerReasonAccepted)
acceptedMessage := "Listener has been accepted"

programmedStatus := metav1.ConditionTrue
programmedReason := string(gatewayv1.ListenerReasonProgrammed)
programmedMessage := "Listener has been programmed"

if listenerErr != nil {
acceptedStatus = metav1.ConditionFalse
acceptedReason = "PortUnavailable"
acceptedMessage = fmt.Sprintf("Failed to start listener: %v", listenerErr)

programmedStatus = metav1.ConditionFalse
programmedReason = "Invalid"
programmedMessage = "Listener could not be programmed due to error"
}

acceptedCond := metav1.Condition{
Type: string(gatewayv1.ListenerConditionAccepted),
Status: acceptedStatus,
Reason: acceptedReason,
Message: acceptedMessage,
LastTransitionTime: metav1.Now(),
ObservedGeneration: generation,
}

programmedCond := metav1.Condition{
Type: string(gatewayv1.ListenerConditionProgrammed),
Status: programmedStatus,
Reason: programmedReason,
Message: programmedMessage,
LastTransitionTime: metav1.Now(),
ObservedGeneration: generation,
}

return acceptedCond, programmedCond
}

func (c *GatewayController) enqueueGateway(obj interface{}) {
Expand Down
Loading
Loading