Skip to content

Commit d0b043d

Browse files
committed
fix deadlock of api-server resources when watch (#313)
1 parent 6108a1b commit d0b043d

File tree

9 files changed

+545
-1175
lines changed

9 files changed

+545
-1175
lines changed

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ header:
3838
- '**/README.md.gotmpl'
3939
- '**/*.json'
4040
- '**/.helmignore'
41+
- 'testbin/**'
4142

4243
comment: on-failure

controllers/apikey_controller.go

Lines changed: 149 additions & 255 deletions
Large diffs are not rendered by default.

controllers/flinkdeployment_controller.go

Lines changed: 81 additions & 205 deletions
Large diffs are not rendered by default.

controllers/secret_controller.go

Lines changed: 147 additions & 221 deletions
Large diffs are not rendered by default.

controllers/serviceaccount_controller.go

Lines changed: 10 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"encoding/base64"
2020
"fmt"
21-
"sync"
2221
"time"
2322

2423
resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
@@ -29,24 +28,18 @@ import (
2928
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3029
"k8s.io/apimachinery/pkg/runtime"
3130
"k8s.io/apimachinery/pkg/types"
32-
"k8s.io/apimachinery/pkg/watch"
3331
ctrl "sigs.k8s.io/controller-runtime"
3432
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3534
"sigs.k8s.io/controller-runtime/pkg/log"
3635
"sigs.k8s.io/controller-runtime/pkg/predicate"
37-
38-
cloudapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/cloud/v1alpha1"
3936
)
4037

4138
// ServiceAccountReconciler reconciles a StreamNative Cloud ServiceAccount object
4239
type ServiceAccountReconciler struct {
4340
client.Client
4441
Scheme *runtime.Scheme
4542
ConnectionManager *ConnectionManager
46-
// watcherMap stores active watchers for ServiceAccounts
47-
watcherMap map[types.NamespacedName]watch.Interface
48-
// watcherMutex protects watcherMap
49-
watcherMutex sync.RWMutex
5043
}
5144

5245
const ServiceAccountFinalizer = "serviceaccount.resource.streamnative.io/finalizer"
@@ -57,103 +50,6 @@ const ServiceAccountFinalizer = "serviceaccount.resource.streamnative.io/finaliz
5750
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch
5851
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
5952

60-
// handleWatchEvents processes events from the watch interface
61-
func (r *ServiceAccountReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) {
62-
logger := log.FromContext(ctx)
63-
defer watcher.Stop()
64-
65-
for {
66-
select {
67-
case <-ctx.Done():
68-
return
69-
case event, ok := <-watcher.ResultChan():
70-
if !ok {
71-
logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name)
72-
// Remove the watcher from the map
73-
r.watcherMutex.Lock()
74-
delete(r.watcherMap, namespacedName)
75-
r.watcherMutex.Unlock()
76-
return
77-
}
78-
79-
if event.Type == watch.Modified {
80-
// Check if the object is a ServiceAccount
81-
cloudSA, ok := event.Object.(*cloudapi.ServiceAccount)
82-
if !ok {
83-
logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to ServiceAccount")
84-
continue
85-
}
86-
87-
// Get the local ServiceAccount
88-
localSA := &resourcev1alpha1.ServiceAccount{}
89-
if err := r.Get(ctx, namespacedName, localSA); err != nil {
90-
logger.Error(err, "Failed to get local ServiceAccount")
91-
continue
92-
}
93-
94-
// Update status
95-
r.updateServiceAccountStatus(ctx, localSA, nil, "Ready", "ServiceAccount synced successfully")
96-
97-
// Process credentials and create Secret if needed
98-
if cloudSA.Status.PrivateKeyType == utils.ServiceAccountCredentialsType && cloudSA.Status.PrivateKeyData != "" {
99-
r.processServiceAccountCredentials(ctx, localSA, cloudSA)
100-
}
101-
}
102-
}
103-
}
104-
}
105-
106-
// processServiceAccountCredentials handles credentials data and creates a Secret
107-
func (r *ServiceAccountReconciler) processServiceAccountCredentials(ctx context.Context, localSA *resourcev1alpha1.ServiceAccount, cloudSA *cloudapi.ServiceAccount) {
108-
logger := log.FromContext(ctx)
109-
110-
// Base64 decode the private key data
111-
credentialsData, err := base64.StdEncoding.DecodeString(cloudSA.Status.PrivateKeyData)
112-
if err != nil {
113-
logger.Error(err, "Failed to decode private key data")
114-
return
115-
}
116-
117-
// Create or update Secret with credentials
118-
if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, localSA, localSA.Namespace, localSA.Name, string(credentialsData)); err != nil {
119-
logger.Error(err, "Failed to create or update service account credentials secret")
120-
return
121-
}
122-
123-
logger.Info("Successfully created credentials secret for service account")
124-
}
125-
126-
// setupWatch creates a new watcher for a ServiceAccount
127-
func (r *ServiceAccountReconciler) setupWatch(ctx context.Context, serviceAccount *resourcev1alpha1.ServiceAccount, saClient *controllers2.ServiceAccountClient) error {
128-
namespacedName := types.NamespacedName{
129-
Namespace: serviceAccount.Namespace,
130-
Name: serviceAccount.Name,
131-
}
132-
133-
// Check if we already have a watcher
134-
r.watcherMutex.RLock()
135-
_, exists := r.watcherMap[namespacedName]
136-
r.watcherMutex.RUnlock()
137-
if exists {
138-
return nil
139-
}
140-
141-
// Create new watcher
142-
watcher, err := saClient.WatchServiceAccount(ctx, serviceAccount.Name)
143-
if err != nil {
144-
return fmt.Errorf("failed to create watcher: %w", err)
145-
}
146-
147-
// Store watcher in map
148-
r.watcherMutex.Lock()
149-
r.watcherMap[namespacedName] = watcher
150-
r.watcherMutex.Unlock()
151-
152-
// Start watching in a new goroutine
153-
go r.handleWatchEvents(ctx, namespacedName, watcher)
154-
return nil
155-
}
156-
15753
// Reconcile handles the reconciliation of ServiceAccount objects
15854
func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
15955
logger := log.FromContext(ctx)
@@ -166,13 +62,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
16662
serviceAccount := &resourcev1alpha1.ServiceAccount{}
16763
if err := r.Get(ctx, req.NamespacedName, serviceAccount); err != nil {
16864
if apierrors.IsNotFound(err) {
169-
// Stop and remove watcher if it exists
170-
r.watcherMutex.Lock()
171-
if watcher, exists := r.watcherMap[req.NamespacedName]; exists {
172-
watcher.Stop()
173-
delete(r.watcherMap, req.NamespacedName)
174-
}
175-
r.watcherMutex.Unlock()
65+
logger.Info("ServiceAccount not found. Reconciliation will stop.", "namespace", req.Namespace, "name", req.Name)
17666
return ctrl.Result{}, nil
17767
}
17868
return ctrl.Result{}, err
@@ -220,7 +110,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
220110

221111
// Handle deletion
222112
if !serviceAccount.DeletionTimestamp.IsZero() {
223-
if controllers2.ContainsString(serviceAccount.Finalizers, ServiceAccountFinalizer) {
113+
if controllerutil.ContainsFinalizer(serviceAccount, ServiceAccountFinalizer) {
224114
// Try to delete remote ServiceAccount
225115
if err := saClient.DeleteServiceAccount(ctx, serviceAccount); err != nil {
226116
if !apierrors.IsNotFound(err) {
@@ -234,7 +124,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
234124
}
235125

236126
// Remove finalizer after successful deletion
237-
serviceAccount.Finalizers = controllers2.RemoveString(serviceAccount.Finalizers, ServiceAccountFinalizer)
127+
controllerutil.RemoveFinalizer(serviceAccount, ServiceAccountFinalizer)
238128
if err := r.Update(ctx, serviceAccount); err != nil {
239129
return ctrl.Result{}, err
240130
}
@@ -243,18 +133,20 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
243133
}
244134

245135
// Add finalizer if it doesn't exist
246-
if !controllers2.ContainsString(serviceAccount.Finalizers, ServiceAccountFinalizer) {
247-
serviceAccount.Finalizers = append(serviceAccount.Finalizers, ServiceAccountFinalizer)
136+
if !controllerutil.ContainsFinalizer(serviceAccount, ServiceAccountFinalizer) {
137+
controllerutil.AddFinalizer(serviceAccount, ServiceAccountFinalizer)
248138
if err := r.Update(ctx, serviceAccount); err != nil {
249139
return ctrl.Result{}, err
250140
}
141+
// Requeue after adding finalizer to ensure the update is processed before proceeding
142+
return ctrl.Result{Requeue: true}, nil
251143
}
252144

253145
// Check if ServiceAccount exists
254146
existingSA, err := saClient.GetServiceAccount(ctx, serviceAccount.Name)
255147
if err != nil {
256-
logger.Info("Failed to get ServiceAccount", "error", err, "existingSA", existingSA)
257148
if !apierrors.IsNotFound(err) {
149+
logger.Info("Failed to get ServiceAccount", "error", err, "existingSA", existingSA)
258150
r.updateServiceAccountStatus(ctx, serviceAccount, err, "GetServiceAccountFailed",
259151
fmt.Sprintf("Failed to get ServiceAccount: %v", err))
260152
return ctrl.Result{}, client.IgnoreNotFound(err)
@@ -286,18 +178,11 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
286178
} else {
287179
if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, serviceAccount, serviceAccount.Namespace, serviceAccount.Name, string(credentialsData)); err != nil {
288180
logger.Error(err, "Failed to create or update service account credentials secret")
289-
} else {
290-
logger.Info("Successfully created credentials secret for service account")
291181
}
292182
}
293183
}
294184
}
295185

296-
// Set up watch for ServiceAccount
297-
if err := r.setupWatch(ctx, serviceAccount, saClient); err != nil {
298-
logger.Error(err, "Failed to set up watch", "serviceAccount", serviceAccount.Name)
299-
}
300-
301186
// Update status
302187
r.updateServiceAccountStatus(ctx, serviceAccount, nil, "Ready", "ServiceAccount created successfully")
303188
} else {
@@ -316,20 +201,14 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque
316201
} else {
317202
if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, serviceAccount, serviceAccount.Namespace, serviceAccount.Name, string(credentialsData)); err != nil {
318203
logger.Error(err, "Failed to create or update service account credentials secret")
319-
} else {
320-
logger.Info("Successfully created credentials secret for service account")
321204
}
322205
}
323206
}
324207
}
325208

326-
// Set up watch for ServiceAccount
327-
if err := r.setupWatch(ctx, serviceAccount, saClient); err != nil {
328-
logger.Error(err, "Failed to set up watch", "serviceAccount", serviceAccount.Name)
329-
}
330-
331209
// Update status
332210
r.updateServiceAccountStatus(ctx, serviceAccount, nil, "Ready", "ServiceAccount synced successfully")
211+
logger.Info("ServiceAccount reconciled", "namespace", serviceAccount.Namespace, "name", serviceAccount.Name)
333212
}
334213

335214
return ctrl.Result{RequeueAfter: requeueInterval}, nil
@@ -388,7 +267,6 @@ func (r *ServiceAccountReconciler) updateServiceAccountStatus(
388267

389268
// SetupWithManager sets up the controller with the Manager.
390269
func (r *ServiceAccountReconciler) SetupWithManager(mgr ctrl.Manager) error {
391-
r.watcherMap = make(map[types.NamespacedName]watch.Interface)
392270
return ctrl.NewControllerManagedBy(mgr).
393271
For(&resourcev1alpha1.ServiceAccount{}).
394272
WithEventFilter(predicate.GenerationChangedPredicate{}).

0 commit comments

Comments
 (0)