Skip to content

Commit 23119d7

Browse files
committed
feat: Add timeout support for reconcile loops
1 parent eecd79b commit 23119d7

File tree

5 files changed

+181
-11
lines changed

5 files changed

+181
-11
lines changed

cmd/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ func init() {
6464
} //nolint: wsl // Needed by operator
6565

6666
func main() {
67-
var metricsAddr, probeAddr, resyncPeriodStr string
67+
var metricsAddr, probeAddr, resyncPeriodStr, reconcileTimeoutStr string
6868

6969
var enableLeaderElection bool
7070

7171
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
7272
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
7373
flag.StringVar(&resyncPeriodStr, "resync-period", "30s", "The resync period to reload all resources for auto-heal procedures.")
74+
flag.StringVar(&reconcileTimeoutStr, "reconcile-timeout", "5s", "The reconcile max timeout.")
7475
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
7576
"Enable leader election for controller manager. "+
7677
"Enabling this will ensure there is only one active controller manager.")
@@ -90,13 +91,20 @@ func main() {
9091
setupLog.Error(err, "unable to parse resync period")
9192
os.Exit(1)
9293
}
94+
// Parse duration
95+
reconcileTimeout, err := time.ParseDuration(reconcileTimeoutStr)
96+
// Check error
97+
if err != nil {
98+
setupLog.Error(err, "unable to parse reconcile timeout")
99+
os.Exit(1)
100+
}
93101
// Log
94102
setupLog.Info(fmt.Sprintf("Starting manager with %s resync period", resyncPeriodStr))
95103

96104
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
97105
Scheme: scheme,
98106
MetricsBindAddress: metricsAddr,
99-
Port: 9443, //nolint: gomnd // Because generated
107+
Port: 9443,
100108
HealthProbeBindAddress: probeAddr,
101109
SyncPeriod: &resyncPeriod,
102110
LeaderElection: enableLeaderElection,
@@ -132,6 +140,7 @@ func main() {
132140
),
133141
ControllerRuntimeDetailedErrorTotal: controllerRuntimeDetailedErrorTotal,
134142
ControllerName: "postgresqlengineconfiguration",
143+
ReconcileTimeout: reconcileTimeout,
135144
}).SetupWithManager(mgr); err != nil {
136145
setupLog.Error(err, "unable to create controller", "controller", "PostgresqlEngineConfiguration")
137146
os.Exit(1)
@@ -151,6 +160,7 @@ func main() {
151160
),
152161
ControllerRuntimeDetailedErrorTotal: controllerRuntimeDetailedErrorTotal,
153162
ControllerName: "postgresqldatabase",
163+
ReconcileTimeout: reconcileTimeout,
154164
}).SetupWithManager(mgr); err != nil {
155165
setupLog.Error(err, "unable to create controller", "controller", "PostgresqlDatabase")
156166
os.Exit(1)
@@ -170,6 +180,7 @@ func main() {
170180
),
171181
ControllerRuntimeDetailedErrorTotal: controllerRuntimeDetailedErrorTotal,
172182
ControllerName: "postgresqluserrole",
183+
ReconcileTimeout: reconcileTimeout,
173184
}).SetupWithManager(mgr); err != nil {
174185
setupLog.Error(err, "unable to create controller", "controller", "PostgresqlUserRole")
175186
os.Exit(1)
@@ -189,6 +200,7 @@ func main() {
189200
),
190201
ControllerRuntimeDetailedErrorTotal: controllerRuntimeDetailedErrorTotal,
191202
ControllerName: "postgresqlpublication",
203+
ReconcileTimeout: reconcileTimeout,
192204
}).SetupWithManager(mgr); err != nil {
193205
setupLog.Error(err, "unable to create controller", "controller", "PostgresqlPublication")
194206
os.Exit(1)

internal/controller/postgresql/postgresqldatabase_controller.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"reflect"
23+
"time"
2324

2425
"k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/runtime"
@@ -51,6 +52,7 @@ type PostgresqlDatabaseReconciler struct {
5152
ControllerRuntimeDetailedErrorTotal *prometheus.CounterVec
5253
Log logr.Logger
5354
ControllerName string
55+
ReconcileTimeout time.Duration
5456
}
5557

5658
//+kubebuilder:rbac:groups=postgresql.easymile.com,resources=postgresqldatabases,verbs=get;list;watch;create;update;patch;delete
@@ -93,11 +95,49 @@ func (r *PostgresqlDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.R
9395
// Original patch
9496
originalPatch := client.MergeFrom(instance.DeepCopy())
9597

98+
// Create timeout in ctx
99+
timeoutCtx, cancel := context.WithTimeout(ctx, r.ReconcileTimeout)
100+
// Defer cancel
101+
defer cancel()
102+
103+
// Init result
104+
var res ctrl.Result
105+
106+
errC := make(chan error, 1)
107+
108+
// Create wrapping function
109+
cb := func() {
110+
a, err := r.mainReconcile(timeoutCtx, reqLogger, instance, originalPatch)
111+
// Save result
112+
res = a
113+
// Send error
114+
errC <- err
115+
}
116+
117+
// Start wrapped function
118+
go cb()
119+
120+
// Run or timeout
121+
select {
122+
case <-timeoutCtx.Done():
123+
// ? Note: Here use primary context otherwise update to set error will be aborted
124+
return r.manageError(ctx, reqLogger, instance, originalPatch, timeoutCtx.Err())
125+
case err := <-errC:
126+
return res, err
127+
}
128+
}
129+
130+
func (r *PostgresqlDatabaseReconciler) mainReconcile(
131+
ctx context.Context,
132+
reqLogger logr.Logger,
133+
instance *postgresqlv1alpha1.PostgresqlDatabase,
134+
originalPatch client.Patch,
135+
) (ctrl.Result, error) {
96136
// Deletion case
97137
if !instance.GetDeletionTimestamp().IsZero() {
98138
// Deletion in progress detected
99139
// Test should delete database
100-
shouldDelete, err := r.shouldDropDatabase(ctx, instance) //nolint:govet // Shadow err
140+
shouldDelete, err := r.shouldDropDatabase(ctx, instance)
101141
if err != nil {
102142
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
103143
}

internal/controller/postgresql/postgresqlengineconfiguration_controller.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type PostgresqlEngineConfigurationReconciler struct {
5050
ControllerRuntimeDetailedErrorTotal *prometheus.CounterVec
5151
Log logr.Logger
5252
ControllerName string
53+
ReconcileTimeout time.Duration
5354
}
5455

5556
//+kubebuilder:rbac:groups=postgresql.easymile.com,resources=postgresqlengineconfigurations,verbs=get;list;watch;create;update;patch;delete
@@ -93,13 +94,51 @@ func (r *PostgresqlEngineConfigurationReconciler) Reconcile(ctx context.Context,
9394
// Original patch
9495
originalPatch := client.MergeFrom(instance.DeepCopy())
9596

97+
// Create timeout in ctx
98+
timeoutCtx, cancel := context.WithTimeout(ctx, r.ReconcileTimeout)
99+
// Defer cancel
100+
defer cancel()
101+
102+
// Init result
103+
var res ctrl.Result
104+
105+
errC := make(chan error, 1)
106+
107+
// Create wrapping function
108+
cb := func() {
109+
a, err := r.mainReconcile(timeoutCtx, reqLogger, instance, originalPatch)
110+
// Save result
111+
res = a
112+
// Send error
113+
errC <- err
114+
}
115+
116+
// Start wrapped function
117+
go cb()
118+
119+
// Run or timeout
120+
select {
121+
case <-timeoutCtx.Done():
122+
// ? Note: Here use primary context otherwise update to set error will be aborted
123+
return r.manageError(ctx, reqLogger, instance, originalPatch, timeoutCtx.Err())
124+
case err := <-errC:
125+
return res, err
126+
}
127+
}
128+
129+
func (r *PostgresqlEngineConfigurationReconciler) mainReconcile(
130+
ctx context.Context,
131+
reqLogger logr.Logger,
132+
instance *postgresqlv1alpha1.PostgresqlEngineConfiguration,
133+
originalPatch client.Patch,
134+
) (ctrl.Result, error) {
96135
// Deletion case
97136
if !instance.GetDeletionTimestamp().IsZero() {
98137
// Need to delete
99138
// Check if wait linked resources deletion flag is enabled
100139
if instance.Spec.WaitLinkedResourcesDeletion {
101140
// Check if there are linked resource linked to this
102-
existingDB, err := r.getAnyDatabaseLinked(ctx, instance) //nolint:govet // Shadow err
141+
existingDB, err := r.getAnyDatabaseLinked(ctx, instance)
103142
if err != nil {
104143
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
105144
}
@@ -112,7 +151,7 @@ func (r *PostgresqlEngineConfigurationReconciler) Reconcile(ctx context.Context,
112151
}
113152
}
114153
// Close all saved pools for that pgec
115-
err = postgres.CloseAllSavedPoolsForName(
154+
err := postgres.CloseAllSavedPoolsForName(
116155
utils.CreateNameKeyForSavedPools(instance.Name, instance.Namespace),
117156
)
118157
// Check error
@@ -134,7 +173,7 @@ func (r *PostgresqlEngineConfigurationReconciler) Reconcile(ctx context.Context,
134173

135174
// Check if the reconcile loop wasn't recall just because of update status
136175
if instance.Status.Phase == postgresqlv1alpha1.EngineValidatedPhase && instance.Status.LastValidatedTime != "" {
137-
dur, err := time.ParseDuration(instance.Spec.CheckInterval) //nolint:govet // Shadow err
176+
dur, err := time.ParseDuration(instance.Spec.CheckInterval)
138177
if err != nil {
139178
return r.manageError(ctx, reqLogger, instance, originalPatch, errors.NewInternalError(err))
140179
}

internal/controller/postgresql/postgresqlpublication_controller.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package postgresql
1919
import (
2020
"context"
2121
"reflect"
22+
"time"
2223

2324
"k8s.io/apimachinery/pkg/api/errors"
2425
"k8s.io/apimachinery/pkg/runtime"
@@ -47,6 +48,7 @@ type PostgresqlPublicationReconciler struct {
4748
ControllerRuntimeDetailedErrorTotal *prometheus.CounterVec
4849
Log logr.Logger
4950
ControllerName string
51+
ReconcileTimeout time.Duration
5052
}
5153

5254
//+kubebuilder:rbac:groups=postgresql.easymile.com,resources=postgresqlpublications,verbs=get;list;watch;create;update;patch;delete
@@ -89,14 +91,52 @@ func (r *PostgresqlPublicationReconciler) Reconcile(ctx context.Context, req ctr
8991
// Original patch
9092
originalPatch := client.MergeFrom(instance.DeepCopy())
9193

94+
// Create timeout in ctx
95+
timeoutCtx, cancel := context.WithTimeout(ctx, r.ReconcileTimeout)
96+
// Defer cancel
97+
defer cancel()
98+
99+
// Init result
100+
var res ctrl.Result
101+
102+
errC := make(chan error, 1)
103+
104+
// Create wrapping function
105+
cb := func() {
106+
a, err := r.mainReconcile(timeoutCtx, reqLogger, instance, originalPatch)
107+
// Save result
108+
res = a
109+
// Send error
110+
errC <- err
111+
}
112+
113+
// Start wrapped function
114+
go cb()
115+
116+
// Run or timeout
117+
select {
118+
case <-timeoutCtx.Done():
119+
// ? Note: Here use primary context otherwise update to set error will be aborted
120+
return r.manageError(ctx, reqLogger, instance, originalPatch, timeoutCtx.Err())
121+
case err := <-errC:
122+
return res, err
123+
}
124+
}
125+
126+
func (r *PostgresqlPublicationReconciler) mainReconcile(
127+
ctx context.Context,
128+
reqLogger logr.Logger,
129+
instance *v1alpha1.PostgresqlPublication,
130+
originalPatch client.Patch,
131+
) (ctrl.Result, error) {
92132
// Deletion case
93133
if !instance.GetDeletionTimestamp().IsZero() { //nolint:wsl
94134
// Deletion detected
95135

96136
// Check if drop on delete is enabled
97137
if instance.Spec.DropOnDelete {
98138
// Delete publication
99-
err = r.manageDropPublication(ctx, reqLogger, instance)
139+
err := r.manageDropPublication(ctx, reqLogger, instance)
100140
if err != nil {
101141
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
102142
}
@@ -106,7 +146,7 @@ func (r *PostgresqlPublicationReconciler) Reconcile(ctx context.Context, req ctr
106146
controllerutil.RemoveFinalizer(instance, config.Finalizer)
107147

108148
// Update CR
109-
err = r.Update(ctx, instance)
149+
err := r.Update(ctx, instance)
110150
if err != nil {
111151
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
112152
}
@@ -119,7 +159,7 @@ func (r *PostgresqlPublicationReconciler) Reconcile(ctx context.Context, req ctr
119159
// Creation / Update case
120160

121161
// Validate
122-
err = r.validate(instance)
162+
err := r.validate(instance)
123163
// Check error
124164
if err != nil {
125165
return r.manageError(ctx, reqLogger, instance, originalPatch, err)

internal/controller/postgresql/postgresqluserrole_controller.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type PostgresqlUserRoleReconciler struct {
7474
ControllerRuntimeDetailedErrorTotal *prometheus.CounterVec
7575
Log logr.Logger
7676
ControllerName string
77+
ReconcileTimeout time.Duration
7778
}
7879

7980
type dbPrivilegeCache struct {
@@ -123,6 +124,44 @@ func (r *PostgresqlUserRoleReconciler) Reconcile(ctx context.Context, req ctrl.R
123124
// Original patch
124125
originalPatch := client.MergeFrom(instance.DeepCopy())
125126

127+
// Create timeout in ctx
128+
timeoutCtx, cancel := context.WithTimeout(ctx, r.ReconcileTimeout)
129+
// Defer cancel
130+
defer cancel()
131+
132+
// Init result
133+
var res ctrl.Result
134+
135+
errC := make(chan error, 1)
136+
137+
// Create wrapping function
138+
cb := func() {
139+
a, err := r.mainReconcile(timeoutCtx, reqLogger, instance, originalPatch)
140+
// Save result
141+
res = a
142+
// Send error
143+
errC <- err
144+
}
145+
146+
// Start wrapped function
147+
go cb()
148+
149+
// Run or timeout
150+
select {
151+
case <-timeoutCtx.Done():
152+
// ? Note: Here use primary context otherwise update to set error will be aborted
153+
return r.manageError(ctx, reqLogger, instance, originalPatch, timeoutCtx.Err())
154+
case err := <-errC:
155+
return res, err
156+
}
157+
}
158+
159+
func (r *PostgresqlUserRoleReconciler) mainReconcile(
160+
ctx context.Context,
161+
reqLogger logr.Logger,
162+
instance *v1alpha1.PostgresqlUserRole,
163+
originalPatch client.Patch,
164+
) (ctrl.Result, error) {
126165
// Deletion case
127166
if !instance.GetDeletionTimestamp().IsZero() { //nolint:wsl // it is like that
128167
// Deletion detected
@@ -138,7 +177,7 @@ func (r *PostgresqlUserRoleReconciler) Reconcile(ctx context.Context, req ctrl.R
138177
// Get needed items
139178

140179
// Find PG Database cache
141-
dbCache, pgecDBPrivilegeCache, err := r.getDatabaseInstances(ctx, instance, true) //nolint:govet // Allow err shadow
180+
dbCache, pgecDBPrivilegeCache, err := r.getDatabaseInstances(ctx, instance, true)
142181
// Check error
143182
if err != nil {
144183
return r.manageError(ctx, reqLogger, instance, originalPatch, err)
@@ -184,7 +223,7 @@ func (r *PostgresqlUserRoleReconciler) Reconcile(ctx context.Context, req ctrl.R
184223
// Creation case
185224

186225
// Validate
187-
err = r.validateInstance(ctx, instance)
226+
err := r.validateInstance(ctx, instance)
188227
// Check error
189228
if err != nil {
190229
return r.manageError(ctx, reqLogger, instance, originalPatch, err)

0 commit comments

Comments
 (0)