Skip to content

Commit a8fc366

Browse files
committed
Use context with timeout for SQL reconciliation
1 parent 177e5ae commit a8fc366

File tree

3 files changed

+62
-61
lines changed

3 files changed

+62
-61
lines changed

pkg/controller/node/node_controller.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
var log = logf.Log.WithName(controllerName)
4848

4949
const controllerName = "controller.mysqlNode"
50+
const mysqlReconciliationTimeout = 30 * time.Second
5051

5152
// Add creates a new MysqlCluster Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
5253
// and Start it when the Manager is Started.
@@ -136,8 +137,11 @@ type ReconcileMysqlNode struct {
136137
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
137138
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete
138139
func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Result, error) {
140+
ctx, cancel := context.WithTimeout(context.TODO(), mysqlReconciliationTimeout)
141+
defer cancel()
142+
139143
pod := &corev1.Pod{}
140-
err := r.Get(context.TODO(), request.NamespacedName, pod)
144+
err := r.Get(ctx, request.NamespacedName, pod)
141145
if err != nil {
142146
if errors.IsNotFound(err) {
143147
// Object not found, return. Created objects are automatically garbage collected.
@@ -151,7 +155,7 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
151155
log.Info("syncing MySQL Node", "pod", request.NamespacedName.String())
152156

153157
var cluster *mysqlcluster.MysqlCluster
154-
cluster, err = r.getNodeCluster(pod)
158+
cluster, err = r.getNodeCluster(ctx, pod)
155159
if err != nil {
156160
log.Info("cluster is not found")
157161
return reconcile.Result{}, err
@@ -163,20 +167,20 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
163167
}
164168

165169
var creds *credentials
166-
creds, err = r.getCredsSecret(cluster)
170+
creds, err = r.getCredsSecret(ctx, cluster)
167171
if err != nil {
168172
return reconcile.Result{}, err
169173
}
170174

171175
// initialize SQL interface
172176
sql := r.getMySQLConnection(cluster, pod, creds)
173177

174-
err = r.initializeMySQL(sql, cluster, creds)
178+
err = r.initializeMySQL(ctx, sql, cluster, creds)
175179
if err != nil {
176180
updatePodStatusCondition(pod, mysqlcluster.NodeInitializedConditionType,
177181
corev1.ConditionFalse, "mysqlInitializationFailed", err.Error())
178182

179-
if uErr := r.updatePod(pod); uErr != nil {
183+
if uErr := r.updatePod(ctx, pod); uErr != nil {
180184
return reconcile.Result{}, uErr
181185
}
182186

@@ -186,16 +190,16 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
186190
updatePodStatusCondition(pod, mysqlcluster.NodeInitializedConditionType,
187191
corev1.ConditionTrue, "mysqlInitializationSucceeded", "success")
188192

189-
return reconcile.Result{}, r.updatePod(pod)
193+
return reconcile.Result{}, r.updatePod(ctx, pod)
190194
}
191195

192-
func (r *ReconcileMysqlNode) initializeMySQL(sql SQLInterface, cluster *mysqlcluster.MysqlCluster, c *credentials) error {
196+
func (r *ReconcileMysqlNode) initializeMySQL(ctx context.Context, sql SQLInterface, cluster *mysqlcluster.MysqlCluster, c *credentials) error {
193197
// wait for mysql to be ready
194-
if err := sql.Wait(); err != nil {
198+
if err := sql.Wait(ctx); err != nil {
195199
return err
196200
}
197201

198-
enableSuperReadOnly, err := sql.DisableSuperReadOnly()
202+
enableSuperReadOnly, err := sql.DisableSuperReadOnly(ctx)
199203
if err != nil {
200204
return err
201205
}
@@ -204,24 +208,24 @@ func (r *ReconcileMysqlNode) initializeMySQL(sql SQLInterface, cluster *mysqlclu
204208
// is slave node?
205209
if cluster.GetMasterHost() != sql.Host() {
206210
log.Info("configure pod as slave", "pod", sql.Host(), "master", cluster.GetMasterHost())
207-
if err := sql.SetPurgedGTID(); err != nil {
211+
if err := sql.SetPurgedGTID(ctx); err != nil {
208212
return err
209213
}
210214

211-
if err := sql.ChangeMasterTo(cluster.GetMasterHost(), c.ReplicationUser, c.ReplicationPassword); err != nil {
215+
if err := sql.ChangeMasterTo(ctx, cluster.GetMasterHost(), c.ReplicationUser, c.ReplicationPassword); err != nil {
212216
return err
213217
}
214218
}
215219

216-
if err := sql.MarkConfigurationDone(); err != nil {
220+
if err := sql.MarkConfigurationDone(ctx); err != nil {
217221
return err
218222
}
219223

220224
return nil
221225
}
222226

223227
// getNodeCluster returns the node related MySQL cluster
224-
func (r *ReconcileMysqlNode) getNodeCluster(pod *corev1.Pod) (*mysqlcluster.MysqlCluster, error) {
228+
func (r *ReconcileMysqlNode) getNodeCluster(ctx context.Context, pod *corev1.Pod) (*mysqlcluster.MysqlCluster, error) {
225229
re := regexp.MustCompile(`^([\w-]+)-mysql-\d*$`)
226230
indexStrs := re.FindStringSubmatch(pod.Name)
227231
if len(indexStrs) != 2 {
@@ -233,7 +237,7 @@ func (r *ReconcileMysqlNode) getNodeCluster(pod *corev1.Pod) (*mysqlcluster.Mysq
233237
Namespace: pod.Namespace,
234238
}
235239
cluster := mysqlcluster.New(&api.MysqlCluster{})
236-
err := r.Get(context.TODO(), clusterKey, cluster.Unwrap())
240+
err := r.Get(ctx, clusterKey, cluster.Unwrap())
237241
return cluster, err
238242
}
239243

@@ -257,13 +261,13 @@ type credentials struct {
257261
ReplicationPassword string
258262
}
259263

260-
func (r *ReconcileMysqlNode) getCredsSecret(cluster *mysqlcluster.MysqlCluster) (*credentials, error) {
264+
func (r *ReconcileMysqlNode) getCredsSecret(ctx context.Context, cluster *mysqlcluster.MysqlCluster) (*credentials, error) {
261265
secretKey := types.NamespacedName{
262266
Name: cluster.GetNameForResource(mysqlcluster.Secret),
263267
Namespace: cluster.Namespace,
264268
}
265269
secret := &corev1.Secret{}
266-
if err := r.Get(context.TODO(), secretKey, secret); err != nil {
270+
if err := r.Get(ctx, secretKey, secret); err != nil {
267271
return nil, err
268272
}
269273

@@ -277,8 +281,8 @@ func (r *ReconcileMysqlNode) getCredsSecret(cluster *mysqlcluster.MysqlCluster)
277281
return creds, creds.Validate()
278282
}
279283

280-
func (r *ReconcileMysqlNode) updatePod(pod *corev1.Pod) error {
281-
return r.Status().Update(context.TODO(), pod)
284+
func (r *ReconcileMysqlNode) updatePod(ctx context.Context, pod *corev1.Pod) error {
285+
return r.Status().Update(ctx, pod)
282286
}
283287

284288
func (c *credentials) Validate() error {
@@ -297,6 +301,7 @@ func anyIsEmpty(ss ...string) bool {
297301
return zero
298302
}
299303

304+
// nolint: unparam
300305
func updatePodStatusCondition(pod *corev1.Pod, condType corev1.PodConditionType,
301306
status corev1.ConditionStatus, reason, msg string) {
302307
newCondition := corev1.PodCondition{

pkg/controller/node/sql.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package node
1818

1919
import (
20+
"context"
2021
"database/sql"
2122
"fmt"
2223
"strings"
@@ -29,19 +30,17 @@ import (
2930
)
3031

3132
const (
32-
// mysqlReadyTries represents the number of tries with 1 second sleep between them to check if MySQL is ready
33-
mysqlReadyTries = 10
3433
// connRetry represents the number of tries to connect to master server
3534
connRetry = 10
3635
)
3736

3837
// SQLInterface expose abstract operations that can be applied on a MySQL node
3938
type SQLInterface interface {
40-
Wait() error
41-
DisableSuperReadOnly() (func(), error)
42-
ChangeMasterTo(string, string, string) error
43-
MarkConfigurationDone() error
44-
SetPurgedGTID() error
39+
Wait(ctx context.Context) error
40+
DisableSuperReadOnly(ctx context.Context) (func(), error)
41+
ChangeMasterTo(ctx context.Context, host string, user string, pass string) error
42+
MarkConfigurationDone(ctx context.Context) error
43+
SetPurgedGTID(ctx context.Context) error
4544
Host() string
4645
}
4746

@@ -62,38 +61,35 @@ func newNodeConn(dsn, host string) SQLInterface {
6261
}
6362
}
6463

65-
func (r *nodeSQLRunner) Wait() error {
64+
// Wait method pings MySQL until it's ready (runs SELECT 1;). The timeout should be set in ctx context.Context
65+
func (r *nodeSQLRunner) Wait(ctx context.Context) error {
6666
log.V(1).Info("wait for mysql to be ready")
6767

68-
for i := 0; i < mysqlReadyTries; i++ {
69-
if err := r.runQuery("SELECT 1"); err == nil {
70-
break
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
// timeout expired
72+
return fmt.Errorf("timeout: mysql is not ready")
73+
case <-time.After(time.Second):
74+
if err := r.runQuery(ctx, "SELECT 1"); err == nil {
75+
return nil
76+
}
7177
}
72-
// wait a second
73-
time.Sleep(1 * time.Second)
7478
}
75-
if err := r.runQuery("SELECT 1"); err != nil {
76-
log.V(1).Info("mysql is not ready", "error", err)
77-
return err
78-
}
79-
80-
log.V(1).Info("mysql is ready")
81-
return nil
82-
8379
}
8480

85-
func (r *nodeSQLRunner) DisableSuperReadOnly() (func(), error) {
81+
func (r *nodeSQLRunner) DisableSuperReadOnly(ctx context.Context) (func(), error) {
8682
enable := func() {
87-
err := r.runQuery("SET GLOBAL SUPER_READ_ONLY = 0;")
83+
err := r.runQuery(ctx, "SET GLOBAL SUPER_READ_ONLY = 0;")
8884
if err != nil {
8985
log.Error(err, "failed to set node super read only", "node", r.Host())
9086
}
9187
}
92-
return enable, r.runQuery("SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;")
88+
return enable, r.runQuery(ctx, "SET GLOBAL READ_ONLY = 1; SET GLOBAL SUPER_READ_ONLY = 0;")
9389
}
9490

9591
// ChangeMasterTo changes the master host and starts slave.
96-
func (r *nodeSQLRunner) ChangeMasterTo(masterHost, user, pass string) error {
92+
func (r *nodeSQLRunner) ChangeMasterTo(ctx context.Context, masterHost, user, pass string) error {
9793
// slave node
9894
query := `
9995
STOP SLAVE;
@@ -103,14 +99,14 @@ func (r *nodeSQLRunner) ChangeMasterTo(masterHost, user, pass string) error {
10399
MASTER_PASSWORD=?,
104100
MASTER_CONNECT_RETRY=?;
105101
`
106-
if err := r.runQuery(query,
102+
if err := r.runQuery(ctx, query,
107103
masterHost, user, pass, connRetry,
108104
); err != nil {
109105
return fmt.Errorf("failed to configure slave node, err: %s", err)
110106
}
111107

112108
query = "START SLAVE;"
113-
if err := r.runQuery(query); err != nil {
109+
if err := r.runQuery(ctx, query); err != nil {
114110
log.Info("failed to start slave in the simple mode, trying a second method")
115111
// TODO: https://bugs.mysql.com/bug.php?id=83713
116112
query2 := `
@@ -120,14 +116,14 @@ func (r *nodeSQLRunner) ChangeMasterTo(masterHost, user, pass string) error {
120116
reset slave;
121117
start slave;
122118
`
123-
if err := r.runQuery(query2); err != nil {
119+
if err := r.runQuery(ctx, query2); err != nil {
124120
return fmt.Errorf("failed to start slave node, err: %s", err)
125121
}
126122
}
127123
return nil
128124
}
129125

130-
func (r *nodeSQLRunner) MarkConfigurationDone() error {
126+
func (r *nodeSQLRunner) MarkConfigurationDone(ctx context.Context) error {
131127
query := `
132128
CREATE TABLE IF NOT EXISTS %s.%s (
133129
ok tinyint(1) NOT NULL
@@ -137,7 +133,7 @@ func (r *nodeSQLRunner) MarkConfigurationDone() error {
137133
`
138134
query = fmt.Sprintf(query, constants.OperatorDbName, "readiness")
139135

140-
if err := r.runQuery(query); err != nil {
136+
if err := r.runQuery(ctx, query); err != nil {
141137
return fmt.Errorf("failed to mark configuration done, err: %s", err)
142138
}
143139
return nil
@@ -148,7 +144,7 @@ func (r *nodeSQLRunner) Host() string {
148144
}
149145

150146
// runQuery executes a query
151-
func (r *nodeSQLRunner) runQuery(q string, args ...interface{}) error {
147+
func (r *nodeSQLRunner) runQuery(ctx context.Context, q string, args ...interface{}) error {
152148
db, close, err := r.dbConn()
153149
if err != nil {
154150
return err
@@ -160,24 +156,23 @@ func (r *nodeSQLRunner) runQuery(q string, args ...interface{}) error {
160156
if !r.enableBinLog {
161157
q = "SET @@SESSION.SQL_LOG_BIN = 0;\n" + q
162158
}
163-
if _, err := db.Exec(q, args...); err != nil {
159+
if _, err := db.ExecContext(ctx, q, args...); err != nil {
164160
return err
165161
}
166162

167163
return nil
168164
}
169165

170166
// readFromMysql executes the given query and loads the values into give variables
171-
func (r *nodeSQLRunner) readFromMysql(query string, values ...interface{}) error {
167+
func (r *nodeSQLRunner) readFromMysql(ctx context.Context, query string, values ...interface{}) error {
172168
db, close, err := r.dbConn()
173169
if err != nil {
174170
return err
175171
}
176172
defer close()
177173

178-
// nolint: gosec
179174
log.V(1).Info("running query", "query", query)
180-
row := db.QueryRow(query)
175+
row := db.QueryRowContext(ctx, query)
181176
if row == nil {
182177
return fmt.Errorf("no row found")
183178
}
@@ -206,14 +201,14 @@ func (r *nodeSQLRunner) dbConn() (*sql.DB, func(), error) {
206201
return db, close, nil
207202
}
208203

209-
func (r *nodeSQLRunner) SetPurgedGTID() error {
204+
func (r *nodeSQLRunner) SetPurgedGTID(ctx context.Context) error {
210205
// first check if the GTID should be set, if the table exists or if the GTID was set before (used)
211206
// nolint: gosec
212207
qq := fmt.Sprintf("SELECT used FROM %[1]s.%[2]s WHERE id=1",
213208
constants.OperatorDbName, constants.OperatorGtidsTableName)
214209

215210
var used bool
216-
if err := r.readFromMysql(qq, &used); err != nil {
211+
if err := r.readFromMysql(ctx, qq, &used); err != nil {
217212
// if it's a: "Table doesn't exist" error then GTID should not be set, it's a master case.
218213
if isMySQLError(err, 1146) {
219214
log.V(1).Info("GTID purged table does not exists", "host", r.Host())
@@ -239,7 +234,7 @@ func (r *nodeSQLRunner) SetPurgedGTID() error {
239234
COMMIT;
240235
`, constants.OperatorDbName, constants.OperatorGtidsTableName)
241236

242-
if err := r.runQuery(query); err != nil {
237+
if err := r.runQuery(ctx, query); err != nil {
243238
return err
244239
}
245240

pkg/controller/node/sql_fake_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package node
1818

1919
import (
20+
"context"
2021
"fmt"
2122
. "github.com/onsi/ginkgo"
2223
. "github.com/onsi/gomega"
@@ -27,27 +28,27 @@ type fakeSQLRunner struct{}
2728
// test if fakeer implements interface
2829
var _ SQLInterface = &fakeSQLRunner{}
2930

30-
func (f *fakeSQLRunner) Wait() error {
31+
func (f *fakeSQLRunner) Wait(ctx context.Context) error {
3132
return nil
3233
}
3334

34-
func (f *fakeSQLRunner) DisableSuperReadOnly() (func(), error) {
35+
func (f *fakeSQLRunner) DisableSuperReadOnly(ctx context.Context) (func(), error) {
3536
return func() {}, nil
3637
}
3738

38-
func (f *fakeSQLRunner) ChangeMasterTo(host, user, pass string) error {
39+
func (f *fakeSQLRunner) ChangeMasterTo(ctx context.Context, host, user, pass string) error {
3940
return nil
4041
}
4142

42-
func (f *fakeSQLRunner) MarkConfigurationDone() error {
43+
func (f *fakeSQLRunner) MarkConfigurationDone(ctx context.Context) error {
4344
return nil
4445
}
4546

4647
func (f *fakeSQLRunner) Host() string {
4748
return ""
4849
}
4950

50-
func (f *fakeSQLRunner) SetPurgedGTID() error {
51+
func (f *fakeSQLRunner) SetPurgedGTID(ctx context.Context) error {
5152
return nil
5253
}
5354

0 commit comments

Comments
 (0)