Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions internal/provider/adc/adc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package adc
import (
"context"
"encoding/json"
"fmt"
"os"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -286,6 +288,7 @@ func (d *adcClient) Start(ctx context.Context) error {
initalSyncDelay := d.InitSyncDelay
time.AfterFunc(initalSyncDelay, func() {
if err := d.Sync(ctx); err != nil {
log.Error(err)
return
}
})
Expand All @@ -299,7 +302,7 @@ func (d *adcClient) Start(ctx context.Context) error {
select {
case <-ticker.C:
if err := d.Sync(ctx); err != nil {
log.Errorw("failed to sync resources", zap.Error(err))
log.Error(err)
}
case <-ctx.Done():
return nil
Expand All @@ -324,25 +327,32 @@ func (d *adcClient) Sync(ctx context.Context) error {

log.Debugw("syncing resources with multiple configs", zap.Any("configs", cfg))

var failedConfigs []string
for name, config := range cfg {
resources, err := d.store.GetResources(name)
if err != nil {
return err
log.Errorw("failed to get resources from store", zap.String("name", name), zap.Error(err))
failedConfigs = append(failedConfigs, name)
continue
}
if resources == nil {
continue
}

err = d.sync(ctx, Task{
if err := d.sync(ctx, Task{
Name: name + "-sync",
configs: []adcConfig{config},
Resources: *resources,
})
if err != nil {
return err
}); err != nil {
log.Errorw("failed to sync resources", zap.String("name", name), zap.Error(err))
failedConfigs = append(failedConfigs, name)
}
}

if len(failedConfigs) > 0 {
return fmt.Errorf("failed to sync %d configs: %s",
len(failedConfigs),
strings.Join(failedConfigs, ", "))
}
return nil
}

Expand Down Expand Up @@ -390,13 +400,16 @@ func (d *adcClient) sync(ctx context.Context, task Task) error {

args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes)

log.Debugw("syncing resources with multiple configs", zap.Any("configs", task.configs))
var failedConfigs []string
for _, config := range task.configs {
if err := d.executor.Execute(ctx, d.BackendMode, config, args); err != nil {
log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config))
return err
failedConfigs = append(failedConfigs, config.Name)
}
}
if len(failedConfigs) > 0 {
return fmt.Errorf("failed to execute adc command for configs: %s", strings.Join(failedConfigs, ", "))
}
return nil
}

Expand Down
8 changes: 7 additions & 1 deletion internal/provider/adc/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"strings"
Expand Down Expand Up @@ -50,11 +51,16 @@ func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config ad
}

func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adcConfig, args []string) error {
var failedAddrs []string
for _, addr := range config.ServerAddrs {
if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil {
return err
log.Errorw("failed to run adc for server", zap.String("server", addr), zap.Error(err))
failedAddrs = append(failedAddrs, addr)
}
}
if len(failedAddrs) > 0 {
return fmt.Errorf("failed to run adc for servers: [%s]", strings.Join(failedAddrs, ", "))
}
return nil
}

Expand Down
153 changes: 37 additions & 116 deletions test/e2e/gatewayapi/gatewayproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package gatewayapi

import (
"fmt"
"net/http"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -103,64 +102,6 @@ spec:
headers:
X-Proxy-Test: "disabled"
`
var (
gatewayProxyWithPluginMetadata0 = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
name: apisix-proxy-config
spec:
provider:
type: ControlPlane
controlPlane:
endpoints:
- %s
auth:
type: AdminKey
adminKey:
value: "%s"
plugins:
- name: error-page
enabled: true
config: {}
pluginMetadata:
error-page: {
"enable": true,
"error_404": {
"body": "404 from plugin metadata",
"content-type": "text/plain"
}
}
`
gatewayProxyWithPluginMetadata1 = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
name: apisix-proxy-config
spec:
provider:
type: ControlPlane
controlPlane:
endpoints:
- %s
auth:
type: AdminKey
adminKey:
value: "%s"
plugins:
- name: error-page
enabled: true
config: {}
pluginMetadata:
error-page: {
"enable": false,
"error_404": {
"body": "404 from plugin metadata",
"content-type": "text/plain"
}
}
`
)

var httpRouteForTest = `
apiVersion: gateway.networking.k8s.io/v1
Expand Down Expand Up @@ -275,59 +216,48 @@ spec:
})
})

Context("Test Gateway with PluginMetadata", func() {
var (
err error
)

PIt("Should work OK with error-page", func() {
By("Update GatewayProxy with PluginMetadata")
err = s.CreateResourceFromString(fmt.Sprintf(gatewayProxyWithPluginMetadata0, s.Deployer.GetAdminEndpoint(), s.AdminKey()))
Expect(err).ShouldNot(HaveOccurred())
Context("Test GatewayProxy with invalid endpoint", func() {
var gatewayProxyWithInvalidEndpoint = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
name: apisix-proxy-config
spec:
provider:
type: ControlPlane
controlPlane:
endpoints:
- "http://invalid-endpoint:9180"
- %s
auth:
type: AdminKey
adminKey:
value: "%s"
`
It("Should fail to apply GatewayProxy with invalid endpoint", func() {
By("Update GatewayProxy with invalid endpoint")
err := s.CreateResourceFromString(fmt.Sprintf(gatewayProxyWithInvalidEndpoint, s.Deployer.GetAdminEndpoint(), s.AdminKey()))
Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy with enabled plugin")
time.Sleep(5 * time.Second)

By("Create HTTPRoute for Gateway with GatewayProxy")
By("Create HTTPRoute")
resourceApplied("HTTPRoute", "test-route", fmt.Sprintf(httpRouteForTest, "apisix"), 1)

time.Sleep(5 * time.Second)
By("Check PluginMetadata working")
s.NewAPISIXClient().
GET("/not-found").
WithHost("example.com").
Expect().
Status(http.StatusNotFound).
Body().Contains("404 from plugin metadata")

By("Update GatewayProxy with PluginMetadata")
err = s.CreateResourceFromString(fmt.Sprintf(gatewayProxyWithPluginMetadata1, s.Deployer.GetAdminEndpoint(), s.AdminKey()))
Expect(err).ShouldNot(HaveOccurred())
time.Sleep(5 * time.Second)

By("Check PluginMetadata working")
s.NewAPISIXClient().
GET("/not-found").
WithHost("example.com").
Expect().
Status(http.StatusNotFound).
Body().Contains(`{"error_msg":"404 Route Not Found"}`)

By("Delete GatewayProxy")
err = s.DeleteResourceFromString(fmt.Sprintf(gatewayProxyWithPluginMetadata0, s.Deployer.GetAdminEndpoint(), s.AdminKey()))
Expect(err).ShouldNot(HaveOccurred())
time.Sleep(5 * time.Second)
expectRequest := func() bool {
resp := s.NewAPISIXClient().
GET("/get").
WithHost("example.com").
Expect().Raw()
return resp.StatusCode == 200 && resp.Header.Get("X-Proxy-Test") == ""
}

By("Check PluginMetadata is not working")
s.NewAPISIXClient().
GET("/not-found").
WithHost("example.com").
Expect().
Status(http.StatusNotFound).
Body().Contains(`{"error_msg":"404 Route Not Found"}`)
Eventually(expectRequest).WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(BeTrue())
})
})

var (
gatewayProxyWithInvalidProviderType = `
Context("Test GatewayProxy Provider Validation", func() {
var (
gatewayProxyWithInvalidProviderType = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
Expand All @@ -336,7 +266,7 @@ spec:
provider:
type: "InvalidType"
`
gatewayProxyWithMissingControlPlane = `
gatewayProxyWithMissingControlPlane = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
Expand All @@ -345,7 +275,7 @@ spec:
provider:
type: "ControlPlane"
`
gatewayProxyWithValidProvider = `
gatewayProxyWithValidProvider = `
apiVersion: apisix.apache.org/v1alpha1
kind: GatewayProxy
metadata:
Expand All @@ -361,16 +291,7 @@ spec:
adminKey:
value: "test-key"
`
)

Context("Test GatewayProxy Provider Validation", func() {
AfterEach(func() {
By("Clean up GatewayProxy resources")
_ = s.DeleteResourceFromString(gatewayProxyWithInvalidProviderType)
_ = s.DeleteResourceFromString(gatewayProxyWithMissingControlPlane)
_ = s.DeleteResourceFromString(gatewayProxyWithValidProvider)
})

)
It("Should reject invalid provider type", func() {
By("Create GatewayProxy with invalid provider type")
err := s.CreateResourceFromString(gatewayProxyWithInvalidProviderType)
Expand Down
Loading