Skip to content

Commit cee1d9b

Browse files
committed
Fix the issue that the cache is out of sync when hookManager's HasSynced is queried in CustomizedInterpreter
Signed-off-by: liaolecheng <[email protected]>
1 parent 64fbe75 commit cee1d9b

File tree

2 files changed

+139
-33
lines changed

2 files changed

+139
-33
lines changed

pkg/resourceinterpreter/customized/webhook/configmanager/manager.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ limitations under the License.
1717
package configmanager
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"sort"
2223
"sync/atomic"
2324

2425
"k8s.io/apimachinery/pkg/labels"
2526
"k8s.io/apimachinery/pkg/runtime/schema"
26-
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2727
"k8s.io/client-go/tools/cache"
2828
"k8s.io/klog/v2"
2929

@@ -48,6 +48,7 @@ type ConfigManager interface {
4848
// interpreterConfigManager collect the resource interpreter webhook configuration.
4949
type interpreterConfigManager struct {
5050
configuration atomic.Value
51+
informer genericmanager.SingleClusterInformerManager
5152
lister cache.GenericLister
5253
initialSynced atomic.Bool
5354
}
@@ -63,16 +64,12 @@ func (m *interpreterConfigManager) HasSynced() bool {
6364
return true
6465
}
6566

66-
if configuration, err := m.lister.List(labels.Everything()); err == nil && len(configuration) == 0 {
67-
// the empty list we initially stored is valid to use.
68-
// Setting initialSynced to true, so subsequent checks
69-
// would be able to take the fast path on the atomic boolean in a
70-
// cluster without any webhooks configured.
71-
m.initialSynced.Store(true)
72-
// the informer has synced, and we don't have any items
73-
return true
67+
err := m.updateConfiguration()
68+
if err != nil {
69+
klog.ErrorS(err, "error updating configuration")
70+
return false
7471
}
75-
return false
72+
return true
7673
}
7774

7875
// NewExploreConfigManager return a new interpreterConfigManager with resourceinterpreterwebhookconfigurations handlers.
@@ -83,42 +80,53 @@ func NewExploreConfigManager(inform genericmanager.SingleClusterInformerManager)
8380

8481
manager.configuration.Store([]WebhookAccessor{})
8582

83+
manager.informer = inform
8684
configHandlers := fedinformer.NewHandlerOnEvents(
87-
func(_ interface{}) { manager.updateConfiguration() },
88-
func(_, _ interface{}) { manager.updateConfiguration() },
89-
func(_ interface{}) { manager.updateConfiguration() })
85+
func(_ interface{}) { _ = manager.updateConfiguration() },
86+
func(_, _ interface{}) { _ = manager.updateConfiguration() },
87+
func(_ interface{}) { _ = manager.updateConfiguration() })
9088
inform.ForResource(resourceExploringWebhookConfigurationsGVR, configHandlers)
9189

9290
return manager
9391
}
9492

95-
func (m *interpreterConfigManager) updateConfiguration() {
93+
// updateConfiguration is used as the event handler for the ResourceInterpreterWebhookConfiguration resource.
94+
// Any changes (add, update, delete) to these resources will trigger this method, which loads all
95+
// ResourceInterpreterWebhookConfiguration resources and refreshes the internal cache accordingly.
96+
// Note: During startup, some events may be missed if the informer has not yet synced. If all events
97+
// are missed during startup, updateConfiguration will be called when HasSynced() is invoked for the
98+
// first time, ensuring the cache is updated on first use.
99+
func (m *interpreterConfigManager) updateConfiguration() error {
100+
if m.informer == nil {
101+
return errors.New("informer manager is not configured")
102+
}
103+
if !m.informer.IsInformerSynced(resourceExploringWebhookConfigurationsGVR) {
104+
return errors.New("informer of ResourceInterpreterWebhookConfiguration not synced")
105+
}
106+
96107
configurations, err := m.lister.List(labels.Everything())
97108
if err != nil {
98-
utilruntime.HandleError(fmt.Errorf("error updating configuration: %v", err))
99-
return
109+
return err
100110
}
101111

102112
configs := make([]*configv1alpha1.ResourceInterpreterWebhookConfiguration, 0)
103113
for _, c := range configurations {
104114
unstructuredConfig, err := helper.ToUnstructured(c)
105115
if err != nil {
106-
klog.Errorf("Failed to transform ResourceInterpreterWebhookConfiguration: %v", err)
107-
return
116+
return err
108117
}
109118

110119
config := &configv1alpha1.ResourceInterpreterWebhookConfiguration{}
111120
err = helper.ConvertToTypedObject(unstructuredConfig, config)
112121
if err != nil {
113-
gvk := unstructuredConfig.GroupVersionKind().String()
114-
klog.Errorf("Failed to convert object(%s), err: %v", gvk, err)
115-
return
122+
return err
116123
}
117124
configs = append(configs, config)
118125
}
119126

120127
m.configuration.Store(mergeResourceExploreWebhookConfigurations(configs))
121128
m.initialSynced.Store(true)
129+
return nil
122130
}
123131

124132
func mergeResourceExploreWebhookConfigurations(configurations []*configv1alpha1.ResourceInterpreterWebhookConfiguration) []WebhookAccessor {

pkg/resourceinterpreter/customized/webhook/configmanager/manager_test.go

Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ limitations under the License.
1717
package configmanager
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"testing"
23+
"time"
2224

2325
"github.com/stretchr/testify/assert"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527
"k8s.io/apimachinery/pkg/labels"
2628
"k8s.io/apimachinery/pkg/runtime"
2729
"k8s.io/apimachinery/pkg/runtime/schema"
30+
"k8s.io/client-go/dynamic"
2831
"k8s.io/client-go/tools/cache"
2932

3033
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
@@ -62,6 +65,10 @@ func TestNewExploreConfigManager(t *testing.T) {
6265

6366
assert.NotNil(t, manager, "Manager should not be nil")
6467
assert.NotNil(t, manager.HookAccessors(), "Accessors should be initialized")
68+
69+
internalManager, ok := manager.(*interpreterConfigManager)
70+
assert.True(t, ok)
71+
assert.Equal(t, informerManager, internalManager.informer)
6572
})
6673
}
6774
}
@@ -70,6 +77,7 @@ func TestHasSynced(t *testing.T) {
7077
tests := []struct {
7178
name string
7279
initialSynced bool
80+
informer genericmanager.SingleClusterInformerManager
7381
listErr error
7482
listResult []runtime.Object
7583
expectedSynced bool
@@ -80,24 +88,47 @@ func TestHasSynced(t *testing.T) {
8088
expectedSynced: true,
8189
},
8290
{
83-
name: "not synced but empty list",
91+
name: "informer not configured",
8492
initialSynced: false,
93+
informer: nil,
94+
expectedSynced: false,
95+
},
96+
{
97+
name: "informer not synced",
98+
initialSynced: false,
99+
informer: &mockSingleClusterInformerManager{
100+
isSynced: false,
101+
},
102+
expectedSynced: false,
103+
},
104+
{
105+
name: "sync with empty list",
106+
initialSynced: false,
107+
informer: &mockSingleClusterInformerManager{
108+
isSynced: true,
109+
},
85110
listResult: []runtime.Object{},
86111
expectedSynced: true,
87112
},
88113
{
89-
name: "not synced with items",
114+
name: "sync with items",
90115
initialSynced: false,
116+
informer: &mockSingleClusterInformerManager{
117+
isSynced: true,
118+
},
91119
listResult: []runtime.Object{
92120
&configv1alpha1.ResourceInterpreterWebhookConfiguration{
93121
ObjectMeta: metav1.ObjectMeta{Name: "test"},
94122
},
95123
},
96-
expectedSynced: false,
124+
expectedSynced: true,
97125
},
98126
{
99-
name: "list error",
100-
initialSynced: false,
127+
name: "list error",
128+
initialSynced: false,
129+
informer: &mockSingleClusterInformerManager{
130+
isSynced: true,
131+
},
101132
listErr: fmt.Errorf("test error"),
102133
expectedSynced: false,
103134
},
@@ -106,6 +137,7 @@ func TestHasSynced(t *testing.T) {
106137
for _, tt := range tests {
107138
t.Run(tt.name, func(t *testing.T) {
108139
manager := &interpreterConfigManager{
140+
informer: tt.informer,
109141
lister: &mockLister{
110142
err: tt.listErr,
111143
items: tt.listResult,
@@ -181,12 +213,30 @@ func TestUpdateConfiguration(t *testing.T) {
181213
name string
182214
configs []runtime.Object
183215
listErr error
216+
informer genericmanager.SingleClusterInformerManager
184217
expectedCount int
185218
wantSynced bool
186219
}{
187220
{
188-
name: "empty configuration",
189-
configs: []runtime.Object{},
221+
name: "informer not configured",
222+
informer: nil,
223+
expectedCount: 0,
224+
wantSynced: false,
225+
},
226+
{
227+
name: "informer not synced",
228+
informer: &mockSingleClusterInformerManager{
229+
isSynced: false,
230+
},
231+
expectedCount: 0,
232+
wantSynced: false,
233+
},
234+
{
235+
name: "empty configuration",
236+
configs: []runtime.Object{},
237+
informer: &mockSingleClusterInformerManager{
238+
isSynced: true,
239+
},
190240
expectedCount: 0,
191241
wantSynced: true,
192242
},
@@ -204,13 +254,19 @@ func TestUpdateConfiguration(t *testing.T) {
204254
},
205255
},
206256
},
257+
informer: &mockSingleClusterInformerManager{
258+
isSynced: true,
259+
},
207260
expectedCount: 1,
208261
wantSynced: true,
209262
},
210263
{
211-
name: "list error",
212-
configs: []runtime.Object{},
213-
listErr: fmt.Errorf("test error"),
264+
name: "list error",
265+
configs: []runtime.Object{},
266+
listErr: fmt.Errorf("test error"),
267+
informer: &mockSingleClusterInformerManager{
268+
isSynced: true,
269+
},
214270
expectedCount: 0,
215271
wantSynced: false,
216272
},
@@ -223,21 +279,63 @@ func TestUpdateConfiguration(t *testing.T) {
223279
items: tt.configs,
224280
err: tt.listErr,
225281
},
282+
informer: tt.informer,
226283
}
227284
manager.configuration.Store([]WebhookAccessor{})
228285
manager.initialSynced.Store(false)
229286

230-
manager.updateConfiguration()
287+
synced := manager.HasSynced()
288+
assert.Equal(t, tt.wantSynced, synced)
231289

232290
accessors := manager.HookAccessors()
233291
assert.Equal(t, tt.expectedCount, len(accessors))
234-
assert.Equal(t, tt.wantSynced, manager.HasSynced())
235292
})
236293
}
237294
}
238295

239296
// Mock Implementations
240297

298+
type mockSingleClusterInformerManager struct {
299+
isSynced bool
300+
}
301+
302+
func (m *mockSingleClusterInformerManager) IsInformerSynced(_ schema.GroupVersionResource) bool {
303+
return m.isSynced
304+
}
305+
306+
func (m *mockSingleClusterInformerManager) Lister(_ schema.GroupVersionResource) cache.GenericLister {
307+
return nil
308+
}
309+
310+
func (m *mockSingleClusterInformerManager) ForResource(_ schema.GroupVersionResource, _ cache.ResourceEventHandler) {
311+
}
312+
313+
func (m *mockSingleClusterInformerManager) Start() {
314+
}
315+
316+
func (m *mockSingleClusterInformerManager) Stop() {
317+
}
318+
319+
func (m *mockSingleClusterInformerManager) WaitForCacheSync() map[schema.GroupVersionResource]bool {
320+
return nil
321+
}
322+
323+
func (m *mockSingleClusterInformerManager) WaitForCacheSyncWithTimeout(_ time.Duration) map[schema.GroupVersionResource]bool {
324+
return nil
325+
}
326+
327+
func (m *mockSingleClusterInformerManager) Context() context.Context {
328+
return context.Background()
329+
}
330+
331+
func (m *mockSingleClusterInformerManager) GetClient() dynamic.Interface {
332+
return nil
333+
}
334+
335+
func (m *mockSingleClusterInformerManager) IsHandlerExist(_ schema.GroupVersionResource, _ cache.ResourceEventHandler) bool {
336+
return false
337+
}
338+
241339
type mockLister struct {
242340
items []runtime.Object
243341
err error

0 commit comments

Comments
 (0)