Skip to content

Commit 5a42b0f

Browse files
committed
add support for parallelism
Signed-off-by: Avi Deitcher <avi@deitcher.net>
1 parent 4beab70 commit 5a42b0f

File tree

9 files changed

+88
-18
lines changed

9 files changed

+88
-18
lines changed

cmd/dump.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er
9595
if len(exclude) == 0 {
9696
exclude = nil
9797
}
98+
99+
// how many databases to back up in parallel
100+
parallel := v.GetInt("parallelism")
101+
if !v.IsSet("parallelism") && dumpConfig != nil && dumpConfig.Parallelism != nil {
102+
parallel = *dumpConfig.Parallelism
103+
}
104+
98105
preBackupScripts := v.GetString("pre-backup-scripts")
99106
if preBackupScripts == "" && scriptsConfig != nil && scriptsConfig.PreBackup != nil {
100107
preBackupScripts = *scriptsConfig.PreBackup
@@ -256,6 +263,7 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er
256263
MaxAllowedPacket: maxAllowedPacket,
257264
Run: uid,
258265
FilenamePattern: filenamePattern,
266+
Parallelism: parallel,
259267
}
260268
_, err := executor.Dump(tracerCtx, dumpOpts)
261269
if err != nil {
@@ -311,6 +319,9 @@ S3: If it is a URL of the format s3://bucketname/path then it will connect via S
311319
// once
312320
flags.Bool("once", false, "Override all other settings and run the dump once immediately and exit. Useful if you use an external scheduler (e.g. as part of an orchestration solution like Cattle or Docker Swarm or [kubernetes cron jobs](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/)) and don't want the container to do the scheduling internally.")
313321

322+
// parallelism - how many databases (and therefore connections) to back up at once
323+
flags.Int("parallelism", 1, "How many databases to back up in parallel.")
324+
314325
// safechars
315326
flags.Bool("safechars", false, "The dump filename usually includes the character `:` in the date, to comply with RFC3339. Some systems and shells don't like that character. If true, will replace all `:` with `-`.")
316327

cmd/dump_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,27 +39,31 @@ func TestDumpCmd(t *testing.T) {
3939
Compressor: &compression.GzipCompressor{},
4040
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
4141
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
42+
Parallelism: 1,
4243
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
4344
{"file URL with pass-file", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{
4445
Targets: []storage.Storage{file.New(*fileTargetURL)},
4546
MaxAllowedPacket: defaultMaxAllowedPacket,
4647
Compressor: &compression.GzipCompressor{},
4748
DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "testpassword"},
4849
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
50+
Parallelism: 1,
4951
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
5052
{"file URL with pass and pass-file (pass takes precedence)", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass", "explicitpass", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{
5153
Targets: []storage.Storage{file.New(*fileTargetURL)},
5254
MaxAllowedPacket: defaultMaxAllowedPacket,
5355
Compressor: &compression.GzipCompressor{},
5456
DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "explicitpass"},
5557
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
58+
Parallelism: 1,
5659
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
5760
{"file URL with prune", []string{"--server", "abc", "--target", "file:///foo/bar", "--retention", "1h"}, "", false, core.DumpOptions{
5861
Targets: []storage.Storage{file.New(*fileTargetURL)},
5962
MaxAllowedPacket: defaultMaxAllowedPacket,
6063
Compressor: &compression.GzipCompressor{},
6164
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
6265
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
66+
Parallelism: 1,
6367
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
6468

6569
// database name and port
@@ -69,13 +73,15 @@ func TestDumpCmd(t *testing.T) {
6973
Compressor: &compression.GzipCompressor{},
7074
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
7175
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
76+
Parallelism: 1,
7277
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
7378
{"database explicit name with explicit port", []string{"--server", "abc", "--port", "3307", "--target", "file:///foo/bar"}, "", false, core.DumpOptions{
7479
Targets: []storage.Storage{file.New(*fileTargetURL)},
7580
MaxAllowedPacket: defaultMaxAllowedPacket,
7681
Compressor: &compression.GzipCompressor{},
7782
DBConn: &database.Connection{Host: "abc", Port: 3307},
7883
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
84+
Parallelism: 1,
7985
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
8086

8187
// config file
@@ -85,20 +91,23 @@ func TestDumpCmd(t *testing.T) {
8591
Compressor: &compression.GzipCompressor{},
8692
DBConn: &database.Connection{Host: "abcd", Port: 3306, User: "user2", Pass: "xxxx2"},
8793
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
94+
Parallelism: 1,
8895
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
8996
{"config file with port override", []string{"--config-file", "testdata/config.yml", "--port", "3307"}, "", false, core.DumpOptions{
9097
Targets: []storage.Storage{file.New(*fileTargetURL)},
9198
MaxAllowedPacket: defaultMaxAllowedPacket,
9299
Compressor: &compression.GzipCompressor{},
93100
DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"},
94101
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
102+
Parallelism: 1,
95103
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
96104
{"config file with filename pattern override", []string{"--config-file", "testdata/pattern.yml", "--port", "3307"}, "", false, core.DumpOptions{
97105
Targets: []storage.Storage{file.New(*fileTargetURL)},
98106
MaxAllowedPacket: defaultMaxAllowedPacket,
99107
Compressor: &compression.GzipCompressor{},
100108
DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"},
101109
FilenamePattern: "foo_{{ .now }}.{{ .compression }}",
110+
Parallelism: 1,
102111
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
103112

104113
// timer options
@@ -108,27 +117,31 @@ func TestDumpCmd(t *testing.T) {
108117
Compressor: &compression.GzipCompressor{},
109118
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
110119
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
120+
Parallelism: 1,
111121
}, core.TimerOptions{Once: true, Frequency: defaultFrequency, Begin: defaultBegin}, nil},
112122
{"cron flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--cron", "0 0 * * *"}, "", false, core.DumpOptions{
113123
Targets: []storage.Storage{file.New(*fileTargetURL)},
114124
MaxAllowedPacket: defaultMaxAllowedPacket,
115125
Compressor: &compression.GzipCompressor{},
116126
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
117127
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
128+
Parallelism: 1,
118129
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin, Cron: "0 0 * * *"}, nil},
119130
{"begin flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--begin", "1234"}, "", false, core.DumpOptions{
120131
Targets: []storage.Storage{file.New(*fileTargetURL)},
121132
MaxAllowedPacket: defaultMaxAllowedPacket,
122133
Compressor: &compression.GzipCompressor{},
123134
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
124135
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
136+
Parallelism: 1,
125137
}, core.TimerOptions{Frequency: defaultFrequency, Begin: "1234"}, nil},
126138
{"frequency flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--frequency", "10"}, "", false, core.DumpOptions{
127139
Targets: []storage.Storage{file.New(*fileTargetURL)},
128140
MaxAllowedPacket: defaultMaxAllowedPacket,
129141
Compressor: &compression.GzipCompressor{},
130142
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
131143
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
144+
Parallelism: 1,
132145
}, core.TimerOptions{Frequency: 10, Begin: defaultBegin}, nil},
133146
{"incompatible flags: once/cron", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--cron", "0 0 * * *"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil},
134147
{"incompatible flags: once/begin", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--begin", "1234"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil},
@@ -146,6 +159,7 @@ func TestDumpCmd(t *testing.T) {
146159
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
147160
PreBackupScripts: "/prebackup",
148161
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
162+
Parallelism: 1,
149163
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
150164
{"postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup"}, "", false, core.DumpOptions{
151165
Targets: []storage.Storage{file.New(*fileTargetURL)},
@@ -154,6 +168,7 @@ func TestDumpCmd(t *testing.T) {
154168
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
155169
PostBackupScripts: "/postbackup",
156170
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
171+
Parallelism: 1,
157172
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
158173
{"prebackup and postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup", "--pre-backup-scripts", "/prebackup"}, "", false, core.DumpOptions{
159174
Targets: []storage.Storage{file.New(*fileTargetURL)},
@@ -163,6 +178,7 @@ func TestDumpCmd(t *testing.T) {
163178
PreBackupScripts: "/prebackup",
164179
PostBackupScripts: "/postbackup",
165180
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
181+
Parallelism: 1,
166182
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
167183
}
168184

docs/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ The following are the environment variables, CLI flags and configuration file op
8181
| where to put the dump file; see [backup](./backup.md) | BP | `dump --target` | `DB_DUMP_TARGET` | `dump.targets` | |
8282
| where the restore file exists; see [restore](./restore.md) | R | `restore --target` | `DB_RESTORE_TARGET` | `restore.target` | |
8383
| replace any `:` in the dump filename with `-` | BP | `dump --safechars` | `DB_DUMP_SAFECHARS` | `database.safechars` | `false` |
84+
| How many databases to back up in parallel, uses that number of threads and connections | B | `dump --parallelism` | `DB_DUMP_PARALLELISM` | `dump.parallelism` | `1` |
8485
| AWS access key ID, used only if a target does not have one | BRP | `aws-access-key-id` | `AWS_ACCESS_KEY_ID` | `dump.targets[s3-target].accessKeyID` | |
8586
| AWS secret access key, used only if a target does not have one | BRP | `aws-secret-access-key` | `AWS_SECRET_ACCESS_KEY` | `dump.targets[s3-target].secretAccessKey` | |
8687
| AWS default region, used only if a target does not have one | BRP | `aws-region` | `AWS_REGION` | `dump.targets[s3-target].region` | |
@@ -144,6 +145,7 @@ for details of each.
144145
* `preBackup`: string, path to directory with pre-backup scripts
145146
* `postBackup`: string, path to directory with post-backup scripts
146147
* `targets`: strings, list of names of known targets, defined in the `targets` section, where to save the backup
148+
* `parallelism`: int, how many databases to back up in parallel
147149
* `restore`: the restore configuration
148150
* `scripts`:
149151
* `preRestore`: string, path to directory with pre-restore scripts

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
require (
3434
filippo.io/age v1.2.1
3535
github.com/InfiniteLoopSpace/go_S-MIME v0.0.0-20181221134359-3f58f9a4b2b6
36-
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e
36+
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151
3737
github.com/google/go-cmp v0.7.0
3838
go.opentelemetry.io/otel v1.31.0
3939
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
7878
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
7979
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e h1:5K7IbijS9p+dezx9m45CjFCR2Sf6BfT/tb540aEw66k=
8080
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE=
81+
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151 h1:WuQNmzJiLSR0d2IpeifwK0E6eOLZQDxzbuHWIEN2/9U=
82+
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE=
8183
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8284
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8385
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

pkg/core/dump.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err
4141
suppressUseDatabase := opts.SuppressUseDatabase
4242
maxAllowedPacket := opts.MaxAllowedPacket
4343
filenamePattern := opts.FilenamePattern
44+
parallelism := opts.Parallelism
4445
logger := e.Logger.WithField("run", opts.Run.String())
4546
logger.Level = e.Logger.Level
4647

@@ -112,6 +113,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err
112113
SuppressUseDatabase: suppressUseDatabase,
113114
MaxAllowedPacket: maxAllowedPacket,
114115
PostDumpDelay: opts.PostDumpDelay,
116+
Parallelism: parallelism,
115117
}, dw); err != nil {
116118
dbDumpSpan.SetStatus(codes.Error, err.Error())
117119
dbDumpSpan.End()

pkg/core/dumpoptions.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ type DumpOptions struct {
2929
FilenamePattern string
3030
// PostDumpDelay inafter each dump is complete, while holding connection open. Do not use outside of tests.
3131
PostDumpDelay time.Duration
32+
// Parallelism how many databases to back up at once, consuming that number of threads
33+
Parallelism int
3234
}

pkg/database/dump.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package database
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/databacker/mysql-backup/pkg/database/mysql"
@@ -16,6 +17,7 @@ type DumpOpts struct {
1617
MaxAllowedPacket int
1718
// PostDumpDelay after each dump is complete, while holding connection open. Do not use outside of tests.
1819
PostDumpDelay time.Duration
20+
Parallelism int
1921
}
2022

2123
func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []DumpWriter) error {
@@ -31,25 +33,49 @@ func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []Dump
3133
if err != nil {
3234
return fmt.Errorf("failed to open connection to database: %v", err)
3335
}
36+
37+
// limit to opts.Parallelism connections
38+
sem := make(chan struct{}, opts.Parallelism)
39+
errCh := make(chan error, len(writers))
40+
var wg sync.WaitGroup
3441
for _, writer := range writers {
35-
for _, schema := range writer.Schemas {
36-
dumper := &mysql.Data{
37-
Out: writer.Writer,
38-
Connection: db,
39-
Schema: schema,
40-
Host: dbconn.Host,
41-
Compact: opts.Compact,
42-
Triggers: opts.Triggers,
43-
Routines: opts.Routines,
44-
SuppressUseDatabase: opts.SuppressUseDatabase,
45-
MaxAllowedPacket: opts.MaxAllowedPacket,
46-
PostDumpDelay: opts.PostDumpDelay,
47-
}
48-
if err := dumper.Dump(); err != nil {
49-
return fmt.Errorf("failed to dump database %s: %v", schema, err)
42+
sem <- struct{}{} // acquire a slot
43+
wg.Add(1)
44+
go func(writer DumpWriter) {
45+
defer wg.Done()
46+
defer func() { <-sem }()
47+
for _, schema := range writer.Schemas {
48+
dumper := &mysql.Data{
49+
Out: writer.Writer,
50+
Connection: db,
51+
Schema: schema,
52+
Host: dbconn.Host,
53+
Compact: opts.Compact,
54+
Triggers: opts.Triggers,
55+
Routines: opts.Routines,
56+
SuppressUseDatabase: opts.SuppressUseDatabase,
57+
MaxAllowedPacket: opts.MaxAllowedPacket,
58+
PostDumpDelay: opts.PostDumpDelay,
59+
}
60+
// return on any error
61+
if err := dumper.Dump(); err != nil {
62+
errCh <- fmt.Errorf("failed to dump database %s: %v", schema, err)
63+
return
64+
}
5065
}
51-
}
66+
}(writer)
5267
}
68+
wg.Wait()
69+
close(errCh)
5370

71+
var errs []error
72+
for err := range errCh {
73+
if err != nil {
74+
errs = append(errs, err)
75+
}
76+
}
77+
if len(errs) > 0 {
78+
return fmt.Errorf("one or more errors occurred: %v", errs)
79+
}
5480
return nil
5581
}

test/backup_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,7 @@ func TestIntegration(t *testing.T) {
914914
t.Fatalf("invalid target url: %v", err)
915915
}
916916

917+
parallelism := 4
917918
dumpOptions := core.DumpOptions{
918919
Compressor: &compression.GzipCompressor{},
919920
DBConn: &database.Connection{
@@ -923,7 +924,8 @@ func TestIntegration(t *testing.T) {
923924
Port: mysql.port,
924925
},
925926
Targets: []storage.Storage{store},
926-
PostDumpDelay: 5 * time.Second, // for testing only, make them delay 10 seconds
927+
PostDumpDelay: 5 * time.Second, // for testing only, make them delay a few seconds
928+
Parallelism: parallelism, // set
927929
}
928930
ctx := context.Background()
929931
start := time.Now()
@@ -964,6 +966,13 @@ func TestIntegration(t *testing.T) {
964966
t.Logf("[%s]\tthreads_running=%d\tthreads_connected=%d\topen_user=%d\tactive_user=%d\n",
965967
time.Now().Format("15:04:05"),
966968
tr, tc, uTotal, uActive)
969+
// threads connected should not be more than our parallel+1
970+
if tc > int64(parallelism)+1 {
971+
t.Errorf("too many threads connected: %d (max %d)", tc, parallelism+1)
972+
}
973+
if tc < int64(parallelism) {
974+
t.Errorf("too few threads connected: %d (min %d)", tc, parallelism)
975+
}
967976

968977
}
969978
}

0 commit comments

Comments
 (0)