Skip to content

Commit 18955cd

Browse files
committed
feat: Add full support for ApisixUpstream in translator and e2e tests
- Implement `translateApisixUpstream` to support ApisixUpstream translation. - Add handling for externalNodes with types Domain and Service. - Update ApisixRoute translator to integrate ApisixUpstream references. - Introduce `SchemeToPort` and `MatchHostDef` utility functions for validation and defaults. - Enable e2e tests for ApisixRoute referencing ApisixUpstream.
1 parent fcd5f05 commit 18955cd

File tree

5 files changed

+238
-10
lines changed

5 files changed

+238
-10
lines changed

api/v2/shared_types.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,29 @@ const (
125125
// PassHostPass represents rewrite option for pass_host Upstream settings.
126126
PassHostRewrite = "rewrite"
127127
)
128+
129+
const (
130+
// ExternalTypeDomain type is a domain
131+
// +k8s:deepcopy-gen=false
132+
ExternalTypeDomain ApisixUpstreamExternalType = "Domain"
133+
134+
// ExternalTypeService type is a K8s ExternalName service
135+
// +k8s:deepcopy-gen=false
136+
ExternalTypeService ApisixUpstreamExternalType = "Service"
137+
)
138+
139+
var schemeToPortMaps = map[string]int{
140+
SchemeHTTP: 80,
141+
SchemeHTTPS: 443,
142+
SchemeGRPC: 80,
143+
SchemeGRPCS: 443,
144+
}
145+
146+
// SchemeToPort scheme converts to the default port
147+
// ref https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/upstream.lua#L167-L172
148+
func SchemeToPort(schema string) int {
149+
if val, ok := schemeToPortMaps[schema]; ok {
150+
return val
151+
}
152+
return 80
153+
}

internal/provider/adc/translator/apisixroute.go

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"cmp"
1717
"encoding/json"
1818
"fmt"
19+
"strconv"
1920

2021
"github.com/pkg/errors"
2122
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -27,11 +28,13 @@ import (
2728
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
2829
"github.com/apache/apisix-ingress-controller/internal/controller/label"
2930
"github.com/apache/apisix-ingress-controller/internal/provider"
31+
types2 "github.com/apache/apisix-ingress-controller/internal/types"
3032
"github.com/apache/apisix-ingress-controller/internal/utils"
3133
"github.com/apache/apisix-ingress-controller/pkg/id"
3234
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
3335
)
3436

37+
//nolint:gocyclo
3538
func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute) (result *TranslateResult, err error) {
3639
result = &TranslateResult{}
3740
for ruleIndex, rule := range ar.Spec.HTTP {
@@ -113,8 +116,16 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a
113116
route.Timeout = timeout
114117
route.Uris = rule.Match.Paths
115118
route.Vars = vars
119+
for key, value := range ar.GetObjectMeta().GetLabels() {
120+
route.Labels[key] = value
121+
}
122+
123+
//nolint:staticcheck
124+
if rule.PluginConfigName != "" {
125+
// FIXME: handle PluginConfig
126+
}
116127

117-
// translate to adc.Upstream
128+
// translate backends
118129
var backendErr error
119130
for _, backend := range rule.Backends {
120131
var (
@@ -137,9 +148,61 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a
137148
upstream.Nodes = append(upstream.Nodes, upNodes...)
138149
}
139150

140-
//nolint:staticcheck
141-
if len(rule.Backends) == 0 && len(rule.Upstreams) > 0 {
142-
// FIXME: when the API ApisixUpstream is supported
151+
var (
152+
apisixUpstreams []*apiv2.ApisixUpstream
153+
adcUpstreams []*adc.Upstream
154+
)
155+
for _, upstreamRef := range rule.Upstreams {
156+
refKey := types2.NamespacedNameKind{
157+
Namespace: ar.GetNamespace(),
158+
Name: upstreamRef.Name,
159+
Kind: "ApisixUpstream",
160+
}
161+
apisixUpstream, ok := tctx.Upstreams[refKey]
162+
if !ok {
163+
continue
164+
}
165+
166+
// todo: translate external upstream
167+
adcUpstream, err := t.translateApisixUpstream(tctx, apisixUpstream)
168+
if err != nil {
169+
t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(apisixUpstream))
170+
continue
171+
}
172+
173+
apisixUpstreams = append(apisixUpstreams, apisixUpstream)
174+
adcUpstreams = append(adcUpstreams, adcUpstream)
175+
}
176+
177+
_ = apisixUpstreams
178+
179+
// If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream;
180+
// Other upstreams are configured in the traffic-split plugin
181+
if len(rule.Backends) == 0 && len(adcUpstreams) > 0 {
182+
service.Upstream = adcUpstreams[0]
183+
adcUpstreams = adcUpstreams[1:]
184+
}
185+
186+
var wups []adc.TrafficSplitConfigRuleWeightedUpstream
187+
for _, adcUpstream := range adcUpstreams {
188+
weight, err := strconv.Atoi(adcUpstream.Labels["meta_weight"])
189+
if err != nil {
190+
t.Log.Error(err, "failed to parse meta_weight from upstream labels", "labels", adcUpstream.GetLabels())
191+
weight = apiv2.DefaultWeight
192+
}
193+
wups = append(wups, adc.TrafficSplitConfigRuleWeightedUpstream{
194+
Upstream: adcUpstream,
195+
Weight: weight,
196+
})
197+
}
198+
if len(wups) > 0 {
199+
route.Plugins["traffic-split"] = &adc.TrafficSplitConfig{
200+
Rules: []adc.TrafficSplitConfigRule{
201+
{
202+
WeightedUpstreams: wups,
203+
},
204+
},
205+
}
143206
}
144207

145208
// translate to adc.Service

internal/provider/adc/translator/apisixupstream.go

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,23 @@ package translator
1414

1515
import (
1616
"cmp"
17-
"errors"
17+
"fmt"
18+
19+
"github.com/pkg/errors"
20+
corev1 "k8s.io/api/core/v1"
21+
"k8s.io/apimachinery/pkg/types"
1822

1923
"github.com/apache/apisix-ingress-controller/api/adc"
2024
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
25+
"github.com/apache/apisix-ingress-controller/internal/provider"
26+
"github.com/apache/apisix-ingress-controller/internal/utils"
2127
)
2228

23-
func (t *Translator) TranslateApisixUpstream(au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) {
29+
func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) {
30+
if len(au.Spec.ExternalNodes) == 0 && au.Spec.Discovery == nil {
31+
return nil, errors.Errorf("%s has empty externalNodes or discovery configuration", utils.NamespacedName(au))
32+
}
33+
2434
ups = adc.NewDefaultUpstream()
2535
for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{
2636
translateApisixUpstreamScheme,
@@ -35,6 +45,13 @@ func (t *Translator) TranslateApisixUpstream(au *apiv2.ApisixUpstream) (ups *adc
3545
return
3646
}
3747
}
48+
for _, f := range []func(*provider.TranslateContext, *apiv2.ApisixUpstream, *adc.Upstream) error{
49+
translateApisixUpstreamExternalNodes,
50+
} {
51+
if err = f(tctx, au, ups); err != nil {
52+
return
53+
}
54+
}
3855

3956
return
4057
}
@@ -145,3 +162,78 @@ func translateApisixUpstreamDiscovery(upstream *apiv2.ApisixUpstream, upstream2
145162
// todo: no filed `.Discovery*` in adc.Upstream
146163
return nil
147164
}
165+
166+
func translateApisixUpstreamExternalNodes(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
167+
for _, node := range au.Spec.ExternalNodes {
168+
switch node.Type {
169+
case apiv2.ExternalTypeDomain:
170+
if err := translateApisixUpstreamExternalNodesDomain(au, ups, node); err != nil {
171+
return err
172+
}
173+
default: // apiv2.ExternalTypeService or default
174+
if err := translateApisixUpstreamExternalNodesExternalName(tctx, au, ups, node); err != nil {
175+
return err
176+
}
177+
}
178+
}
179+
180+
return nil
181+
}
182+
func translateApisixUpstreamExternalNodesDomain(au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error {
183+
weight := apiv2.DefaultWeight
184+
if node.Weight != nil {
185+
weight = *node.Weight
186+
}
187+
188+
if !utils.MatchHostDef(node.Name) {
189+
return fmt.Errorf("ApisixUpstream %s/%s ExternalNodes[]'s name %s as Domain must match lowercase RFC 1123 subdomain. "+
190+
"a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character",
191+
au.Namespace, au.Name, node.Name)
192+
}
193+
194+
n := adc.UpstreamNode{
195+
Host: node.Name,
196+
Weight: weight,
197+
}
198+
199+
if node.Port != nil {
200+
n.Port = *node.Port
201+
} else {
202+
n.Port = apiv2.SchemeToPort(au.Spec.Scheme)
203+
}
204+
205+
ups.Nodes = append(ups.Nodes, n)
206+
207+
return nil
208+
}
209+
210+
func translateApisixUpstreamExternalNodesExternalName(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error {
211+
serviceNN := types.NamespacedName{Namespace: au.GetNamespace(), Name: node.Name}
212+
svc, ok := tctx.Services[serviceNN]
213+
if !ok {
214+
return errors.Errorf("service not found, service: %s", serviceNN)
215+
}
216+
217+
if svc.Spec.Type != corev1.ServiceTypeExternalName {
218+
return errors.Errorf("ApisixUpstream %s ExternalNodes[] must refers to a ExternalName service: %s", utils.NamespacedName(au), node.Name)
219+
}
220+
221+
weight := apiv2.DefaultWeight
222+
if node.Weight != nil {
223+
weight = *node.Weight
224+
}
225+
n := adc.UpstreamNode{
226+
Host: svc.Spec.ExternalName,
227+
Weight: weight,
228+
}
229+
230+
if node.Port != nil {
231+
n.Port = *node.Port
232+
} else {
233+
n.Port = apiv2.SchemeToPort(au.Spec.Scheme)
234+
}
235+
236+
ups.Nodes = append(ups.Nodes, n)
237+
238+
return nil
239+
}

internal/utils/k8s.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package utils
1414

1515
import (
1616
"net"
17+
"regexp"
1718

1819
k8stypes "k8s.io/apimachinery/pkg/types"
1920
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -47,3 +48,14 @@ func ValidateRemoteAddrs(remoteAddrs []string) error {
4748
}
4849
return nil
4950
}
51+
52+
var hostDef = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
53+
var hostDefRegex = regexp.MustCompile(hostDef)
54+
55+
// MatchHostDef checks that host matches host's shcema
56+
// ref to : https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/schema_def.lua#L40
57+
// ref to : https://github.com/kubernetes/kubernetes/blob/976a940f4a4e84fe814583848f97b9aafcdb083f/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L205
58+
// They define regex differently, but k8s's dns is more accurate
59+
func MatchHostDef(host string) bool {
60+
return hostDefRegex.MatchString(host)
61+
}

test/e2e/apisix/route.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,45 @@ spec:
296296
// So the case is pending for now
297297
})
298298

299-
PIt("Test ApisixRoute reference ApisixUpstream", func() {
300-
// This case depends on ApisixUpstream.
301-
// ApisixUpstream is not implemented yet.
302-
// So the case is pending for now.
299+
It("Test ApisixRoute reference ApisixUpstream", func() {
300+
const apisixRouteSpec = `
301+
apiVersion: apisix.apache.org/v2
302+
kind: ApisixRoute
303+
metadata:
304+
name: default
305+
spec:
306+
ingressClassName: apisix
307+
http:
308+
- name: rule0
309+
match:
310+
paths:
311+
- /*
312+
upstreams:
313+
- name: default-upstream
314+
`
315+
const apisixUpstreamSpec = `
316+
apiVersion: apisix.apache.org/v2
317+
kind: ApisixUpstream
318+
metadata:
319+
name: default-upstream
320+
spec:
321+
ingressClassName: apisix
322+
externalNodes:
323+
- type: Service
324+
name: httpbin-service-e2e-test
325+
`
326+
By("create ApisixUpstream and ApisixRoute")
327+
err := s.CreateResourceFromString(apisixUpstreamSpec)
328+
Expect(err).ShouldNot(HaveOccurred(), "apply apisixUpstreamSpec")
329+
330+
var apisxiRoute apiv2.ApisixRoute
331+
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisxiRoute, apisixRouteSpec)
332+
333+
By("verify ApisixRoute and ApisixUpstream works")
334+
request := func(path string) int {
335+
return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode
336+
}
337+
Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK))
303338
})
304339
})
305340
})

0 commit comments

Comments
 (0)