Skip to content

Commit fc5b515

Browse files
Add read leaf nodes from checkpoint file for Archive node use (#4040)
* add read leaf nodes * Add tests to leaf node reader * chnages per suggestions * correct lint * lint * lint * make scatch internally * Update ledger/complete/wal/checkpoint_v6_test.go Co-authored-by: Leo Zhang <[email protected]> * fix issues * fix timeout test * goimports --------- Co-authored-by: Leo Zhang (zhangchiqing) <[email protected]>
1 parent bbb57b6 commit fc5b515

File tree

3 files changed

+277
-32
lines changed

3 files changed

+277
-32
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package wal
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/rs/zerolog"
8+
9+
"github.com/onflow/flow-go/ledger"
10+
"github.com/onflow/flow-go/ledger/common/hash"
11+
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
12+
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
13+
)
14+
15+
type LeafNode struct {
16+
Hash hash.Hash
17+
Path ledger.Path
18+
Payload *ledger.Payload
19+
}
20+
21+
type LeafNodeResult struct {
22+
LeafNode *LeafNode
23+
Err error
24+
}
25+
26+
func nodeToLeaf(leaf *node.Node) *LeafNode {
27+
return &LeafNode{
28+
Hash: leaf.Hash(),
29+
Path: *leaf.Path(),
30+
Payload: leaf.Payload(),
31+
}
32+
}
33+
34+
func OpenAndReadLeafNodesFromCheckpointV6(dir string, fileName string, logger *zerolog.Logger) (
35+
allLeafNodesCh <-chan LeafNodeResult, errToReturn error) {
36+
37+
filepath := filePathCheckpointHeader(dir, fileName)
38+
39+
f, err := os.Open(filepath)
40+
if err != nil {
41+
return nil, fmt.Errorf("could not open file %v: %w", filepath, err)
42+
}
43+
defer func(file *os.File) {
44+
errToReturn = closeAndMergeError(file, errToReturn)
45+
}(f)
46+
47+
subtrieChecksums, _, err := readCheckpointHeader(filepath, logger)
48+
if err != nil {
49+
return nil, fmt.Errorf("could not read header: %w", err)
50+
}
51+
52+
// ensure all checkpoint part file exists, might return os.ErrNotExist error
53+
// if a file is missing
54+
err = allPartFileExist(dir, fileName, len(subtrieChecksums))
55+
if err != nil {
56+
return nil, fmt.Errorf("fail to check all checkpoint part file exist: %w", err)
57+
}
58+
59+
bufSize := 1000
60+
leafNodesCh := make(chan LeafNodeResult, bufSize)
61+
allLeafNodesCh = leafNodesCh
62+
defer func() {
63+
close(leafNodesCh)
64+
}()
65+
66+
// push leaf nodes to allLeafNodesCh
67+
for i, checksum := range subtrieChecksums {
68+
readCheckpointSubTrieLeafNodes(leafNodesCh, dir, fileName, i, checksum, logger)
69+
}
70+
71+
return allLeafNodesCh, nil
72+
}
73+
74+
func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- LeafNodeResult, dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) {
75+
err := processCheckpointSubTrie(dir, fileName, index, checksum, logger,
76+
func(reader *Crc32Reader, nodesCount uint64) error {
77+
scratch := make([]byte, 1024*4) // must not be less than 1024
78+
79+
logging := logProgress(fmt.Sprintf("reading %v-th sub trie roots", index), int(nodesCount), logger)
80+
dummyChild := &node.Node{}
81+
for i := uint64(1); i <= nodesCount; i++ {
82+
node, err := flattener.ReadNode(reader, scratch, func(nodeIndex uint64) (*node.Node, error) {
83+
if nodeIndex >= i {
84+
return nil, fmt.Errorf("sequence of serialized nodes does not satisfy Descendents-First-Relationship")
85+
}
86+
return dummyChild, nil
87+
})
88+
if err != nil {
89+
return fmt.Errorf("cannot read node %d: %w", i, err)
90+
}
91+
if node.IsLeaf() {
92+
leafNodesCh <- LeafNodeResult{
93+
LeafNode: nodeToLeaf(node),
94+
Err: nil,
95+
}
96+
}
97+
98+
logging(i)
99+
}
100+
return nil
101+
})
102+
103+
if err != nil {
104+
leafNodesCh <- LeafNodeResult{
105+
LeafNode: nil,
106+
Err: err,
107+
}
108+
}
109+
}

ledger/complete/wal/checkpoint_v6_reader.go

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -325,22 +325,65 @@ func readSubTriesConcurrently(dir string, fileName string, subtrieChecksums []ui
325325
return nodesGroups, nil
326326
}
327327

328+
func readCheckpointSubTrie(dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) (
329+
[]*node.Node,
330+
error,
331+
) {
332+
var nodes []*node.Node
333+
err := processCheckpointSubTrie(dir, fileName, index, checksum, logger,
334+
func(reader *Crc32Reader, nodesCount uint64) error {
335+
scratch := make([]byte, 1024*4) // must not be less than 1024
336+
337+
nodes = make([]*node.Node, nodesCount+1) //+1 for 0 index meaning nil
338+
logging := logProgress(fmt.Sprintf("reading %v-th sub trie roots", index), int(nodesCount), logger)
339+
for i := uint64(1); i <= nodesCount; i++ {
340+
node, err := flattener.ReadNode(reader, scratch, func(nodeIndex uint64) (*node.Node, error) {
341+
if nodeIndex >= i {
342+
return nil, fmt.Errorf("sequence of serialized nodes does not satisfy Descendents-First-Relationship")
343+
}
344+
return nodes[nodeIndex], nil
345+
})
346+
if err != nil {
347+
return fmt.Errorf("cannot read node %d: %w", i, err)
348+
}
349+
nodes[i] = node
350+
logging(i)
351+
}
352+
return nil
353+
})
354+
355+
if err != nil {
356+
return nil, err
357+
}
358+
359+
// since nodes[0] is always `nil`, returning a slice without nodes[0] could simplify the
360+
// implementation of getNodeByIndex
361+
// return nodes[1:], nil
362+
return nodes[1:], nil
363+
}
364+
328365
// subtrie file contains:
329366
// 1. checkpoint version
330367
// 2. nodes
331368
// 3. node count
332369
// 4. checksum
333-
func readCheckpointSubTrie(dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) (
334-
subtrieRootNodes []*node.Node,
370+
func processCheckpointSubTrie(
371+
dir string,
372+
fileName string,
373+
index int,
374+
checksum uint32,
375+
logger *zerolog.Logger,
376+
processNode func(*Crc32Reader, uint64) error,
377+
) (
335378
errToReturn error,
336379
) {
337380
filepath, _, err := filePathSubTries(dir, fileName, index)
338381
if err != nil {
339-
return nil, err
382+
return err
340383
}
341384
f, err := os.Open(filepath)
342385
if err != nil {
343-
return nil, fmt.Errorf("could not open file %v: %w", filepath, err)
386+
return fmt.Errorf("could not open file %v: %w", filepath, err)
344387
}
345388
defer func(file *os.File) {
346389
evictErr := evictFileFromLinuxPageCache(file, false, logger)
@@ -354,81 +397,68 @@ func readCheckpointSubTrie(dir string, fileName string, index int, checksum uint
354397
// valite the magic bytes and version
355398
err = validateFileHeader(MagicBytesCheckpointSubtrie, VersionV6, f)
356399
if err != nil {
357-
return nil, err
400+
return err
358401
}
359402

360403
nodesCount, expectedSum, err := readSubTriesFooter(f)
361404
if err != nil {
362-
return nil, fmt.Errorf("cannot read sub trie node count: %w", err)
405+
return fmt.Errorf("cannot read sub trie node count: %w", err)
363406
}
364407

365408
if checksum != expectedSum {
366-
return nil, fmt.Errorf("mismatch checksum in subtrie file. checksum from checkpoint header %v does not "+
409+
return fmt.Errorf("mismatch checksum in subtrie file. checksum from checkpoint header %v does not "+
367410
"match with the checksum in subtrie file %v", checksum, expectedSum)
368411
}
369412

370-
// restart from the beginning of the file, make sure CRC32Reader has seen all the bytes
413+
// restart from the beginning of the file, make sure Crc32Reader has seen all the bytes
371414
// in order to compute the correct checksum
372415
_, err = f.Seek(0, io.SeekStart)
373416
if err != nil {
374-
return nil, fmt.Errorf("cannot seek to start of file: %w", err)
417+
return fmt.Errorf("cannot seek to start of file: %w", err)
375418
}
376419

377420
reader := NewCRC32Reader(bufio.NewReaderSize(f, defaultBufioReadSize))
378421

379422
// read version again for calculating checksum
380423
_, _, err = readFileHeader(reader)
381424
if err != nil {
382-
return nil, fmt.Errorf("could not read version again for subtrie: %w", err)
425+
return fmt.Errorf("could not read version again for subtrie: %w", err)
383426
}
384427

385428
// read file part index and verify
386-
scratch := make([]byte, 1024*4) // must not be less than 1024
387-
logging := logProgress(fmt.Sprintf("reading %v-th sub trie roots", index), int(nodesCount), logger)
388429

389-
nodes := make([]*node.Node, nodesCount+1) //+1 for 0 index meaning nil
390-
for i := uint64(1); i <= nodesCount; i++ {
391-
node, err := flattener.ReadNode(reader, scratch, func(nodeIndex uint64) (*node.Node, error) {
392-
if nodeIndex >= i {
393-
return nil, fmt.Errorf("sequence of serialized nodes does not satisfy Descendents-First-Relationship")
394-
}
395-
return nodes[nodeIndex], nil
396-
})
397-
if err != nil {
398-
return nil, fmt.Errorf("cannot read node %d: %w", i, err)
399-
}
400-
nodes[i] = node
401-
logging(i)
430+
err = processNode(reader, nodesCount)
431+
if err != nil {
432+
return err
402433
}
403434

435+
scratch := make([]byte, 1024)
404436
// read footer and discard, since we only care about checksum
405437
_, err = io.ReadFull(reader, scratch[:encNodeCountSize])
406438
if err != nil {
407-
return nil, fmt.Errorf("cannot read footer: %w", err)
439+
return fmt.Errorf("cannot read footer: %w", err)
408440
}
409441

410442
// calculate the actual checksum
411443
actualSum := reader.Crc32()
412444

413445
if actualSum != expectedSum {
414-
return nil, fmt.Errorf("invalid checksum in subtrie checkpoint, expected %v, actual %v",
446+
return fmt.Errorf("invalid checksum in subtrie checkpoint, expected %v, actual %v",
415447
expectedSum, actualSum)
416448
}
417449

418450
// read the checksum and discard, since we only care about whether ensureReachedEOF
419451
_, err = io.ReadFull(reader, scratch[:crc32SumSize])
420452
if err != nil {
421-
return nil, fmt.Errorf("could not read subtrie file's checksum: %w", err)
453+
return fmt.Errorf("could not read subtrie file's checksum: %w", err)
422454
}
423455

424456
err = ensureReachedEOF(reader)
425457
if err != nil {
426-
return nil, fmt.Errorf("fail to read %v-th sutrie file: %w", index, err)
458+
return fmt.Errorf("fail to read %v-th sutrie file: %w", index, err)
427459
}
428460

429-
// since nodes[0] is always `nil`, returning a slice without nodes[0] could simplify the
430-
// implementation of getNodeByIndex
431-
return nodes[1:], nil
461+
return nil
432462
}
433463

434464
func readSubTriesFooter(f *os.File) (uint64, uint32, error) {

0 commit comments

Comments
 (0)