Skip to content

Commit 3485634

Browse files
committed
8-to-9: Pre-fill backup file. Reduce size. Better syncing.
1 parent 0cc7930 commit 3485634

File tree

2 files changed

+92
-51
lines changed

2 files changed

+92
-51
lines changed

ipfs-8-to-9/migration/migration.go

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99
"io"
1010
"os"
1111
"path/filepath"
12-
"strings"
1312

1413
migrate "github.com/ipfs/fs-repo-migrations/go-migrate"
1514
lock "github.com/ipfs/fs-repo-migrations/ipfs-1-to-2/repolock"
1615
"github.com/ipfs/fs-repo-migrations/mfsr"
1716
"github.com/ipfs/go-filestore"
17+
dshelp "github.com/ipfs/go-ipfs-ds-help"
1818

1919
log "github.com/ipfs/fs-repo-migrations/stump"
2020
ds "github.com/ipfs/go-datastore"
@@ -124,34 +124,47 @@ func (m Migration) Apply(opts migrate.Options) error {
124124
}
125125
defer f.Close()
126126
buf := bufio.NewWriter(f)
127-
defer buf.Flush()
128127

129-
// Will be closed by cidSwapper when it finish writing.
130128
swapCh := make(chan Swap, 1000)
131129

132130
writingDone := make(chan struct{})
133131
go func() {
134132
for sw := range swapCh {
135-
fmt.Fprint(buf, sw.Old.String()+","+sw.New.String()+"\n")
133+
// Only write the Old string (a CID). We can derive
134+
// the multihash from it.
135+
fmt.Fprint(buf, sw.Old.String(), "\n")
136136
}
137137
close(writingDone)
138138
}()
139139

140+
// Add all the keys to migrate to the backup file
140141
for _, prefix := range migrationPrefixes {
141-
log.VLog(" - migrating keys for prefix %s", prefix)
142+
log.VLog(" - Adding keys in prefix %s to backup file", prefix)
142143
cidSwapper := CidSwapper{Prefix: prefix, Store: dstore, SwapCh: swapCh}
143-
total, err := cidSwapper.Run()
144+
total, err := cidSwapper.Run(true) // DRY RUN
144145
if err != nil {
145146
close(swapCh)
146147
log.Error(err)
147148
return err
148149
}
149-
log.Log("%d CIDv1 keys swapped to raw multihashes for %s", total, prefix)
150+
log.Log("%d CIDv1 keys added to backup file for %s", total, prefix)
150151
}
151152
close(swapCh)
152-
// Wait for our writing to finish before doing the final flush
153-
// (deferred).
153+
// Wait for our writing to finish before doing the flushing.
154154
<-writingDone
155+
buf.Flush()
156+
157+
// The backup file is ready. Run the migration.
158+
for _, prefix := range migrationPrefixes {
159+
log.VLog(" - Migrating keys in prefix %s", prefix)
160+
cidSwapper := CidSwapper{Prefix: prefix, Store: dstore}
161+
total, err := cidSwapper.Run(false) // NOT a Dry Run
162+
if err != nil {
163+
log.Error(err)
164+
return err
165+
}
166+
log.Log("%d CIDv1 keys in %s have been migrated", total, prefix)
167+
}
155168

156169
if err := repo.WriteVersion("9"); err != nil {
157170
log.Error("failed to write version file")
@@ -198,18 +211,26 @@ func (m Migration) Revert(opts migrate.Options) error {
198211

199212
unswapCh := make(chan Swap, 1000)
200213
scanner := bufio.NewScanner(f)
214+
var scannerErr error
201215

202216
go func() {
203217
defer close(unswapCh)
204218

205219
for scanner.Scan() {
206-
line := scanner.Text()
207-
oldAndNew := strings.Split(line, ",")
208-
if len(oldAndNew) != 2 {
209-
log.Error("bad line in backup file: %s", line)
210-
continue
220+
cidPath := ds.NewKey(scanner.Text())
221+
cidKey := ds.NewKey(cidPath.BaseNamespace())
222+
prefix := cidPath.Parent()
223+
cid, err := dsKeyToCid(cidKey)
224+
if err != nil {
225+
log.Error("could not parse cid from backup file: %s", err)
226+
scannerErr = err
227+
break
211228
}
212-
sw := Swap{Old: ds.NewKey(oldAndNew[0]), New: ds.NewKey(oldAndNew[1])}
229+
mhashPath := prefix.Child(dshelp.MultihashToDsKey(cid.Hash()))
230+
// This is the original swap object which is what we
231+
// wanted to rebuild. Old is the old path and new is
232+
// the new path and the unswapper will revert this.
233+
sw := Swap{Old: cidPath, New: mhashPath}
213234
unswapCh <- sw
214235
}
215236
if err := scanner.Err(); err != nil {
@@ -219,13 +240,19 @@ func (m Migration) Revert(opts migrate.Options) error {
219240

220241
}()
221242

222-
// The backup file contains prefixed keys, so we do not need to set them.
243+
// The backup file contains prefixed keys, so we do not need to set
244+
// them.
223245
cidSwapper := CidSwapper{Store: dstore}
224246
total, err := cidSwapper.Revert(unswapCh)
225247
if err != nil {
226248
log.Error(err)
227249
return err
228250
}
251+
// Revert will only return after unswapCh is closed, so we know
252+
// scannerErr is safe to read at this point.
253+
if scannerErr != nil {
254+
return err
255+
}
229256

230257
log.Log("%d multihashes reverted to CidV1s", total)
231258
if err := repo.WriteVersion("8"); err != nil {

ipfs-8-to-9/migration/swapper.go

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ type CidSwapper struct {
3737

3838
// Run lists all the keys in the datastore and triggers a swap operation for
3939
// those corresponding to CIDv1s (replacing them by their raw multihash).
40+
// When dryRun is true, it will not perform any changes, but notify SwapCh
41+
// as if it would.
4042
//
4143
// Run returns the total number of keys swapped.
42-
// The SwapCh is closed at the end of the run.
43-
func (cswap *CidSwapper) Run() (uint64, error) {
44+
func (cswap *CidSwapper) Run(dryRun bool) (uint64, error) {
4445
// Query all keys. We will loop all keys
4546
// and swap those that can be parsed as CIDv1.
4647
queryAll := query.Query{
@@ -55,7 +56,7 @@ func (cswap *CidSwapper) Run() (uint64, error) {
5556
defer results.Close()
5657
resultsCh := results.Next()
5758
swapWorkerFunc := func() (uint64, uint64) {
58-
return cswap.swapWorker(resultsCh)
59+
return cswap.swapWorker(dryRun, resultsCh)
5960
}
6061
return cswap.runWorkers(NWorkers, swapWorkerFunc)
6162
}
@@ -100,12 +101,11 @@ func (cswap *CidSwapper) runWorkers(nWorkers int, f func() (uint64, uint64)) (ui
100101
// swapWorkers reads query results from a channel and renames CIDv1 keys to
101102
// raw multihashes by reading the blocks and storing them with the new
102103
// key. Returns the number of keys swapped and the number of errors.
103-
func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint64) {
104+
func (cswap *CidSwapper) swapWorker(dryRun bool, resultsCh <-chan query.Result) (uint64, uint64) {
104105
var errored uint64
105106

106107
sw := &swapWorker{
107108
store: cswap.Store,
108-
swapCh: cswap.SwapCh,
109109
syncPrefix: cswap.Prefix,
110110
}
111111

@@ -133,19 +133,34 @@ func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint
133133
mh := c.Hash()
134134
// /path/to/old/<cid> -> /path/to/old/<multihash>
135135
newKey := oldKey.Parent().Child(dshelp.MultihashToDsKey(mh))
136-
err = sw.swap(oldKey, newKey)
137-
if err != nil {
138-
log.Error("swapping %s for %s: %s", oldKey, newKey, err)
139-
errored++
140-
continue
136+
if dryRun {
137+
sw.swapped++
138+
} else {
139+
err = sw.swap(oldKey, newKey)
140+
if err != nil {
141+
log.Error("swapping %s for %s: %s", oldKey, newKey, err)
142+
errored++
143+
continue
144+
}
145+
}
146+
147+
if cswap.SwapCh != nil {
148+
cswap.SwapCh <- Swap{Old: oldKey, New: newKey}
141149
}
142150
}
143151

144-
// final sync
145-
err := sw.sync()
146-
if err != nil {
147-
log.Error("error performing last sync: %s", err)
148-
errored++
152+
if !dryRun {
153+
// final sync
154+
err := sw.syncAndDelete()
155+
if err != nil {
156+
log.Error("error performing last sync: %s", err)
157+
errored++
158+
}
159+
err = sw.sync() // sync deleted items
160+
if err != nil {
161+
log.Error("error performing last sync for deletions: %s", err)
162+
errored++
163+
}
149164
}
150165

151166
return sw.swapped, errored
@@ -159,7 +174,6 @@ func (cswap *CidSwapper) unswapWorker(unswapCh <-chan Swap) (uint64, uint64) {
159174

160175
swker := &swapWorker{
161176
store: cswap.Store,
162-
swapCh: cswap.SwapCh,
163177
syncPrefix: cswap.Prefix,
164178
}
165179

@@ -194,23 +208,29 @@ func (cswap *CidSwapper) unswapWorker(unswapCh <-chan Swap) (uint64, uint64) {
194208
errored++
195209
}
196210
swker.swapped++
197-
continue
198-
}
199-
if err != nil {
211+
} else if err != nil {
200212
log.Error("swapping %s for %s: %s", sw.New, sw.Old, err)
201213
errored++
202214
continue
203215
}
216+
if cswap.SwapCh != nil {
217+
cswap.SwapCh <- Swap{Old: sw.New, New: sw.Old}
218+
}
204219
// Remember that we switched certain multiash for a Cid already
205220
unswappedMap[sw.New] = sw.Old
206221
}
207222

208-
// final sync
209-
err := swker.sync()
223+
// final sync to added things
224+
err := swker.syncAndDelete()
210225
if err != nil {
211226
log.Error("error performing last sync: %s", err)
212227
errored++
213228
}
229+
err = swker.sync() // final sync for deletes.
230+
if err != nil {
231+
log.Error("error performing last sync for deletions: %s", err)
232+
errored++
233+
}
214234

215235
return swker.swapped, errored
216236
}
@@ -221,7 +241,6 @@ type swapWorker struct {
221241
swapped uint64
222242
curSyncSize uint64
223243

224-
swapCh chan Swap
225244
store ds.Batching
226245
syncPrefix ds.Key
227246

@@ -245,27 +264,20 @@ func (sw *swapWorker) swap(old, new ds.Key) error {
245264
sw.swapped++
246265
sw.curSyncSize += vLen
247266

248-
if sw.swapCh != nil {
249-
sw.swapCh <- Swap{Old: old, New: new}
250-
}
251-
252267
// We have copied about 10MB
253268
if sw.curSyncSize >= SyncSize {
254269
sw.curSyncSize = 0
255-
err = sw.sync()
270+
err = sw.syncAndDelete()
256271
if err != nil {
257272
return err
258273
}
259274
}
260275
return nil
261276
}
262277

263-
func (sw *swapWorker) sync() error {
264-
log.Log("Syncing after %d objects migrated", sw.swapped)
265-
// Sync all the new keys to disk
266-
err := sw.store.Sync(sw.syncPrefix)
278+
func (sw *swapWorker) syncAndDelete() error {
279+
err := sw.sync()
267280
if err != nil {
268-
log.Error(err)
269281
return err
270282
}
271283

@@ -276,11 +288,13 @@ func (sw *swapWorker) sync() error {
276288
}
277289
}
278290
sw.toDelete = nil
291+
return nil
292+
}
279293

280-
// Sync again.
281-
err = sw.store.Sync(sw.syncPrefix)
294+
func (sw *swapWorker) sync() error {
295+
log.Log("Migration worker syncing after %d objects migrated", sw.swapped)
296+
err := sw.store.Sync(sw.syncPrefix)
282297
if err != nil {
283-
log.Error(err)
284298
return err
285299
}
286300
return nil

0 commit comments

Comments
 (0)