Skip to content

Commit 1d08f4a

Browse files
committed
update code to skbn 0.3.0
1 parent 5f0dc03 commit 1d08f4a

File tree

4 files changed

+33
-24
lines changed

4 files changed

+33
-24
lines changed

cmd/cain.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ func NewRootCmd(args []string) *cobra.Command {
3737
}
3838

3939
type backupCmd struct {
40-
namespace string
41-
selector string
42-
container string
43-
keyspace string
44-
dst string
45-
parallel int
40+
namespace string
41+
selector string
42+
container string
43+
keyspace string
44+
dst string
45+
parallel int
46+
bufferSize float64
4647

4748
out io.Writer
4849
}
@@ -56,7 +57,7 @@ func NewBackupCmd(out io.Writer) *cobra.Command {
5657
Short: "backup cassandra cluster to cloud storage",
5758
Long: ``,
5859
Run: func(cmd *cobra.Command, args []string) {
59-
if _, err := cain.Backup(b.namespace, b.selector, b.container, b.keyspace, b.dst, b.parallel); err != nil {
60+
if _, err := cain.Backup(b.namespace, b.selector, b.container, b.keyspace, b.dst, b.parallel, b.bufferSize); err != nil {
6061
log.Fatal(err)
6162
}
6263
},
@@ -69,18 +70,20 @@ func NewBackupCmd(out io.Writer) *cobra.Command {
6970
f.StringVarP(&b.keyspace, "keyspace", "k", "", "keyspace to act on")
7071
f.StringVar(&b.dst, "dst", "", "destination to backup to. Example: s3://bucket/cassandra")
7172
f.IntVarP(&b.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism")
73+
f.Float64VarP(&b.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)")
7274

7375
return cmd
7476
}
7577

7678
type restoreCmd struct {
77-
src string
78-
keyspace string
79-
tag string
80-
namespace string
81-
selector string
82-
container string
83-
parallel int
79+
src string
80+
keyspace string
81+
tag string
82+
namespace string
83+
selector string
84+
container string
85+
parallel int
86+
bufferSize float64
8487

8588
out io.Writer
8689
}
@@ -94,7 +97,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command {
9497
Short: "restore cassandra cluster from cloud storage",
9598
Long: ``,
9699
Run: func(cmd *cobra.Command, args []string) {
97-
if err := cain.Restore(r.src, r.keyspace, r.tag, r.namespace, r.selector, r.container, r.parallel); err != nil {
100+
if err := cain.Restore(r.src, r.keyspace, r.tag, r.namespace, r.selector, r.container, r.parallel, r.bufferSize); err != nil {
98101
log.Fatal(err)
99102
}
100103
},
@@ -108,6 +111,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command {
108111
f.StringVarP(&r.selector, "selector", "l", "", "selector to filter on")
109112
f.StringVarP(&r.container, "container", "c", "cassandra", "container name to act on")
110113
f.IntVarP(&r.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism")
114+
f.Float64VarP(&r.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)")
111115

112116
return cmd
113117
}

pkg/cain/cain.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
// Backup performs backup
12-
func Backup(namespace, selector, container, keyspace, dst string, parallel int) (string, error) {
12+
func Backup(namespace, selector, container, keyspace, dst string, parallel int, bufferSize float64) (string, error) {
1313
log.Println("Backup started!")
1414
dstPrefix, dstPath := utils.SplitInTwo(dst, "://")
1515

@@ -45,7 +45,7 @@ func Backup(namespace, selector, container, keyspace, dst string, parallel int)
4545
}
4646

4747
log.Println("Starting files copy")
48-
if err := skbn.PerformCopy(k8sClient, dstClient, "k8s", dstPrefix, fromToPathsAllPods, parallel); err != nil {
48+
if err := skbn.PerformCopy(k8sClient, dstClient, "k8s", dstPrefix, fromToPathsAllPods, parallel, bufferSize); err != nil {
4949
return "", err
5050
}
5151

@@ -57,7 +57,7 @@ func Backup(namespace, selector, container, keyspace, dst string, parallel int)
5757
}
5858

5959
// Restore performs restore
60-
func Restore(src, keyspace, tag, namespace, selector, container string, parallel int) error {
60+
func Restore(src, keyspace, tag, namespace, selector, container string, parallel int, bufferSize float64) error {
6161
log.Println("Restore started!")
6262
srcPrefix, srcBasePath := utils.SplitInTwo(src, "://")
6363

@@ -102,7 +102,7 @@ func Restore(src, keyspace, tag, namespace, selector, container string, parallel
102102
TruncateTables(k8sClient, namespace, container, keyspace, existingPods, tablesToRefresh, materializedViews)
103103

104104
log.Println("Starting files copy")
105-
if err := skbn.PerformCopy(srcClient, k8sClient, srcPrefix, "k8s", fromToPaths, parallel); err != nil {
105+
if err := skbn.PerformCopy(srcClient, k8sClient, srcPrefix, "k8s", fromToPaths, parallel, bufferSize); err != nil {
106106
return err
107107
}
108108

pkg/cain/cqlsh.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cain
22

33
import (
4+
"bytes"
45
"crypto/sha256"
56
"fmt"
67
"log"
@@ -26,7 +27,8 @@ func BackupKeyspaceSchema(iK8sClient, iDstClient interface{}, namespace, pod, co
2627
dstBasePath := filepath.Join(dstPath, namespace, clusterName, keyspace, sum)
2728
schemaToPath := filepath.Join(dstBasePath, "schema.cql")
2829

29-
if err := skbn.Upload(iDstClient, dstPrefix, schemaToPath, "", schema); err != nil {
30+
reader := bytes.NewReader(schema)
31+
if err := skbn.Upload(iDstClient, dstPrefix, schemaToPath, "", reader); err != nil {
3032
return "", nil
3133
}
3234

@@ -108,7 +110,8 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s
108110
k8sClient := iK8sClient.(*skbn.K8sClient)
109111

110112
command = append([]string{"cqlsh", "-e"}, command...)
111-
stdout, stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil)
113+
stdout := new(bytes.Buffer)
114+
stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout)
112115

113116
if len(stderr) != 0 {
114117
return nil, fmt.Errorf("STDERR: " + (string)(stderr))
@@ -117,7 +120,7 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s
117120
return nil, err
118121
}
119122

120-
return removeWarning(stdout), nil
123+
return removeWarning(stdout.Bytes()), nil
121124
}
122125

123126
func removeWarning(b []byte) []byte {

pkg/cain/nodetool.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cain
22

33
import (
4+
"bytes"
45
"fmt"
56
"log"
67
"strings"
@@ -123,15 +124,16 @@ func refreshTable(k8sClient *skbn.K8sClient, namespace, pod, container, keyspace
123124

124125
func nodetool(k8sClient *skbn.K8sClient, namespace, pod, container string, command []string) (string, error) {
125126
command = append([]string{"nodetool"}, command...)
126-
stdout, stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil)
127+
stdout := new(bytes.Buffer)
128+
stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout)
127129
if len(stderr) != 0 {
128130
return "", fmt.Errorf("STDERR: " + (string)(stderr))
129131
}
130132
if err != nil {
131133
return "", err
132134
}
133135

134-
return (string)(stdout), nil
136+
return stdout.String(), nil
135137
}
136138

137139
func printOutput(output, pod string) {

0 commit comments

Comments
 (0)