@@ -29,6 +29,7 @@ type Swap struct {
2929// CidSwapper reads all the keys in a datastore and replaces
3030// them with their raw multihash.
3131type CidSwapper struct {
32+ Prefix ds.Key // A prefix/namespace to limit the query.
3233 Store ds.Batching // the datastore to migrate.
3334 SwapCh chan Swap // a channel that gets notified for every swap
3435}
@@ -39,12 +40,10 @@ type CidSwapper struct {
3940// Run returns the total number of keys swapped.
4041// The SwapCh is closed at the end of the run.
4142func (cswap * CidSwapper ) Run () (uint64 , error ) {
42- if cswap .SwapCh != nil {
43- defer close (cswap .SwapCh )
44- }
4543 // Query all keys. We will loop all keys
4644 // and swap those that can be parsed as CIDv1.
4745 queryAll := query.Query {
46+ Prefix : cswap .Prefix .String (),
4847 KeysOnly : true ,
4948 }
5049
@@ -54,48 +53,46 @@ func (cswap *CidSwapper) Run() (uint64, error) {
5453 }
5554 defer results .Close ()
5655 resultsCh := results .Next ()
57-
58- var total uint64
59- var nErrors uint64
60- var wg sync.WaitGroup
61- wg .Add (NWorkers )
62- for i := 0 ; i < NWorkers ; i ++ {
63- go func () {
64- defer wg .Done ()
65- n , e := cswap .swapWorker (resultsCh )
66- atomic .AddUint64 (& total , n )
67- atomic .AddUint64 (& nErrors , e )
68- }()
56+ swapWorkerFunc := func () (uint64 , uint64 ) {
57+ return cswap .swapWorker (resultsCh )
6958 }
70- wg .Wait ()
71- if nErrors > 0 {
72- return total , errors .New ("errors happened during the migration. Consider running it again" )
73- }
74-
75- return total , nil
59+ return cswap .runWorkers (NWorkers , swapWorkerFunc )
7660}
7761
7862// Revert allows to undo any operations made by Run(). The given channel should
7963// receive Swap objects as they were sent by Run. It returns the number of
8064// swap operations performed.
8165func (cswap * CidSwapper ) Revert (unswapCh <- chan Swap ) (uint64 , error ) {
66+ swapWorkerFunc := func () (uint64 , uint64 ) {
67+ return cswap .unswapWorker (unswapCh )
68+ }
69+ // We only run 1 worker for revert. Migrations
70+ // many-cid-to-one-multihash mappings, but reverts can have the
71+ // opposite. The unswapWorker keeps a cache to handle that, but
72+ // this only works with a single worker. Otherwise we'd need
73+ // complex syncing, or delayed removal (increased datastore size).
74+ return cswap .runWorkers (1 , swapWorkerFunc )
75+ }
76+
77+ // Run workers launches several workers to run the given function which returns
78+ // number of swapped items and number of errors.
79+ func (cswap * CidSwapper ) runWorkers (nWorkers int , f func () (uint64 , uint64 )) (uint64 , error ) {
8280 var total uint64
8381 var nErrors uint64
8482 var wg sync.WaitGroup
85- wg .Add (NWorkers )
86- for i := 0 ; i < NWorkers ; i ++ {
83+ wg .Add (nWorkers )
84+ for i := 0 ; i < nWorkers ; i ++ {
8785 go func () {
8886 defer wg .Done ()
89- n , e := cswap . unswapWorker ( unswapCh )
87+ n , e := f ( )
9088 atomic .AddUint64 (& total , n )
9189 atomic .AddUint64 (& nErrors , e )
9290 }()
9391 }
9492 wg .Wait ()
9593 if nErrors > 0 {
96- return total , errors .New ("errors happened during the revert migration. Consider running it again" )
94+ return total , errors .New ("errors happened during the migration. Consider running it again" )
9795 }
98-
9996 return total , nil
10097}
10198
@@ -106,8 +103,9 @@ func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint
106103 var errored uint64
107104
108105 sw := & swapWorker {
109- store : cswap .Store ,
110- swapCh : cswap .SwapCh ,
106+ store : cswap .Store ,
107+ swapCh : cswap .SwapCh ,
108+ syncPrefix : cswap .Prefix ,
111109 }
112110
113111 // Process keys from the results channel
@@ -119,7 +117,7 @@ func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint
119117 }
120118
121119 oldKey := ds .NewKey (res .Key )
122- c , err := dsKeyToCid (oldKey )
120+ c , err := dsKeyToCid (ds . NewKey ( oldKey . BaseNamespace ())) // remove prefix
123121 if err != nil {
124122 // complain if we find anything that is not a CID but
125123 // leave it as it is.
@@ -132,7 +130,8 @@ func (cswap *CidSwapper) swapWorker(resultsCh <-chan query.Result) (uint64, uint
132130
133131 // Cid Version > 0
134132 mh := c .Hash ()
135- newKey := dshelp .MultihashToDsKey (mh )
133+ // /path/to/old/<cid> -> /path/to/old/<multihash>
134+ newKey := oldKey .Parent ().Child (dshelp .MultihashToDsKey (mh ))
136135 err = sw .swap (oldKey , newKey )
137136 if err != nil {
138137 log .Error ("swapping %s for %s: %s" , oldKey , newKey , err )
@@ -158,22 +157,51 @@ func (cswap *CidSwapper) unswapWorker(unswapCh <-chan Swap) (uint64, uint64) {
158157 var errored uint64
159158
160159 swker := & swapWorker {
161- store : cswap .Store ,
162- swapCh : cswap .SwapCh ,
160+ store : cswap .Store ,
161+ swapCh : cswap .SwapCh ,
162+ syncPrefix : cswap .Prefix ,
163163 }
164164
165+ // A map from multihash to Cid
166+ unswappedMap := make (map [ds.Key ]ds.Key )
167+
165168 // Process keys from the results channel
166169 for sw := range unswapCh {
167170 err := swker .swap (sw .New , sw .Old )
171+
172+ // Handle the case where a block had actually multiple CIDs
173+ // and we already deleted the multihash-addressed block. This
174+ // needs a manual swap from the CID we reverted to before.
168175 if err == ds .ErrNotFound {
169- log .Log ("could not revert %s->%s. Was it already reverted? Ignoring..." , sw .Old , sw .New )
176+ // Is it because we swapped it already?
177+ swappedTo , ok := unswappedMap [sw .New ]
178+ if ! ok {
179+ log .Error ("could not revert %s->%s. Could not find %s" , sw .Old , sw .New , sw .New )
180+ errored ++
181+ continue
182+ }
183+ swker .sync ()
184+ log .VLog (" - %s is duplicated under additional CIDs (%s). This is ok." , sw .New , sw .Old )
185+ v , err := swker .store .Get (swappedTo )
186+ if err != nil {
187+ log .Error ("could not get previously reverted value %s: %s" , swappedTo , err )
188+ errored ++
189+ continue
190+ }
191+ if err := swker .store .Put (sw .Old , v ); err != nil {
192+ log .Error (err )
193+ errored ++
194+ }
195+ swker .swapped ++
170196 continue
171197 }
172198 if err != nil {
173199 log .Error ("swapping %s for %s: %s" , sw .New , sw .Old , err )
174200 errored ++
175201 continue
176202 }
203+ // Remember that we switched certain multiash for a Cid already
204+ unswappedMap [sw .New ] = sw .Old
177205 }
178206
179207 // final sync
@@ -192,8 +220,9 @@ type swapWorker struct {
192220 swapped uint64
193221 curSyncSize uint64
194222
195- swapCh chan Swap
196- store ds.Batching
223+ swapCh chan Swap
224+ store ds.Batching
225+ syncPrefix ds.Key
197226
198227 toDelete []ds.Key
199228}
@@ -232,7 +261,7 @@ func (sw *swapWorker) swap(old, new ds.Key) error {
232261
233262func (sw * swapWorker ) sync () error {
234263 // Sync all the new keys to disk
235- err := sw .store .Sync (ds . NewKey ( "/" ) )
264+ err := sw .store .Sync (sw . syncPrefix )
236265 if err != nil {
237266 log .Error (err )
238267 return err
@@ -247,7 +276,7 @@ func (sw *swapWorker) sync() error {
247276 sw .toDelete = nil
248277
249278 // Sync again.
250- err = sw .store .Sync (ds . NewKey ( "/" ) )
279+ err = sw .store .Sync (sw . syncPrefix )
251280 if err != nil {
252281 log .Error (err )
253282 return err
0 commit comments