Skip to content

Commit c86cfcb

Browse files
committed
feat: support consumer translator
1 parent b03aa71 commit c86cfcb

File tree

17 files changed

+411
-34
lines changed

17 files changed

+411
-34
lines changed

api/adc/types.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,22 @@ type Metadata struct {
6868
}
6969

7070
type Resources struct {
71-
ConsumerGroups []*ConsumerGroup `json:"consumer_groups,omitempty" yaml:"consumer_groups,omitempty"`
72-
Consumers []*ConsumerElement `json:"consumers,omitempty" yaml:"consumers,omitempty"`
73-
GlobalRules Plugins `json:"global_rules,omitempty" yaml:"global_rules,omitempty"`
74-
PluginMetadata Plugins `json:"plugin_metadata,omitempty" yaml:"plugin_metadata,omitempty"`
75-
Services []*Service `json:"services,omitempty" yaml:"services,omitempty"`
76-
SSLs []*SSL `json:"ssls,omitempty" yaml:"ssls,omitempty"`
71+
ConsumerGroups []*ConsumerGroup `json:"consumer_groups,omitempty" yaml:"consumer_groups,omitempty"`
72+
Consumers []*Consumer `json:"consumers,omitempty" yaml:"consumers,omitempty"`
73+
GlobalRules Plugins `json:"global_rules,omitempty" yaml:"global_rules,omitempty"`
74+
PluginMetadata Plugins `json:"plugin_metadata,omitempty" yaml:"plugin_metadata,omitempty"`
75+
Services []*Service `json:"services,omitempty" yaml:"services,omitempty"`
76+
SSLs []*SSL `json:"ssls,omitempty" yaml:"ssls,omitempty"`
7777
}
7878

7979
type ConsumerGroup struct {
8080
Metadata `json:",inline" yaml:",inline"`
81-
Consumers []ConsumerElement `json:"consumers,omitempty" yaml:"consumers,omitempty"`
82-
Name string `json:"name" yaml:"name"`
83-
Plugins Plugins `json:"plugins" yaml:"plugins"`
81+
Consumers []Consumer `json:"consumers,omitempty" yaml:"consumers,omitempty"`
82+
Name string `json:"name" yaml:"name"`
83+
Plugins Plugins `json:"plugins" yaml:"plugins"`
8484
}
8585

86-
type ConsumerElement struct {
86+
type Consumer struct {
8787
Credentials []Credential `json:"credentials,omitempty" yaml:"credentials,omitempty"`
8888
Description string `json:"description,omitempty" yaml:"description,omitempty"`
8989
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
@@ -415,6 +415,19 @@ func ComposeServiceNameWithRule(namespace, name string, rule string) string {
415415
return buf.String()
416416
}
417417

418+
func ComposeConsumerName(namespace, name string) string {
419+
// FIXME Use sync.Pool to reuse this buffer if the upstream
420+
// name composing code path is hot.
421+
p := make([]byte, 0, len(namespace)+len(name)+1)
422+
buf := bytes.NewBuffer(p)
423+
424+
buf.WriteString(namespace)
425+
buf.WriteByte('_')
426+
buf.WriteString(name)
427+
428+
return buf.String()
429+
}
430+
418431
// NewDefaultUpstream returns an empty Upstream with default values.
419432
func NewDefaultService() *Service {
420433
return &Service{

api/v1alpha1/consumer_types.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ type ConsumerSpec struct {
2222
}
2323

2424
type GatewayRef struct {
25-
Name string `json:"name,omitempty"`
26-
Kind string `json:"kind,omitempty"`
27-
Group string `json:"group,omitempty"`
25+
// +kubebuilder:validation:Required
26+
// +kubebuilder:validation:MinLength=1
27+
Name string `json:"name"`
28+
// +kubebuilder:default=Gateway
29+
Kind *string `json:"kind,omitempty"`
30+
// +kubebuilder:default=gateway.networking.k8s.io
31+
Group *string `json:"group,omitempty"`
2832
Namespace *string `json:"namespace,omitempty"`
2933
}
3034

api/v1alpha1/pluginconfig_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type Plugin struct {
3636
// The plugin name.
3737
Name string `json:"name" yaml:"name"`
3838
// Plugin configuration.
39-
Config apiextensionsv1.JSON `json:"config" yaml:"config"`
39+
Config apiextensionsv1.JSON `json:"config,omitempty" yaml:"config,omitempty"`
4040
}
4141

4242
func init() {

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/gateway.apisix.io_consumers.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,18 @@ spec:
6767
gatewayRef:
6868
properties:
6969
group:
70+
default: gateway.networking.k8s.io
7071
type: string
7172
kind:
73+
default: Gateway
7274
type: string
7375
name:
76+
minLength: 1
7477
type: string
7578
namespace:
7679
type: string
80+
required:
81+
- name
7782
type: object
7883
plugins:
7984
items:
@@ -85,7 +90,6 @@ spec:
8590
description: The plugin name.
8691
type: string
8792
required:
88-
- config
8993
- name
9094
type: object
9195
type: array

config/crd/bases/gateway.apisix.io_pluginconfigs.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ spec:
4949
description: The plugin name.
5050
type: string
5151
required:
52-
- config
5352
- name
5453
type: object
5554
type: array

internal/controller/consumer_controller.go

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,22 @@ import (
44
"context"
55

66
"github.com/api7/api7-ingress-controller/api/v1alpha1"
7+
"github.com/api7/api7-ingress-controller/internal/controller/indexer"
78
"github.com/api7/api7-ingress-controller/internal/provider"
89
"github.com/go-logr/logr"
10+
corev1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/api/meta"
912
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1013
"k8s.io/apimachinery/pkg/runtime"
14+
"k8s.io/apimachinery/pkg/types"
1115
ctrl "sigs.k8s.io/controller-runtime"
16+
"sigs.k8s.io/controller-runtime/pkg/builder"
1217
"sigs.k8s.io/controller-runtime/pkg/client"
18+
"sigs.k8s.io/controller-runtime/pkg/event"
19+
"sigs.k8s.io/controller-runtime/pkg/handler"
20+
"sigs.k8s.io/controller-runtime/pkg/predicate"
21+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
22+
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
1323
)
1424

1525
// ConsumerReconciler reconciles a Gateway object.
@@ -24,10 +34,59 @@ type ConsumerReconciler struct { //nolint:revive
2434
// SetupWithManager sets up the controller with the Manager.
2535
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
2636
return ctrl.NewControllerManagedBy(mgr).
27-
For(&v1alpha1.Consumer{}).
37+
For(&v1alpha1.Consumer{},
38+
builder.WithPredicates(
39+
predicate.NewPredicateFuncs(r.checkGatewayRef),
40+
),
41+
).
42+
WithEventFilter(predicate.GenerationChangedPredicate{}).
43+
Watches(&gatewayv1.Gateway{},
44+
handler.EnqueueRequestsFromMapFunc(r.listConsumersForGateway),
45+
builder.WithPredicates(
46+
predicate.Funcs{
47+
GenericFunc: func(e event.GenericEvent) bool {
48+
return false
49+
},
50+
DeleteFunc: func(e event.DeleteEvent) bool {
51+
return false
52+
},
53+
CreateFunc: func(e event.CreateEvent) bool {
54+
return true
55+
},
56+
UpdateFunc: func(e event.UpdateEvent) bool {
57+
return true
58+
},
59+
},
60+
),
61+
).
2862
Complete(r)
2963
}
3064

65+
func (r *ConsumerReconciler) listConsumersForGateway(ctx context.Context, obj client.Object) []reconcile.Request {
66+
gateway, ok := obj.(*gatewayv1.Gateway)
67+
if !ok {
68+
r.Log.Error(nil, "failed to convert to Gateway", "object", obj)
69+
return nil
70+
}
71+
consumerList := &v1alpha1.ConsumerList{}
72+
if err := r.List(ctx, consumerList, client.MatchingFields{
73+
indexer.ConsumerGatewayRef: indexer.GenIndexKey(gateway.Name, gateway.GetNamespace()),
74+
}); err != nil {
75+
r.Log.Error(err, "failed to list consumers")
76+
return nil
77+
}
78+
var requests []reconcile.Request
79+
for _, consumer := range consumerList.Items {
80+
requests = append(requests, reconcile.Request{
81+
NamespacedName: client.ObjectKey{
82+
Name: consumer.Name,
83+
Namespace: consumer.Namespace,
84+
},
85+
})
86+
}
87+
return requests
88+
}
89+
3190
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
3291
consumer := new(v1alpha1.Consumer)
3392
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
@@ -41,10 +100,109 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
41100
}
42101

43102
if err := r.Provider.Delete(ctx, consumer); err != nil {
103+
r.Log.Error(err, "failed to delete consumer", "consumer", consumer)
44104
return ctrl.Result{}, err
45105
}
106+
return ctrl.Result{}, nil
46107
}
108+
r.Log.Error(err, "failed to get consumer", "consumer", consumer)
47109
return ctrl.Result{}, err
48110
}
111+
112+
var statusErr error
113+
tctx := provider.NewDefaultTranslateContext()
114+
115+
if err := r.processSepc(ctx, tctx, consumer); err != nil {
116+
r.Log.Error(err, "failed to process consumer spec", "consumer", consumer)
117+
statusErr = err
118+
}
119+
120+
if err := r.Provider.Update(ctx, tctx, consumer); err != nil {
121+
r.Log.Error(err, "failed to update consumer", "consumer", consumer)
122+
statusErr = err
123+
}
124+
125+
if err := r.updateStatus(ctx, consumer, statusErr); err != nil {
126+
r.Log.Error(err, "failed to update consumer status", "consumer", consumer)
127+
return ctrl.Result{}, err
128+
}
129+
49130
return ctrl.Result{}, nil
50131
}
132+
133+
func (r *ConsumerReconciler) processSepc(ctx context.Context, tctx *provider.TranslateContext, consumer *v1alpha1.Consumer) error {
134+
r.Log.Info("Processing consumer", "name", consumer.Name, "namespace", consumer.Namespace)
135+
136+
for _, credential := range consumer.Spec.Credentials {
137+
if credential.SecretRef == nil {
138+
continue
139+
}
140+
ns := consumer.GetNamespace()
141+
if credential.SecretRef.Namespace != nil {
142+
ns = string(*credential.SecretRef.Namespace)
143+
}
144+
secret := corev1.Secret{}
145+
if err := r.Get(ctx, client.ObjectKey{
146+
Name: credential.SecretRef.Name,
147+
Namespace: ns,
148+
}, &secret); err != nil {
149+
if client.IgnoreNotFound(err) == nil {
150+
continue
151+
}
152+
r.Log.Error(err, "failed to get secret", "secret", credential.SecretRef.Name)
153+
return err
154+
}
155+
156+
tctx.Secrets[types.NamespacedName{
157+
Namespace: ns,
158+
Name: credential.SecretRef.Name,
159+
}] = &secret
160+
161+
}
162+
return nil
163+
}
164+
165+
func (r *ConsumerReconciler) updateStatus(ctx context.Context, consumer *v1alpha1.Consumer, err error) error {
166+
condition := NewCondition(consumer.Generation, true, "Successfully")
167+
if err != nil {
168+
condition = NewCondition(consumer.Generation, false, err.Error())
169+
}
170+
if !VerifyConditions(&consumer.Status.Conditions, condition) {
171+
return nil
172+
}
173+
meta.SetStatusCondition(&consumer.Status.Conditions, condition)
174+
if err := r.Status().Update(ctx, consumer); err != nil {
175+
r.Log.Error(err, "failed to update consumer status", "consumer", consumer)
176+
return err
177+
}
178+
return nil
179+
}
180+
181+
func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool {
182+
consumer, ok := object.(*v1alpha1.Consumer)
183+
if !ok {
184+
return false
185+
}
186+
if consumer.Spec.GatewayRef.Name == "" {
187+
return false
188+
}
189+
if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway {
190+
return false
191+
}
192+
if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName {
193+
return false
194+
}
195+
ns := consumer.GetNamespace()
196+
if consumer.Spec.GatewayRef.Namespace != nil {
197+
ns = string(*consumer.Spec.GatewayRef.Namespace)
198+
}
199+
gateway := &gatewayv1.Gateway{}
200+
if err := r.Get(context.TODO(), client.ObjectKey{
201+
Name: consumer.Spec.GatewayRef.Name,
202+
Namespace: ns,
203+
}, gateway); err != nil {
204+
r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name)
205+
return false
206+
}
207+
return true
208+
}

internal/controller/indexer/indexer.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@ package indexer
33
import (
44
"context"
55

6+
"github.com/api7/api7-ingress-controller/api/v1alpha1"
67
networkingv1 "k8s.io/api/networking/v1"
78
ctrl "sigs.k8s.io/controller-runtime"
89
"sigs.k8s.io/controller-runtime/pkg/client"
910
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
1011
)
1112

1213
const (
13-
ServiceIndexRef = "serviceRefs"
14-
ExtensionRef = "extensionRef"
15-
ParametersRef = "parametersRef"
16-
ParentRefs = "parentRefs"
17-
IngressClass = "ingressClass"
18-
SecretIndexRef = "secretRefs"
19-
IngressClassRef = "ingressClassRef"
14+
ServiceIndexRef = "serviceRefs"
15+
ExtensionRef = "extensionRef"
16+
ParametersRef = "parametersRef"
17+
ParentRefs = "parentRefs"
18+
IngressClass = "ingressClass"
19+
SecretIndexRef = "secretRefs"
20+
IngressClassRef = "ingressClassRef"
21+
ConsumerGatewayRef = "consumerGatewayRef"
2022
)
2123

2224
func SetupIndexer(mgr ctrl.Manager) error {
@@ -29,6 +31,9 @@ func SetupIndexer(mgr ctrl.Manager) error {
2931
if err := setupIngressIndexer(mgr); err != nil {
3032
return err
3133
}
34+
if err := setupConsumerIndexer(mgr); err != nil {
35+
return err
36+
}
3237
return nil
3338
}
3439

@@ -44,6 +49,31 @@ func setupGatewayIndexer(mgr ctrl.Manager) error {
4449
return nil
4550
}
4651

52+
func setupConsumerIndexer(mgr ctrl.Manager) error {
53+
if err := mgr.GetFieldIndexer().IndexField(
54+
context.Background(),
55+
&v1alpha1.Consumer{},
56+
ConsumerGatewayRef,
57+
ConsumerGatewayRefIndexFunc,
58+
); err != nil {
59+
return err
60+
}
61+
return nil
62+
}
63+
func ConsumerGatewayRefIndexFunc(rawObj client.Object) []string {
64+
consumer := rawObj.(*v1alpha1.Consumer)
65+
66+
if consumer.Spec.GatewayRef.Name == "" {
67+
return nil
68+
}
69+
70+
ns := consumer.GetNamespace()
71+
if consumer.Spec.GatewayRef.Namespace != nil {
72+
ns = string(*consumer.Spec.GatewayRef.Namespace)
73+
}
74+
return []string{GenIndexKey(ns, consumer.Spec.GatewayRef.Name)}
75+
}
76+
4777
func setupHTTPRouteIndexer(mgr ctrl.Manager) error {
4878
if err := mgr.GetFieldIndexer().IndexField(
4979
context.Background(),

0 commit comments

Comments
 (0)