@@ -23,6 +23,8 @@ import (
2323 "encoding/json"
2424 "errors"
2525 "fmt"
26+ "io"
27+ "net/http"
2628 "os"
2729 "os/exec"
2830 "strings"
@@ -31,6 +33,7 @@ import (
3133
3234 "github.com/api7/gopkg/pkg/log"
3335 "go.uber.org/zap"
36+ "k8s.io/utils/ptr"
3437
3538 adctypes "github.com/apache/apisix-ingress-controller/api/adc"
3639 "github.com/apache/apisix-ingress-controller/internal/types"
@@ -194,3 +197,252 @@ func BuildADCExecuteArgs(filePath string, labels map[string]string, types []stri
194197 }
195198 return args
196199}
200+
201+ // ADCServerRequest represents the request body for ADC Server /sync endpoint
202+ type ADCServerRequest struct {
203+ Task ADCServerTask `json:"task"`
204+ }
205+
206+ // ADCServerTask represents the task configuration in ADC Server request
207+ type ADCServerTask struct {
208+ Opts ADCServerOpts `json:"opts"`
209+ Config adctypes.Resources `json:"config"`
210+ }
211+
212+ // ADCServerOpts represents the options in ADC Server task
213+ type ADCServerOpts struct {
214+ Backend string `json:"backend"`
215+ Server string `json:"server"`
216+ Token string `json:"token"`
217+ LabelSelector map [string ]string `json:"labelSelector,omitempty"`
218+ IncludeResourceType []string `json:"includeResourceType,omitempty"`
219+ TlsSkipVerify * bool `json:"tlsSkipVerify,omitempty"`
220+ }
221+
222+ // HTTPADCExecutor implements ADCExecutor interface using HTTP calls to ADC Server
223+ type HTTPADCExecutor struct {
224+ sync.Mutex
225+ httpClient * http.Client
226+ serverURL string
227+ }
228+
229+ // NewHTTPADCExecutor creates a new HTTPADCExecutor with the specified ADC Server URL
230+ func NewHTTPADCExecutor (serverURL string ) * HTTPADCExecutor {
231+ return & HTTPADCExecutor {
232+ httpClient : & http.Client {
233+ Timeout : 30 * time .Second ,
234+ },
235+ serverURL : serverURL ,
236+ }
237+ }
238+
239+ // Execute implements the ADCExecutor interface using HTTP calls
240+ func (e * HTTPADCExecutor ) Execute (ctx context.Context , mode string , config adcConfig , args []string ) error {
241+ e .Lock ()
242+ defer e .Unlock ()
243+
244+ return e .runHTTPSync (ctx , mode , config , args )
245+ }
246+
247+ // runHTTPSync performs HTTP sync to ADC Server for each server address
248+ func (e * HTTPADCExecutor ) runHTTPSync (ctx context.Context , mode string , config adcConfig , args []string ) error {
249+ var execErrs = types.ADCExecutionError {
250+ Name : config .Name ,
251+ }
252+
253+ for _ , addr := range config .ServerAddrs {
254+ if err := e .runHTTPSyncForSingleServer (ctx , addr , mode , config , args ); err != nil {
255+ log .Errorw ("failed to run http sync for server" , zap .String ("server" , addr ), zap .Error (err ))
256+ var execErr types.ADCExecutionServerAddrError
257+ if errors .As (err , & execErr ) {
258+ execErrs .FailedErrors = append (execErrs .FailedErrors , execErr )
259+ } else {
260+ execErrs .FailedErrors = append (execErrs .FailedErrors , types.ADCExecutionServerAddrError {
261+ ServerAddr : addr ,
262+ Err : err .Error (),
263+ })
264+ }
265+ }
266+ }
267+ if len (execErrs .FailedErrors ) > 0 {
268+ return execErrs
269+ }
270+ return nil
271+ }
272+
273+ // runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
274+ func (e * HTTPADCExecutor ) runHTTPSyncForSingleServer (ctx context.Context , serverAddr , mode string , config adcConfig , args []string ) error {
275+ ctx , cancel := context .WithTimeout (ctx , 15 * time .Second )
276+ defer cancel ()
277+
278+ // Parse args to extract labels, types, and file path
279+ labels , types , filePath , err := e .parseArgs (args )
280+ if err != nil {
281+ return fmt .Errorf ("failed to parse args: %w" , err )
282+ }
283+
284+ // Load resources from file
285+ resources , err := e .loadResourcesFromFile (filePath )
286+ if err != nil {
287+ return fmt .Errorf ("failed to load resources from file %s: %w" , filePath , err )
288+ }
289+
290+ // Build HTTP request
291+ req , err := e .buildHTTPRequest (ctx , serverAddr , mode , config , labels , types , resources )
292+ if err != nil {
293+ return fmt .Errorf ("failed to build HTTP request: %w" , err )
294+ }
295+
296+ // Send HTTP request
297+ resp , err := e .httpClient .Do (req )
298+ if err != nil {
299+ return fmt .Errorf ("failed to send HTTP request: %w" , err )
300+ }
301+ defer func () {
302+ if closeErr := resp .Body .Close (); closeErr != nil {
303+ log .Warnw ("failed to close response body" , zap .Error (closeErr ))
304+ }
305+ }()
306+
307+ // Handle HTTP response
308+ return e .handleHTTPResponse (resp , serverAddr )
309+ }
310+
311+ // parseArgs parses the command line arguments to extract labels, types, and file path
312+ func (e * HTTPADCExecutor ) parseArgs (args []string ) (map [string ]string , []string , string , error ) {
313+ labels := make (map [string ]string )
314+ var types []string
315+ var filePath string
316+
317+ for i := 0 ; i < len (args ); i ++ {
318+ switch args [i ] {
319+ case "-f" :
320+ if i + 1 < len (args ) {
321+ filePath = args [i + 1 ]
322+ i ++
323+ }
324+ case "--label-selector" :
325+ if i + 1 < len (args ) {
326+ labelPair := args [i + 1 ]
327+ parts := strings .SplitN (labelPair , "=" , 2 )
328+ if len (parts ) == 2 {
329+ labels [parts [0 ]] = parts [1 ]
330+ }
331+ i ++
332+ }
333+ case "--include-resource-type" :
334+ if i + 1 < len (args ) {
335+ types = append (types , args [i + 1 ])
336+ i ++
337+ }
338+ }
339+ }
340+
341+ if filePath == "" {
342+ return nil , nil , "" , errors .New ("file path not found in args" )
343+ }
344+
345+ return labels , types , filePath , nil
346+ }
347+
348+ // loadResourcesFromFile loads ADC resources from the specified file
349+ func (e * HTTPADCExecutor ) loadResourcesFromFile (filePath string ) (* adctypes.Resources , error ) {
350+ data , err := os .ReadFile (filePath )
351+ if err != nil {
352+ return nil , fmt .Errorf ("failed to read file: %w" , err )
353+ }
354+
355+ var resources adctypes.Resources
356+ if err := json .Unmarshal (data , & resources ); err != nil {
357+ return nil , fmt .Errorf ("failed to unmarshal resources: %w" , err )
358+ }
359+
360+ return & resources , nil
361+ }
362+
363+ // buildHTTPRequest builds the HTTP request for ADC Server
364+ func (e * HTTPADCExecutor ) buildHTTPRequest (ctx context.Context , serverAddr , mode string , config adcConfig , labels map [string ]string , types []string , resources * adctypes.Resources ) (* http.Request , error ) {
365+ // Prepare request body
366+ tlsVerify := config .TlsVerify
367+ reqBody := ADCServerRequest {
368+ Task : ADCServerTask {
369+ Opts : ADCServerOpts {
370+ Backend : mode ,
371+ Server : serverAddr ,
372+ Token : config .Token ,
373+ LabelSelector : labels ,
374+ IncludeResourceType : types ,
375+ TlsSkipVerify : ptr .To (! tlsVerify ),
376+ },
377+ Config : * resources ,
378+ },
379+ }
380+
381+ jsonData , err := json .Marshal (reqBody )
382+ if err != nil {
383+ return nil , fmt .Errorf ("failed to marshal request body: %w" , err )
384+ }
385+
386+ log .Debugw ("sending HTTP request to ADC Server" ,
387+ zap .String ("url" , e .serverURL + "/sync" ),
388+ zap .String ("server" , serverAddr ),
389+ zap .String ("mode" , mode ),
390+ zap .Any ("labelSelector" , labels ),
391+ zap .Strings ("includeResourceType" , types ),
392+ zap .Bool ("tlsSkipVerify" , ! tlsVerify ),
393+ )
394+
395+ // Create HTTP request
396+ req , err := http .NewRequestWithContext (ctx , "PUT" , e .serverURL + "/sync" , bytes .NewBuffer (jsonData ))
397+ if err != nil {
398+ return nil , fmt .Errorf ("failed to create HTTP request: %w" , err )
399+ }
400+
401+ req .Header .Set ("Content-Type" , "application/json" )
402+ return req , nil
403+ }
404+
405+ // handleHTTPResponse handles the HTTP response from ADC Server
406+ func (e * HTTPADCExecutor ) handleHTTPResponse (resp * http.Response , serverAddr string ) error {
407+ body , err := io .ReadAll (resp .Body )
408+ if err != nil {
409+ return fmt .Errorf ("failed to read response body: %w" , err )
410+ }
411+
412+ log .Debugw ("received HTTP response from ADC Server" ,
413+ zap .String ("server" , serverAddr ),
414+ zap .Int ("status" , resp .StatusCode ),
415+ zap .String ("response" , string (body )),
416+ )
417+
418+ // not only 200, HTTP 202 is also accepted
419+ if resp .StatusCode / 100 != 2 {
420+ return types.ADCExecutionServerAddrError {
421+ ServerAddr : serverAddr ,
422+ Err : fmt .Sprintf ("HTTP %d: %s" , resp .StatusCode , string (body )),
423+ }
424+ }
425+
426+ // Parse response body
427+ var result adctypes.SyncResult
428+ if err := json .Unmarshal (body , & result ); err != nil {
429+ log .Errorw ("failed to unmarshal ADC Server response" ,
430+ zap .Error (err ),
431+ zap .String ("response" , string (body )),
432+ )
433+ return fmt .Errorf ("failed to parse ADC Server response: %w" , err )
434+ }
435+
436+ // Check for sync failures
437+ if result .FailedCount > 0 && len (result .Failed ) > 0 {
438+ log .Errorw ("ADC Server sync failed" , zap .Any ("result" , result ))
439+ return types.ADCExecutionServerAddrError {
440+ ServerAddr : serverAddr ,
441+ Err : result .Failed [0 ].Reason ,
442+ FailedStatuses : result .Failed ,
443+ }
444+ }
445+
446+ log .Debugw ("ADC Server sync success" , zap .Any ("result" , result ))
447+ return nil
448+ }
0 commit comments