Skip to content

Commit 0ba7567

Browse files
authored
fix(revert): service not found error caused during configuration of externalNodes (#26)
1 parent d49691b commit 0ba7567

File tree

5 files changed

+51
-65
lines changed

5 files changed

+51
-65
lines changed

pkg/kube/apisix/apis/config/v2/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,8 @@ type ApisixUpstreamSpec struct {
505505
// ApisixUpstreamConfig contains rich features on APISIX Upstream, for instance
506506
// load balancer, health check, etc.
507507
type ApisixUpstreamConfig struct {
508-
Granularity string `json:"granularity,omitempty" yaml:"scheme,omitempty"`
508+
// TODO: Improve or abandon, the next step is to improve or abandon it
509+
Granularity string `json:"granularity,omitempty" yaml:"granularity,omitempty"`
509510
// LoadBalancer represents the load balancer configuration for Kubernetes Service.
510511
// The default strategy is round robin.
511512
// +optional

pkg/providers/apisix/apisix_upstream.go

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package apisix
1616

1717
import (
1818
"context"
19-
"errors"
2019
"fmt"
2120
"reflect"
2221
"strconv"
@@ -192,13 +191,6 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
192191
if au.Spec == nil {
193192
return nil
194193
}
195-
svc, err := c.SvcLister.Services(namespace).Get(name)
196-
if err != nil {
197-
log.Errorf("failed to get service %s: %s", key, err)
198-
errRecord = err
199-
goto updateStatus
200-
}
201-
202194
// We will prioritize ExternalNodes and Discovery.
203195
if len(au.Spec.ExternalNodes) != 0 || au.Spec.Discovery != nil {
204196
var newUps *apisixv1.Upstream
@@ -228,7 +220,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
228220
}
229221
// updateUpstream for real
230222
upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
231-
errRecord = c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent(), svc.Spec.ClusterIP)
223+
errRecord = c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent())
232224
if err == apisix.ErrNotFound {
233225
errRecord = fmt.Errorf("%s", "upstream doesn't exist. It will be created after ApisixRoute is created referencing it.")
234226
}
@@ -242,6 +234,14 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
242234
portLevelSettings[port.Port] = port.ApisixUpstreamConfig
243235
}
244236
}
237+
238+
svc, err := c.SvcLister.Services(namespace).Get(name)
239+
if err != nil {
240+
log.Errorf("failed to get service %s: %s", key, err)
241+
errRecord = err
242+
goto updateStatus
243+
}
244+
245245
var subsets []configv2.ApisixUpstreamSubset
246246
subsets = append(subsets, configv2.ApisixUpstreamSubset{})
247247
if len(au.Spec.Subsets) > 0 {
@@ -257,7 +257,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
257257
cfg = au.Spec.ApisixUpstreamConfig
258258
}
259259
}
260-
err := c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port), &cfg, ev.Type.IsSyncEvent(), svc.Spec.ClusterIP)
260+
err := c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port), &cfg, ev.Type.IsSyncEvent())
261261
if err != nil {
262262
if err == apisix.ErrNotFound {
263263
errRecord = fmt.Errorf("%s", "upstream doesn't exist. It will be created after ApisixRoute is created referencing it.")
@@ -325,7 +325,7 @@ func (c *apisixUpstreamController) updateStatus(obj kube.ApisixUpstream, statusE
325325
}
326326
}
327327

328-
func (c *apisixUpstreamController) updateUpstream(ctx context.Context, upsName string, cfg *configv2.ApisixUpstreamConfig, shouldCompare bool, svcClusterIP string) error {
328+
func (c *apisixUpstreamController) updateUpstream(ctx context.Context, upsName string, cfg *configv2.ApisixUpstreamConfig, shouldCompare bool) error {
329329
// TODO: multi cluster
330330
clusterName := c.Config.APISIX.DefaultClusterName
331331

@@ -348,24 +348,8 @@ func (c *apisixUpstreamController) updateUpstream(ctx context.Context, upsName s
348348
}
349349

350350
newUps.Metadata = ups.Metadata
351+
newUps.Nodes = ups.Nodes
351352

352-
if cfg.Granularity == types.ResolveGranularity.Service {
353-
if svcClusterIP == "" {
354-
log.Errorw("ApisixRoute refers to a headless service but want to use the service level resolve granularity",
355-
zap.String("ApisixUpstream name", upsName),
356-
)
357-
return errors.New("conflict headless service and backend resolve granularity")
358-
}
359-
for _, node := range ups.Nodes {
360-
newUps.Nodes = append(newUps.Nodes, apisixv1.UpstreamNode{
361-
Host: svcClusterIP,
362-
Port: node.Port,
363-
Weight: node.Weight,
364-
})
365-
}
366-
} else {
367-
newUps.Nodes = ups.Nodes
368-
}
369353
log.Debugw("updating upstream since ApisixUpstream changed",
370354
zap.Any("upstream", newUps),
371355
zap.String("ApisixUpstream name", upsName),

test/e2e/scaffold/k8s.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,25 @@ func (s *Scaffold) GetKubernetesClient() *kubernetes.Clientset {
783783
return client
784784
}
785785

786+
func (s *Scaffold) GetEndpoints(endpointsName string) (*corev1.Endpoints, error) {
787+
e := s.GetKubernetesClient()
788+
return e.CoreV1().Endpoints(s.Namespace()).Get(context.Background(), endpointsName, metav1.GetOptions{})
789+
}
790+
791+
func (s *Scaffold) GetEndpointIPs(endpointsName string) ([]string, error) {
792+
endpoints, err := s.GetEndpoints(endpointsName)
793+
if err != nil {
794+
return nil, err
795+
}
796+
ips := make([]string, 0)
797+
for _, subset := range endpoints.Subsets {
798+
for _, addr := range subset.Addresses {
799+
ips = append(ips, addr.IP)
800+
}
801+
}
802+
return ips, nil
803+
}
804+
786805
func (s *Scaffold) RunKubectlAndGetOutput(args ...string) (string, error) {
787806
return k8s.RunKubectlAndGetOutputE(ginkgo.GinkgoT(), s.kubectlOptions, args...)
788807
}

test/e2e/suite-features/external-service.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -115,19 +115,19 @@ spec:
115115
`, name, nodeType, nodeName)
116116
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
117117
}
118-
119-
PhaseCreateApisixUpstreamWithGranularity := func(s *scaffold.Scaffold, name string, nodeType v2.ApisixUpstreamExternalType, nodeName, granularity string) {
120-
au := fmt.Sprintf(`
121-
apiVersion: apisix.apache.org/v2
122-
kind: ApisixUpstream
123-
metadata:
124-
name: %s
125-
spec:
126-
granularity: %s
127-
`, name, granularity)
128-
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
129-
}
130-
118+
/*
119+
PhaseCreateApisixUpstreamWithGranularity := func(s *scaffold.Scaffold, name string, nodeType v2.ApisixUpstreamExternalType, nodeName, granularity string) {
120+
au := fmt.Sprintf(`
121+
apiVersion: apisix.apache.org/v2
122+
kind: ApisixUpstream
123+
metadata:
124+
name: %s
125+
spec:
126+
granularity: %s
127+
`, name, granularity)
128+
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
129+
}
130+
*/
131131
PhaseValidateNoUpstreams := func(s *scaffold.Scaffold) {
132132
ups, err := s.ListApisixUpstreams()
133133
assert.Nil(ginkgo.GinkgoT(), err)
@@ -475,19 +475,19 @@ spec:
475475
PhaseCreateApisixUpstream(s, "httpbin-upstream", v2.ExternalTypeDomain, "postman-echo.com")
476476
PhaseCreateApisixRouteWithHostRewriteAndBackend(s, "httpbin-route", "httpbin-upstream", "postman-echo.com", "httpbin-temp", 80)
477477
//configure the created upstream with granularity service
478-
PhaseCreateApisixUpstreamWithGranularity(s, "httpbin-temp", v2.ExternalTypeService, "httpbin-temp", "service")
478+
//PhaseCreateApisixUpstreamWithGranularity(s, "httpbin-temp", v2.ExternalTypeService, "httpbin-temp", "service")
479479
time.Sleep(time.Second * 6)
480480

481-
svc, err := s.GetServiceByName("httpbin-temp")
482-
assert.Nil(ginkgo.GinkgoT(), err, "get httpbin service")
483-
ip := svc.Spec.ClusterIP
481+
ips, err := s.GetEndpointIPs("httpbin-temp")
482+
assert.Nil(ginkgo.GinkgoT(), err, "get httpbin service endpoint")
483+
assert.Len(ginkgo.GinkgoT(), ips, 1, "httpbin service endpoint count")
484484

485485
upName := apisixv1.ComposeUpstreamName(s.Namespace(), "httpbin-temp", "", 80)
486486
upID := id.GenID(upName)
487487

488488
// -- validation --
489489
PhaseValidateTrafficSplit(s, 2, upID, map[string]*validateFactor{
490-
ip: {
490+
ips[0]: {
491491
port: 80,
492492
weight: translation.DefaultWeight,
493493
},

test/e2e/suite-plugins/suite-plugins-traffic/api_breaker.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,6 @@ var _ = ginkgo.Describe("suite-plugins-traffic: api-breaker plugin", func() {
3030
s := scaffoldFunc()
3131
ginkgo.It("sanity", func() {
3232
backendSvc, backendPorts := s.DefaultHTTPBackend()
33-
au := fmt.Sprintf(`
34-
apiVersion: apisix.apache.org/v2
35-
kind: ApisixUpstream
36-
metadata:
37-
name: %s
38-
spec:
39-
granularity: service
40-
`, backendSvc)
41-
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
4233
ar := fmt.Sprintf(`
4334
apiVersion: apisix.apache.org/v2
4435
kind: ApisixRoute
@@ -95,15 +86,6 @@ spec:
9586
})
9687
ginkgo.It("disable plugin", func() {
9788
backendSvc, backendPorts := s.DefaultHTTPBackend()
98-
au := fmt.Sprintf(`
99-
apiVersion: apisix.apache.org/v2
100-
kind: ApisixUpstream
101-
metadata:
102-
name: %s
103-
spec:
104-
granularity: service
105-
`, backendSvc)
106-
assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(au))
10789
ar := fmt.Sprintf(`
10890
apiVersion: apisix.apache.org/v2
10991
kind: ApisixRoute

0 commit comments

Comments
 (0)