@@ -23,6 +23,8 @@ import (
2323 "encoding/json"
2424 "errors"
2525 "fmt"
26+ "io"
27+ "net/http"
2628 "os"
2729 "os/exec"
2830 "strings"
@@ -31,11 +33,16 @@ import (
3133
3234 "github.com/api7/gopkg/pkg/log"
3335 "go.uber.org/zap"
36+ "k8s.io/utils/ptr"
3437
3538 adctypes "github.com/apache/apisix-ingress-controller/api/adc"
3639 "github.com/apache/apisix-ingress-controller/internal/types"
3740)
3841
42+ const (
43+ defaultHTTPADCExecutorAddr = "http://127.0.0.1:3000"
44+ )
45+
3946type ADCExecutor interface {
4047 Execute (ctx context.Context , mode string , config adctypes.Config , args []string ) error
4148}
@@ -194,3 +201,252 @@ func BuildADCExecuteArgs(filePath string, labels map[string]string, types []stri
194201 }
195202 return args
196203}
204+
205+ // ADCServerRequest represents the request body for ADC Server /sync endpoint
206+ type ADCServerRequest struct {
207+ Task ADCServerTask `json:"task"`
208+ }
209+
210+ // ADCServerTask represents the task configuration in ADC Server request
211+ type ADCServerTask struct {
212+ Opts ADCServerOpts `json:"opts"`
213+ Config adctypes.Resources `json:"config"`
214+ }
215+
216+ // ADCServerOpts represents the options in ADC Server task
217+ type ADCServerOpts struct {
218+ Backend string `json:"backend"`
219+ Server string `json:"server"`
220+ Token string `json:"token"`
221+ LabelSelector map [string ]string `json:"labelSelector,omitempty"`
222+ IncludeResourceType []string `json:"includeResourceType,omitempty"`
223+ TlsSkipVerify * bool `json:"tlsSkipVerify,omitempty"`
224+ }
225+
226+ // HTTPADCExecutor implements ADCExecutor interface using HTTP calls to ADC Server
227+ type HTTPADCExecutor struct {
228+ sync.Mutex
229+ httpClient * http.Client
230+ serverURL string
231+ }
232+
233+ // NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL
234+ func NewHTTPADCExecutor (serverURL string ) * HTTPADCExecutor {
235+ return & HTTPADCExecutor {
236+ httpClient : & http.Client {
237+ Timeout : 30 * time .Second ,
238+ },
239+ serverURL : serverURL ,
240+ }
241+ }
242+
243+ // Execute implements the ADCExecutor interface using HTTP calls
244+ func (e * HTTPADCExecutor ) Execute (ctx context.Context , mode string , config adctypes.Config , args []string ) error {
245+ e .Lock ()
246+ defer e .Unlock ()
247+
248+ return e .runHTTPSync (ctx , mode , config , args )
249+ }
250+
251+ // runHTTPSync performs HTTP sync to ADC Server for each server address
252+ func (e * HTTPADCExecutor ) runHTTPSync (ctx context.Context , mode string , config adctypes.Config , args []string ) error {
253+ var execErrs = types.ADCExecutionError {
254+ Name : config .Name ,
255+ }
256+
257+ for _ , addr := range config .ServerAddrs {
258+ if err := e .runHTTPSyncForSingleServer (ctx , addr , mode , config , args ); err != nil {
259+ log .Errorw ("failed to run http sync for server" , zap .String ("server" , addr ), zap .Error (err ))
260+ var execErr types.ADCExecutionServerAddrError
261+ if errors .As (err , & execErr ) {
262+ execErrs .FailedErrors = append (execErrs .FailedErrors , execErr )
263+ } else {
264+ execErrs .FailedErrors = append (execErrs .FailedErrors , types.ADCExecutionServerAddrError {
265+ ServerAddr : addr ,
266+ Err : err .Error (),
267+ })
268+ }
269+ }
270+ }
271+ if len (execErrs .FailedErrors ) > 0 {
272+ return execErrs
273+ }
274+ return nil
275+ }
276+
277+ // runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
278+ func (e * HTTPADCExecutor ) runHTTPSyncForSingleServer (ctx context.Context , serverAddr , mode string , config adctypes.Config , args []string ) error {
279+ ctx , cancel := context .WithTimeout (ctx , 15 * time .Second )
280+ defer cancel ()
281+
282+ // Parse args to extract labels, types, and file path
283+ labels , types , filePath , err := e .parseArgs (args )
284+ if err != nil {
285+ return fmt .Errorf ("failed to parse args: %w" , err )
286+ }
287+
288+ // Load resources from file
289+ resources , err := e .loadResourcesFromFile (filePath )
290+ if err != nil {
291+ return fmt .Errorf ("failed to load resources from file %s: %w" , filePath , err )
292+ }
293+
294+ // Build HTTP request
295+ req , err := e .buildHTTPRequest (ctx , serverAddr , mode , config , labels , types , resources )
296+ if err != nil {
297+ return fmt .Errorf ("failed to build HTTP request: %w" , err )
298+ }
299+
300+ // Send HTTP request
301+ resp , err := e .httpClient .Do (req )
302+ if err != nil {
303+ return fmt .Errorf ("failed to send HTTP request: %w" , err )
304+ }
305+ defer func () {
306+ if closeErr := resp .Body .Close (); closeErr != nil {
307+ log .Warnw ("failed to close response body" , zap .Error (closeErr ))
308+ }
309+ }()
310+
311+ // Handle HTTP response
312+ return e .handleHTTPResponse (resp , serverAddr )
313+ }
314+
315+ // parseArgs parses the command line arguments to extract labels, types, and file path
316+ func (e * HTTPADCExecutor ) parseArgs (args []string ) (map [string ]string , []string , string , error ) {
317+ labels := make (map [string ]string )
318+ var types []string
319+ var filePath string
320+
321+ for i := 0 ; i < len (args ); i ++ {
322+ switch args [i ] {
323+ case "-f" :
324+ if i + 1 < len (args ) {
325+ filePath = args [i + 1 ]
326+ i ++
327+ }
328+ case "--label-selector" :
329+ if i + 1 < len (args ) {
330+ labelPair := args [i + 1 ]
331+ parts := strings .SplitN (labelPair , "=" , 2 )
332+ if len (parts ) == 2 {
333+ labels [parts [0 ]] = parts [1 ]
334+ }
335+ i ++
336+ }
337+ case "--include-resource-type" :
338+ if i + 1 < len (args ) {
339+ types = append (types , args [i + 1 ])
340+ i ++
341+ }
342+ }
343+ }
344+
345+ if filePath == "" {
346+ return nil , nil , "" , errors .New ("file path not found in args" )
347+ }
348+
349+ return labels , types , filePath , nil
350+ }
351+
352+ // loadResourcesFromFile loads ADC resources from the specified file
353+ func (e * HTTPADCExecutor ) loadResourcesFromFile (filePath string ) (* adctypes.Resources , error ) {
354+ data , err := os .ReadFile (filePath )
355+ if err != nil {
356+ return nil , fmt .Errorf ("failed to read file: %w" , err )
357+ }
358+
359+ var resources adctypes.Resources
360+ if err := json .Unmarshal (data , & resources ); err != nil {
361+ return nil , fmt .Errorf ("failed to unmarshal resources: %w" , err )
362+ }
363+
364+ return & resources , nil
365+ }
366+
367+ // buildHTTPRequest builds the HTTP request for ADC Server
368+ func (e * HTTPADCExecutor ) buildHTTPRequest (ctx context.Context , serverAddr , mode string , config adctypes.Config , labels map [string ]string , types []string , resources * adctypes.Resources ) (* http.Request , error ) {
369+ // Prepare request body
370+ tlsVerify := config .TlsVerify
371+ reqBody := ADCServerRequest {
372+ Task : ADCServerTask {
373+ Opts : ADCServerOpts {
374+ Backend : mode ,
375+ Server : serverAddr ,
376+ Token : config .Token ,
377+ LabelSelector : labels ,
378+ IncludeResourceType : types ,
379+ TlsSkipVerify : ptr .To (! tlsVerify ),
380+ },
381+ Config : * resources ,
382+ },
383+ }
384+
385+ jsonData , err := json .Marshal (reqBody )
386+ if err != nil {
387+ return nil , fmt .Errorf ("failed to marshal request body: %w" , err )
388+ }
389+
390+ log .Debugw ("sending HTTP request to ADC Server" ,
391+ zap .String ("url" , e .serverURL + "/sync" ),
392+ zap .String ("server" , serverAddr ),
393+ zap .String ("mode" , mode ),
394+ zap .Any ("labelSelector" , labels ),
395+ zap .Strings ("includeResourceType" , types ),
396+ zap .Bool ("tlsSkipVerify" , ! tlsVerify ),
397+ )
398+
399+ // Create HTTP request
400+ req , err := http .NewRequestWithContext (ctx , "PUT" , e .serverURL + "/sync" , bytes .NewBuffer (jsonData ))
401+ if err != nil {
402+ return nil , fmt .Errorf ("failed to create HTTP request: %w" , err )
403+ }
404+
405+ req .Header .Set ("Content-Type" , "application/json" )
406+ return req , nil
407+ }
408+
409+ // handleHTTPResponse handles the HTTP response from ADC Server
410+ func (e * HTTPADCExecutor ) handleHTTPResponse (resp * http.Response , serverAddr string ) error {
411+ body , err := io .ReadAll (resp .Body )
412+ if err != nil {
413+ return fmt .Errorf ("failed to read response body: %w" , err )
414+ }
415+
416+ log .Debugw ("received HTTP response from ADC Server" ,
417+ zap .String ("server" , serverAddr ),
418+ zap .Int ("status" , resp .StatusCode ),
419+ zap .String ("response" , string (body )),
420+ )
421+
422+ // not only 200, HTTP 202 is also accepted
423+ if resp .StatusCode / 100 != 2 {
424+ return types.ADCExecutionServerAddrError {
425+ ServerAddr : serverAddr ,
426+ Err : fmt .Sprintf ("HTTP %d: %s" , resp .StatusCode , string (body )),
427+ }
428+ }
429+
430+ // Parse response body
431+ var result adctypes.SyncResult
432+ if err := json .Unmarshal (body , & result ); err != nil {
433+ log .Errorw ("failed to unmarshal ADC Server response" ,
434+ zap .Error (err ),
435+ zap .String ("response" , string (body )),
436+ )
437+ return fmt .Errorf ("failed to parse ADC Server response: %w" , err )
438+ }
439+
440+ // Check for sync failures
441+ if result .FailedCount > 0 && len (result .Failed ) > 0 {
442+ log .Errorw ("ADC Server sync failed" , zap .Any ("result" , result ))
443+ return types.ADCExecutionServerAddrError {
444+ ServerAddr : serverAddr ,
445+ Err : result .Failed [0 ].Reason ,
446+ FailedStatuses : result .Failed ,
447+ }
448+ }
449+
450+ log .Debugw ("ADC Server sync success" , zap .Any ("result" , result ))
451+ return nil
452+ }
0 commit comments