Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,19 @@ func (h *eventHandlerImpl) updateStatuses(ctx context.Context, gr *graph.Graph,
transitionTime,
h.cfg.gatewayCtlrName,
)
inferencePoolReqs := status.PrepareInferencePoolRequests(gr.ReferencedInferencePools, transitionTime)

reqs := make(
[]status.UpdateRequest,
0,
len(gcReqs)+len(routeReqs)+len(polReqs)+len(ngfPolReqs)+len(snippetsFilterReqs),
len(gcReqs)+len(routeReqs)+len(polReqs)+len(ngfPolReqs)+len(snippetsFilterReqs)+len(inferencePoolReqs),
)
reqs = append(reqs, gcReqs...)
reqs = append(reqs, routeReqs...)
reqs = append(reqs, polReqs...)
reqs = append(reqs, ngfPolReqs...)
reqs = append(reqs, snippetsFilterReqs...)
reqs = append(reqs, inferencePoolReqs...)

h.cfg.statusUpdater.UpdateGroup(ctx, groupAllExceptGateways, reqs...)

Expand Down
53 changes: 53 additions & 0 deletions internal/controller/state/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
inference "sigs.k8s.io/gateway-api-inference-extension/api/v1"
v1 "sigs.k8s.io/gateway-api/apis/v1"
"sigs.k8s.io/gateway-api/apis/v1alpha2"

Expand Down Expand Up @@ -1108,3 +1109,55 @@ func NewBackendTLSPolicyNoValidCACertificate(message string) Condition {
Message: message,
}
}

// NewInferencePoolAccepted returns a Condition that indicates that the InferencePool is accepted by the Gateway.
func NewInferencePoolAccepted() Condition {
return Condition{
Type: string(inference.InferencePoolConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(inference.InferencePoolConditionAccepted),
Message: "InferencePool is accepted by the Gateway.",
}
}

// NewInferencePoolResolvedRefs returns a Condition that
// indicates that all references in the InferencePool are resolved.
func NewInferencePoolResolvedRefs() Condition {
return Condition{
Type: string(inference.InferencePoolConditionResolvedRefs),
Status: metav1.ConditionTrue,
Reason: string(inference.InferencePoolConditionResolvedRefs),
Message: "Inference pool references a valid ExtensionRef.",
}
}

// NewDefaultInferenceConditions returns the default Conditions
// that must be present in the status of an InferencePool.
func NewDefaultInferenceConditions() []Condition {
return []Condition{
NewInferencePoolAccepted(),
NewInferencePoolResolvedRefs(),
}
}

// NewInferencePoolInvalidHTTPRouteNotAccepted returns a Condition that indicates that the InferencePool is not
// accepted because the associated HTTPRoute is not accepted by the Gateway.
func NewInferencePoolInvalidHTTPRouteNotAccepted(msg string) Condition {
return Condition{
Type: string(inference.InferencePoolConditionAccepted),
Status: metav1.ConditionFalse,
Reason: string(inference.InferencePoolReasonHTTPRouteNotAccepted),
Message: msg,
}
}

// NewInferencePoolInvalidExtensionref returns a Condition that indicates that the InferencePool is not
// accepted because the ExtensionRef is invalid.
func NewInferencePoolInvalidExtensionref(msg string) Condition {
return Condition{
Type: string(inference.InferencePoolConditionResolvedRefs),
Status: metav1.ConditionFalse,
Reason: string(inference.InferencePoolReasonInvalidExtensionRef),
Message: msg,
}
}
3 changes: 2 additions & 1 deletion internal/controller/state/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ func BuildGraph(
processedSnippetsFilters,
state.InferencePools,
)
referencedInferencePools := buildReferencedInferencePools(routes, gws, state.InferencePools)

referencedInferencePools := buildReferencedInferencePools(routes, gws, state.InferencePools, state.Services)

l4routes := buildL4RoutesForGateways(
state.TLSRoutes,
Expand Down
26 changes: 20 additions & 6 deletions internal/controller/state/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ func TestBuildGraph(t *testing.T) {
Namespace: testNs,
Name: controller.CreateInferencePoolServiceName("ipool"),
},
ServicePort: v1.ServicePort{Port: 80},
Valid: true,
Weight: 1,
InvalidForGateways: map[types.NamespacedName]conditions.Condition{},
IsInferencePool: true,
EndpointPickerConfig: &inference.EndpointPickerRef{},
ServicePort: v1.ServicePort{Port: 80},
Valid: true,
Weight: 1,
InvalidForGateways: map[types.NamespacedName]conditions.Condition{},
IsInferencePool: true,
EndpointPickerConfig: &inference.EndpointPickerRef{
Kind: kinds.Service,
Name: inference.ObjectName(controller.CreateInferencePoolServiceName("ipool")),
},
},
}
rbrs := []RouteBackendRef{
Expand Down Expand Up @@ -389,6 +392,10 @@ func TestBuildGraph(t *testing.T) {
TargetPorts: []inference.Port{
{Number: 80},
},
EndpointPickerRef: inference.EndpointPickerRef{
Kind: kinds.Service,
Name: inference.ObjectName(controller.CreateInferencePoolServiceName("ipool")),
},
},
}

Expand Down Expand Up @@ -1325,6 +1332,13 @@ func TestBuildGraph(t *testing.T) {
ReferencedInferencePools: map[types.NamespacedName]*ReferencedInferencePool{
client.ObjectKeyFromObject(inferencePool): {
Source: inferencePool,
Gateways: []*gatewayv1.Gateway{
gw1.Source,
},
HTTPRoutes: []*L7Route{
inferenceRoute,
},
Conditions: []conditions.Condition{},
},
},
ReferencedCaCertConfigMaps: map[types.NamespacedName]*CaCertConfigMap{
Expand Down
100 changes: 97 additions & 3 deletions internal/controller/state/graph/inferencepools.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package graph

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
inference "sigs.k8s.io/gateway-api-inference-extension/api/v1"
apiv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/nginx/nginx-gateway-fabric/v2/internal/controller/state/conditions"
"github.com/nginx/nginx-gateway-fabric/v2/internal/framework/controller"
"github.com/nginx/nginx-gateway-fabric/v2/internal/framework/kinds"
)
Expand All @@ -14,6 +19,12 @@ import (
type ReferencedInferencePool struct {
// Source is the original InferencePool that this ReferencedInferencePool is based on.
Source *inference.InferencePool
// Gateways are the Gateways that this ReferencedInferencePool is attached to.
Gateways []*apiv1.Gateway
// HTTPRoutes are the HTTPRoutes that reference this InferencePool.
HTTPRoutes []*L7Route
// Conditions contains the conditions that should be applied to the InferencePool.
Conditions []conditions.Condition
}

// buildReferencedInferencePools builds a map of InferencePools that are referenced by HTTPRoutes
Expand All @@ -22,8 +33,9 @@ func buildReferencedInferencePools(
routes map[RouteKey]*L7Route,
gws map[types.NamespacedName]*Gateway,
inferencePools map[types.NamespacedName]*inference.InferencePool,
services map[types.NamespacedName]*v1.Service,
) map[types.NamespacedName]*ReferencedInferencePool {
referencedInferencePools := make(map[types.NamespacedName]*ReferencedInferencePool)
referencedInferencePools := make(map[types.NamespacedName]*ReferencedInferencePool, len(inferencePools))

for _, gw := range gws {
if gw == nil {
Expand All @@ -37,6 +49,17 @@ func buildReferencedInferencePools(
return nil
}

// validate each referenced InferencePool and add conditions.
for _, refPool := range referencedInferencePools {
if routeCond := validateInferencePoolRoutesAcceptance(refPool.Source, refPool.HTTPRoutes); routeCond != nil {
refPool.Conditions = append(refPool.Conditions, *routeCond)
}

if extensionRefCond := validateInferencePoolExtensionRef(refPool.Source, services); extensionRefCond != nil {
refPool.Conditions = append(refPool.Conditions, *extensionRefCond)
}
}

return referencedInferencePools
}

Expand All @@ -48,8 +71,9 @@ func processInferencePoolsForGateway(
inferencePools map[types.NamespacedName]*inference.InferencePool,
) {
gwKey := client.ObjectKeyFromObject(gw.Source)

for _, route := range routes {
if !route.Valid || !routeBelongsToGateway(route.ParentRefs, gwKey) {
if !routeBelongsToGateway(route.ParentRefs, gwKey) {
continue
}

Expand All @@ -70,13 +94,83 @@ func processInferencePoolsForGateway(
}

if _, referenced := referencedInferencePools[poolName]; !referenced {
referencedInferencePools[poolName] = &ReferencedInferencePool{}
referencedInferencePools[poolName] = &ReferencedInferencePool{
Conditions: make([]conditions.Condition, 0, 2),
Gateways: make([]*apiv1.Gateway, 0),
HTTPRoutes: make([]*L7Route, 0),
}
}

if pool, exists := inferencePools[poolName]; exists {
referencedInferencePools[poolName].Source = pool
referencedInferencePools[poolName].Gateways = append(
referencedInferencePools[poolName].Gateways,
gw.Source,
)
referencedInferencePools[poolName].HTTPRoutes = append(
referencedInferencePools[poolName].HTTPRoutes,
route,
)
}
}
}
}
}

// validateInferencePoolExtensionRef validates the ExtensionRef of the InferencePool.
func validateInferencePoolExtensionRef(
ip *inference.InferencePool,
svc map[types.NamespacedName]*v1.Service,
) *conditions.Condition {
var failingCond conditions.Condition
if ip == nil {
return nil
}

// if kind is empty, it defaults to Service
kind := string(ip.Spec.EndpointPickerRef.Kind)
if kind == "" {
kind = kinds.Service
}

if kind != kinds.Service {
failingCond = conditions.NewInferencePoolInvalidExtensionref("Invalid ExtensionRef kind: " + kind)
return &failingCond
}

eppNsName := types.NamespacedName{
Name: string(ip.Spec.EndpointPickerRef.Name),
Namespace: ip.GetNamespace(),
}

if _, ok := svc[eppNsName]; !ok {
failingCond = conditions.NewInferencePoolInvalidExtensionref("ExtensionRef Service not found: " + eppNsName.String())
return &failingCond
}

return nil
}

// validateInferencePoolRoutesAcceptance checks if the routes that reference the InferencePool
// are accepted by the Gateway.
func validateInferencePoolRoutesAcceptance(ip *inference.InferencePool, routes []*L7Route) *conditions.Condition {
if ip == nil || len(routes) == 0 {
return nil
}

// we do not need to validate that the route belongs to the gateway or not
// we only process routes that belong to the gateway in the first place
for _, route := range routes {
if !route.Valid {
cond := conditions.NewInferencePoolInvalidHTTPRouteNotAccepted(
fmt.Sprintf("Referenced HTTPRoute %s/%s is not accepted by the Gateway",
route.Source.GetNamespace(),
route.Source.GetName(),
),
)
return &cond
}
}

return nil
}
Loading
Loading