Skip to content

Commit f987634

Browse files
committed
add method for register a cluster into backup scheduler cron
1 parent b0ba103 commit f987634

File tree

3 files changed

+300
-0
lines changed

3 files changed

+300
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
Copyright 2018 Pressinfra SRL
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package clustercontroller
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"time"
23+
24+
"github.com/golang/glog"
25+
//"github.com/robfig/cron"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
29+
api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1"
30+
myclientset "github.com/presslabs/mysql-operator/pkg/generated/clientset/versioned"
31+
)
32+
33+
var (
34+
lockJobRegister sync.Mutex
35+
// polling time for backup to be completed
36+
backupPollingTime = time.Second
37+
// time to wait for a backup to be completed
38+
backupWatchTimeout = time.Hour
39+
)
40+
41+
// The job structure contains the context to schedule a backup
42+
type job struct {
43+
Name string
44+
Namespace string
45+
46+
BackupRunning *bool
47+
48+
lock *sync.Mutex
49+
myClient myclientset.Interface
50+
}
51+
52+
func (c *Controller) registerClusterInBackupCron(cluster *api.MysqlCluster) error {
53+
glog.Infof("Register cluster into cronjob: %s, crontab: %s",
54+
cluster.Name, cluster.Spec.BackupSchedule)
55+
56+
if len(cluster.Spec.BackupSchedule) == 0 {
57+
return nil
58+
}
59+
60+
lockJobRegister.Lock()
61+
defer lockJobRegister.Unlock()
62+
63+
for _, entry := range c.cron.Entries() {
64+
j, ok := entry.Job.(job)
65+
if ok && j.Name == cluster.Name && j.Namespace == cluster.Namespace {
66+
glog.V(3).Infof("Cluster %s already added to cron.", cluster.Name)
67+
return nil
68+
}
69+
}
70+
71+
return c.cron.AddJob(cluster.Spec.BackupSchedule, job{
72+
Name: cluster.Name,
73+
Namespace: cluster.Namespace,
74+
myClient: c.myClient,
75+
BackupRunning: new(bool),
76+
lock: new(sync.Mutex),
77+
})
78+
}
79+
80+
func (j job) Run() {
81+
backupName := fmt.Sprintf("%s-auto-backup-%s", j.Name, time.Now().Format("2006-01-02t15-04-05"))
82+
83+
// Wrap backup creation to ensure that lock is released when backup is
84+
// created
85+
created := func() bool {
86+
j.lock.Lock()
87+
defer j.lock.Unlock()
88+
89+
if *j.BackupRunning {
90+
glog.Infof("Last scheduled backup still running! Can't initiate a new backup for cluster: %s",
91+
j.Name)
92+
return false
93+
}
94+
95+
tries := 0
96+
for {
97+
var err error
98+
_, err = j.myClient.Mysql().MysqlBackups(j.Namespace).Create(&api.MysqlBackup{
99+
ObjectMeta: metav1.ObjectMeta{
100+
Name: backupName,
101+
Labels: map[string]string{
102+
"recurrent": "true",
103+
},
104+
},
105+
Spec: api.BackupSpec{
106+
ClusterName: j.Name,
107+
},
108+
})
109+
110+
if err == nil {
111+
break
112+
}
113+
glog.V(3).Infof("Fail to create backup %s, err: %s", backupName, err)
114+
115+
if tries > 5 {
116+
glog.Errorf("Fail to create backup for cluster: %s, err: %s, max tries %d exeded!",
117+
j.Name, err, tries)
118+
return false
119+
}
120+
121+
time.Sleep(5 * time.Second)
122+
tries += 1
123+
}
124+
125+
*j.BackupRunning = true
126+
return true
127+
}()
128+
129+
if !created {
130+
return
131+
}
132+
133+
defer func() {
134+
j.lock.Lock()
135+
defer j.lock.Unlock()
136+
*j.BackupRunning = false
137+
}()
138+
139+
err := wait.PollImmediate(backupPollingTime, backupWatchTimeout, func() (bool, error) {
140+
b, err := j.myClient.Mysql().MysqlBackups(j.Namespace).Get(backupName, metav1.GetOptions{})
141+
if err != nil {
142+
glog.Warningf("Failed to get backup %s, err %s", backupName, err)
143+
return false, nil
144+
}
145+
if b.Status.Completed {
146+
glog.Infof("Backup '%s' finished.", backupName)
147+
return true, nil
148+
}
149+
150+
return false, nil
151+
})
152+
153+
if err != nil {
154+
glog.Errorf("Waiting for backup '%s' for cluster '%s' to be ready failed: %s",
155+
backupName, j.Name, err)
156+
}
157+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright 2018 Pressinfra SRL
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package clustercontroller
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"k8s.io/client-go/kubernetes/fake"
24+
"k8s.io/client-go/tools/record"
25+
26+
. "github.com/onsi/ginkgo"
27+
. "github.com/onsi/gomega"
28+
. "github.com/onsi/gomega/gstruct"
29+
"github.com/robfig/cron"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
32+
api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1"
33+
fakeMyClient "github.com/presslabs/mysql-operator/pkg/generated/clientset/versioned/fake"
34+
"github.com/presslabs/mysql-operator/pkg/util/options"
35+
tutil "github.com/presslabs/mysql-operator/pkg/util/test"
36+
)
37+
38+
var _ = Describe("Test cluster reconciliation queue", func() {
39+
var (
40+
client *fake.Clientset
41+
myClient *fakeMyClient.Clientset
42+
rec *record.FakeRecorder
43+
cluster *api.MysqlCluster
44+
controller *Controller
45+
stop chan struct{}
46+
opt *options.Options
47+
)
48+
49+
BeforeEach(func() {
50+
opt = options.GetOptions()
51+
client = fake.NewSimpleClientset()
52+
myClient = fakeMyClient.NewSimpleClientset()
53+
rec = record.NewFakeRecorder(100)
54+
cluster = tutil.NewFakeCluster("backupscheduler")
55+
stop = make(chan struct{})
56+
controller = newController(stop, client, myClient, rec)
57+
backupPollingTime = 10 * time.Millisecond
58+
backupWatchTimeout = time.Second
59+
60+
})
61+
62+
AfterEach(func() {
63+
close(stop)
64+
})
65+
66+
Describe("Scheduled backups cron", func() {
67+
Context("cluster with schedule backups", func() {
68+
It("try to register multiple times", func() {
69+
cluster.Spec.BackupSchedule = "0 * * * *"
70+
_, err := myClient.MysqlV1alpha1().MysqlClusters(tutil.Namespace).Create(cluster)
71+
Expect(err).To(Succeed())
72+
73+
err = controller.registerClusterInBackupCron(cluster)
74+
Expect(err).To(Succeed())
75+
76+
Expect(controller.cron.Entries()).To(HaveLen(1))
77+
78+
err = controller.registerClusterInBackupCron(cluster)
79+
Expect(err).To(Succeed())
80+
81+
Expect(controller.cron.Entries()).To(HaveLen(1))
82+
Expect(controller.cron.Entries()).To(
83+
ContainElement(PointTo(MatchFields(IgnoreExtras, Fields{
84+
"Job": MatchFields(IgnoreExtras, Fields{
85+
"Name": Equal(cluster.Name),
86+
"Namespace": Equal(cluster.Namespace),
87+
"BackupRunning": PointTo(Equal(false)),
88+
}),
89+
}))))
90+
})
91+
It("try to register multiple times in parallel", func() {
92+
cluster2 := tutil.NewFakeCluster("bs2")
93+
94+
cluster.Spec.BackupSchedule = "0 * * * *"
95+
cluster2.Spec.BackupSchedule = "0 * * * *"
96+
97+
go controller.registerClusterInBackupCron(cluster)
98+
go controller.registerClusterInBackupCron(cluster2)
99+
go controller.registerClusterInBackupCron(cluster)
100+
go controller.registerClusterInBackupCron(cluster2)
101+
102+
Eventually(func() []*cron.Entry {
103+
lockJobRegister.Lock() // avoid a data race
104+
defer lockJobRegister.Unlock()
105+
return controller.cron.Entries()
106+
}).Should(HaveLen(2))
107+
})
108+
109+
It("start job to schedule a backup", func() {
110+
j := job{
111+
Name: cluster.Name,
112+
Namespace: cluster.Namespace,
113+
myClient: myClient,
114+
BackupRunning: new(bool),
115+
lock: new(sync.Mutex),
116+
}
117+
go j.Run()
118+
go j.Run()
119+
120+
Eventually(func() *bool {
121+
return j.BackupRunning
122+
}).Should(PointTo(Equal(true)))
123+
Eventually(func() []api.MysqlBackup {
124+
bs, _ := myClient.Mysql().MysqlBackups(j.Namespace).List(metav1.ListOptions{})
125+
return bs.Items
126+
}).Should(HaveLen(1))
127+
128+
bs, _ := myClient.Mysql().MysqlBackups(j.Namespace).List(metav1.ListOptions{})
129+
backup := bs.Items[0]
130+
backup.Status.Completed = true
131+
_, err := myClient.Mysql().MysqlBackups(j.Namespace).Update(&backup)
132+
Expect(err).To(Succeed())
133+
134+
Eventually(func() *bool {
135+
return j.BackupRunning
136+
}).Should(PointTo(Equal(false)))
137+
})
138+
})
139+
})
140+
})

pkg/controller/clustercontroller/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/golang/glog"
26+
"github.com/robfig/cron"
2627
apiext_clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
2728
k8errors "k8s.io/apimachinery/pkg/api/errors"
2829
"k8s.io/apimachinery/pkg/util/runtime"
@@ -78,6 +79,7 @@ type Controller struct {
7879
clustersSync sync.Map
7980

8081
reconcileQueue workqueue.DelayingInterface
82+
cron *cron.Cron
8183
}
8284

8385
// New returns a new controller
@@ -91,6 +93,7 @@ func New(ctx *controllerpkg.Context) *Controller {
9193
CRDClient: ctx.CRDClient,
9294
KubeSharedInformerFactory: ctx.KubeSharedInformerFactory,
9395
SharedInformerFactory: ctx.SharedInformerFactory,
96+
cron: cron.New(),
9497
}
9598

9699
statefulSetInformer := ctx.KubeSharedInformerFactory.Apps().V1().StatefulSets()

0 commit comments

Comments
 (0)