Skip to content

Commit dc33d1d

Browse files
ronethingAlinsRan
andauthored
fix: full sync during restart results in loss of dataplane traffic (#2489) (#211)
Signed-off-by: Ashing Zheng <[email protected]> Co-authored-by: AlinsRan <[email protected]>
1 parent 9c57124 commit dc33d1d

24 files changed

+717
-944
lines changed

internal/controller/apisixconsumer_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
4040
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4141
"github.com/apache/apisix-ingress-controller/internal/controller/status"
42+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4243
"github.com/apache/apisix-ingress-controller/internal/provider"
4344
"github.com/apache/apisix-ingress-controller/internal/utils"
4445
)
@@ -51,10 +52,12 @@ type ApisixConsumerReconciler struct {
5152

5253
Provider provider.Provider
5354
Updater status.Updater
55+
Readier readiness.ReadinessManager
5456
}
5557

5658
// Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting)
5759
func (r *ApisixConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
60+
defer r.Readier.Done(&apiv2.ApisixConsumer{}, req.NamespacedName)
5861
r.Log.Info("reconcile", "request", req.NamespacedName)
5962

6063
ac := &apiv2.ApisixConsumer{}

internal/controller/apisixglobalrule_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/apache/apisix-ingress-controller/internal/controller/config"
3939
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4040
"github.com/apache/apisix-ingress-controller/internal/controller/status"
41+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4142
"github.com/apache/apisix-ingress-controller/internal/provider"
4243
"github.com/apache/apisix-ingress-controller/internal/utils"
4344
)
@@ -49,10 +50,13 @@ type ApisixGlobalRuleReconciler struct {
4950
Log logr.Logger
5051
Provider provider.Provider
5152
Updater status.Updater
53+
54+
Readier readiness.ReadinessManager
5255
}
5356

5457
// Reconcile implements the reconciliation logic for ApisixGlobalRule
5558
func (r *ApisixGlobalRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
59+
defer r.Readier.Done(&apiv2.ApisixGlobalRule{}, req.NamespacedName)
5660
var globalRule apiv2.ApisixGlobalRule
5761
if err := r.Get(ctx, req.NamespacedName, &globalRule); err != nil {
5862
if client.IgnoreNotFound(err) == nil {

internal/controller/apisixroute_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
4545
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4646
"github.com/apache/apisix-ingress-controller/internal/controller/status"
47+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4748
"github.com/apache/apisix-ingress-controller/internal/provider"
4849
"github.com/apache/apisix-ingress-controller/internal/types"
4950
"github.com/apache/apisix-ingress-controller/internal/utils"
@@ -57,6 +58,7 @@ type ApisixRouteReconciler struct {
5758
Log logr.Logger
5859
Provider provider.Provider
5960
Updater status.Updater
61+
Readier readiness.ReadinessManager
6062
}
6163

6264
// SetupWithManager sets up the controller with the Manager.
@@ -97,6 +99,7 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
9799
}
98100

99101
func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
102+
defer r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName)
100103
var ar apiv2.ApisixRoute
101104
if err := r.Get(ctx, req.NamespacedName, &ar); err != nil {
102105
if client.IgnoreNotFound(err) == nil {

internal/controller/apisixtls_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/apache/apisix-ingress-controller/internal/controller/config"
4040
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4141
"github.com/apache/apisix-ingress-controller/internal/controller/status"
42+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4243
"github.com/apache/apisix-ingress-controller/internal/provider"
4344
"github.com/apache/apisix-ingress-controller/internal/utils"
4445
)
@@ -50,6 +51,7 @@ type ApisixTlsReconciler struct {
5051
Log logr.Logger
5152
Provider provider.Provider
5253
Updater status.Updater
54+
Readier readiness.ReadinessManager
5355
}
5456

5557
// SetupWithManager sets up the controller with the Manager.
@@ -85,6 +87,7 @@ func (r *ApisixTlsReconciler) SetupWithManager(mgr ctrl.Manager) error {
8587

8688
// Reconcile implements the reconciliation logic for ApisixTls
8789
func (r *ApisixTlsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
90+
defer r.Readier.Done(&apiv2.ApisixTls{}, req.NamespacedName)
8891
var tls apiv2.ApisixTls
8992
if err := r.Get(ctx, req.NamespacedName, &tls); err != nil {
9093
if client.IgnoreNotFound(err) == nil {

internal/controller/consumer_controller.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
3939
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4040
"github.com/apache/apisix-ingress-controller/internal/controller/status"
41+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4142
"github.com/apache/apisix-ingress-controller/internal/provider"
4243
"github.com/apache/apisix-ingress-controller/internal/utils"
4344
)
@@ -51,6 +52,7 @@ type ConsumerReconciler struct { //nolint:revive
5152
Provider provider.Provider
5253

5354
Updater status.Updater
55+
Readier readiness.ReadinessManager
5456
}
5557

5658
// SetupWithManager sets up the controller with the Manager.
@@ -181,6 +183,7 @@ func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, o
181183
}
182184

183185
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
186+
defer r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName)
184187
consumer := new(v1alpha1.Consumer)
185188
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
186189
if client.IgnoreNotFound(err) == nil {
@@ -303,31 +306,5 @@ func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool {
303306
if !ok {
304307
return false
305308
}
306-
if consumer.Spec.GatewayRef.Name == "" {
307-
return false
308-
}
309-
if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway {
310-
return false
311-
}
312-
if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName {
313-
return false
314-
}
315-
ns := consumer.GetNamespace()
316-
if consumer.Spec.GatewayRef.Namespace != nil {
317-
ns = *consumer.Spec.GatewayRef.Namespace
318-
}
319-
gateway := &gatewayv1.Gateway{}
320-
if err := r.Get(context.Background(), client.ObjectKey{
321-
Name: consumer.Spec.GatewayRef.Name,
322-
Namespace: ns,
323-
}, gateway); err != nil {
324-
r.Log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name)
325-
return false
326-
}
327-
gatewayClass := &gatewayv1.GatewayClass{}
328-
if err := r.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
329-
r.Log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName)
330-
return false
331-
}
332-
return matchesController(string(gatewayClass.Spec.ControllerName))
309+
return MatchConsumerGatewayRef(context.Background(), r.Client, r.Log, consumer)
333310
}

internal/controller/httproute_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
4949
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
5050
"github.com/apache/apisix-ingress-controller/internal/controller/status"
51+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
5152
"github.com/apache/apisix-ingress-controller/internal/provider"
5253
"github.com/apache/apisix-ingress-controller/internal/types"
5354
"github.com/apache/apisix-ingress-controller/internal/utils"
@@ -65,6 +66,7 @@ type HTTPRouteReconciler struct { //nolint:revive
6566
genericEvent chan event.GenericEvent
6667

6768
Updater status.Updater
69+
Readier readiness.ReadinessManager
6870
}
6971

7072
// SetupWithManager sets up the controller with the Manager.
@@ -130,6 +132,7 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
130132
}
131133

132134
func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
135+
defer r.Readier.Done(&gatewayv1.HTTPRoute{}, req.NamespacedName)
133136
hr := new(gatewayv1.HTTPRoute)
134137
if err := r.Get(ctx, req.NamespacedName, hr); err != nil {
135138
if client.IgnoreNotFound(err) == nil {

internal/controller/indexer/indexer.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const (
4747
GatewayClassIndexRef = "gatewayClassRef"
4848
ApisixUpstreamRef = "apisixUpstreamRef"
4949
PluginConfigIndexRef = "pluginConfigRefs"
50+
ControllerName = "controllerName"
5051
)
5152

5253
func SetupIndexer(mgr ctrl.Manager) error {
@@ -59,11 +60,11 @@ func SetupIndexer(mgr ctrl.Manager) error {
5960
setupIngressClassIndexer,
6061
setupGatewayProxyIndexer,
6162
setupGatewaySecretIndex,
62-
setupGatewayClassIndexer,
6363
setupApisixRouteIndexer,
6464
setupApisixPluginConfigIndexer,
6565
setupApisixTlsIndexer,
6666
setupApisixConsumerIndexer,
67+
setupGatewayClassIndexer,
6768
} {
6869
if err := setup(mgr); err != nil {
6970
return err
@@ -81,6 +82,17 @@ func setupGatewayIndexer(mgr ctrl.Manager) error {
8182
); err != nil {
8283
return err
8384
}
85+
86+
if err := mgr.GetFieldIndexer().IndexField(
87+
context.Background(),
88+
&gatewayv1.Gateway{},
89+
GatewayClassIndexRef,
90+
func(obj client.Object) (requests []string) {
91+
return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)}
92+
},
93+
); err != nil {
94+
return err
95+
}
8496
return nil
8597
}
8698

@@ -273,10 +285,10 @@ func setupGatewaySecretIndex(mgr ctrl.Manager) error {
273285
func setupGatewayClassIndexer(mgr ctrl.Manager) error {
274286
return mgr.GetFieldIndexer().IndexField(
275287
context.Background(),
276-
&gatewayv1.Gateway{},
277-
GatewayClassIndexRef,
278-
func(obj client.Object) (requests []string) {
279-
return []string{string(obj.(*gatewayv1.Gateway).Spec.GatewayClassName)}
288+
&gatewayv1.GatewayClass{},
289+
ControllerName,
290+
func(obj client.Object) []string {
291+
return []string{string(obj.(*gatewayv1.GatewayClass).Spec.ControllerName)}
280292
},
281293
)
282294
}

internal/controller/ingress_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
4646
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
4747
"github.com/apache/apisix-ingress-controller/internal/controller/status"
48+
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4849
"github.com/apache/apisix-ingress-controller/internal/provider"
4950
"github.com/apache/apisix-ingress-controller/internal/utils"
5051
)
@@ -59,6 +60,7 @@ type IngressReconciler struct { //nolint:revive
5960
genericEvent chan event.GenericEvent
6061

6162
Updater status.Updater
63+
Readier readiness.ReadinessManager
6264
}
6365

6466
// SetupWithManager sets up the controller with the Manager.
@@ -117,6 +119,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error {
117119

118120
// Reconcile handles the reconciliation of Ingress resources
119121
func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
122+
defer r.Readier.Done(&networkingv1.Ingress{}, req.NamespacedName)
120123
ingress := new(networkingv1.Ingress)
121124
if err := r.Get(ctx, req.NamespacedName, ingress); err != nil {
122125
if client.IgnoreNotFound(err) == nil {

internal/controller/status/updater.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
150150
}
151151
}
152152

153+
func (u *UpdateHandler) NeedsLeaderElection() bool {
154+
return true
155+
}
156+
153157
func (u *UpdateHandler) Writer() Updater {
154158
return &UpdateWriter{
155159
updateChannel: u.updateChannel,

internal/controller/utils.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
corev1 "k8s.io/api/core/v1"
3636
discoveryv1 "k8s.io/api/discovery/v1"
3737
networkingv1 "k8s.io/api/networking/v1"
38+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3839
"k8s.io/apimachinery/pkg/api/meta"
3940
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4041
"k8s.io/apimachinery/pkg/labels"
@@ -1448,3 +1449,37 @@ func TypePredicate[T client.Object]() func(obj client.Object) bool {
14481449
return ok
14491450
}
14501451
}
1452+
1453+
func MatchConsumerGatewayRef(ctx context.Context, c client.Client, log logr.Logger, consumer *v1alpha1.Consumer) bool {
1454+
if consumer.Spec.GatewayRef.Name == "" {
1455+
return false
1456+
}
1457+
if consumer.Spec.GatewayRef.Kind != nil && *consumer.Spec.GatewayRef.Kind != KindGateway {
1458+
return false
1459+
}
1460+
if consumer.Spec.GatewayRef.Group != nil && *consumer.Spec.GatewayRef.Group != gatewayv1.GroupName {
1461+
return false
1462+
}
1463+
ns := consumer.GetNamespace()
1464+
if consumer.Spec.GatewayRef.Namespace != nil {
1465+
ns = *consumer.Spec.GatewayRef.Namespace
1466+
}
1467+
gateway := &gatewayv1.Gateway{}
1468+
if err := c.Get(context.Background(), client.ObjectKey{
1469+
Name: consumer.Spec.GatewayRef.Name,
1470+
Namespace: ns,
1471+
}, gateway); err != nil {
1472+
if !k8serrors.IsNotFound(err) {
1473+
log.Error(err, "failed to get gateway", "gateway", consumer.Spec.GatewayRef.Name)
1474+
}
1475+
return false
1476+
}
1477+
gatewayClass := &gatewayv1.GatewayClass{}
1478+
if err := c.Get(context.Background(), client.ObjectKey{Name: string(gateway.Spec.GatewayClassName)}, gatewayClass); err != nil {
1479+
if !k8serrors.IsNotFound(err) {
1480+
log.Error(err, "failed to get gateway class", "gateway", gateway.GetName(), "gatewayclass", gateway.Spec.GatewayClassName)
1481+
}
1482+
return false
1483+
}
1484+
return matchesController(string(gatewayClass.Spec.ControllerName))
1485+
}

0 commit comments

Comments
 (0)