Skip to content

Commit a171cf7

Browse files
committed
(WIP)feat: support adc server mode
Signed-off-by: Ashing Zheng <[email protected]>
1 parent 6ca4414 commit a171cf7

File tree

7 files changed

+309
-1
lines changed

7 files changed

+309
-1
lines changed

internal/controller/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func NewDefaultConfig() *Config {
5454
Type: ProviderTypeAPI7EE,
5555
SyncPeriod: types.TimeDuration{Duration: 0},
5656
InitSyncDelay: types.TimeDuration{Duration: 20 * time.Minute},
57+
ADCServerURL: "http://127.0.0.1:3000",
58+
UseADCServer: true,
5759
},
5860
}
5961
}

internal/controller/config/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,6 @@ type ProviderConfig struct {
8484
Type ProviderType `json:"type" yaml:"type"`
8585
SyncPeriod types.TimeDuration `json:"sync_period" yaml:"sync_period"`
8686
InitSyncDelay types.TimeDuration `json:"init_sync_delay" yaml:"init_sync_delay"`
87+
ADCServerURL string `json:"adc_server_url,omitempty" yaml:"adc_server_url,omitempty"`
88+
UseADCServer bool `json:"use_adc_server,omitempty" yaml:"use_adc_server,omitempty"`
8789
}

internal/manager/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ func Run(ctx context.Context, logger logr.Logger) error {
173173
SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration,
174174
InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration,
175175
BackendMode: string(config.ControllerConfig.ProviderConfig.Type),
176+
ADCServerURL: config.ControllerConfig.ProviderConfig.ADCServerURL,
177+
UseADCServer: config.ControllerConfig.ProviderConfig.UseADCServer,
176178
})
177179
if err != nil {
178180
setupLog.Error(err, "unable to create provider")

internal/provider/adc/adc.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,23 @@ func New(updater status.Updater, readier readiness.ReadinessManager, opts ...Opt
115115
o := Options{}
116116
o.ApplyOptions(opts)
117117

118+
// Choose executor based on configuration
119+
var executor ADCExecutor
120+
if o.UseADCServer && o.ADCServerURL != "" {
121+
executor = NewHTTPADCExecutor(o.ADCServerURL)
122+
log.Infow("using HTTP ADC Executor", zap.String("server_url", o.ADCServerURL))
123+
} else {
124+
executor = &DefaultADCExecutor{}
125+
log.Infow("using default CLI ADC Executor")
126+
}
127+
118128
return &adcClient{
119129
Options: o,
120130
translator: &translator.Translator{},
121131
configs: make(map[types.NamespacedNameKind]adcConfig),
122132
parentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind),
123133
store: NewStore(),
124-
executor: &DefaultADCExecutor{},
134+
executor: executor,
125135
updater: updater,
126136
readier: readier,
127137
syncCh: make(chan struct{}, 1),

internal/provider/adc/executor.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"encoding/json"
2424
"errors"
2525
"fmt"
26+
"io"
27+
"net/http"
2628
"os"
2729
"os/exec"
2830
"strings"
@@ -194,3 +196,250 @@ func BuildADCExecuteArgs(filePath string, labels map[string]string, types []stri
194196
}
195197
return args
196198
}
199+
200+
// ADCServerRequest represents the request body for ADC Server /sync endpoint
201+
type ADCServerRequest struct {
202+
Task ADCServerTask `json:"task"`
203+
}
204+
205+
// ADCServerTask represents the task configuration in ADC Server request
206+
type ADCServerTask struct {
207+
Opts ADCServerOpts `json:"opts"`
208+
Config adctypes.Resources `json:"config"`
209+
}
210+
211+
// ADCServerOpts represents the options in ADC Server task
212+
type ADCServerOpts struct {
213+
Backend string `json:"backend"`
214+
Server string `json:"server"`
215+
Token string `json:"token"`
216+
Labels map[string]string `json:"labels,omitempty"`
217+
Types []string `json:"types,omitempty"`
218+
TlsVerify *bool `json:"tls_verify,omitempty"`
219+
}
220+
221+
// HTTPADCExecutor implements ADCExecutor interface using HTTP calls to ADC Server
222+
type HTTPADCExecutor struct {
223+
sync.Mutex
224+
httpClient *http.Client
225+
serverURL string
226+
}
227+
228+
// NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL
229+
func NewHTTPADCExecutor(serverURL string) *HTTPADCExecutor {
230+
return &HTTPADCExecutor{
231+
httpClient: &http.Client{
232+
Timeout: 30 * time.Second,
233+
},
234+
serverURL: serverURL,
235+
}
236+
}
237+
238+
// Execute implements the ADCExecutor interface using HTTP calls
239+
func (e *HTTPADCExecutor) Execute(ctx context.Context, mode string, config adcConfig, args []string) error {
240+
e.Lock()
241+
defer e.Unlock()
242+
243+
return e.runHTTPSync(ctx, mode, config, args)
244+
}
245+
246+
// runHTTPSync performs HTTP sync to ADC Server for each server address
247+
func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, mode string, config adcConfig, args []string) error {
248+
var execErrs = types.ADCExecutionError{
249+
Name: config.Name,
250+
}
251+
252+
for _, addr := range config.ServerAddrs {
253+
if err := e.runHTTPSyncForSingleServer(ctx, addr, mode, config, args); err != nil {
254+
log.Errorw("failed to run http sync for server", zap.String("server", addr), zap.Error(err))
255+
var execErr types.ADCExecutionServerAddrError
256+
if errors.As(err, &execErr) {
257+
execErrs.FailedErrors = append(execErrs.FailedErrors, execErr)
258+
} else {
259+
execErrs.FailedErrors = append(execErrs.FailedErrors, types.ADCExecutionServerAddrError{
260+
ServerAddr: addr,
261+
Err: err.Error(),
262+
})
263+
}
264+
}
265+
}
266+
if len(execErrs.FailedErrors) > 0 {
267+
return execErrs
268+
}
269+
return nil
270+
}
271+
272+
// runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
273+
func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, serverAddr, mode string, config adcConfig, args []string) error {
274+
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
275+
defer cancel()
276+
277+
// Parse args to extract labels, types, and file path
278+
labels, types, filePath, err := e.parseArgs(args)
279+
if err != nil {
280+
return fmt.Errorf("failed to parse args: %w", err)
281+
}
282+
283+
// Load resources from file
284+
resources, err := e.loadResourcesFromFile(filePath)
285+
if err != nil {
286+
return fmt.Errorf("failed to load resources from file %s: %w", filePath, err)
287+
}
288+
289+
// Build HTTP request
290+
req, err := e.buildHTTPRequest(ctx, serverAddr, mode, config, labels, types, resources)
291+
if err != nil {
292+
return fmt.Errorf("failed to build HTTP request: %w", err)
293+
}
294+
295+
// Send HTTP request
296+
resp, err := e.httpClient.Do(req)
297+
if err != nil {
298+
return fmt.Errorf("failed to send HTTP request: %w", err)
299+
}
300+
defer func() {
301+
if closeErr := resp.Body.Close(); closeErr != nil {
302+
log.Warnw("failed to close response body", zap.Error(closeErr))
303+
}
304+
}()
305+
306+
// Handle HTTP response
307+
return e.handleHTTPResponse(resp, serverAddr)
308+
}
309+
310+
// parseArgs parses the command line arguments to extract labels, types, and file path
311+
func (e *HTTPADCExecutor) parseArgs(args []string) (map[string]string, []string, string, error) {
312+
labels := make(map[string]string)
313+
var types []string
314+
var filePath string
315+
316+
for i := 0; i < len(args); i++ {
317+
switch args[i] {
318+
case "-f":
319+
if i+1 < len(args) {
320+
filePath = args[i+1]
321+
i++
322+
}
323+
case "--label-selector":
324+
if i+1 < len(args) {
325+
labelPair := args[i+1]
326+
parts := strings.SplitN(labelPair, "=", 2)
327+
if len(parts) == 2 {
328+
labels[parts[0]] = parts[1]
329+
}
330+
i++
331+
}
332+
case "--include-resource-type":
333+
if i+1 < len(args) {
334+
types = append(types, args[i+1])
335+
i++
336+
}
337+
}
338+
}
339+
340+
if filePath == "" {
341+
return nil, nil, "", errors.New("file path not found in args")
342+
}
343+
344+
return labels, types, filePath, nil
345+
}
346+
347+
// loadResourcesFromFile loads ADC resources from the specified file
348+
func (e *HTTPADCExecutor) loadResourcesFromFile(filePath string) (*adctypes.Resources, error) {
349+
data, err := os.ReadFile(filePath)
350+
if err != nil {
351+
return nil, fmt.Errorf("failed to read file: %w", err)
352+
}
353+
354+
var resources adctypes.Resources
355+
if err := json.Unmarshal(data, &resources); err != nil {
356+
return nil, fmt.Errorf("failed to unmarshal resources: %w", err)
357+
}
358+
359+
return &resources, nil
360+
}
361+
362+
// buildHTTPRequest builds the HTTP request for ADC Server
363+
func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr, mode string, config adcConfig, labels map[string]string, types []string, resources *adctypes.Resources) (*http.Request, error) {
364+
// Prepare request body
365+
tlsVerify := config.TlsVerify
366+
reqBody := ADCServerRequest{
367+
Task: ADCServerTask{
368+
Opts: ADCServerOpts{
369+
Backend: mode,
370+
Server: serverAddr,
371+
Token: config.Token,
372+
Labels: labels,
373+
Types: types,
374+
TlsVerify: &tlsVerify,
375+
},
376+
Config: *resources,
377+
},
378+
}
379+
380+
jsonData, err := json.Marshal(reqBody)
381+
if err != nil {
382+
return nil, fmt.Errorf("failed to marshal request body: %w", err)
383+
}
384+
385+
log.Debugw("sending HTTP request to ADC Server",
386+
zap.String("url", e.serverURL+"/sync"),
387+
zap.String("server", serverAddr),
388+
zap.String("mode", mode),
389+
zap.Any("labels", labels),
390+
zap.Strings("types", types),
391+
)
392+
393+
// Create HTTP request
394+
req, err := http.NewRequestWithContext(ctx, "PUT", e.serverURL+"/sync", bytes.NewBuffer(jsonData))
395+
if err != nil {
396+
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
397+
}
398+
399+
req.Header.Set("Content-Type", "application/json")
400+
return req, nil
401+
}
402+
403+
// handleHTTPResponse handles the HTTP response from ADC Server
404+
func (e *HTTPADCExecutor) handleHTTPResponse(resp *http.Response, serverAddr string) error {
405+
body, err := io.ReadAll(resp.Body)
406+
if err != nil {
407+
return fmt.Errorf("failed to read response body: %w", err)
408+
}
409+
410+
log.Debugw("received HTTP response from ADC Server",
411+
zap.String("server", serverAddr),
412+
zap.Int("status", resp.StatusCode),
413+
zap.String("response", string(body)),
414+
)
415+
416+
if resp.StatusCode != http.StatusOK {
417+
return types.ADCExecutionServerAddrError{
418+
ServerAddr: serverAddr,
419+
Err: fmt.Sprintf("HTTP %d: %s", resp.StatusCode, string(body)),
420+
}
421+
}
422+
423+
// Parse response body
424+
var result adctypes.SyncResult
425+
if err := json.Unmarshal(body, &result); err != nil {
426+
log.Errorw("failed to unmarshal ADC Server response",
427+
zap.Error(err),
428+
zap.String("response", string(body)),
429+
)
430+
return fmt.Errorf("failed to parse ADC Server response: %w", err)
431+
}
432+
433+
// Check for sync failures
434+
if result.FailedCount > 0 && len(result.Failed) > 0 {
435+
log.Errorw("ADC Server sync failed", zap.Any("result", result))
436+
return types.ADCExecutionServerAddrError{
437+
ServerAddr: serverAddr,
438+
Err: result.Failed[0].Reason,
439+
FailedStatuses: result.Failed,
440+
}
441+
}
442+
443+
log.Debugw("ADC Server sync success", zap.Any("result", result))
444+
return nil
445+
}

internal/provider/adc/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type Options struct {
2828
SyncPeriod time.Duration
2929
InitSyncDelay time.Duration
3030
BackendMode string
31+
ADCServerURL string
32+
UseADCServer bool
3133
}
3234

3335
func (o *Options) ApplyToList(lo *Options) {
@@ -43,6 +45,12 @@ func (o *Options) ApplyToList(lo *Options) {
4345
if o.BackendMode != "" {
4446
lo.BackendMode = o.BackendMode
4547
}
48+
if o.ADCServerURL != "" {
49+
lo.ADCServerURL = o.ADCServerURL
50+
}
51+
if o.UseADCServer {
52+
lo.UseADCServer = o.UseADCServer
53+
}
4654
}
4755

4856
func (o *Options) ApplyOptions(opts []Option) *Options {

test/e2e/framework/manifests/ingress.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ data:
342342
# The default value is 0 seconds, which means the controller will not sync.
343343
# If you want to enable the sync, set it to a positive value.
344344
init_sync_delay: {{ .InitSyncDelay | default "1m" }}
345+
adc_server_url: "http://127.0.0.1:3000"
346+
use_adc_server: true
345347
---
346348
apiVersion: v1
347349
kind: Service
@@ -422,6 +424,39 @@ spec:
422424
capabilities:
423425
drop:
424426
- ALL
427+
- image: ghcr.io/api7/adc:dev
428+
env:
429+
- name: ADC_RUNNING_MODE
430+
value: ingress
431+
- name: ADC_EXPERIMENTAL_FEATURE_FLAGS
432+
value: remote-state-file,parallel-backend-request
433+
name: adc-server
434+
args:
435+
- "server"
436+
- "--listen"
437+
- "http://127.0.0.1:3000"
438+
ports:
439+
- name: http
440+
containerPort: 3000
441+
protocol: TCP
442+
livenessProbe:
443+
httpGet:
444+
path: /healthz
445+
port: 3000
446+
initialDelaySeconds: 10
447+
periodSeconds: 10
448+
readinessProbe:
449+
httpGet:
450+
path: /healthz
451+
port: 3000
452+
initialDelaySeconds: 5
453+
periodSeconds: 5
454+
securityContext:
455+
allowPrivilegeEscalation: false
456+
capabilities:
457+
drop:
458+
- ALL
459+
runAsNonRoot: true
425460
volumes:
426461
- name: ingress-config
427462
configMap:

0 commit comments

Comments
 (0)