Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/apisix-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
ARCH: amd64
ENABLE_PROXY: "false"
BASE_IMAGE_TAG: "debug"
ADC_VERSION: "dev"
run: |
echo "building images..."
make build-image
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,23 @@ kind-down:
|| echo "kind cluster does not exist"

.PHONY: kind-load-images
kind-load-images: pull-infra-images kind-load-ingress-image
kind-load-images: pull-infra-images kind-load-ingress-image kind-load-adc-image
@kind load docker-image kennethreitz/httpbin:latest --name $(KIND_NAME)
@kind load docker-image jmalloc/echo-server:latest --name $(KIND_NAME)

.PHONY: kind-load-ingress-image
kind-load-ingress-image:
@kind load docker-image $(IMG) --name $(KIND_NAME)

.PHONY: kind-load-adc-image
kind-load-adc-image:
@kind load docker-image ghcr.io/api7/adc:dev --name $(KIND_NAME)

.PHONY: pull-infra-images
pull-infra-images:
@docker pull kennethreitz/httpbin:latest
@docker pull jmalloc/echo-server:latest
@docker pull ghcr.io/api7/adc:dev

##@ Build

Expand Down
1 change: 0 additions & 1 deletion api/adc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,6 @@ type StatusEvent struct {
type ResponseDetails struct {
Status int `json:"status"`
Headers map[string]string `json:"headers"`
Data ResponseData `json:"data"`
}

type ResponseData struct {
Expand Down
8 changes: 7 additions & 1 deletion internal/adc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ type Client struct {
}

func New(mode string, timeout time.Duration) (*Client, error) {
serverURL := os.Getenv("ADC_SERVER_URL")
if serverURL == "" {
serverURL = defaultHTTPADCExecutorAddr
}

log.Infow("using HTTP ADC Executor", zap.String("server_url", serverURL))
return &Client{
Store: cache.NewStore(),
executor: &DefaultADCExecutor{},
executor: NewHTTPADCExecutor(serverURL, timeout),
BackendMode: mode,
ConfigManager: common.NewConfigManager[types.NamespacedNameKind, adctypes.Config](),
}, nil
Expand Down
268 changes: 265 additions & 3 deletions internal/adc/client/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"strings"
Expand All @@ -31,11 +33,16 @@ import (

"github.com/api7/gopkg/pkg/log"
"go.uber.org/zap"
"k8s.io/utils/ptr"

adctypes "github.com/apache/apisix-ingress-controller/api/adc"
"github.com/apache/apisix-ingress-controller/internal/types"
)

const (
defaultHTTPADCExecutorAddr = "http://127.0.0.1:3000"
)

type ADCExecutor interface {
Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error
}
Expand All @@ -45,9 +52,6 @@ type DefaultADCExecutor struct {
}

func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error {
e.Lock()
defer e.Unlock()

return e.runADC(ctx, mode, config, args)
}

Expand Down Expand Up @@ -194,3 +198,261 @@ func BuildADCExecuteArgs(filePath string, labels map[string]string, types []stri
}
return args
}

// ADCServerRequest represents the request body for ADC Server /sync endpoint
type ADCServerRequest struct {
Task ADCServerTask `json:"task"`
}

// ADCServerTask represents the task configuration in ADC Server request
type ADCServerTask struct {
Opts ADCServerOpts `json:"opts"`
Config adctypes.Resources `json:"config"`
}

// ADCServerOpts represents the options in ADC Server task
type ADCServerOpts struct {
Backend string `json:"backend"`
Server []string `json:"server"`
Token string `json:"token"`
LabelSelector map[string]string `json:"labelSelector,omitempty"`
IncludeResourceType []string `json:"includeResourceType,omitempty"`
TlsSkipVerify *bool `json:"tlsSkipVerify,omitempty"`
CacheKey string `json:"cacheKey"`
}

// HTTPADCExecutor implements ADCExecutor interface using HTTP calls to ADC Server
type HTTPADCExecutor struct {
httpClient *http.Client
serverURL string
}

// NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL
func NewHTTPADCExecutor(serverURL string, timeout time.Duration) *HTTPADCExecutor {
return &HTTPADCExecutor{
httpClient: &http.Client{
Timeout: timeout,
},
serverURL: serverURL,
}
}

// Execute implements the ADCExecutor interface using HTTP calls
func (e *HTTPADCExecutor) Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error {
return e.runHTTPSync(ctx, mode, config, args)
}

// runHTTPSync performs HTTP sync to ADC Server for each server address
func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, mode string, config adctypes.Config, args []string) error {
var execErrs = types.ADCExecutionError{
Name: config.Name,
}

serverAddrs := func() []string {
if mode == "apisix-standalone" {
return []string{strings.Join(config.ServerAddrs, ",")}
}
return config.ServerAddrs
}()
log.Debugw("running http sync", zap.Strings("serverAddrs", serverAddrs), zap.String("mode", mode))

for _, addr := range serverAddrs {
if err := e.runHTTPSyncForSingleServer(ctx, addr, mode, config, args); err != nil {
log.Errorw("failed to run http sync for server", zap.String("server", addr), zap.Error(err))
var execErr types.ADCExecutionServerAddrError
if errors.As(err, &execErr) {
execErrs.FailedErrors = append(execErrs.FailedErrors, execErr)
} else {
execErrs.FailedErrors = append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{
ServerAddr: addr,
Err: err.Error(),
})
}
}
}
if len(execErrs.FailedErrors) > 0 {
return execErrs
}
return nil
}

// runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, serverAddr, mode string, config adctypes.Config, args []string) error {
ctx, cancel := context.WithTimeout(ctx, e.httpClient.Timeout)
defer cancel()

// Parse args to extract labels, types, and file path
labels, types, filePath, err := e.parseArgs(args)
if err != nil {
return fmt.Errorf("failed to parse args: %w", err)
}

// Load resources from file
resources, err := e.loadResourcesFromFile(filePath)
if err != nil {
return fmt.Errorf("failed to load resources from file %s: %w", filePath, err)
}

// Build HTTP request
req, err := e.buildHTTPRequest(ctx, serverAddr, mode, config, labels, types, resources)
if err != nil {
return fmt.Errorf("failed to build HTTP request: %w", err)
}

// Send HTTP request
resp, err := e.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send HTTP request: %w", err)
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.Warnw("failed to close response body", zap.Error(closeErr))
}
}()

// Handle HTTP response
return e.handleHTTPResponse(resp, serverAddr)
}

// parseArgs parses the command line arguments to extract labels, types, and file path
func (e *HTTPADCExecutor) parseArgs(args []string) (map[string]string, []string, string, error) {
labels := make(map[string]string)
var types []string
var filePath string

for i := 0; i < len(args); i++ {
switch args[i] {
case "-f":
if i+1 < len(args) {
filePath = args[i+1]
i++
}
case "--label-selector":
if i+1 < len(args) {
labelPair := args[i+1]
parts := strings.SplitN(labelPair, "=", 2)
if len(parts) == 2 {
labels[parts[0]] = parts[1]
}
i++
}
case "--include-resource-type":
if i+1 < len(args) {
types = append(types, args[i+1])
i++
}
}
}

if filePath == "" {
return nil, nil, "", errors.New("file path not found in args")
}

return labels, types, filePath, nil
}

// loadResourcesFromFile loads ADC resources from the specified file
func (e *HTTPADCExecutor) loadResourcesFromFile(filePath string) (*adctypes.Resources, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var resources adctypes.Resources
if err := json.Unmarshal(data, &resources); err != nil {
return nil, fmt.Errorf("failed to unmarshal resources: %w", err)
}

return &resources, nil
}

// buildHTTPRequest builds the HTTP request for ADC Server
func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr, mode string, config adctypes.Config, labels map[string]string, types []string, resources *adctypes.Resources) (*http.Request, error) {
// Prepare request body
tlsVerify := config.TlsVerify
reqBody := ADCServerRequest{
Task: ADCServerTask{
Opts: ADCServerOpts{
Backend: mode,
Server: strings.Split(serverAddr, ","),
Token: config.Token,
LabelSelector: labels,
IncludeResourceType: types,
TlsSkipVerify: ptr.To(!tlsVerify),
CacheKey: config.Name,
},
Config: *resources,
},
}

jsonData, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}

log.Debugw("request body", zap.String("body", string(jsonData)))

log.Debugw("sending HTTP request to ADC Server",
zap.String("url", e.serverURL+"/sync"),
zap.String("server", serverAddr),
zap.String("mode", mode),
zap.String("cacheKey", config.Name),
zap.Any("labelSelector", labels),
zap.Strings("includeResourceType", types),
zap.Bool("tlsSkipVerify", !tlsVerify),
)

// Create HTTP request
req, err := http.NewRequestWithContext(ctx, "PUT", e.serverURL+"/sync", bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
return req, nil
}

// handleHTTPResponse handles the HTTP response from ADC Server
func (e *HTTPADCExecutor) handleHTTPResponse(resp *http.Response, serverAddr string) error {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}

log.Debugw("received HTTP response from ADC Server",
zap.String("server", serverAddr),
zap.Int("status", resp.StatusCode),
zap.String("response", string(body)),
)

// not only 200, HTTP 202 is also accepted
if resp.StatusCode/100 != 2 {
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: fmt.Sprintf("HTTP %d: %s", resp.StatusCode, string(body)),
}
}

// Parse response body
var result adctypes.SyncResult
if err := json.Unmarshal(body, &result); err != nil {
log.Errorw("failed to unmarshal ADC Server response",
zap.Error(err),
zap.String("response", string(body)),
)
return fmt.Errorf("failed to parse ADC Server response: %w", err)
}

// Check for sync failures
if result.FailedCount > 0 && len(result.Failed) > 0 {
log.Errorw("ADC Server sync failed", zap.Any("result", result))
return types.ADCExecutionServerAddrError{
ServerAddr: serverAddr,
Err: result.Failed[0].Reason,
FailedStatuses: result.Failed,
}
}

log.Debugw("ADC Server sync success", zap.Any("result", result))
return nil
}
5 changes: 5 additions & 0 deletions internal/provider/apisix/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (d *apisixProvider) handleDetailedFailedStatuses(
statusUpdateMap map[types.NamespacedNameKind][]string,
) {
for _, status := range failedStatus.FailedStatuses {
// in the APISIX standalone mode, the related values in the sync failure event are empty.
if status.Event.ResourceType == "" {
d.handleEmptyFailedStatuses(configName, failedStatus, statusUpdateMap)
return
}
id := status.Event.ResourceID
labels, err := d.client.GetResourceLabel(configName, status.Event.ResourceType, id)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/framework/manifests/apisix-standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ spec:
spec:
initContainers:
- name: config-setup
image: apache/apisix:3.13.0-ubuntu
image: apache/apisix:dev
command:
- sh
- -c
Expand All @@ -72,7 +72,7 @@ spec:
mountPath: /tmp/apisix-conf
containers:
- name: apisix
image: apache/apisix:3.13.0-ubuntu
image: apache/apisix:dev
ports:
- name: http
containerPort: 9080
Expand Down
Loading
Loading