Skip to content

Commit 181831b

Browse files
authored
backport: part 5 (#248)
1 parent 522f08f commit 181831b

File tree

11 files changed

+525
-69
lines changed

11 files changed

+525
-69
lines changed

api/adc/types.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,20 @@ func (n *UpstreamNodes) UnmarshalJSON(p []byte) error {
505505
return nil
506506
}
507507

508+
// MarshalJSON implements the json.Marshaler interface for UpstreamNodes.
509+
// By default, Go serializes a nil slice as JSON null. However, for compatibility
510+
// with APISIX semantics, we want a nil UpstreamNodes to be encoded as an empty
511+
// array ([]) instead of null. Non-nil slices are marshaled as usual.
512+
//
513+
// See APISIX upstream nodes schema definition for details:
514+
// https://github.com/apache/apisix/blob/77dacda31277a31d6014b4970e36bae2a5c30907/apisix/schema_def.lua#L295-L338
515+
func (n UpstreamNodes) MarshalJSON() ([]byte, error) {
516+
if n == nil {
517+
return []byte("[]"), nil
518+
}
519+
return json.Marshal([]UpstreamNode(n))
520+
}
521+
508522
// ComposeRouteName uses namespace, name and rule name to compose
509523
// the route name.
510524
func ComposeRouteName(namespace, name string, rule string) string {
@@ -621,9 +635,9 @@ type ResponseRewriteConfig struct {
621635
}
622636

623637
type ResponseHeaders struct {
624-
Set map[string]string `json:"set" yaml:"set"`
625-
Add []string `json:"add" yaml:"add"`
626-
Remove []string `json:"remove" yaml:"remove"`
638+
Set map[string]string `json:"set,omitempty" yaml:"set,omitempty"`
639+
Add []string `json:"add,omitempty" yaml:"add,omitempty"`
640+
Remove []string `json:"remove,omitempty" yaml:"remove,omitempty"`
627641
}
628642

629643
// RequestMirror is the rule config for proxy-mirror plugin.

internal/adc/translator/httproute.go

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
adctypes "github.com/apache/apisix-ingress-controller/api/adc"
3535
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
36+
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
3637
"github.com/apache/apisix-ingress-controller/internal/controller/label"
3738
"github.com/apache/apisix-ingress-controller/internal/id"
3839
"github.com/apache/apisix-ingress-controller/internal/provider"
@@ -285,7 +286,7 @@ func (t *Translator) fillHTTPRoutePolicies(routes []*adctypes.Route, policies []
285286
}
286287

287288
func (t *Translator) translateEndpointSlice(portName *string, weight int, endpointSlices []discoveryv1.EndpointSlice, endpointFilter func(*discoveryv1.Endpoint) bool) adctypes.UpstreamNodes {
288-
var nodes adctypes.UpstreamNodes
289+
nodes := adctypes.UpstreamNodes{}
289290
if len(endpointSlices) == 0 {
290291
return nodes
291292
}
@@ -466,32 +467,89 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou
466467
labels := label.GenLabel(httpRoute)
467468

468469
for ruleIndex, rule := range rules {
469-
upstream := adctypes.NewDefaultUpstream()
470-
var backendErr error
470+
service := adctypes.NewDefaultService()
471+
service.Labels = labels
472+
473+
service.Name = adctypes.ComposeServiceNameWithRule(httpRoute.Namespace, httpRoute.Name, fmt.Sprintf("%d", ruleIndex))
474+
service.ID = id.GenID(service.Name)
475+
service.Hosts = hosts
476+
477+
var (
478+
upstreams = make([]*adctypes.Upstream, 0)
479+
weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0)
480+
backendErr error
481+
)
482+
471483
for _, backend := range rule.BackendRefs {
472484
if backend.Namespace == nil {
473485
namespace := gatewayv1.Namespace(httpRoute.Namespace)
474486
backend.Namespace = &namespace
475487
}
488+
upstream := adctypes.NewDefaultUpstream()
476489
upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter)
477490
if err != nil {
478491
backendErr = err
479492
continue
480493
}
494+
if len(upNodes) == 0 {
495+
continue
496+
}
497+
481498
t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream)
482-
upstream.Nodes = append(upstream.Nodes, upNodes...)
499+
upstream.Nodes = upNodes
500+
upstreams = append(upstreams, upstream)
483501
}
484502

485-
// todo: support multiple backends
486-
service := adctypes.NewDefaultService()
487-
service.Labels = labels
503+
// Handle multiple backends with traffic-split plugin
504+
if len(upstreams) == 0 {
505+
// Create a default upstream if no valid backends
506+
upstream := adctypes.NewDefaultUpstream()
507+
service.Upstream = upstream
508+
} else if len(upstreams) == 1 {
509+
// Single backend - use directly as service upstream
510+
service.Upstream = upstreams[0]
511+
} else {
512+
// Multiple backends - use traffic-split plugin
513+
service.Upstream = upstreams[0]
514+
upstreams = upstreams[1:]
515+
516+
// Set weight in traffic-split for the default upstream
517+
weight := apiv2.DefaultWeight
518+
if rule.BackendRefs[0].Weight != nil {
519+
weight = int(*rule.BackendRefs[0].Weight)
520+
}
521+
weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{
522+
Weight: weight,
523+
})
488524

489-
service.Name = adctypes.ComposeServiceNameWithRule(httpRoute.Namespace, httpRoute.Name, fmt.Sprintf("%d", ruleIndex))
490-
service.ID = id.GenID(service.Name)
491-
service.Hosts = hosts
492-
service.Upstream = upstream
525+
// Set other upstreams in traffic-split
526+
for i, upstream := range upstreams {
527+
weight := apiv2.DefaultWeight
528+
// get weight from the backend refs starting from the second backend
529+
if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil {
530+
weight = int(*rule.BackendRefs[i+1].Weight)
531+
}
532+
weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{
533+
Upstream: upstream,
534+
Weight: weight,
535+
})
536+
}
537+
538+
if len(weightedUpstreams) > 0 {
539+
if service.Plugins == nil {
540+
service.Plugins = make(map[string]any)
541+
}
542+
service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{
543+
Rules: []adctypes.TrafficSplitConfigRule{
544+
{
545+
WeightedUpstreams: weightedUpstreams,
546+
},
547+
},
548+
}
549+
}
550+
}
493551

494-
if backendErr != nil && len(upstream.Nodes) == 0 {
552+
if backendErr != nil && (service.Upstream == nil || len(service.Upstream.Nodes) == 0) {
495553
if service.Plugins == nil {
496554
service.Plugins = make(map[string]any)
497555
}

internal/adc/translator/ingress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (t *Translator) TranslateIngress(tctx *provider.TranslateContext, obj *netw
222222

223223
// translateEndpointSliceForIngress create upstream nodes from EndpointSlice
224224
func (t *Translator) translateEndpointSliceForIngress(weight int, endpointSlices []discoveryv1.EndpointSlice, servicePort *corev1.ServicePort) adctypes.UpstreamNodes {
225-
var nodes adctypes.UpstreamNodes
225+
nodes := adctypes.UpstreamNodes{}
226226
if len(endpointSlices) == 0 {
227227
return nodes
228228
}

internal/provider/apisix/provider.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,20 @@ import (
3838
"github.com/apache/apisix-ingress-controller/internal/controller/status"
3939
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
4040
"github.com/apache/apisix-ingress-controller/internal/provider"
41+
"github.com/apache/apisix-ingress-controller/internal/provider/common"
4142
"github.com/apache/apisix-ingress-controller/internal/types"
4243
"github.com/apache/apisix-ingress-controller/internal/utils"
4344
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
4445
)
4546

46-
const ProviderTypeAPISIX = "apisix"
47+
const (
48+
ProviderTypeAPISIX = "apisix"
49+
50+
RetryBaseDelay = 1 * time.Second
51+
RetryMaxDelay = 1000 * time.Second
52+
53+
MinSyncPeriod = 1 * time.Second
54+
)
4755

4856
type apisixProvider struct {
4957
provider.Options
@@ -229,33 +237,32 @@ func (d *apisixProvider) Start(ctx context.Context) error {
229237

230238
initalSyncDelay := d.InitSyncDelay
231239
if initalSyncDelay > 0 {
232-
time.AfterFunc(initalSyncDelay, func() {
233-
if err := d.sync(ctx); err != nil {
234-
log.Error(err)
235-
return
236-
}
237-
})
240+
time.AfterFunc(initalSyncDelay, d.syncNotify)
238241
}
239242

240-
if d.SyncPeriod < 1 {
241-
return nil
243+
syncPeriod := d.SyncPeriod
244+
if syncPeriod < MinSyncPeriod {
245+
syncPeriod = MinSyncPeriod
242246
}
243-
ticker := time.NewTicker(d.SyncPeriod)
247+
ticker := time.NewTicker(syncPeriod)
244248
defer ticker.Stop()
249+
250+
retrier := common.NewRetrier(common.NewExponentialBackoff(RetryBaseDelay, RetryMaxDelay))
251+
245252
for {
246-
synced := false
247253
select {
248254
case <-d.syncCh:
249-
synced = true
250255
case <-ticker.C:
251-
synced = true
256+
case <-retrier.C():
252257
case <-ctx.Done():
258+
retrier.Reset()
253259
return nil
254260
}
255-
if synced {
256-
if err := d.sync(ctx); err != nil {
257-
log.Error(err)
258-
}
261+
if err := d.sync(ctx); err != nil {
262+
log.Error(err)
263+
retrier.Next()
264+
} else {
265+
retrier.Reset()
259266
}
260267
}
261268
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package common
19+
20+
import (
21+
"sync"
22+
"time"
23+
)
24+
25+
type Backoff interface {
26+
Next() time.Duration
27+
Reset()
28+
}
29+
30+
type ExponentialBackoff struct {
31+
base, max, current time.Duration
32+
}
33+
34+
func NewExponentialBackoff(base, max time.Duration) *ExponentialBackoff {
35+
return &ExponentialBackoff{base: base, max: max, current: base}
36+
}
37+
38+
func (b *ExponentialBackoff) Next() time.Duration {
39+
delay := b.current
40+
b.current *= 2
41+
if b.current > b.max {
42+
b.current = b.max
43+
}
44+
return delay
45+
}
46+
47+
func (b *ExponentialBackoff) Reset() {
48+
b.current = b.base
49+
}
50+
51+
type Retrier struct {
52+
mu sync.Mutex
53+
ch chan struct{}
54+
timer *time.Timer
55+
backoff Backoff
56+
}
57+
58+
func NewRetrier(b Backoff) *Retrier {
59+
return &Retrier{
60+
ch: make(chan struct{}, 1),
61+
backoff: b,
62+
}
63+
}
64+
65+
func (r *Retrier) Reset() {
66+
r.mu.Lock()
67+
defer r.mu.Unlock()
68+
69+
if r.timer != nil {
70+
r.timer.Stop()
71+
r.timer = nil
72+
}
73+
r.backoff.Reset()
74+
}
75+
76+
func (r *Retrier) Next() {
77+
r.mu.Lock()
78+
defer r.mu.Unlock()
79+
80+
if r.timer != nil {
81+
r.timer.Stop()
82+
r.timer = nil
83+
}
84+
85+
delay := r.backoff.Next()
86+
r.timer = time.AfterFunc(delay, func() {
87+
select {
88+
case r.ch <- struct{}{}:
89+
default:
90+
}
91+
})
92+
}
93+
94+
func (r *Retrier) C() <-chan struct{} {
95+
return r.ch
96+
}

0 commit comments

Comments
 (0)