From 6ecec53305441a478911b47d3e3310f54b7d71c1 Mon Sep 17 00:00:00 2001 From: sneax Date: Tue, 2 Dec 2025 19:19:58 +0530 Subject: [PATCH] Implements dynamic programming cache for CID traversal in `ipfs dag stat` to avoid redundant traversals when multiple DAGs share common subgraphs. Resolves TODO in `core/commands/dag/stat.go:17-18`: > "cache every cid traversal in a dp cache. if the cid exists in the cache, don't traverse it, and use the cached result to compute the new state" - **Added `cidStatCache`** structure to memoize subtree statistics for each CID - **Replaced linear traversal** with recursive DP algorithm that: - Checks cache before traversing any node - Skips entire subtree traversal for cached CIDs - Computes and caches subtree stats (size + block count) for each node - **Maintains correct accounting** for TotalSize, SharedSize, and deduplication ratios - Added `TestDagStatCaching` with two test cases: - Cache consistency when querying duplicate CIDs - Correct deduplication stats with shared subgraph Signed-off-by: sneax --- core/commands/dag/stat.go | 119 ++++++++++++++++++++++++++++++-------- test/cli/dag_test.go | 61 +++++++++++++++++++ 2 files changed, 155 insertions(+), 25 deletions(-) diff --git a/core/commands/dag/stat.go b/core/commands/dag/stat.go index bb9be7e0d90..f1115aea978 100644 --- a/core/commands/dag/stat.go +++ b/core/commands/dag/stat.go @@ -6,7 +6,6 @@ import ( "os" mdag "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/merkledag/traverse" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" @@ -14,9 +13,34 @@ import ( "github.com/ipfs/kubo/core/commands/e" ) -// TODO cache every cid traversal in a dp cache -// if the cid exists in the cache, don't traverse it, and use the cached result -// to compute the new state +// cidStatCache caches the statistics for already-traversed CIDs to avoid +// redundant traversals when multiple DAGs share common subgraphs +type cidStatCache struct { + stats map[string]*cachedStat +} + +type cachedStat struct { + size uint64 + numBlocks int64 +} + +func newCidStatCache() *cidStatCache { + return &cidStatCache{ + stats: make(map[string]*cachedStat), + } +} + +func (c *cidStatCache) get(cid cid.Cid) (*cachedStat, bool) { + stat, ok := c.stats[cid.String()] + return stat, ok +} + +func (c *cidStatCache) put(cid cid.Cid, size uint64, numBlocks int64) { + c.stats[cid.String()] = &cachedStat{ + size: size, + numBlocks: numBlocks, + } +} func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { progressive := req.Options[progressOptionName].(bool) @@ -27,6 +51,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) nodeGetter := mdag.NewSession(req.Context, api.Dag()) cidSet := cid.NewSet() + cache := newCidStatCache() dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}} for _, a := range req.Arguments { p, err := cmdutils.PathOrCidPath(a) @@ -41,37 +66,81 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) return fmt.Errorf("cannot return size for anything other than a DAG with a root CID") } - obj, err := nodeGetter.Get(req.Context, rp.RootCid()) - if err != nil { - return err - } dagstats := &DagStat{Cid: rp.RootCid()} dagStatSummary.appendStats(dagstats) - err = traverse.Traverse(obj, traverse.Options{ - DAG: nodeGetter, - Order: traverse.DFSPre, - Func: func(current traverse.State) error { - currentNodeSize := uint64(len(current.Node.RawData())) - dagstats.Size += currentNodeSize - dagstats.NumBlocks++ - if !cidSet.Has(current.Node.Cid()) { - dagStatSummary.incrementTotalSize(currentNodeSize) + + // Use a custom recursive traversal with DP caching + var traverseWithCache func(c cid.Cid) (*cachedStat, error) + traverseWithCache = func(c cid.Cid) (*cachedStat, error) { + // Check cache first - this is the DP optimization + // If cached, just return the stats without updating global counters + if cached, ok := cache.get(c); ok { + // Still need to track redundant access + node, err := nodeGetter.Get(req.Context, c) + if err != nil { + return nil, err } - dagStatSummary.incrementRedundantSize(currentNodeSize) - cidSet.Add(current.Node.Cid()) + nodeSize := uint64(len(node.RawData())) + dagStatSummary.incrementRedundantSize(nodeSize) + cidSet.Add(c) + if progressive { if err := res.Emit(dagStatSummary); err != nil { - return err + return nil, err } } - return nil - }, - ErrFunc: nil, - SkipDuplicates: true, - }) + return cached, nil + } + + node, err := nodeGetter.Get(req.Context, c) + if err != nil { + return nil, err + } + + nodeSize := uint64(len(node.RawData())) + subtreeSize := nodeSize + subtreeBlocks := int64(1) + + // Update global tracking for this new node + if !cidSet.Has(c) { + dagStatSummary.incrementTotalSize(nodeSize) + } + dagStatSummary.incrementRedundantSize(nodeSize) + cidSet.Add(c) + + // Recursively compute stats for all children + for _, link := range node.Links() { + childStats, err := traverseWithCache(link.Cid) + if err != nil { + return nil, err + } + subtreeSize += childStats.size + subtreeBlocks += childStats.numBlocks + } + + // Cache this node's subtree stats + stat := &cachedStat{ + size: subtreeSize, + numBlocks: subtreeBlocks, + } + cache.put(c, subtreeSize, subtreeBlocks) + + if progressive { + if err := res.Emit(dagStatSummary); err != nil { + return nil, err + } + } + + return stat, nil + } + + rootStats, err := traverseWithCache(rp.RootCid()) if err != nil { return fmt.Errorf("error traversing DAG: %w", err) } + + dagstats.Size = rootStats.size + dagstats.NumBlocks = rootStats.numBlocks } dagStatSummary.UniqueBlocks = cidSet.Len() diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index f6758a71037..3773bda8500 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -302,3 +302,64 @@ func TestDagImportFastProvide(t *testing.T) { require.Contains(t, daemonLog, "fast-provide-root: skipped") }) } + +func TestDagStatCaching(t *testing.T) { + t.Parallel() + + t.Run("cache reuses stats for duplicate CIDs", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init().StartDaemon() + + // Import the fixture with shared subgraph + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid) + require.NoError(t, err) + + // Run dag stat on the same CID multiple times - cache should make it consistent + stat1 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid) + stat2 := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid) + + // Both should return identical results + assert.Equal(t, stat1.Stdout.Bytes(), stat2.Stdout.Bytes()) + + // Parse and verify the stats are correct + var data1, data2 Data + err = json.Unmarshal(stat1.Stdout.Bytes(), &data1) + require.NoError(t, err) + err = json.Unmarshal(stat2.Stdout.Bytes(), &data2) + require.NoError(t, err) + + assert.Equal(t, data1, data2) + assert.Equal(t, 53, data1.TotalSize) // node1 size (46) + child (7) + assert.Equal(t, 2, data1.DagStats[0].NumBlocks) + }) + + t.Run("cache works across multiple CIDs with shared subgraph", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init().StartDaemon() + + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid) + require.NoError(t, err) + + // When querying both nodes that share a child, the shared child should + // only be counted once in TotalSize but twice in redundant size + stat := node.RunIPFS("dag", "stat", "--progress=false", "--enc=json", node1Cid, node2Cid) + var data Data + err = json.Unmarshal(stat.Stdout.Bytes(), &data) + require.NoError(t, err) + + // TotalSize should be: node1(46) + node2(46) + shared_child(7) = 99 + assert.Equal(t, 99, data.TotalSize) + // SharedSize should be: shared_child(7) counted again = 7 + assert.Equal(t, 7, data.SharedSize) + // Ratio should be (99+7)/99 ≈ 1.0707 + expectedRatio := float64(99+7) / float64(99) + assert.Equal(t, testutils.FloatTruncate(expectedRatio, 4), testutils.FloatTruncate(data.Ratio, 4)) + assert.Equal(t, 3, data.UniqueBlocks) + }) +}