Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 94 additions & 25 deletions core/commands/dag/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,41 @@ import (
"os"

mdag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/traverse"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
"github.com/ipfs/kubo/core/commands/e"
)

// TODO cache every cid traversal in a dp cache
// if the cid exists in the cache, don't traverse it, and use the cached result
// to compute the new state
// cidStatCache caches the statistics for already-traversed CIDs to avoid
// redundant traversals when multiple DAGs share common subgraphs
type cidStatCache struct {
stats map[string]*cachedStat
}

type cachedStat struct {
size uint64
numBlocks int64
}

func newCidStatCache() *cidStatCache {
return &cidStatCache{
stats: make(map[string]*cachedStat),
}
}

func (c *cidStatCache) get(cid cid.Cid) (*cachedStat, bool) {
stat, ok := c.stats[cid.String()]
return stat, ok
}

func (c *cidStatCache) put(cid cid.Cid, size uint64, numBlocks int64) {
c.stats[cid.String()] = &cachedStat{
size: size,
numBlocks: numBlocks,
}
}

func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
progressive := req.Options[progressOptionName].(bool)
Expand All @@ -27,6 +51,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
nodeGetter := mdag.NewSession(req.Context, api.Dag())

cidSet := cid.NewSet()
cache := newCidStatCache()
dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}}
for _, a := range req.Arguments {
p, err := cmdutils.PathOrCidPath(a)
Expand All @@ -41,37 +66,81 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
return fmt.Errorf("cannot return size for anything other than a DAG with a root CID")
}

obj, err := nodeGetter.Get(req.Context, rp.RootCid())
if err != nil {
return err
}
dagstats := &DagStat{Cid: rp.RootCid()}
dagStatSummary.appendStats(dagstats)
err = traverse.Traverse(obj, traverse.Options{
DAG: nodeGetter,
Order: traverse.DFSPre,
Func: func(current traverse.State) error {
currentNodeSize := uint64(len(current.Node.RawData()))
dagstats.Size += currentNodeSize
dagstats.NumBlocks++
if !cidSet.Has(current.Node.Cid()) {
dagStatSummary.incrementTotalSize(currentNodeSize)

// Use a custom recursive traversal with DP caching
var traverseWithCache func(c cid.Cid) (*cachedStat, error)
traverseWithCache = func(c cid.Cid) (*cachedStat, error) {
// Check cache first - this is the DP optimization
// If cached, just return the stats without updating global counters
if cached, ok := cache.get(c); ok {
// Still need to track redundant access
node, err := nodeGetter.Get(req.Context, c)
if err != nil {
return nil, err
}
dagStatSummary.incrementRedundantSize(currentNodeSize)
cidSet.Add(current.Node.Cid())
nodeSize := uint64(len(node.RawData()))
dagStatSummary.incrementRedundantSize(nodeSize)
cidSet.Add(c)

if progressive {
if err := res.Emit(dagStatSummary); err != nil {
return err
return nil, err
}
}
return nil
},
ErrFunc: nil,
SkipDuplicates: true,
})
return cached, nil
}

node, err := nodeGetter.Get(req.Context, c)
if err != nil {
return nil, err
}

nodeSize := uint64(len(node.RawData()))
subtreeSize := nodeSize
subtreeBlocks := int64(1)

// Update global tracking for this new node
if !cidSet.Has(c) {
dagStatSummary.incrementTotalSize(nodeSize)
}
dagStatSummary.incrementRedundantSize(nodeSize)
cidSet.Add(c)

// Recursively compute stats for all children
for _, link := range node.Links() {
childStats, err := traverseWithCache(link.Cid)
if err != nil {
return nil, err
}
subtreeSize += childStats.size
subtreeBlocks += childStats.numBlocks
}

// Cache this node's subtree stats
stat := &cachedStat{
size: subtreeSize,
numBlocks: subtreeBlocks,
}
cache.put(c, subtreeSize, subtreeBlocks)

if progressive {
if err := res.Emit(dagStatSummary); err != nil {
return nil, err
}
}

return stat, nil
}

rootStats, err := traverseWithCache(rp.RootCid())
if err != nil {
return fmt.Errorf("error traversing DAG: %w", err)
}

dagstats.Size = rootStats.size
dagstats.NumBlocks = rootStats.numBlocks
}

dagStatSummary.UniqueBlocks = cidSet.Len()
Expand Down
61 changes: 61 additions & 0 deletions test/cli/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,64 @@ func TestDagImportFastProvide(t *testing.T) {
require.Contains(t, daemonLog, "fast-provide-root: skipped")
})
}

func TestDagStatCaching(t *testing.T) {
t.Parallel()

t.Run("cache reuses stats for duplicate CIDs", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()

// Import the fixture with shared subgraph
r, err := os.Open(fixtureFile)
require.NoError(t, err)
defer r.Close()
err = node.IPFSDagImport(r, fixtureCid)
require.NoError(t, err)

// Run dag stat on the same CID multiple times - cache should make it consistent
stat1 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid)
stat2 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid)

// Both should return identical results
assert.Equal(t, stat1.Stdout.Bytes(), stat2.Stdout.Bytes())

// Parse and verify the stats are correct
var data1, data2 Data
err = json.Unmarshal(stat1.Stdout.Bytes(), &data1)
require.NoError(t, err)
err = json.Unmarshal(stat2.Stdout.Bytes(), &data2)
require.NoError(t, err)

assert.Equal(t, data1, data2)
assert.Equal(t, 53, data1.TotalSize) // node1 size (46) + child (7)
assert.Equal(t, 2, data1.DagStats[0].NumBlocks)
})

t.Run("cache works across multiple CIDs with shared subgraph", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()

r, err := os.Open(fixtureFile)
require.NoError(t, err)
defer r.Close()
err = node.IPFSDagImport(r, fixtureCid)
require.NoError(t, err)

// When querying both nodes that share a child, the shared child should
// only be counted once in TotalSize but twice in redundant size
stat := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid, node2Cid)
var data Data
err = json.Unmarshal(stat.Stdout.Bytes(), &data)
require.NoError(t, err)

// TotalSize should be: node1(46) + node2(46) + shared_child(7) = 99
assert.Equal(t, 99, data.TotalSize)
// SharedSize should be: shared_child(7) counted again = 7
assert.Equal(t, 7, data.SharedSize)
// Ratio should be (99+7)/99 ≈ 1.0707
expectedRatio := float64(99+7) / float64(99)
assert.Equal(t, testutils.FloatTruncate(expectedRatio, 4), testutils.FloatTruncate(data.Ratio, 4))
assert.Equal(t, 3, data.UniqueBlocks)
})
}