Skip to content

Commit f7da9a6

Browse files
committed
Allow to resume badger migration using a given key
1 parent f7c33d0 commit f7da9a6

File tree

1 file changed

+123
-120
lines changed

1 file changed

+123
-120
lines changed

scripts/badger-migration/main.go

Lines changed: 123 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package main
22

33
import (
44
"bytes"
5+
"encoding/base64"
56
"encoding/binary"
67
"errors"
78
"flag"
89
"fmt"
9-
"log"
1010
"os"
1111
"path/filepath"
1212

@@ -62,6 +62,7 @@ func main() {
6262
var v1, v2 bool
6363
var dir, valueDir string
6464
var typ, database string
65+
var key string
6566

6667
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
6768

@@ -71,26 +72,34 @@ func main() {
7172
fs.StringVar(&valueDir, "value-dir", "", "badger database value directory")
7273
fs.StringVar(&typ, "type", "", "the destination database type to use")
7374
fs.StringVar(&database, "database", "", "the destination driver-specific data source name")
75+
fs.StringVar(&key, "key", "", "the key used to resume the migration")
7476
fs.Usage = func() { usage(fs) }
7577
fs.Parse(os.Args[1:])
7678

7779
switch {
7880
case v1 == v2:
79-
fatal("flag --v1 or --v2 are required")
81+
fatal("flag -v1 or -v2 are required")
8082
case dir == "":
81-
fatal("flag --dir is required")
83+
fatal("flag -dir is required")
8284
case typ != "postgresql" && typ != "mysql":
83-
fatal(`flag --type must be "postgresql" or "mysql"`)
85+
fatal(`flag -type must be "postgresql" or "mysql"`)
8486
case database == "":
8587
fatal("flag --database required")
8688
}
8789

8890
var (
89-
err error
90-
v1DB *badgerv1.DB
91-
v2DB *badgerv2.DB
91+
err error
92+
v1DB *badgerv1.DB
93+
v2DB *badgerv2.DB
94+
lastKey []byte
9295
)
9396

97+
if key != "" {
98+
if lastKey, err = base64.StdEncoding.DecodeString(key); err != nil {
99+
fatal("error decoding key: %v", err)
100+
}
101+
}
102+
94103
if v1 {
95104
if v1DB, err = badgerV1Open(dir, valueDir); err != nil {
96105
fatal("error opening badger v1 database: %v", err)
@@ -109,30 +118,55 @@ func main() {
109118
allTables := append([]string{}, authorityTables...)
110119
allTables = append(allTables, acmeTables...)
111120

112-
for _, table := range allTables {
121+
// Convert prefix names to badger key prefixes
122+
badgerKeys := make([][]byte, len(allTables))
123+
for i, name := range allTables {
124+
badgerKeys[i], err = badgerEncode([]byte(name))
125+
if err != nil {
126+
fatal("error encoding table %s: %v", name, err)
127+
}
128+
}
129+
130+
for i, prefix := range badgerKeys {
131+
table := allTables[i]
132+
133+
// With a key flag, resume from that table and prefix
134+
if lastKey != nil {
135+
bucket, _ := parseBadgerEncode(lastKey)
136+
if table != string(bucket) {
137+
fmt.Printf("skipping table %s\n", table)
138+
continue
139+
}
140+
// Continue with a new prefix
141+
prefix = lastKey
142+
lastKey = nil
143+
}
144+
113145
var n int64
114-
fmt.Printf("migrating %s ...\n", table)
146+
fmt.Printf("migrating %s ...", table)
115147
if err := db.CreateTable([]byte(table)); err != nil {
116148
fatal("error creating table %s: %v", table, err)
117149
}
118150

119151
if v1 {
120-
if err := badgerV1Iterate(v1DB, []byte(table), func(bucket, key, value []byte) error {
152+
if badgerKey, err := badgerV1Iterate(v1DB, prefix, func(bucket, key, value []byte) error {
121153
n++
122154
return db.Set(bucket, key, value)
123155
}); err != nil {
124-
fatal("error inserting into %s: %v", table, err)
156+
fmt.Println()
157+
fatal("error inserting into %s: %v\nLast key: %s", table, err, base64.StdEncoding.EncodeToString(badgerKey))
125158
}
126159
} else {
127-
if err := badgerV2Iterate(v2DB, []byte(table), func(bucket, key, value []byte) error {
160+
if badgerKey, err := badgerV2Iterate(v2DB, prefix, func(bucket, key, value []byte) error {
128161
n++
129162
return db.Set(bucket, key, value)
130163
}); err != nil {
131-
fatal("error inserting into %s: %v", table, err)
164+
fmt.Println()
165+
fatal("error inserting into %s: %v\nLast key: %s", table, err, base64.StdEncoding.EncodeToString(badgerKey))
132166
}
133167
}
134168

135-
log.Printf("%d rows\n", n)
169+
fmt.Printf(" %d rows\n", n)
136170
}
137171
}
138172

@@ -158,95 +192,70 @@ func badgerV2Open(dir, valueDir string) (*badgerv2.DB, error) {
158192
return badgerv2.Open(opts)
159193
}
160194

161-
func badgerV1Iterate(db *badgerv1.DB, table []byte, fn func(table, key, value []byte) error) error {
162-
return db.View(func(txn *badgerv1.Txn) error {
163-
var tableExists bool
195+
type Iterator interface {
196+
Seek([]byte)
197+
ValidForPrefix([]byte) bool
198+
Next()
199+
}
200+
201+
type Item interface {
202+
KeyCopy([]byte) []byte
203+
ValueCopy([]byte) ([]byte, error)
204+
}
164205

206+
func badgerV1Iterate(db *badgerv1.DB, prefix []byte, fn func(bucket, key, value []byte) error) (badgerKey []byte, err error) {
207+
err = db.View(func(txn *badgerv1.Txn) error {
165208
it := txn.NewIterator(badgerv1.DefaultIteratorOptions)
166209
defer it.Close()
167-
168-
prefix, err := badgerEncode(table)
169-
if err != nil {
170-
return err
171-
}
172-
173-
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
174-
tableExists = true
175-
item := it.Item()
176-
bk := item.KeyCopy(nil)
177-
if isBadgerTable(bk) {
178-
continue
179-
}
180-
181-
bucket, key, err := fromBadgerKey(bk)
182-
if err != nil {
183-
return fmt.Errorf("error converting from badger key %s", bk)
184-
}
185-
if !bytes.Equal(table, bucket) {
186-
return fmt.Errorf("bucket names do not match; want %s, but got %s", table, bucket)
187-
}
188-
189-
v, err := item.ValueCopy(nil)
190-
if err != nil {
191-
return fmt.Errorf("error retrieving contents from database value: %w", err)
192-
}
193-
value := cloneBytes(v)
194-
195-
if err := fn(bucket, key, value); err != nil {
196-
return fmt.Errorf("error exporting %s[%s]=%v", table, key, value)
197-
}
198-
}
199-
200-
if !tableExists {
201-
fmt.Printf("bucket %s not found\n", table)
202-
}
203-
204-
return nil
210+
badgerKey, err = badgerIterate(it, prefix, fn)
211+
return err
205212
})
213+
return
206214
}
207215

208-
func badgerV2Iterate(db *badgerv2.DB, table []byte, fn func(table, key, value []byte) error) error {
209-
return db.View(func(txn *badgerv2.Txn) error {
210-
var tableExists bool
211-
216+
func badgerV2Iterate(db *badgerv2.DB, prefix []byte, fn func(bucket, key, value []byte) error) (badgerKey []byte, err error) {
217+
err = db.View(func(txn *badgerv2.Txn) error {
212218
it := txn.NewIterator(badgerv2.DefaultIteratorOptions)
213219
defer it.Close()
220+
badgerKey, err = badgerIterate(it, prefix, fn)
221+
return err
222+
})
223+
return
224+
}
214225

215-
prefix, err := badgerEncode(table)
216-
if err != nil {
217-
return err
226+
func badgerIterate(it Iterator, prefix []byte, fn func(bucket, key, value []byte) error) ([]byte, error) {
227+
var badgerKey []byte
228+
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
229+
var item Item
230+
switch itt := it.(type) {
231+
case *badgerv1.Iterator:
232+
item = itt.Item()
233+
case *badgerv2.Iterator:
234+
item = itt.Item()
235+
default:
236+
return badgerKey, fmt.Errorf("unexpected iterator type %T", it)
218237
}
219-
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
220-
tableExists = true
221-
item := it.Item()
222-
bk := item.KeyCopy(nil)
223-
if isBadgerTable(bk) {
224-
continue
225-
}
226238

227-
bucket, key, err := fromBadgerKey(bk)
228-
if err != nil {
229-
return fmt.Errorf("error converting from badgerKey %s: %w", bk, err)
230-
}
231-
if !bytes.Equal(table, bucket) {
232-
return fmt.Errorf("bucket names do not match; want %s, but got %s", table, bucket)
233-
}
234-
235-
v, err := item.ValueCopy(nil)
236-
if err != nil {
237-
return fmt.Errorf("error retrieving contents from database value: %w", err)
238-
}
239-
value := cloneBytes(v)
239+
badgerKey = item.KeyCopy(nil)
240+
if isBadgerTable(badgerKey) {
241+
continue
242+
}
240243

241-
if err := fn(bucket, key, value); err != nil {
242-
return fmt.Errorf("error exporting %s[%s]=%v", table, key, value)
243-
}
244+
bucket, key, err := fromBadgerKey(badgerKey)
245+
if err != nil {
246+
return badgerKey, fmt.Errorf("error converting from badger key %s", badgerKey)
244247
}
245-
if !tableExists {
246-
log.Printf("bucket %s not found", table)
248+
value, err := item.ValueCopy(nil)
249+
if err != nil {
250+
return badgerKey, fmt.Errorf("error retrieving contents from database value: %w", err)
247251
}
248-
return nil
249-
})
252+
253+
if err := fn(bucket, key, value); err != nil {
254+
return badgerKey, fmt.Errorf("error exporting %s[%s]=%x", bucket, key, value)
255+
}
256+
}
257+
258+
return badgerKey, nil
250259
}
251260

252261
// badgerEncode encodes a byte slice into a section of a BadgerKey.
@@ -267,6 +276,31 @@ func badgerEncode(val []byte) ([]byte, error) {
267276
}
268277
}
269278

279+
// parseBadgerEncode decodes the badger key and returns the bucket and the rest.
280+
func parseBadgerEncode(bk []byte) (value, rest []byte) {
281+
var (
282+
keyLen uint16
283+
start = uint16(2)
284+
length = uint16(len(bk))
285+
)
286+
if uint16(len(bk)) < start {
287+
return nil, bk
288+
}
289+
// First 2 bytes stores the length of the value.
290+
if err := binary.Read(bytes.NewReader(bk[:2]), binary.LittleEndian, &keyLen); err != nil {
291+
return nil, bk
292+
}
293+
end := start + keyLen
294+
switch {
295+
case length < end:
296+
return nil, bk
297+
case length == end:
298+
return bk[start:end], nil
299+
default:
300+
return bk[start:end], bk[end:]
301+
}
302+
}
303+
270304
// isBadgerTable returns True if the slice is a badgerTable token, false otherwise.
271305
// badgerTable means that the slice contains only the [size|value] of one section
272306
// of a badgerKey and no remainder. A badgerKey is [buket|key], while a badgerTable
@@ -293,34 +327,3 @@ func fromBadgerKey(bk []byte) ([]byte, []byte, error) {
293327

294328
return bucket, key, nil
295329
}
296-
297-
// cloneBytes returns a copy of a given slice.
298-
func cloneBytes(v []byte) []byte {
299-
var clone = make([]byte, len(v))
300-
copy(clone, v)
301-
return clone
302-
}
303-
304-
func parseBadgerEncode(bk []byte) (value, rest []byte) {
305-
var (
306-
keyLen uint16
307-
start = uint16(2)
308-
length = uint16(len(bk))
309-
)
310-
if uint16(len(bk)) < start {
311-
return nil, bk
312-
}
313-
// First 2 bytes stores the length of the value.
314-
if err := binary.Read(bytes.NewReader(bk[:2]), binary.LittleEndian, &keyLen); err != nil {
315-
return nil, bk
316-
}
317-
end := start + keyLen
318-
switch {
319-
case length < end:
320-
return nil, bk
321-
case length == end:
322-
return bk[start:end], nil
323-
default:
324-
return bk[start:end], bk[end:]
325-
}
326-
}

0 commit comments

Comments
 (0)