@@ -23,6 +23,8 @@ import (
2323 "encoding/json"
2424 "errors"
2525 "fmt"
26+ "io"
27+ "net/http"
2628 "os"
2729 "os/exec"
2830 "strings"
@@ -31,11 +33,16 @@ import (
3133
3234 "github.com/api7/gopkg/pkg/log"
3335 "go.uber.org/zap"
36+ "k8s.io/utils/ptr"
3437
3538 adctypes "github.com/apache/apisix-ingress-controller/api/adc"
3639 "github.com/apache/apisix-ingress-controller/internal/types"
3740)
3841
42+ const (
43+ defaultHTTPADCExecutorAddr = "http://127.0.0.1:3000"
44+ )
45+
3946type ADCExecutor interface {
4047 Execute (ctx context.Context , mode string , config adctypes.Config , args []string ) error
4148}
@@ -45,9 +52,6 @@ type DefaultADCExecutor struct {
4552}
4653
4754func (e * DefaultADCExecutor ) Execute (ctx context.Context , mode string , config adctypes.Config , args []string ) error {
48- e .Lock ()
49- defer e .Unlock ()
50-
5155 return e .runADC (ctx , mode , config , args )
5256}
5357
@@ -194,3 +198,261 @@ func BuildADCExecuteArgs(filePath string, labels map[string]string, types []stri
194198 }
195199 return args
196200}
201+
202+ // ADCServerRequest represents the request body for ADC Server /sync endpoint
203+ type ADCServerRequest struct {
204+ Task ADCServerTask `json:"task"`
205+ }
206+
207+ // ADCServerTask represents the task configuration in ADC Server request
208+ type ADCServerTask struct {
209+ Opts ADCServerOpts `json:"opts"`
210+ Config adctypes.Resources `json:"config"`
211+ }
212+
213+ // ADCServerOpts represents the options in ADC Server task
214+ type ADCServerOpts struct {
215+ Backend string `json:"backend"`
216+ Server []string `json:"server"`
217+ Token string `json:"token"`
218+ LabelSelector map [string ]string `json:"labelSelector,omitempty"`
219+ IncludeResourceType []string `json:"includeResourceType,omitempty"`
220+ TlsSkipVerify * bool `json:"tlsSkipVerify,omitempty"`
221+ CacheKey string `json:"cacheKey"`
222+ }
223+
224+ // HTTPADCExecutor implements ADCExecutor interface using HTTP calls to ADC Server
225+ type HTTPADCExecutor struct {
226+ httpClient * http.Client
227+ serverURL string
228+ }
229+
230+ // NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL
231+ func NewHTTPADCExecutor (serverURL string , timeout time.Duration ) * HTTPADCExecutor {
232+ return & HTTPADCExecutor {
233+ httpClient : & http.Client {
234+ Timeout : timeout ,
235+ },
236+ serverURL : serverURL ,
237+ }
238+ }
239+
240+ // Execute implements the ADCExecutor interface using HTTP calls
241+ func (e * HTTPADCExecutor ) Execute (ctx context.Context , mode string , config adctypes.Config , args []string ) error {
242+ return e .runHTTPSync (ctx , mode , config , args )
243+ }
244+
245+ // runHTTPSync performs HTTP sync to ADC Server for each server address
246+ func (e * HTTPADCExecutor ) runHTTPSync (ctx context.Context , mode string , config adctypes.Config , args []string ) error {
247+ var execErrs = types.ADCExecutionError {
248+ Name : config .Name ,
249+ }
250+
251+ serverAddrs := func () []string {
252+ if mode == "apisix-standalone" {
253+ return []string {strings .Join (config .ServerAddrs , "," )}
254+ }
255+ return config .ServerAddrs
256+ }()
257+ log .Debugw ("running http sync" , zap .Strings ("serverAddrs" , serverAddrs ), zap .String ("mode" , mode ))
258+
259+ for _ , addr := range serverAddrs {
260+ if err := e .runHTTPSyncForSingleServer (ctx , addr , mode , config , args ); err != nil {
261+ log .Errorw ("failed to run http sync for server" , zap .String ("server" , addr ), zap .Error (err ))
262+ var execErr types.ADCExecutionServerAddrError
263+ if errors .As (err , & execErr ) {
264+ execErrs .FailedErrors = append (execErrs .FailedErrors , execErr )
265+ } else {
266+ execErrs .FailedErrors = append (execErrs .FailedErrors , types.ADCExecutionServerAddrError {
267+ ServerAddr : addr ,
268+ Err : err .Error (),
269+ })
270+ }
271+ }
272+ }
273+ if len (execErrs .FailedErrors ) > 0 {
274+ return execErrs
275+ }
276+ return nil
277+ }
278+
279+ // runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
280+ func (e * HTTPADCExecutor ) runHTTPSyncForSingleServer (ctx context.Context , serverAddr , mode string , config adctypes.Config , args []string ) error {
281+ ctx , cancel := context .WithTimeout (ctx , e .httpClient .Timeout )
282+ defer cancel ()
283+
284+ // Parse args to extract labels, types, and file path
285+ labels , types , filePath , err := e .parseArgs (args )
286+ if err != nil {
287+ return fmt .Errorf ("failed to parse args: %w" , err )
288+ }
289+
290+ // Load resources from file
291+ resources , err := e .loadResourcesFromFile (filePath )
292+ if err != nil {
293+ return fmt .Errorf ("failed to load resources from file %s: %w" , filePath , err )
294+ }
295+
296+ // Build HTTP request
297+ req , err := e .buildHTTPRequest (ctx , serverAddr , mode , config , labels , types , resources )
298+ if err != nil {
299+ return fmt .Errorf ("failed to build HTTP request: %w" , err )
300+ }
301+
302+ // Send HTTP request
303+ resp , err := e .httpClient .Do (req )
304+ if err != nil {
305+ return fmt .Errorf ("failed to send HTTP request: %w" , err )
306+ }
307+ defer func () {
308+ if closeErr := resp .Body .Close (); closeErr != nil {
309+ log .Warnw ("failed to close response body" , zap .Error (closeErr ))
310+ }
311+ }()
312+
313+ // Handle HTTP response
314+ return e .handleHTTPResponse (resp , serverAddr )
315+ }
316+
317+ // parseArgs parses the command line arguments to extract labels, types, and file path
318+ func (e * HTTPADCExecutor ) parseArgs (args []string ) (map [string ]string , []string , string , error ) {
319+ labels := make (map [string ]string )
320+ var types []string
321+ var filePath string
322+
323+ for i := 0 ; i < len (args ); i ++ {
324+ switch args [i ] {
325+ case "-f" :
326+ if i + 1 < len (args ) {
327+ filePath = args [i + 1 ]
328+ i ++
329+ }
330+ case "--label-selector" :
331+ if i + 1 < len (args ) {
332+ labelPair := args [i + 1 ]
333+ parts := strings .SplitN (labelPair , "=" , 2 )
334+ if len (parts ) == 2 {
335+ labels [parts [0 ]] = parts [1 ]
336+ }
337+ i ++
338+ }
339+ case "--include-resource-type" :
340+ if i + 1 < len (args ) {
341+ types = append (types , args [i + 1 ])
342+ i ++
343+ }
344+ }
345+ }
346+
347+ if filePath == "" {
348+ return nil , nil , "" , errors .New ("file path not found in args" )
349+ }
350+
351+ return labels , types , filePath , nil
352+ }
353+
354+ // loadResourcesFromFile loads ADC resources from the specified file
355+ func (e * HTTPADCExecutor ) loadResourcesFromFile (filePath string ) (* adctypes.Resources , error ) {
356+ data , err := os .ReadFile (filePath )
357+ if err != nil {
358+ return nil , fmt .Errorf ("failed to read file: %w" , err )
359+ }
360+
361+ var resources adctypes.Resources
362+ if err := json .Unmarshal (data , & resources ); err != nil {
363+ return nil , fmt .Errorf ("failed to unmarshal resources: %w" , err )
364+ }
365+
366+ return & resources , nil
367+ }
368+
369+ // buildHTTPRequest builds the HTTP request for ADC Server
370+ func (e * HTTPADCExecutor ) buildHTTPRequest (ctx context.Context , serverAddr , mode string , config adctypes.Config , labels map [string ]string , types []string , resources * adctypes.Resources ) (* http.Request , error ) {
371+ // Prepare request body
372+ tlsVerify := config .TlsVerify
373+ reqBody := ADCServerRequest {
374+ Task : ADCServerTask {
375+ Opts : ADCServerOpts {
376+ Backend : mode ,
377+ Server : strings .Split (serverAddr , "," ),
378+ Token : config .Token ,
379+ LabelSelector : labels ,
380+ IncludeResourceType : types ,
381+ TlsSkipVerify : ptr .To (! tlsVerify ),
382+ CacheKey : config .Name ,
383+ },
384+ Config : * resources ,
385+ },
386+ }
387+
388+ jsonData , err := json .Marshal (reqBody )
389+ if err != nil {
390+ return nil , fmt .Errorf ("failed to marshal request body: %w" , err )
391+ }
392+
393+ log .Debugw ("request body" , zap .String ("body" , string (jsonData )))
394+
395+ log .Debugw ("sending HTTP request to ADC Server" ,
396+ zap .String ("url" , e .serverURL + "/sync" ),
397+ zap .String ("server" , serverAddr ),
398+ zap .String ("mode" , mode ),
399+ zap .String ("cacheKey" , config .Name ),
400+ zap .Any ("labelSelector" , labels ),
401+ zap .Strings ("includeResourceType" , types ),
402+ zap .Bool ("tlsSkipVerify" , ! tlsVerify ),
403+ )
404+
405+ // Create HTTP request
406+ req , err := http .NewRequestWithContext (ctx , "PUT" , e .serverURL + "/sync" , bytes .NewBuffer (jsonData ))
407+ if err != nil {
408+ return nil , fmt .Errorf ("failed to create HTTP request: %w" , err )
409+ }
410+
411+ req .Header .Set ("Content-Type" , "application/json" )
412+ return req , nil
413+ }
414+
415+ // handleHTTPResponse handles the HTTP response from ADC Server
416+ func (e * HTTPADCExecutor ) handleHTTPResponse (resp * http.Response , serverAddr string ) error {
417+ body , err := io .ReadAll (resp .Body )
418+ if err != nil {
419+ return fmt .Errorf ("failed to read response body: %w" , err )
420+ }
421+
422+ log .Debugw ("received HTTP response from ADC Server" ,
423+ zap .String ("server" , serverAddr ),
424+ zap .Int ("status" , resp .StatusCode ),
425+ zap .String ("response" , string (body )),
426+ )
427+
428+ // not only 200, HTTP 202 is also accepted
429+ if resp .StatusCode / 100 != 2 {
430+ return types.ADCExecutionServerAddrError {
431+ ServerAddr : serverAddr ,
432+ Err : fmt .Sprintf ("HTTP %d: %s" , resp .StatusCode , string (body )),
433+ }
434+ }
435+
436+ // Parse response body
437+ var result adctypes.SyncResult
438+ if err := json .Unmarshal (body , & result ); err != nil {
439+ log .Errorw ("failed to unmarshal ADC Server response" ,
440+ zap .Error (err ),
441+ zap .String ("response" , string (body )),
442+ )
443+ return fmt .Errorf ("failed to parse ADC Server response: %w" , err )
444+ }
445+
446+ // Check for sync failures
447+ if result .FailedCount > 0 && len (result .Failed ) > 0 {
448+ log .Errorw ("ADC Server sync failed" , zap .Any ("result" , result ))
449+ return types.ADCExecutionServerAddrError {
450+ ServerAddr : serverAddr ,
451+ Err : result .Failed [0 ].Reason ,
452+ FailedStatuses : result .Failed ,
453+ }
454+ }
455+
456+ log .Debugw ("ADC Server sync success" , zap .Any ("result" , result ))
457+ return nil
458+ }
0 commit comments