Skip to content

Commit 65165ba

Browse files
Merge pull request #1136 from ivov/add-mulitple-output-formats-to-database-dump-command
Add multiple output formats to `database dump` command
2 parents 285e227 + 4d3dc85 commit 65165ba

File tree

7 files changed

+512
-94
lines changed

7 files changed

+512
-94
lines changed

internal/cmd/database/dump.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ import (
2626
)
2727

2828
type dumpFlags struct {
29-
localAddr string
30-
remoteAddr string
31-
keyspace string
32-
shard string
33-
replica bool
34-
rdonly bool
35-
tables string
36-
wheres string
37-
output string
38-
threads int
39-
schemaOnly bool
29+
localAddr string
30+
remoteAddr string
31+
keyspace string
32+
shard string
33+
replica bool
34+
rdonly bool
35+
tables string
36+
wheres string
37+
output string
38+
threads int
39+
schemaOnly bool
40+
outputFormat string
4041
}
4142

4243
// DumpCmd encapsulates the commands for dumping a database
@@ -67,6 +68,8 @@ func DumpCmd(ch *cmdutil.Helper) *cobra.Command {
6768
"Output directory of the dump. By default the dump is saved to a folder in the current directory.")
6869
cmd.PersistentFlags().IntVar(&f.threads, "threads", 16, "Number of concurrent threads to use to dump the database.")
6970
cmd.PersistentFlags().BoolVar(&f.schemaOnly, "schema-only", false, "Only dump schema, skip table data.")
71+
cmd.PersistentFlags().StringVar(&f.outputFormat, "output-format", "sql",
72+
"Output format for data: sql (for MySQL, default), json, or csv.")
7073

7174
return cmd
7275
}
@@ -87,6 +90,11 @@ func dump(ch *cmdutil.Helper, cmd *cobra.Command, flags *dumpFlags, args []strin
8790
return fmt.Errorf("to target a single shard, please pass the --keyspace flag")
8891
}
8992

93+
validFormats := map[string]bool{"sql": true, "json": true, "csv": true}
94+
if !validFormats[flags.outputFormat] {
95+
return fmt.Errorf("invalid output format: %s. Valid options are: sql, json, csv", flags.outputFormat)
96+
}
97+
9098
client, err := ch.Client()
9199
if err != nil {
92100
return err
@@ -239,6 +247,7 @@ func dump(ch *cmdutil.Helper, cmd *cobra.Command, flags *dumpFlags, args []strin
239247
cfg.SessionVars = []string{"set workload=olap;"}
240248
cfg.Outdir = dir
241249
cfg.SchemaOnly = flags.schemaOnly
250+
cfg.OutputFormat = flags.outputFormat
242251

243252
if flags.shard != "" {
244253
if flags.replica {

internal/dumper/csv_writer.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package dumper
2+
3+
import (
4+
"bytes"
5+
"encoding/csv"
6+
"fmt"
7+
8+
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
9+
)
10+
11+
type csvWriter struct {
12+
cfg *Config
13+
fieldNames []string
14+
csvBuffer bytes.Buffer
15+
writer *csv.Writer
16+
chunkbytes int
17+
}
18+
19+
func newCSVWriter(cfg *Config) *csvWriter {
20+
return &csvWriter{
21+
cfg: cfg,
22+
}
23+
}
24+
25+
func (w *csvWriter) Initialize(fieldNames []string) error {
26+
w.fieldNames = fieldNames
27+
w.csvBuffer.Reset()
28+
w.writer = csv.NewWriter(&w.csvBuffer)
29+
30+
if err := w.writer.Write(fieldNames); err != nil {
31+
return err
32+
}
33+
w.writer.Flush()
34+
return nil
35+
}
36+
37+
func (w *csvWriter) WriteRow(row []sqltypes.Value) (int, error) {
38+
csvRow := make([]string, len(row))
39+
for i, v := range row {
40+
if v.Raw() == nil {
41+
csvRow[i] = ""
42+
} else {
43+
csvRow[i] = v.String()
44+
}
45+
}
46+
47+
if err := w.writer.Write(csvRow); err != nil {
48+
return 0, err
49+
}
50+
w.writer.Flush()
51+
52+
rowBytes := w.csvBuffer.Len()
53+
bytesAdded := rowBytes - w.chunkbytes
54+
w.chunkbytes = rowBytes
55+
return bytesAdded, nil
56+
}
57+
58+
func (w *csvWriter) ShouldFlush() bool {
59+
return (w.chunkbytes / 1024 / 1024) >= w.cfg.ChunksizeInMB
60+
}
61+
62+
func (w *csvWriter) Flush(outdir, database, table string, fileNo int) error {
63+
file := fmt.Sprintf("%s/%s.%s.%05d.csv", outdir, database, table, fileNo)
64+
err := writeFile(file, w.csvBuffer.String())
65+
if err != nil {
66+
return err
67+
}
68+
69+
w.csvBuffer.Reset()
70+
w.writer = csv.NewWriter(&w.csvBuffer)
71+
if err := w.writer.Write(w.fieldNames); err != nil {
72+
return err
73+
}
74+
w.writer.Flush()
75+
w.chunkbytes = 0
76+
return nil
77+
}
78+
79+
func (w *csvWriter) Close(outdir, database, table string, fileNo int) error {
80+
if w.csvBuffer.Len() > 0 {
81+
return w.Flush(outdir, database, table, fileNo)
82+
}
83+
return nil
84+
}

internal/dumper/dumper.go

Lines changed: 71 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/planetscale/cli/internal/cmdutil"
1515
"github.com/planetscale/cli/internal/printer"
16-
querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
1716
"golang.org/x/sync/errgroup"
1817

1918
"go.uber.org/zap"
@@ -38,6 +37,7 @@ type Config struct {
3837
Shard string
3938
Table string
4039
Outdir string
40+
OutputFormat string
4141
SessionVars []string
4242
Threads int
4343
ChunksizeInMB int
@@ -64,7 +64,8 @@ type Config struct {
6464

6565
func NewDefaultConfig() *Config {
6666
return &Config{
67-
Threads: 1,
67+
Threads: 1,
68+
OutputFormat: "sql",
6869
}
6970
}
7071

@@ -80,6 +81,12 @@ func NewDumper(cfg *Config) (*Dumper, error) {
8081
}, nil
8182
}
8283

84+
type dumpContext struct {
85+
fieldNames []string
86+
selfields []string
87+
where string
88+
}
89+
8390
func (d *Dumper) Run(ctx context.Context) error {
8491
initPool, err := NewPool(d.log, d.cfg.Threads, d.cfg.Address, d.cfg.User, d.cfg.Password, nil, "")
8592
if err != nil {
@@ -262,51 +269,37 @@ func (d *Dumper) dumpTableSchema(conn *Connection, database string, table string
262269
return nil
263270
}
264271

265-
// Dump a table in "MySQL" (multi-inserts) format
272+
// Dump a table in the configured output format
266273
func (d *Dumper) dumpTable(ctx context.Context, conn *Connection, database string, table string) error {
267-
var allBytes uint64
268-
var allRows uint64
269-
var where string
270-
var selfields []string
271-
272-
fields := make([]string, 0)
273-
{
274-
flds, err := d.dumpableFieldNames(conn, table)
275-
if err != nil {
276-
return err
277-
}
278-
279-
for _, name := range flds {
280-
d.log.Debug("dump", zap.Any("filters", d.cfg.Filters), zap.String("table", table), zap.String("field_name", name))
281-
282-
if _, ok := d.cfg.Filters[table][name]; ok {
283-
continue
284-
}
274+
var writer TableWriter
275+
276+
switch d.cfg.OutputFormat {
277+
case "json":
278+
writer = newJSONWriter(d.cfg)
279+
case "csv":
280+
writer = newCSVWriter(d.cfg)
281+
default:
282+
writer = newSQLWriter(d.cfg, table)
283+
}
285284

286-
fields = append(fields, fmt.Sprintf("`%s`", name))
287-
replacement, ok := d.cfg.Selects[table][name]
288-
if ok {
289-
selfields = append(selfields, fmt.Sprintf("%s AS `%s`", replacement, name))
290-
} else {
291-
selfields = append(selfields, fmt.Sprintf("`%s`", name))
292-
}
293-
}
285+
dumpCtx, err := d.tableDumpContext(conn, table)
286+
if err != nil {
287+
return err
294288
}
295289

296-
if v, ok := d.cfg.Wheres[table]; ok {
297-
where = fmt.Sprintf(" WHERE %v", v)
290+
if err := writer.Initialize(dumpCtx.fieldNames); err != nil {
291+
return err
298292
}
299293

300-
cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT %s FROM `%s`.`%s` %s", strings.Join(selfields, ", "), database, table, where))
294+
cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT %s FROM `%s`.`%s` %s", strings.Join(dumpCtx.selfields, ", "), database, table, dumpCtx.where))
301295
if err != nil {
302296
return err
303297
}
298+
defer cursor.Close()
304299

300+
var allBytes uint64
301+
var allRows uint64
305302
fileNo := 1
306-
stmtsize := 0
307-
chunkbytes := 0
308-
rows := make([]string, 0, 256)
309-
inserts := make([]string, 0, 256)
310303
for cursor.Next() {
311304
row, err := cursor.RowValues()
312305
if err != nil {
@@ -318,42 +311,18 @@ func (d *Dumper) dumpTable(ctx context.Context, conn *Connection, database strin
318311
return ctx.Err()
319312
}
320313

321-
values := make([]string, 0, 16)
322-
for _, v := range row {
323-
if v.Raw() == nil {
324-
values = append(values, "NULL")
325-
} else {
326-
str := v.String()
327-
switch {
328-
case v.IsSigned(), v.IsUnsigned(), v.IsFloat(), v.IsIntegral(), v.Type() == querypb.Type_DECIMAL:
329-
values = append(values, str)
330-
default:
331-
values = append(values, fmt.Sprintf("\"%s\"", escapeBytes(v.Raw())))
332-
}
333-
}
314+
bytesAdded, err := writer.WriteRow(row)
315+
if err != nil {
316+
return err
334317
}
335-
r := "(" + strings.Join(values, ",") + ")"
336-
rows = append(rows, r)
337318

338319
allRows++
339-
stmtsize += len(r)
340-
chunkbytes += len(r)
341-
allBytes += uint64(len(r))
342-
atomic.AddUint64(&d.cfg.Allbytes, uint64(len(r)))
320+
allBytes += uint64(bytesAdded)
321+
atomic.AddUint64(&d.cfg.Allbytes, uint64(bytesAdded))
343322
atomic.AddUint64(&d.cfg.Allrows, 1)
344323

345-
if stmtsize >= d.cfg.StmtSize {
346-
insertone := fmt.Sprintf("INSERT INTO `%s`(%s) VALUES\n%s", table, strings.Join(fields, ","), strings.Join(rows, ",\n"))
347-
inserts = append(inserts, insertone)
348-
rows = rows[:0]
349-
stmtsize = 0
350-
}
351-
352-
if (chunkbytes / 1024 / 1024) >= d.cfg.ChunksizeInMB {
353-
query := strings.Join(inserts, ";\n") + ";\n"
354-
file := fmt.Sprintf("%s/%s.%s.%05d.sql", d.cfg.Outdir, database, table, fileNo)
355-
err = writeFile(file, query)
356-
if err != nil {
324+
if writer.ShouldFlush() {
325+
if err := writer.Flush(d.cfg.Outdir, database, table, fileNo); err != nil {
357326
return err
358327
}
359328

@@ -367,26 +336,11 @@ func (d *Dumper) dumpTable(ctx context.Context, conn *Connection, database strin
367336
zap.Int("thread_conn_id", conn.ID),
368337
)
369338

370-
inserts = inserts[:0]
371-
chunkbytes = 0
372339
fileNo++
373340
}
374341
}
375-
if chunkbytes > 0 {
376-
if len(rows) > 0 {
377-
insertone := fmt.Sprintf("INSERT INTO `%s`(%s) VALUES\n%s", table, strings.Join(fields, ","), strings.Join(rows, ",\n"))
378-
inserts = append(inserts, insertone)
379-
}
380342

381-
query := strings.Join(inserts, ";\n") + ";\n"
382-
file := fmt.Sprintf("%s/%s.%s.%05d.sql", d.cfg.Outdir, database, table, fileNo)
383-
err = writeFile(file, query)
384-
if err != nil {
385-
return err
386-
}
387-
}
388-
err = cursor.Close()
389-
if err != nil {
343+
if err := writer.Close(d.cfg.Outdir, database, table, fileNo); err != nil {
390344
return err
391345
}
392346

@@ -401,6 +355,40 @@ func (d *Dumper) dumpTable(ctx context.Context, conn *Connection, database strin
401355
return nil
402356
}
403357

358+
func (d *Dumper) tableDumpContext(conn *Connection, table string) (*dumpContext, error) {
359+
ctx := &dumpContext{}
360+
361+
flds, err := d.dumpableFieldNames(conn, table)
362+
if err != nil {
363+
return nil, err
364+
}
365+
366+
ctx.fieldNames = make([]string, 0)
367+
ctx.selfields = make([]string, 0)
368+
369+
for _, name := range flds {
370+
d.log.Debug("dump", zap.Any("filters", d.cfg.Filters), zap.String("table", table), zap.String("field_name", name))
371+
372+
if _, ok := d.cfg.Filters[table][name]; ok {
373+
continue
374+
}
375+
376+
ctx.fieldNames = append(ctx.fieldNames, name)
377+
replacement, ok := d.cfg.Selects[table][name]
378+
if ok {
379+
ctx.selfields = append(ctx.selfields, fmt.Sprintf("%s AS `%s`", replacement, name))
380+
} else {
381+
ctx.selfields = append(ctx.selfields, fmt.Sprintf("`%s`", name))
382+
}
383+
}
384+
385+
if v, ok := d.cfg.Wheres[table]; ok {
386+
ctx.where = fmt.Sprintf(" WHERE %v", v)
387+
}
388+
389+
return ctx, nil
390+
}
391+
404392
func (d *Dumper) allTables(conn *Connection, database string) ([]string, error) {
405393
qr, err := conn.Fetch(fmt.Sprintf("SHOW TABLES FROM `%s`", database))
406394
if err != nil {

0 commit comments

Comments
 (0)