Skip to content

Commit c7eda21

Browse files
authored
test: verifyWorkerRun and helptext (#11063)
1 parent 798b889 commit c7eda21

File tree

4 files changed

+1011
-24
lines changed

4 files changed

+1011
-24
lines changed

core/commands/repo.go

Lines changed: 253 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,22 @@ import (
55
"errors"
66
"fmt"
77
"io"
8-
"os"
98
"runtime"
109
"strings"
1110
"sync"
1211
"text/tabwriter"
12+
"time"
1313

1414
oldcmds "github.com/ipfs/kubo/commands"
1515
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
16+
coreiface "github.com/ipfs/kubo/core/coreiface"
1617
corerepo "github.com/ipfs/kubo/core/corerepo"
1718
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
1819
"github.com/ipfs/kubo/repo/fsrepo/migrations"
1920

2021
humanize "github.com/dustin/go-humanize"
2122
bstore "github.com/ipfs/boxo/blockstore"
23+
"github.com/ipfs/boxo/path"
2224
cid "github.com/ipfs/go-cid"
2325
cmds "github.com/ipfs/go-ipfs-cmds"
2426
)
@@ -226,45 +228,137 @@ Version string The repo version.
226228
},
227229
}
228230

231+
// VerifyProgress reports verification progress to the user.
232+
// It contains either a message about a corrupt block or a progress counter.
229233
type VerifyProgress struct {
230-
Msg string
231-
Progress int
234+
Msg string // Message about a corrupt/healed block (empty for valid blocks)
235+
Progress int // Number of blocks processed so far
232236
}
233237

234-
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
238+
// verifyState represents the state of a block after verification.
239+
// States track both the verification result and any remediation actions taken.
240+
type verifyState int
241+
242+
const (
243+
verifyStateValid verifyState = iota // Block is valid and uncorrupted
244+
verifyStateCorrupt // Block is corrupt, no action taken
245+
verifyStateCorruptRemoved // Block was corrupt and successfully removed
246+
verifyStateCorruptRemoveFailed // Block was corrupt but removal failed
247+
verifyStateCorruptHealed // Block was corrupt, removed, and successfully re-fetched
248+
verifyStateCorruptHealFailed // Block was corrupt and removed, but re-fetching failed
249+
)
250+
251+
const (
252+
// verifyWorkerMultiplier determines worker pool size relative to CPU count.
253+
// Since block verification is I/O-bound (disk reads + potential network fetches),
254+
// we use more workers than CPU cores to maximize throughput.
255+
verifyWorkerMultiplier = 2
256+
)
257+
258+
// verifyResult contains the outcome of verifying a single block.
259+
// It includes the block's CID, its verification state, and an optional
260+
// human-readable message describing what happened.
261+
type verifyResult struct {
262+
cid cid.Cid // CID of the block that was verified
263+
state verifyState // Final state after verification and any remediation
264+
msg string // Human-readable message (empty for valid blocks)
265+
}
266+
267+
// verifyWorkerRun processes CIDs from the keys channel, verifying their integrity.
268+
// If shouldDrop is true, corrupt blocks are removed from the blockstore.
269+
// If shouldHeal is true (implies shouldDrop), removed blocks are re-fetched from the network.
270+
// The api parameter must be non-nil when shouldHeal is true.
271+
// healTimeout specifies the maximum time to wait for each block heal (0 = no timeout).
272+
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- *verifyResult, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) {
235273
defer wg.Done()
236274

275+
sendResult := func(r *verifyResult) bool {
276+
select {
277+
case results <- r:
278+
return true
279+
case <-ctx.Done():
280+
return false
281+
}
282+
}
283+
237284
for k := range keys {
238285
_, err := bs.Get(ctx, k)
239286
if err != nil {
240-
select {
241-
case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
242-
case <-ctx.Done():
243-
return
287+
// Block is corrupt
288+
result := &verifyResult{cid: k, state: verifyStateCorrupt}
289+
290+
if !shouldDrop {
291+
result.msg = fmt.Sprintf("block %s was corrupt (%s)", k, err)
292+
if !sendResult(result) {
293+
return
294+
}
295+
continue
296+
}
297+
298+
// Try to delete
299+
if delErr := bs.DeleteBlock(ctx, k); delErr != nil {
300+
result.state = verifyStateCorruptRemoveFailed
301+
result.msg = fmt.Sprintf("block %s was corrupt (%s), failed to remove (%s)", k, err, delErr)
302+
if !sendResult(result) {
303+
return
304+
}
305+
continue
306+
}
307+
308+
if !shouldHeal {
309+
result.state = verifyStateCorruptRemoved
310+
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed", k, err)
311+
if !sendResult(result) {
312+
return
313+
}
314+
continue
244315
}
245316

317+
// Try to heal by re-fetching from network (api is guaranteed non-nil here)
318+
healCtx := ctx
319+
var healCancel context.CancelFunc
320+
if healTimeout > 0 {
321+
healCtx, healCancel = context.WithTimeout(ctx, healTimeout)
322+
}
323+
324+
if _, healErr := api.Block().Get(healCtx, path.FromCid(k)); healErr != nil {
325+
result.state = verifyStateCorruptHealFailed
326+
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, failed to heal (%s)", k, err, healErr)
327+
} else {
328+
result.state = verifyStateCorruptHealed
329+
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, healed", k, err)
330+
}
331+
332+
if healCancel != nil {
333+
healCancel()
334+
}
335+
336+
if !sendResult(result) {
337+
return
338+
}
246339
continue
247340
}
248341

249-
select {
250-
case results <- "":
251-
case <-ctx.Done():
342+
// Block is valid
343+
if !sendResult(&verifyResult{cid: k, state: verifyStateValid}) {
252344
return
253345
}
254346
}
255347
}
256348

257-
func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
258-
results := make(chan string)
349+
// verifyResultChan creates a channel of verification results by spawning multiple worker goroutines
350+
// to process blocks in parallel. It returns immediately with a channel that will receive results.
351+
func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) <-chan *verifyResult {
352+
results := make(chan *verifyResult)
259353

260354
go func() {
261355
defer close(results)
262356

263357
var wg sync.WaitGroup
264358

265-
for i := 0; i < runtime.NumCPU()*2; i++ {
359+
for i := 0; i < runtime.NumCPU()*verifyWorkerMultiplier; i++ {
266360
wg.Add(1)
267-
go verifyWorkerRun(ctx, &wg, keys, results, bs)
361+
go verifyWorkerRun(ctx, &wg, keys, results, bs, api, shouldDrop, shouldHeal, healTimeout)
268362
}
269363

270364
wg.Wait()
@@ -276,13 +370,84 @@ func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blocks
276370
var repoVerifyCmd = &cmds.Command{
277371
Helptext: cmds.HelpText{
278372
Tagline: "Verify all blocks in repo are not corrupted.",
373+
ShortDescription: `
374+
'ipfs repo verify' checks integrity of all blocks in the local datastore.
375+
Each block is read and validated against its CID to ensure data integrity.
376+
377+
Without any flags, this is a SAFE, read-only check that only reports corrupt
378+
blocks without modifying the repository. This can be used as a "dry run" to
379+
preview what --drop or --heal would do.
380+
381+
Use --drop to remove corrupt blocks, or --heal to remove and re-fetch from
382+
the network.
383+
384+
Examples:
385+
ipfs repo verify # safe read-only check, reports corrupt blocks
386+
ipfs repo verify --drop # remove corrupt blocks
387+
ipfs repo verify --heal # remove and re-fetch corrupt blocks
388+
389+
Exit Codes:
390+
0: All blocks are valid, OR all corrupt blocks were successfully remediated
391+
(with --drop or --heal)
392+
1: Corrupt blocks detected (without flags), OR remediation failed (block
393+
removal or healing failed with --drop or --heal)
394+
395+
Note: --heal requires the daemon to be running in online mode with network
396+
connectivity to nodes that have the missing blocks. Make sure the daemon is
397+
online and connected to other peers. Healing will attempt to re-fetch each
398+
corrupt block from the network after removing it. If a block cannot be found
399+
on the network, it will remain deleted.
400+
401+
WARNING: Both --drop and --heal are DESTRUCTIVE operations that permanently
402+
delete corrupt blocks from your repository. Once deleted, blocks cannot be
403+
recovered unless --heal successfully fetches them from the network. Blocks
404+
that cannot be healed will remain permanently deleted. Always backup your
405+
repository before using these options.
406+
`,
407+
},
408+
Options: []cmds.Option{
409+
cmds.BoolOption("drop", "Remove corrupt blocks from datastore (destructive operation)."),
410+
cmds.BoolOption("heal", "Remove corrupt blocks and re-fetch from network (destructive operation, implies --drop)."),
411+
cmds.StringOption("heal-timeout", "Maximum time to wait for each block heal (e.g., \"30s\"). Only applies with --heal.").WithDefault("30s"),
279412
},
280413
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
281414
nd, err := cmdenv.GetNode(env)
282415
if err != nil {
283416
return err
284417
}
285418

419+
drop, _ := req.Options["drop"].(bool)
420+
heal, _ := req.Options["heal"].(bool)
421+
422+
if heal {
423+
drop = true // heal implies drop
424+
}
425+
426+
// Parse and validate heal-timeout
427+
timeoutStr, _ := req.Options["heal-timeout"].(string)
428+
healTimeout, err := time.ParseDuration(timeoutStr)
429+
if err != nil {
430+
return fmt.Errorf("invalid heal-timeout: %w", err)
431+
}
432+
if healTimeout < 0 {
433+
return errors.New("heal-timeout must be >= 0")
434+
}
435+
436+
// Check online mode and API availability for healing operation
437+
var api coreiface.CoreAPI
438+
if heal {
439+
if !nd.IsOnline {
440+
return ErrNotOnline
441+
}
442+
api, err = cmdenv.GetApi(env, req)
443+
if err != nil {
444+
return err
445+
}
446+
if api == nil {
447+
return fmt.Errorf("healing requested but API is not available - make sure daemon is online and connected to other peers")
448+
}
449+
}
450+
286451
bs := &bstore.ValidatingBlockstore{Blockstore: bstore.NewBlockstore(nd.Repo.Datastore())}
287452

288453
keys, err := bs.AllKeysChan(req.Context)
@@ -291,17 +456,47 @@ var repoVerifyCmd = &cmds.Command{
291456
return err
292457
}
293458

294-
results := verifyResultChan(req.Context, keys, bs)
459+
results := verifyResultChan(req.Context, keys, bs, api, drop, heal, healTimeout)
295460

296-
var fails int
461+
// Track statistics for each type of outcome
462+
var corrupted, removed, removeFailed, healed, healFailed int
297463
var i int
298-
for msg := range results {
299-
if msg != "" {
300-
if err := res.Emit(&VerifyProgress{Msg: msg}); err != nil {
464+
465+
for result := range results {
466+
// Update counters based on the block's final state
467+
switch result.state {
468+
case verifyStateCorrupt:
469+
// Block is corrupt but no action was taken (--drop not specified)
470+
corrupted++
471+
case verifyStateCorruptRemoved:
472+
// Block was corrupt and successfully removed (--drop specified)
473+
corrupted++
474+
removed++
475+
case verifyStateCorruptRemoveFailed:
476+
// Block was corrupt but couldn't be removed
477+
corrupted++
478+
removeFailed++
479+
case verifyStateCorruptHealed:
480+
// Block was corrupt, removed, and successfully re-fetched (--heal specified)
481+
corrupted++
482+
removed++
483+
healed++
484+
case verifyStateCorruptHealFailed:
485+
// Block was corrupt and removed, but re-fetching failed
486+
corrupted++
487+
removed++
488+
healFailed++
489+
default:
490+
// verifyStateValid blocks are not counted (they're the expected case)
491+
}
492+
493+
// Emit progress message for corrupt blocks
494+
if result.state != verifyStateValid && result.msg != "" {
495+
if err := res.Emit(&VerifyProgress{Msg: result.msg}); err != nil {
301496
return err
302497
}
303-
fails++
304498
}
499+
305500
i++
306501
if err := res.Emit(&VerifyProgress{Progress: i}); err != nil {
307502
return err
@@ -312,8 +507,42 @@ var repoVerifyCmd = &cmds.Command{
312507
return err
313508
}
314509

315-
if fails != 0 {
316-
return errors.New("verify complete, some blocks were corrupt")
510+
if corrupted > 0 {
511+
// Build a summary of what happened with corrupt blocks
512+
summary := fmt.Sprintf("verify complete, %d blocks corrupt", corrupted)
513+
if removed > 0 {
514+
summary += fmt.Sprintf(", %d removed", removed)
515+
}
516+
if removeFailed > 0 {
517+
summary += fmt.Sprintf(", %d failed to remove", removeFailed)
518+
}
519+
if healed > 0 {
520+
summary += fmt.Sprintf(", %d healed", healed)
521+
}
522+
if healFailed > 0 {
523+
summary += fmt.Sprintf(", %d failed to heal", healFailed)
524+
}
525+
526+
// Determine success/failure based on operation mode
527+
shouldFail := false
528+
529+
if !drop {
530+
// Detection-only mode: always fail if corruption found
531+
shouldFail = true
532+
} else if heal {
533+
// Heal mode: fail if any removal or heal failed
534+
shouldFail = (removeFailed > 0 || healFailed > 0)
535+
} else {
536+
// Drop mode: fail if any removal failed
537+
shouldFail = (removeFailed > 0)
538+
}
539+
540+
if shouldFail {
541+
return errors.New(summary)
542+
}
543+
544+
// Success: emit summary as a message instead of error
545+
return res.Emit(&VerifyProgress{Msg: summary})
317546
}
318547

319548
return res.Emit(&VerifyProgress{Msg: "verify complete, all blocks validated."})
@@ -322,7 +551,7 @@ var repoVerifyCmd = &cmds.Command{
322551
Encoders: cmds.EncoderMap{
323552
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *VerifyProgress) error {
324553
if strings.Contains(obj.Msg, "was corrupt") {
325-
fmt.Fprintln(os.Stdout, obj.Msg)
554+
fmt.Fprintln(w, obj.Msg)
326555
return nil
327556
}
328557

0 commit comments

Comments
 (0)