diff --git a/internal/provider/adc/adc.go b/internal/provider/adc/adc.go index 6b217ef4a..4750be4d0 100644 --- a/internal/provider/adc/adc.go +++ b/internal/provider/adc/adc.go @@ -94,6 +94,8 @@ type adcClient struct { updater status.Updater statusUpdateMap map[types.NamespacedNameKind][]string + + syncCh chan struct{} } type Task struct { @@ -116,6 +118,7 @@ func New(updater status.Updater, opts ...Option) (provider.Provider, error) { store: NewStore(), executor: &DefaultADCExecutor{}, updater: updater, + syncCh: make(chan struct{}, 1), }, nil } @@ -220,6 +223,7 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext, // which only needs to be saved in cache // and triggered by a timer for synchronization if d.BackendMode == BackendModeAPISIXStandalone || d.BackendMode == BackendModeAPISIX { + d.syncNotify() return nil } @@ -289,6 +293,8 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { Name: obj.GetName(), configs: configs, }) + } else { + d.syncNotify() } return nil case BackendModeAPI7EE: @@ -306,12 +312,14 @@ func (d *adcClient) Delete(ctx context.Context, obj client.Object) error { func (d *adcClient) Start(ctx context.Context) error { initalSyncDelay := d.InitSyncDelay - time.AfterFunc(initalSyncDelay, func() { - if err := d.Sync(ctx); err != nil { - log.Error(err) - return - } - }) + if initalSyncDelay > 0 { + time.AfterFunc(initalSyncDelay, func() { + if err := d.Sync(ctx); err != nil { + log.Error(err) + return + } + }) + } if d.SyncPeriod < 1 { return nil @@ -319,13 +327,19 @@ func (d *adcClient) Start(ctx context.Context) error { ticker := time.NewTicker(d.SyncPeriod) defer ticker.Stop() for { + synced := false select { + case <-d.syncCh: + synced = true case <-ticker.C: + synced = true + case <-ctx.Done(): + return nil + } + if synced { if err := d.Sync(ctx); err != nil { log.Error(err) } - case <-ctx.Done(): - return nil } } } @@ -506,6 +520,13 @@ func (d *adcClient) sync(ctx context.Context, task Task) error { return nil } +func (d *adcClient) syncNotify() { + select { + case d.syncCh <- struct{}{}: + default: + } +} + func prepareSyncFile(resources any) (string, func(), error) { data, err := json.Marshal(resources) if err != nil { diff --git a/internal/provider/adc/config.go b/internal/provider/adc/config.go index 6617e28c9..20bc3f648 100644 --- a/internal/provider/adc/config.go +++ b/internal/provider/adc/config.go @@ -207,6 +207,8 @@ func (d *adcClient) updateConfigForGatewayProxy(tctx *provider.TranslateContext, for _, ref := range referrers { d.configs[ref] = *config } + + d.syncNotify() return nil } diff --git a/test/conformance/apisix/suite_test.go b/test/conformance/apisix/suite_test.go index 2f9c47c7b..2d5dc6fe6 100644 --- a/test/conformance/apisix/suite_test.go +++ b/test/conformance/apisix/suite_test.go @@ -165,9 +165,9 @@ func TestMain(m *testing.M) { ControllerName: s.GetControllerName(), Namespace: namespace, StatusAddress: address, - InitSyncDelay: 1 * time.Minute, + InitSyncDelay: 20 * time.Minute, ProviderType: framework.ProviderType, - ProviderSyncPeriod: 10 * time.Millisecond, + ProviderSyncPeriod: 1 * time.Hour, }) adminEndpoint := fmt.Sprintf("http://%s.%s:9180", svc.Name, namespace) diff --git a/test/e2e/scaffold/apisix_deployer.go b/test/e2e/scaffold/apisix_deployer.go index fb22200b2..597ebae21 100644 --- a/test/e2e/scaffold/apisix_deployer.go +++ b/test/e2e/scaffold/apisix_deployer.go @@ -260,7 +260,7 @@ func (s *APISIXDeployer) DeployIngress() { s.Framework.DeployIngress(framework.IngressDeployOpts{ ControllerName: s.opts.ControllerName, ProviderType: framework.ProviderType, - ProviderSyncPeriod: 200 * time.Millisecond, + ProviderSyncPeriod: 1 * time.Hour, Namespace: s.namespace, Replicas: 1, }) @@ -270,7 +270,7 @@ func (s *APISIXDeployer) ScaleIngress(replicas int) { s.Framework.DeployIngress(framework.IngressDeployOpts{ ControllerName: s.opts.ControllerName, ProviderType: framework.ProviderType, - ProviderSyncPeriod: 200 * time.Millisecond, + ProviderSyncPeriod: 1 * time.Hour, Namespace: s.namespace, Replicas: replicas, })