Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions api/adc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ type Metadata struct {
}

type Resources struct {
ConsumerGroups []*ConsumerGroup `json:"consumer_groups,omitempty" yaml:"consumer_groups,omitempty"`
Consumers []*ConsumerElement `json:"consumers,omitempty" yaml:"consumers,omitempty"`
GlobalRules Plugins `json:"global_rules,omitempty" yaml:"global_rules,omitempty"`
PluginMetadata Plugins `json:"plugin_metadata,omitempty" yaml:"plugin_metadata,omitempty"`
Services []*Service `json:"services,omitempty" yaml:"services,omitempty"`
SSLs []*SSL `json:"ssls,omitempty" yaml:"ssls,omitempty"`
ConsumerGroups []*ConsumerGroup `json:"consumer_groups,omitempty" yaml:"consumer_groups,omitempty"`
Consumers []*Consumer `json:"consumers,omitempty" yaml:"consumers,omitempty"`
GlobalRules Plugins `json:"global_rules,omitempty" yaml:"global_rules,omitempty"`
PluginMetadata Plugins `json:"plugin_metadata,omitempty" yaml:"plugin_metadata,omitempty"`
Services []*Service `json:"services,omitempty" yaml:"services,omitempty"`
SSLs []*SSL `json:"ssls,omitempty" yaml:"ssls,omitempty"`
}

type ConsumerGroup struct {
Metadata `json:",inline" yaml:",inline"`
Consumers []ConsumerElement `json:"consumers,omitempty" yaml:"consumers,omitempty"`
Name string `json:"name" yaml:"name"`
Plugins Plugins `json:"plugins" yaml:"plugins"`
Consumers []Consumer `json:"consumers,omitempty" yaml:"consumers,omitempty"`
Name string `json:"name" yaml:"name"`
Plugins Plugins `json:"plugins" yaml:"plugins"`
}

type ConsumerElement struct {
type Consumer struct {
Credentials []Credential `json:"credentials,omitempty" yaml:"credentials,omitempty"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Expand Down Expand Up @@ -415,6 +415,19 @@ func ComposeServiceNameWithRule(namespace, name string, rule string) string {
return buf.String()
}

func ComposeConsumerName(namespace, name string) string {
// FIXME Use sync.Pool to reuse this buffer if the upstream
// name composing code path is hot.
p := make([]byte, 0, len(namespace)+len(name)+1)
buf := bytes.NewBuffer(p)

buf.WriteString(namespace)
buf.WriteByte('_')
buf.WriteString(name)

return buf.String()
}

// NewDefaultUpstream returns an empty Upstream with default values.
func NewDefaultService() *Service {
return &Service{
Expand Down
10 changes: 7 additions & 3 deletions api/v1alpha1/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ type ConsumerSpec struct {
}

type GatewayRef struct {
Name string `json:"name,omitempty"`
Kind string `json:"kind,omitempty"`
Group string `json:"group,omitempty"`
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Name string `json:"name"`
// +kubebuilder:default=Gateway
Kind *string `json:"kind,omitempty"`
// +kubebuilder:default=gateway.networking.k8s.io
Group *string `json:"group,omitempty"`
Namespace *string `json:"namespace,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/pluginconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Plugin struct {
// The plugin name.
Name string `json:"name" yaml:"name"`
// Plugin configuration.
Config apiextensionsv1.JSON `json:"config" yaml:"config"`
Config apiextensionsv1.JSON `json:"config,omitempty" yaml:"config,omitempty"`
}

func init() {
Expand Down
10 changes: 10 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion config/crd/bases/gateway.apisix.io_consumers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ spec:
gatewayRef:
properties:
group:
default: gateway.networking.k8s.io
type: string
kind:
default: Gateway
type: string
name:
minLength: 1
type: string
namespace:
type: string
required:
- name
type: object
plugins:
items:
Expand All @@ -85,7 +90,6 @@ spec:
description: The plugin name.
type: string
required:
- config
- name
type: object
type: array
Expand Down
1 change: 0 additions & 1 deletion config/crd/bases/gateway.apisix.io_pluginconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ spec:
description: The plugin name.
type: string
required:
- config
- name
type: object
type: array
Expand Down
160 changes: 159 additions & 1 deletion internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@ import (
"context"

"github.com/api7/api7-ingress-controller/api/v1alpha1"
"github.com/api7/api7-ingress-controller/internal/controller/indexer"
"github.com/api7/api7-ingress-controller/internal/provider"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// ConsumerReconciler reconciles a Gateway object.
Expand All @@ -24,10 +34,59 @@ type ConsumerReconciler struct { //nolint:revive
// SetupWithManager sets up the controller with the Manager.
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.Consumer{}).
For(&v1alpha1.Consumer{},
builder.WithPredicates(
predicate.NewPredicateFuncs(r.checkGatewayRef),
),
).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Watches(&gatewayv1.Gateway{},
handler.EnqueueRequestsFromMapFunc(r.listConsumersForGateway),
builder.WithPredicates(
predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
},
),
).
Complete(r)
}

func (r *ConsumerReconciler) listConsumersForGateway(ctx context.Context, obj client.Object) []reconcile.Request {
gateway, ok := obj.(*gatewayv1.Gateway)
if !ok {
r.Log.Error(nil, "failed to convert to Gateway", "object", obj)
return nil
}
consumerList := &v1alpha1.ConsumerList{}
if err := r.List(ctx, consumerList, client.MatchingFields{
indexer.ConsumerGatewayRef: indexer.GenIndexKey(gateway.Name, gateway.GetNamespace()),
}); err != nil {
r.Log.Error(err, "failed to list consumers")
return nil
}
requests := make([]reconcile.Request, 0, len(consumerList.Items))
for _, consumer := range consumerList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Name: consumer.Name,
Namespace: consumer.Namespace,
},
})
}
return requests
}

func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
consumer := new(v1alpha1.Consumer)
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
Expand All @@ -43,8 +102,107 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := r.Provider.Delete(ctx, consumer); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

var statusErr error
tctx := provider.NewDefaultTranslateContext()

if err := r.processSpec(ctx, tctx, consumer); err != nil {
r.Log.Error(err, "failed to process consumer spec", "consumer", consumer)
statusErr = err
}

if err := r.Provider.Update(ctx, tctx, consumer); err != nil {
r.Log.Error(err, "failed to update consumer", "consumer", consumer)
statusErr = err
}

if err := r.updateStatus(ctx, consumer, statusErr); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *ConsumerReconciler) processSpec(ctx context.Context, tctx *provider.TranslateContext, consumer *v1alpha1.Consumer) error {
for _, credential := range consumer.Spec.Credentials {
if credential.SecretRef == nil {
continue
}
ns := consumer.GetNamespace()
if credential.SecretRef.Namespace != nil {
ns = *credential.SecretRef.Namespace
}
secret := corev1.Secret{}
if err := r.Get(ctx, client.ObjectKey{
Name: credential.SecretRef.Name,
Namespace: ns,
}, &secret); err != nil {
if client.IgnoreNotFound(err) == nil {
continue
}
r.Log.Error(err, "failed to get secret", "secret", credential.SecretRef.Name)
return err
}

tctx.Secrets[types.NamespacedName{
Namespace: ns,
Name: credential.SecretRef.Name,
}] = &secret

}
return nil
}

func (r *ConsumerReconciler) updateStatus(ctx context.Context, consumer *v1alpha1.Consumer, err error) error {
condition := NewCondition(consumer.Generation, true, "Successfully")
if err != nil {
condition = NewCondition(consumer.Generation, false, err.Error())
}
if !VerifyConditions(&consumer.Status.Conditions, condition) {
return nil
}
meta.SetStatusCondition(&consumer.Status.Conditions, condition)
if err := r.Status().Update(ctx, consumer); err != nil {
r.Log.Error(err, "failed to update consumer status", "consumer", consumer)
return err
}
return nil
}

func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool {
consumer, ok := object.(*v1alpha1.Consumer)
if !ok {
return false
}
if consumer.Spec.GatewayRef.Name == "" {
return false
}
if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway {
return false
}
if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName {
return false
}
ns := consumer.GetNamespace()
if consumer.Spec.GatewayRef.Namespace != nil {
ns = *consumer.Spec.GatewayRef.Namespace
}
gateway := &gatewayv1.Gateway{}
if err := r.Get(context.Background(), client.ObjectKey{
Name: consumer.Spec.GatewayRef.Name,
Namespace: ns,
}, gateway); err != nil {
r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name)
return false
}
gatewayClass := &gatewayv1.GatewayClass{}
if err := r.Client.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
r.Log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName)
return false
}
return matchesController(string(gatewayClass.Spec.ControllerName))
}
44 changes: 37 additions & 7 deletions internal/controller/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ package indexer
import (
"context"

"github.com/api7/api7-ingress-controller/api/v1alpha1"
networkingv1 "k8s.io/api/networking/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
)

const (
ServiceIndexRef = "serviceRefs"
ExtensionRef = "extensionRef"
ParametersRef = "parametersRef"
ParentRefs = "parentRefs"
IngressClass = "ingressClass"
SecretIndexRef = "secretRefs"
IngressClassRef = "ingressClassRef"
ServiceIndexRef = "serviceRefs"
ExtensionRef = "extensionRef"
ParametersRef = "parametersRef"
ParentRefs = "parentRefs"
IngressClass = "ingressClass"
SecretIndexRef = "secretRefs"
IngressClassRef = "ingressClassRef"
ConsumerGatewayRef = "consumerGatewayRef"
)

func SetupIndexer(mgr ctrl.Manager) error {
Expand All @@ -29,6 +31,9 @@ func SetupIndexer(mgr ctrl.Manager) error {
if err := setupIngressIndexer(mgr); err != nil {
return err
}
if err := setupConsumerIndexer(mgr); err != nil {
return err
}
return nil
}

Expand All @@ -44,6 +49,31 @@ func setupGatewayIndexer(mgr ctrl.Manager) error {
return nil
}

func setupConsumerIndexer(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&v1alpha1.Consumer{},
ConsumerGatewayRef,
ConsumerGatewayRefIndexFunc,
); err != nil {
return err
}
return nil
}
func ConsumerGatewayRefIndexFunc(rawObj client.Object) []string {
consumer := rawObj.(*v1alpha1.Consumer)

if consumer.Spec.GatewayRef.Name == "" {
return nil
}

ns := consumer.GetNamespace()
if consumer.Spec.GatewayRef.Namespace != nil {
ns = *consumer.Spec.GatewayRef.Namespace
}
return []string{GenIndexKey(ns, consumer.Spec.GatewayRef.Name)}
}

func setupHTTPRouteIndexer(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
Expand Down
Loading
Loading