Skip to content

Commit bc7f546

Browse files
committed
Add more comments and cleanup code
1 parent 2ea97fd commit bc7f546

File tree

2 files changed

+213
-136
lines changed

2 files changed

+213
-136
lines changed

fs-repo-11-to-12/migration/migration.go

Lines changed: 63 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,43 @@ import (
77
"bufio"
88
"context"
99
"fmt"
10-
"io"
1110
"os"
1211
"path/filepath"
12+
"sync"
1313

1414
log "github.com/ipfs/fs-repo-migrations/stump"
15-
format "github.com/ipfs/go-ipld-format"
1615

17-
ipfslite "github.com/hsanjuan/ipfs-lite"
1816
migrate "github.com/ipfs/fs-repo-migrations/go-migrate"
19-
lock "github.com/ipfs/fs-repo-migrations/ipfs-1-to-2/repolock"
2017
mfsr "github.com/ipfs/fs-repo-migrations/mfsr"
2118
cid "github.com/ipfs/go-cid"
2219
ds "github.com/ipfs/go-datastore"
2320
filestore "github.com/ipfs/go-filestore"
2421
dshelp "github.com/ipfs/go-ipfs-ds-help"
25-
ipfspinner "github.com/ipfs/go-ipfs-pinner"
26-
dspinner "github.com/ipfs/go-ipfs-pinner/dspinner"
2722
gc "github.com/ipfs/go-ipfs/gc"
28-
loader "github.com/ipfs/go-ipfs/plugin/loader"
29-
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
3023
)
3124

3225
const backupFile = "11-to-12-cids.txt"
3326

27+
// mfs root key path in the datastore
3428
var mfsRootKey = ds.NewKey("/local/filesroot")
3529

30+
// blocks prefix in the datastore
3631
var blocksPrefix = ds.NewKey("/blocks")
3732

33+
// filestore prefix in the datastore
3834
var filestorePrefix = filestore.FilestorePrefix
3935

36+
// keyspace that we will be migrating and that contains paths in the form
37+
// <prefix>/<cid>.
4038
var migrationPrefixes = []ds.Key{
4139
blocksPrefix,
4240
filestorePrefix,
4341
}
4442

4543
// Migration implements the migration described above.
4644
type Migration struct {
47-
plugins *loader.PluginLoader
48-
dstore ds.Batching
45+
loadPluginsOnce sync.Once
46+
dstore ds.Batching
4947
}
5048

5149
// Versions returns the current version string for this migration.
@@ -58,65 +56,12 @@ func (m *Migration) Reversible() bool {
5856
return true
5957
}
6058

61-
// lock the repo
62-
func (m *Migration) lock(opts migrate.Options) (io.Closer, error) {
63-
log.VLog("locking repo at %q", opts.Path)
64-
return lock.Lock2(opts.Path)
65-
}
66-
67-
func (m *Migration) setupPlugins(opts migrate.Options) error {
68-
if m.plugins != nil {
69-
return nil
70-
}
71-
72-
log.VLog(" - loading repo configurations")
73-
plugins, err := loader.NewPluginLoader(opts.Path)
74-
if err != nil {
75-
return fmt.Errorf("error loading plugins: %s", err)
76-
}
77-
m.plugins = plugins
78-
79-
if err := plugins.Initialize(); err != nil {
80-
return fmt.Errorf("error initializing plugins: %s", err)
81-
}
82-
83-
if err := plugins.Inject(); err != nil {
84-
return fmt.Errorf("error injecting plugins: %s", err)
85-
}
86-
87-
return nil
88-
}
89-
90-
// open the datastore
91-
func (m *Migration) open(opts migrate.Options) error {
92-
if err := m.setupPlugins(opts); err != nil {
93-
return err
94-
}
95-
96-
// assume already opened. We cannot initalize plugins twice.
97-
if m.dstore != nil {
98-
m.dstore.Close()
99-
}
100-
101-
cfg, err := fsrepo.ConfigAt(opts.Path)
102-
if err != nil {
103-
return err
104-
}
105-
106-
dsc, err := fsrepo.AnyDatastoreConfig(cfg.Datastore.Spec)
107-
if err != nil {
108-
return err
109-
}
110-
111-
dstore, err := dsc.Create(opts.Path)
112-
if err != nil {
113-
return err
114-
}
115-
m.dstore = dstore
116-
return nil
117-
}
118-
11959
// Apply runs the migration and writes a log file that can be used by Revert.
60+
// Steps:
61+
// - Open a raw datastore using go-ipfs settings
62+
// - Simulate the migration and save a backup log
63+
// - Run the migration by storing all CIDv1 addressed logs as raw-multihash
64+
// addressed.
12065
func (m *Migration) Apply(opts migrate.Options) error {
12166
log.Verbose = opts.Verbose
12267
log.Log("applying %s repo migration", m.Versions())
@@ -142,21 +87,7 @@ func (m *Migration) Apply(opts migrate.Options) error {
14287

14388
log.VLog(" - starting CIDv1 to raw multihash block migration")
14489

145-
// Prepare backing up of CIDs
146-
backupPath := filepath.Join(opts.Path, backupFile)
147-
log.VLog(" - backup file will be written to %s", backupPath)
148-
_, err = os.Stat(backupPath)
149-
if err != nil {
150-
if !os.IsNotExist(err) {
151-
log.Error(err)
152-
return err
153-
}
154-
} else { // backup file exists
155-
log.Log("WARN: backup file %s already exists. CIDs-Multihash pairs will be appended", backupPath)
156-
}
157-
158-
// If it exists, append to it.
159-
f, err := os.OpenFile(backupPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
90+
f, err := createBackupFile(opts.Path, backupFile)
16091
if err != nil {
16192
log.Error(err)
16293
return err
@@ -165,8 +96,9 @@ func (m *Migration) Apply(opts migrate.Options) error {
16596
buf := bufio.NewWriter(f)
16697

16798
swapCh := make(chan Swap, 1000)
168-
16999
writingDone := make(chan struct{})
100+
101+
// Write the old CIDv1-key of every swapped item to the buffer.
170102
go func() {
171103
for sw := range swapCh {
172104
// Only write the Old string (a CID). We can derive
@@ -176,7 +108,14 @@ func (m *Migration) Apply(opts migrate.Options) error {
176108
close(writingDone)
177109
}()
178110

179-
// Add all the keys to migrate to the backup file
111+
// DRY RUN: A swapper for each migration prefix and write everything
112+
// to the file.
113+
//
114+
// Reasoning: We do not want a broken or unwritten file if the
115+
// migration blows up. If it does blow up half-way through, and we
116+
// need to run it again, we will only append to this file. Having
117+
// potential duplicate entries in the backup file will not break
118+
// reverts.
180119
for _, prefix := range migrationPrefixes {
181120
log.VLog(" - Adding keys in prefix %s to backup file", prefix)
182121
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore, SwapCh: swapCh}
@@ -193,7 +132,7 @@ func (m *Migration) Apply(opts migrate.Options) error {
193132
<-writingDone
194133
buf.Flush()
195134

196-
// The backup file is ready. Run the migration.
135+
// MIGRATION: Run the real migration.
197136
for _, prefix := range migrationPrefixes {
198137
log.VLog(" - Migrating keys in prefix %s", prefix)
199138
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore}
@@ -205,6 +144,7 @@ func (m *Migration) Apply(opts migrate.Options) error {
205144
log.Log("%d CIDv1 keys in %s have been migrated", total, prefix)
206145
}
207146

147+
// Wrap up, we are now in repo-version 12.
208148
if err := repo.WriteVersion("12"); err != nil {
209149
log.Error("failed to write version file")
210150
return err
@@ -215,6 +155,12 @@ func (m *Migration) Apply(opts migrate.Options) error {
215155
}
216156

217157
// Revert attempts to undo the migration using the log file written by Apply.
158+
// Steps:
159+
// - Read the backup log and write all entries as a CIDv1-addressed block
160+
// - Do the same with the MFS root
161+
// - Do the same with all the CidV1 blocks recursively referenced in the pinset
162+
//
163+
// Revert does not delete blocks that are reverted so cover some corner cases.
218164
func (m *Migration) Revert(opts migrate.Options) error {
219165
log.Verbose = opts.Verbose
220166
log.Log("reverting %s repo migration", m.Versions())
@@ -241,8 +187,7 @@ func (m *Migration) Revert(opts migrate.Options) error {
241187

242188
// Open revert path for reading
243189
backupPath := filepath.Join(opts.Path, backupFile)
244-
log.VLog(" - backup file will be read from %s", backupPath)
245-
f, err := os.Open(backupPath)
190+
f, err := getBackupFile(backupPath)
246191
if err != nil {
247192
log.Error(err)
248193
return err
@@ -252,14 +197,21 @@ func (m *Migration) Revert(opts migrate.Options) error {
252197
scanner := bufio.NewScanner(f)
253198
var scannerErr error
254199

200+
// This will send swap objects to the Unswapper on unswapCh as they
201+
// are read from the backup file on disk. It will also send MFS and
202+
// pinset pins for reversal.
255203
go func() {
256204
defer close(unswapCh)
257205

206+
// Process backup file first.
258207
for scanner.Scan() {
259208
line := scanner.Text()
260209
if len(line) == 0 {
261210
continue
262211
}
212+
// The backup file only contains the original cidv1
213+
// path. Do the path massaging magic to figure out the
214+
// actual Cidv1 and derived Cidv0-path (current key).
263215
cidPath := ds.NewKey(line)
264216
cidKey := ds.NewKey(cidPath.BaseNamespace())
265217
prefix := cidPath.Parent()
@@ -281,8 +233,9 @@ func (m *Migration) Revert(opts migrate.Options) error {
281233
return
282234
}
283235

284-
// Ensure all pins/MFS which may have happened post-migration
285-
// are reverted.
236+
// Process MFS/pinset. We have to do this in cases the user
237+
// has been running with the migration for some time and made changes to
238+
// the pinset or the MFS root.
286239
if err := walkPinsAndMFS(unswapCh, m.dstore); err != nil {
287240
log.Error(err)
288241
return
@@ -291,7 +244,7 @@ func (m *Migration) Revert(opts migrate.Options) error {
291244
}()
292245

293246
// The backup file contains prefixed keys, so we do not need to set
294-
// them.
247+
// Prefix in the CidSwapper.
295248
cidSwapper := CidSwapper{Store: m.dstore}
296249
total, err := cidSwapper.Revert(unswapCh)
297250
if err != nil {
@@ -304,6 +257,7 @@ func (m *Migration) Revert(opts migrate.Options) error {
304257
return err
305258
}
306259

260+
// Wrap up the Revert. We are back at version 11.
307261
log.Log("%d multihashes reverted to CidV1s", total)
308262
if err := repo.WriteVersion("11"); err != nil {
309263
log.Error("failed to write version file")
@@ -316,6 +270,8 @@ func (m *Migration) Revert(opts migrate.Options) error {
316270
log.Error("could not close backup file")
317271
return err
318272
}
273+
274+
// Move the backup file out of the way.
319275
err = os.Rename(backupPath, backupPath+".reverted")
320276
if err != nil {
321277
log.Error("could not rename the backup file, but migration worked: %s", err)
@@ -324,52 +280,23 @@ func (m *Migration) Revert(opts migrate.Options) error {
324280
return nil
325281
}
326282

327-
func getPinner(ctx context.Context, dstore ds.Batching) (ipfspinner.Pinner, format.DAGService, error) {
328-
// Wrapping a datastore all the way up to a DagService.
329-
// This is the shortest way.
330-
dags, err := ipfslite.New(
331-
ctx,
332-
dstore,
333-
nil,
334-
nil,
335-
&ipfslite.Config{
336-
Offline: true,
337-
},
338-
)
339-
if err != nil {
340-
return nil, nil, err
341-
}
342-
343-
// Get a pinner.
344-
pinner, err := dspinner.New(ctx, dstore, dags)
345-
if err != nil {
346-
return nil, nil, err
347-
}
348-
return pinner, dags, nil
349-
}
350-
351-
func getMFSRoot(dstore ds.Batching) (cid.Cid, error) {
352-
// Find the MFS root.
353-
mfsRoot, err := dstore.Get(mfsRootKey)
354-
if err != nil {
355-
return cid.Undef, err
356-
}
357-
c, err := cid.Cast(mfsRoot)
358-
if err != nil {
359-
log.Error(err)
360-
return cid.Undef, err
361-
}
362-
return c, nil
363-
}
364-
283+
// sends all pins in the pinset and the MFS root to the unswap channel when
284+
// they are CidV1s. This is used during revert. Pins may have been added or
285+
// MFS root changed at some point between the migration and the revert. If we
286+
// do not revert those CIDv1s, we might find that go-ipfs does not know
287+
// anymore how to find those blocks (they should be reverted and addressed as
288+
// CIDv1 in the blockstore).
289+
//
290+
// In the best case, most of those blocks will already be stored correctly and
291+
// the revert can swiftly do nothing.
365292
func walkPinsAndMFS(unswapCh chan Swap, dstore ds.Batching) error {
366-
// The easiest way to get a dag service that we can use with the
367-
// pinner on top of the datastore we opened.
368293
ctx, cancel := context.WithCancel(context.Background())
369294
defer cancel()
370295

371296
var bestEffortRoots []cid.Cid
372297

298+
// There should always be an MFS root. We add it to the list of things
299+
// to revert.
373300
mfsRoot, err := getMFSRoot(dstore)
374301
if err == ds.ErrNotFound {
375302
log.Error("empty MFS root")
@@ -382,7 +309,7 @@ func walkPinsAndMFS(unswapCh chan Swap, dstore ds.Batching) error {
382309
log.Log("MFS Root: %s\n", mfsRoot)
383310
bestEffortRoots = append(bestEffortRoots, mfsRoot)
384311

385-
// Get a pinner.
312+
// Get a pinner so that we can recursively list all pins.
386313
pinner, dags, err := getPinner(ctx, dstore)
387314
if err != nil {
388315
return err
@@ -397,15 +324,15 @@ func walkPinsAndMFS(unswapCh chan Swap, dstore ds.Batching) error {
397324
}
398325
}()
399326

400-
// Obtain the total set of CIDs that we need to make sure are
401-
// not
327+
// This GC method returns adds the things that cannot be GC'ed (so all
328+
// the things that are pinned).
402329
gcs, err := gc.ColoredSet(ctx, pinner, dags, bestEffortRoots, output)
403330
if err != nil {
404331
return err
405332
}
406333

407-
// We have everything. We send unswap requests
408-
// for all these blocks.
334+
// We have everything. We send unswap requests for all these blocks,
335+
// when they are CIDv1s.
409336
err = gcs.ForEach(func(c cid.Cid) error {
410337
// CidV0s are always fine. We do not need to unswap them.
411338
if c.Version() == 0 {

0 commit comments

Comments
 (0)