Skip to content

Commit c746f03

Browse files
committed
Serve backups using http. Fixes #40
1 parent 43c65ba commit c746f03

File tree

7 files changed

+146
-93
lines changed

7 files changed

+146
-93
lines changed

cmd/mysql-helper/appclone/appclone.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package appclone
1818

1919
import (
2020
"fmt"
21+
"net/http"
2122
"os"
2223
"os/exec"
2324
"strings"
@@ -149,35 +150,24 @@ func cloneFromBucket(initBucket string) error {
149150

150151
func cloneFromSource(host string) error {
151152
glog.Infof("Cloning from node: %s", host)
152-
// ncat --recv-only {host} {port}
153-
// connects to host and get data from there
154-
ncat := exec.Command("ncat", "--recv-only", host, tb.BackupPort)
153+
154+
resp, err := http.Get(fmt.Sprintf("http://%s:%d%s", host, tb.ServerPort, tb.ServerBackupPath))
155+
if err != nil {
156+
return fmt.Errorf("fail to get backup: %s", err)
157+
}
155158

156159
// xbstream -x -C {mysql data target dir}
157160
// extracts files from stdin (-x) and writes them to mysql
158161
// data target dir
159162
xbstream := exec.Command("xbstream", "-x", "-C", tb.DataDir)
160163

161-
ncat.Stderr = os.Stderr
164+
xbstream.Stdin = resp.Body
162165
xbstream.Stderr = os.Stderr
163166

164-
var err error
165-
if xbstream.Stdin, err = ncat.StdoutPipe(); err != nil {
166-
return fmt.Errorf("set pipe, error: %s", err)
167-
}
168-
169-
if err := ncat.Start(); err != nil {
170-
return fmt.Errorf("ncat start error: %s", err)
171-
}
172-
173167
if err := xbstream.Start(); err != nil {
174168
return fmt.Errorf("xbstream start error: %s", err)
175169
}
176170

177-
if err := ncat.Wait(); err != nil {
178-
return fmt.Errorf("ncat wait error: %s", err)
179-
}
180-
181171
if err := xbstream.Wait(); err != nil {
182172
return fmt.Errorf("xbstream wait error: %s", err)
183173
}

cmd/mysql-helper/apphelper/apphelper.go

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@ limitations under the License.
1717
package apphelper
1818

1919
import (
20-
"context"
2120
"fmt"
22-
"net/http"
23-
"os"
24-
"os/exec"
25-
"strings"
2621
"time"
2722

2823
"github.com/golang/glog"
@@ -87,12 +82,10 @@ func RunRunCommand(stopCh <-chan struct{}) error {
8782
}
8883
glog.V(2).Info("Configured read only flag...")
8984

90-
// start http server for readiness probe
91-
// here the server is ready to accept traffic
92-
httpServer(stopCh)
85+
srv := NewServer(stopCh)
86+
glog.V(2).Info("Starting http server...")
9387

94-
// now serve backups
95-
return startServeBackups()
88+
return srv.ListenAndServe()
9689
}
9790

9891
func configureOrchestratorUser() error {
@@ -134,47 +127,6 @@ func configureExporterUser() error {
134127
return nil
135128
}
136129

137-
func startServeBackups() error {
138-
glog.Infof("Serve backups command.")
139-
140-
xtrabackup_cmd := []string{"xtrabackup", "--backup", "--slave-info", "--stream=xbstream",
141-
"--host=127.0.0.1", fmt.Sprintf("--user=%s", tb.GetReplUser()),
142-
fmt.Sprintf("--password=%s", tb.GetReplPass())}
143-
144-
ncat := exec.Command("ncat", "--listen", "--keep-open", "--send-only", "--max-conns=1",
145-
tb.BackupPort, "-c", strings.Join(xtrabackup_cmd, " "))
146-
147-
ncat.Stderr = os.Stderr
148-
149-
return ncat.Run()
150-
151-
}
152-
153-
func httpServer(stop <-chan struct{}) {
154-
mux := http.NewServeMux()
155-
156-
// Add health endpoint
157-
mux.HandleFunc(tb.HelperProbePath, func(w http.ResponseWriter, r *http.Request) {
158-
w.Write([]byte("OK"))
159-
})
160-
161-
srv := &http.Server{
162-
Addr: fmt.Sprintf(":%d", tb.HelperProbePort),
163-
Handler: mux,
164-
}
165-
166-
// Shutdown gracefully the http server
167-
go func() {
168-
<-stop // wait for stop signal
169-
if err := srv.Shutdown(context.Background()); err != nil {
170-
glog.Errorf("Failed to stop probe server, err: %s", err)
171-
}
172-
}()
173-
go func() {
174-
glog.Fatal(srv.ListenAndServe())
175-
}()
176-
}
177-
178130
func waitForMysqlReady() error {
179131
glog.V(2).Info("Wait for mysql to be ready.")
180132

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
Copyright 2018 Pressinfra SRL
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package apphelper
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"net/http"
24+
"os"
25+
"os/exec"
26+
27+
"github.com/golang/glog"
28+
tb "github.com/presslabs/mysql-operator/cmd/mysql-helper/util"
29+
)
30+
31+
type server struct {
32+
http.Server
33+
}
34+
35+
func NewServer(stop <-chan struct{}) *server {
36+
mux := http.NewServeMux()
37+
srv := &server{
38+
Server: http.Server{
39+
Addr: fmt.Sprintf(":%d", tb.ServerPort),
40+
Handler: mux,
41+
},
42+
}
43+
44+
// Add handle functions
45+
mux.HandleFunc(tb.ServerProbePath, srv.healthHandle)
46+
mux.Handle(tb.ServerBackupPath, tb.MaxClients(http.HandlerFunc(srv.serveBackupHandle), 1))
47+
48+
// Shutdown gracefully the http server
49+
go func() {
50+
<-stop // wait for stop signal
51+
if err := srv.Shutdown(context.Background()); err != nil {
52+
glog.Errorf("Failed to stop probe server, err: %s", err)
53+
}
54+
}()
55+
56+
return srv
57+
}
58+
59+
func (s *server) healthHandle(w http.ResponseWriter, r *http.Request) {
60+
w.WriteHeader(http.StatusOK)
61+
w.Write([]byte("OK"))
62+
}
63+
64+
func (s *server) serveBackupHandle(w http.ResponseWriter, r *http.Request) {
65+
66+
flusher, ok := w.(http.Flusher)
67+
if !ok {
68+
http.Error(w, "Streamming unsupported!", http.StatusInternalServerError)
69+
return
70+
}
71+
72+
w.Header().Set("Content-Type", "application/octet-stream")
73+
w.Header().Set("Connection", "keep-alive")
74+
75+
xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream",
76+
"--host=127.0.0.1", fmt.Sprintf("--user=%s", tb.GetReplUser()),
77+
fmt.Sprintf("--password=%s", tb.GetReplPass()))
78+
79+
xtrabackup.Stderr = os.Stderr
80+
81+
stdout, err := xtrabackup.StdoutPipe()
82+
if err != nil {
83+
glog.Errorf("Fail to create stdoutpipe: %s", err)
84+
http.Error(w, "xtrabackup failed", http.StatusInternalServerError)
85+
return
86+
}
87+
88+
defer stdout.Close()
89+
90+
if err := xtrabackup.Start(); err != nil {
91+
glog.Errorf("Fail to start xtrabackup cmd: %s", err)
92+
http.Error(w, "xtrabackup failed", http.StatusInternalServerError)
93+
return
94+
}
95+
96+
if _, err := io.Copy(w, stdout); err != nil {
97+
glog.Errorf("Fail to copy buffer: %s", err)
98+
http.Error(w, "buffer copy failed", http.StatusInternalServerError)
99+
return
100+
}
101+
102+
flusher.Flush()
103+
104+
if err := xtrabackup.Wait(); err != nil {
105+
glog.Errorf("Fail waiting for xtrabackup to finish: %s", err)
106+
http.Error(w, "xtrabackup failed", http.StatusInternalServerError)
107+
return
108+
}
109+
}

cmd/mysql-helper/apptakebackup/apptakebackup.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package apptakebackup
1818

1919
import (
2020
"fmt"
21+
"net/http"
2122
"os"
2223
"os/exec"
2324
"strings"
@@ -32,38 +33,30 @@ const (
3233
)
3334

3435
func RunTakeBackupCommand(stopCh <-chan struct{}, srcHost, destBucket string) error {
35-
glog.Infof("Take backup from '%s' to bucket '%s' started...", srcHost, destBucket)
36+
glog.Infof("Taking backup from '%s' to bucket '%s' ...", srcHost, destBucket)
3637
destBucket = normalizeBucketUri(destBucket)
3738
return pushBackupFromTo(srcHost, destBucket)
3839
}
3940

4041
func pushBackupFromTo(srcHost, destBucket string) error {
41-
// TODO: document each func
42-
ncat := exec.Command("ncat", "-i", ncatIdleTimeout, "--recv-only", srcHost, tb.BackupPort)
42+
resp, err := http.Get(fmt.Sprintf("http://%s:%d%s", srcHost, tb.ServerPort, tb.ServerBackupPath))
43+
if err != nil {
44+
return fmt.Errorf("fail to get backup: %s", err)
45+
}
4346

4447
gzip := exec.Command("gzip", "-c")
4548

4649
rclone := exec.Command("rclone",
4750
fmt.Sprintf("--config=%s", tb.RcloneConfigFile), "rcat", destBucket)
4851

49-
ncat.Stderr = os.Stderr
52+
gzip.Stdin = resp.Body
5053
gzip.Stderr = os.Stderr
5154
rclone.Stderr = os.Stderr
5255

53-
var err error
54-
// ncat | gzip | rclone
55-
if gzip.Stdin, err = ncat.StdoutPipe(); err != nil {
56-
return err
57-
}
58-
5956
if rclone.Stdin, err = gzip.StdoutPipe(); err != nil {
6057
return err
6158
}
6259

63-
if err := ncat.Start(); err != nil {
64-
return fmt.Errorf("ncat start error: %s", err)
65-
}
66-
6760
if err := gzip.Start(); err != nil {
6861
return fmt.Errorf("gzip start error: %s", err)
6962
}
@@ -72,11 +65,6 @@ func pushBackupFromTo(srcHost, destBucket string) error {
7265
return fmt.Errorf("rclone start error: %s", err)
7366
}
7467

75-
glog.V(2).Info("Wait for ncat to finish.")
76-
if err := ncat.Wait(); err != nil {
77-
return fmt.Errorf("ncat wait error: %s", err)
78-
}
79-
8068
glog.V(2).Info("Wait for gzip to finish.")
8169
if err := gzip.Wait(); err != nil {
8270
return fmt.Errorf("gzip wait error: %s", err)

cmd/mysql-helper/util/util.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"database/sql"
2222
"fmt"
2323
"io"
24+
"net/http"
2425
"os"
2526
"path"
2627
"strconv"
@@ -68,8 +69,10 @@ var (
6869

6970
NameOfStatefulSet = api.StatefulSet
7071

71-
HelperProbePath = mysqlcluster.HelperProbePath
72-
HelperProbePort = mysqlcluster.HelperProbePort
72+
// http server config
73+
ServerPort = mysqlcluster.HelperServerPort
74+
ServerProbePath = mysqlcluster.HelperServerProbePath
75+
ServerBackupPath = "/xtbackup"
7376
)
7477

7578
const (
@@ -284,3 +287,14 @@ func CopyFile(src, dst string) error {
284287
}
285288
return out.Close()
286289
}
290+
291+
func MaxClients(h http.Handler, n int) http.Handler {
292+
sema := make(chan struct{}, n)
293+
294+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
295+
sema <- struct{}{}
296+
defer func() { <-sema }()
297+
298+
h.ServeHTTP(w, r)
299+
})
300+
}

pkg/mysqlcluster/settings.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ const (
4040
// change
4141
ConfigVersion = "2018-03-23:12:33"
4242

43-
HelperProbePath = "/health"
44-
HelperProbePort = 8001
43+
HelperServerPort = 8088
44+
HelperServerProbePath = "/health"
4545

4646
ExporterPortName = "prometheus"
4747
ExporterPort = 9104

pkg/mysqlcluster/statefullset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ func (f *cFactory) ensureContainersSpec(in []core.Container) []core.Container {
340340
// HELPER container
341341
helper.ReadinessProbe = ensureProbe(helper.ReadinessProbe, 5, 5, 10, core.Handler{
342342
HTTPGet: &core.HTTPGetAction{
343-
Path: HelperProbePath,
344-
Port: intstr.FromInt(HelperProbePort),
343+
Path: HelperServerProbePath,
344+
Port: intstr.FromInt(HelperServerPort),
345345
Scheme: core.URISchemeHTTP,
346346
},
347347
})

0 commit comments

Comments
 (0)