Skip to content

Commit 0842ad2

Browse files
authored
Merge pull request #1071 from vlerdman/feat/add-size-progress-tracker
feat(ipld/merkledag): add total size of visited nodes in progress tracker
2 parents 23312e7 + 2e08ae7 commit 0842ad2

File tree

3 files changed

+57
-18
lines changed

3 files changed

+57
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The following emojis are used to highlight certain changes:
1818

1919
- `routing/http`: `GET /routing/v1/dht/closest/peers/{key}` per [IPIP-476](https://github.com/ipfs/specs/pull/476)
2020
- upgrade to `go-libp2p-kad-dht` [v0.36.0](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.36.0)
21+
- `ipld/merkledag`: Added fetched node size reporting to the progress tracker. See [kubo#8915](https://github.com/ipfs/kubo/issues/8915)
2122

2223
### Changed
2324

ipld/merkledag/merkledag.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,20 @@ func GetLinksDirect(serv format.NodeGetter) GetLinks {
132132
}
133133
}
134134

135+
// GetLinksDirectWithProgressTracker creates a function as GetLinksDirect, but
136+
// updates the ProgressTracker with the raw block data size of the retrieved node.
137+
func GetLinksDirectWithProgressTracker(serv format.NodeGetter, tracker *ProgressTracker) GetLinks {
138+
return func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
139+
nd, err := serv.Get(ctx, c)
140+
if err != nil {
141+
return nil, err
142+
}
143+
// We don't use Size() as it returns cumulative size including linked nodes.
144+
tracker.Update(uint64(len(nd.RawData())))
145+
return nd.Links(), nil
146+
}
147+
}
148+
135149
type sesGetter struct {
136150
bs *bserv.Session
137151
decoder *legacy.Decoder
@@ -208,20 +222,13 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
208222
// We default to Concurrent() walk.
209223
opts = append([]WalkOption{Concurrent()}, opts...)
210224

211-
// If we have a ProgressTracker, we wrap the visit function to handle it.
225+
// If we have a ProgressTracker, we wrap the get links function to handle it.
212226
v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
213227
if v == nil {
214228
return WalkDepth(ctx, GetLinksDirect(ng), root, visit, opts...)
215229
}
216230

217-
visitProgress := func(c cid.Cid, depth int) bool {
218-
if visit(c, depth) {
219-
v.Increment()
220-
return true
221-
}
222-
return false
223-
}
224-
return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, opts...)
231+
return WalkDepth(ctx, GetLinksDirectWithProgressTracker(ng, v), root, visit, opts...)
225232
}
226233

227234
// GetMany gets many nodes from the DAG at once.
@@ -457,10 +464,18 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d
457464
return nil
458465
}
459466

467+
// ProgressStat represents the progress of a fetch operation.
468+
type ProgressStat struct {
469+
// Nodes is the total number of nodes fetched.
470+
Nodes int
471+
// Bytes is the total bytes of raw block data.
472+
Bytes uint64
473+
}
474+
460475
// ProgressTracker is used to show progress when fetching nodes.
461476
type ProgressTracker struct {
462-
Total int
463-
lk sync.Mutex
477+
stat ProgressStat
478+
lk sync.Mutex
464479
}
465480

466481
// DeriveContext returns a new context with value "progress" derived from the
@@ -469,18 +484,26 @@ func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
469484
return context.WithValue(ctx, progressContextKey, p)
470485
}
471486

472-
// Increment adds one to the total progress.
473-
func (p *ProgressTracker) Increment() {
487+
// Update adds one to the total nodes and updates the total bytes.
488+
func (p *ProgressTracker) Update(bytes uint64) {
474489
p.lk.Lock()
475490
defer p.lk.Unlock()
476-
p.Total++
491+
p.stat.Nodes++
492+
p.stat.Bytes += bytes
477493
}
478494

479495
// Value returns the current progress.
480496
func (p *ProgressTracker) Value() int {
481497
p.lk.Lock()
482498
defer p.lk.Unlock()
483-
return p.Total
499+
return p.stat.Nodes
500+
}
501+
502+
// ProgressStat returns the current progress stat.
503+
func (p *ProgressTracker) ProgressStat() ProgressStat {
504+
p.lk.Lock()
505+
defer p.lk.Unlock()
506+
return p.stat
484507
}
485508

486509
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error {

ipld/merkledag/merkledag_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,7 @@ func TestProgressIndicatorNoChildren(t *testing.T) {
11611161
func testProgressIndicator(t *testing.T, depth int) {
11621162
ds := dstest.Mock()
11631163

1164-
top, numChildren := mkDag(ds, depth)
1164+
top, numChildren, totalSize := mkDag(ds, depth)
11651165

11661166
v := new(ProgressTracker)
11671167
ctx := v.DeriveContext(context.Background())
@@ -1175,9 +1175,19 @@ func testProgressIndicator(t *testing.T, depth int) {
11751175
t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d",
11761176
numChildren+1, v.Value())
11771177
}
1178+
1179+
if v.ProgressStat().Nodes != numChildren+1 {
1180+
t.Errorf("wrong number of children reported in progress stat indicator, expected %d, got %d",
1181+
numChildren+1, v.ProgressStat().Nodes)
1182+
}
1183+
1184+
if v.ProgressStat().Bytes != totalSize {
1185+
t.Errorf("wrong bytes reported in progress stat indicator, expected %d, got %d",
1186+
totalSize, v.ProgressStat().Bytes)
1187+
}
11781188
}
11791189

1180-
func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int) {
1190+
func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int, uint64) {
11811191
ctx := context.Background()
11821192

11831193
totalChildren := 0
@@ -1213,7 +1223,12 @@ func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int) {
12131223
panic(err)
12141224
}
12151225

1216-
return nd.Cid(), totalChildren
1226+
totalSize, err := nd.Size()
1227+
if err != nil {
1228+
panic(err)
1229+
}
1230+
1231+
return nd.Cid(), totalChildren, totalSize
12171232
}
12181233

12191234
func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode {

0 commit comments

Comments
 (0)