@@ -10,7 +10,6 @@ import (
1010 "sync"
1111 "time"
1212
13- "github.com/sirupsen/logrus"
1413 cpov1 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apis/cpo.opensource.cybertec.at/v1"
1514 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/apiserver"
1615 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/cluster"
@@ -22,6 +21,8 @@ import (
2221 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/constants"
2322 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/k8sutil"
2423 "github.com/cybertec-postgresql/cybertec-pg-operator/pkg/util/ringlog"
24+ "github.com/sirupsen/logrus"
25+ appsv1 "k8s.io/api/apps/v1"
2526 v1 "k8s.io/api/core/v1"
2627 rbacv1 "k8s.io/api/rbac/v1"
2728 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -72,6 +73,59 @@ type Controller struct {
7273
7374 PodServiceAccount * v1.ServiceAccount
7475 PodServiceAccountRoleBinding * rbacv1.RoleBinding
76+
77+ curReconcile sync.Map
78+ }
79+
80+ // preparation for better reconcile
81+ type runningReconcile struct {
82+ cancel context.CancelFunc
83+ }
84+
85+ func (c * Controller ) statefulSetDelete (obj interface {}) {
86+ sts , ok := obj .(* appsv1.StatefulSet )
87+ if ! ok {
88+ c .logger .Warn ("statefulSetDelete: received object is not a StatefulSet" )
89+ return
90+ }
91+
92+ clusterName , exists := sts .Labels [c .opConfig .ClusterNameLabel ]
93+ if ! exists {
94+ c .logger .Warnf ("StatefulSet %s/%s has no cluster name label %q" , sts .Namespace , sts .Name , c .opConfig .ClusterNameLabel )
95+ return
96+ }
97+
98+ c .logger .Infof ("StatefulSet deleted: %s/%s, triggering reconcile" , sts .Namespace , sts .Name )
99+
100+ // Vorherige laufende Reconcile abbrechen
101+ if val , exists := c .curReconcile .Load (clusterName ); exists {
102+ c .logger .Infof ("Aborting running reconcile for cluster %s" , clusterName )
103+ val .(runningReconcile ).cancel ()
104+ }
105+
106+ // neuen Reconcile starten
107+ ctx , cancel := context .WithCancel (context .Background ())
108+ c .curReconcile .Store (clusterName , runningReconcile {cancel : cancel })
109+
110+ go func () {
111+ defer c .curReconcile .Delete (clusterName )
112+ c .reconcileCluster (ctx , clusterName )
113+ }()
114+ }
115+
116+ func (c * Controller ) reconcileCluster (ctx context.Context , clusterName string ) {
117+ c .logger .Infof ("Starting reconcile for cluster %s" , clusterName )
118+
119+ // Hier implementierst du das, was normal auch beim CRD-Reconcile passiert
120+ // z.B. Cluster prüfen, StatefulSet erstellen, Pods erstellen etc.
121+
122+ select {
123+ case <- ctx .Done ():
124+ c .logger .Infof ("Reconcile for cluster %s canceled" , clusterName )
125+ return
126+ default :
127+ // continue normal reconcile
128+ }
75129}
76130
77131// NewController creates a new controller
@@ -398,6 +452,23 @@ func (c *Controller) initSharedInformers() {
398452 })
399453 }
400454
455+ // STS
456+ statefulSetLW := & cache.ListWatch {
457+ ListFunc : c .statefulSetListFunc ,
458+ WatchFunc : c .statefulSetWatchFunc ,
459+ }
460+
461+ c .statefulSetInformer = cache .NewSharedIndexInformer (
462+ statefulSetLW ,
463+ & appsv1.StatefulSet {},
464+ constants .QueueResyncPeriodPod , // oder eigenes Intervall
465+ cache.Indexers {cache .NamespaceIndex : cache .MetaNamespaceIndexFunc },
466+ )
467+
468+ c .statefulSetInformer .AddEventHandler (cache.ResourceEventHandlerFuncs {
469+ DeleteFunc : c .statefulSetDelete ,
470+ })
471+
401472 // Pods
402473 podLw := & cache.ListWatch {
403474 ListFunc : c .podListFunc ,
@@ -452,6 +523,12 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
452523 }
453524
454525 wg .Add (5 + util .Bool2Int (c .opConfig .EnablePostgresTeamCRD ))
526+
527+ wg .Add (1 )
528+ go func () {
529+ defer wg .Done ()
530+ c .statefulSetInformer .Run (stopCh )
531+ }()
455532 go c .runPodInformer (stopCh , wg )
456533 go c .runPostgresqlInformer (stopCh , wg )
457534 go c .clusterResync (stopCh , wg )
0 commit comments