Skip to content

Commit 3613795

Browse files
AlinsRanronething
authored andcommitted
feat: support retry in case of sync failure (#2534)
Signed-off-by: Ashing Zheng <[email protected]>
1 parent 522f08f commit 3613795

File tree

3 files changed

+179
-17
lines changed

3 files changed

+179
-17
lines changed

internal/provider/apisix/provider.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,20 @@ import (
3838
"github.com/apache/apisix-ingress-controller/internal/controller/status"
3939
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4040
"github.com/apache/apisix-ingress-controller/internal/provider"
41+
"github.com/apache/apisix-ingress-controller/internal/provider/common"
4142
"github.com/apache/apisix-ingress-controller/internal/types"
4243
"github.com/apache/apisix-ingress-controller/internal/utils"
4344
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
4445
)
4546

46-
const ProviderTypeAPISIX = "apisix"
47+
const (
48+
ProviderTypeAPISIX = "apisix"
49+
50+
RetryBaseDelay = 1 * time.Second
51+
RetryMaxDelay = 1000 * time.Second
52+
53+
MinSyncPeriod = 1 * time.Second
54+
)
4755

4856
type apisixProvider struct {
4957
provider.Options
@@ -229,33 +237,32 @@ func (d *apisixProvider) Start(ctx context.Context) error {
229237

230238
initalSyncDelay := d.InitSyncDelay
231239
if initalSyncDelay > 0 {
232-
time.AfterFunc(initalSyncDelay, func() {
233-
if err := d.sync(ctx); err != nil {
234-
log.Error(err)
235-
return
236-
}
237-
})
240+
time.AfterFunc(initalSyncDelay, d.syncNotify)
238241
}
239242

240-
if d.SyncPeriod < 1 {
241-
return nil
243+
syncPeriod := d.SyncPeriod
244+
if syncPeriod < MinSyncPeriod {
245+
syncPeriod = MinSyncPeriod
242246
}
243-
ticker := time.NewTicker(d.SyncPeriod)
247+
ticker := time.NewTicker(syncPeriod)
244248
defer ticker.Stop()
249+
250+
retrier := common.NewRetrier(common.NewExponentialBackoff(RetryBaseDelay, RetryMaxDelay))
251+
245252
for {
246-
synced := false
247253
select {
248254
case <-d.syncCh:
249-
synced = true
250255
case <-ticker.C:
251-
synced = true
256+
case <-retrier.C():
252257
case <-ctx.Done():
258+
retrier.Reset()
253259
return nil
254260
}
255-
if synced {
256-
if err := d.sync(ctx); err != nil {
257-
log.Error(err)
258-
}
261+
if err := d.sync(ctx); err != nil {
262+
log.Error(err)
263+
retrier.Next()
264+
} else {
265+
retrier.Reset()
259266
}
260267
}
261268
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package common
19+
20+
import (
21+
"sync"
22+
"time"
23+
)
24+
25+
type Backoff interface {
26+
Next() time.Duration
27+
Reset()
28+
}
29+
30+
type ExponentialBackoff struct {
31+
base, max, current time.Duration
32+
}
33+
34+
func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff {
35+
return &ExponentialBackoff{base: base, max: max, current: base}
36+
}
37+
38+
func (b *ExponentialBackoff) Next() time.Duration {
39+
delay := b.current
40+
b.current *= 2
41+
if b.current > b.max {
42+
b.current = b.max
43+
}
44+
return delay
45+
}
46+
47+
func (b *ExponentialBackoff) Reset() {
48+
b.current = b.base
49+
}
50+
51+
type Retrier struct {
52+
mu sync.Mutex
53+
ch chan struct{}
54+
timer *time.Timer
55+
backoff Backoff
56+
}
57+
58+
func NewRetrier(b Backoff) *Retrier {
59+
return &Retrier{
60+
ch: make(chan struct{}, 1),
61+
backoff: b,
62+
}
63+
}
64+
65+
func (r *Retrier) Reset() {
66+
r.mu.Lock()
67+
defer r.mu.Unlock()
68+
69+
if r.timer != nil {
70+
r.timer.Stop()
71+
r.timer = nil
72+
}
73+
r.backoff.Reset()
74+
}
75+
76+
func (r *Retrier) Next() {
77+
r.mu.Lock()
78+
defer r.mu.Unlock()
79+
80+
if r.timer != nil {
81+
r.timer.Stop()
82+
r.timer = nil
83+
}
84+
85+
delay := r.backoff.Next()
86+
r.timer = time.AfterFunc(delay, func() {
87+
select {
88+
case r.ch <- struct{}{}:
89+
default:
90+
}
91+
})
92+
}
93+
94+
func (r *Retrier) C() <-chan struct{} {
95+
return r.ch
96+
}

test/e2e/crds/v2/route.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net"
2626
"net/http"
2727
"net/url"
28+
"os"
2829
"strings"
2930
"time"
3031

@@ -1762,4 +1763,62 @@ spec:
17621763
})
17631764
})
17641765
})
1766+
1767+
Context("Exception Test", func() {
1768+
const apisixRouteSpec = `
1769+
apiVersion: apisix.apache.org/v2
1770+
kind: ApisixRoute
1771+
metadata:
1772+
name: default
1773+
spec:
1774+
ingressClassName: %s
1775+
http:
1776+
- name: rule0
1777+
match:
1778+
hosts:
1779+
- httpbin
1780+
paths:
1781+
- /*
1782+
backends:
1783+
- serviceName: httpbin-service-e2e-test
1784+
servicePort: 80
1785+
`
1786+
It("try again when sync failed", func() {
1787+
if os.Getenv("PROVIDER_TYPE") == framework.ProviderTypeAPI7EE {
1788+
Skip("skipping test in API7EE mode")
1789+
}
1790+
s.Deployer.ScaleDataplane(0)
1791+
1792+
err := s.CreateResourceFromString(fmt.Sprintf(apisixRouteSpec, s.Namespace()))
1793+
Expect(err).NotTo(HaveOccurred(), "creating ApisixRoute")
1794+
1795+
By("check ApisixRoute status")
1796+
s.RetryAssertion(func() string {
1797+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
1798+
return output
1799+
}).WithTimeout(30 * time.Second).
1800+
Should(
1801+
And(
1802+
ContainSubstring(`status: "False"`),
1803+
ContainSubstring(`reason: SyncFailed`),
1804+
),
1805+
)
1806+
1807+
s.Deployer.ScaleDataplane(1)
1808+
1809+
s.RetryAssertion(func() string {
1810+
output, _ := s.GetOutputFromString("ar", "default", "-o", "yaml", "-n", s.Namespace())
1811+
return output
1812+
}).WithTimeout(60 * time.Second).
1813+
Should(ContainSubstring(`status: "True"`))
1814+
1815+
By("check route in APISIX")
1816+
s.RequestAssert(&scaffold.RequestAssert{
1817+
Method: "GET",
1818+
Path: "/get",
1819+
Host: "httpbin",
1820+
Check: scaffold.WithExpectedStatus(200),
1821+
})
1822+
})
1823+
})
17651824
})

0 commit comments

Comments
 (0)