Skip to content

Commit 02932e0

Browse files
authored
Merge pull request #4212 from onflow/leo/fix-read-leaf-nodes
Fix read leaf nodes
2 parents 55f5744 + ea88fef commit 02932e0

File tree

2 files changed

+48
-48
lines changed

2 files changed

+48
-48
lines changed

ledger/complete/wal/checkpoint_v6_leaf_reader.go

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@ type LeafNode struct {
1818
Payload *ledger.Payload
1919
}
2020

21-
type LeafNodeResult struct {
22-
LeafNode *LeafNode
23-
Err error
24-
}
25-
2621
func nodeToLeaf(leaf *node.Node) *LeafNode {
2722
return &LeafNode{
2823
Hash: leaf.Hash(),
@@ -31,48 +26,50 @@ func nodeToLeaf(leaf *node.Node) *LeafNode {
3126
}
3227
}
3328

34-
func OpenAndReadLeafNodesFromCheckpointV6(dir string, fileName string, logger *zerolog.Logger) (
35-
allLeafNodesCh <-chan LeafNodeResult, errToReturn error) {
29+
// OpenAndReadLeafNodesFromCheckpointV6 takes a channel for pushing the leaf nodes that are read from
30+
// the given checkpoint file specified by dir and fileName.
31+
// It returns when finish reading the checkpoint file and the input channel can be closed.
32+
func OpenAndReadLeafNodesFromCheckpointV6(allLeafNodesCh chan<- *LeafNode, dir string, fileName string, logger *zerolog.Logger) (errToReturn error) {
33+
// we are the only sender of the channel, closing it after done
34+
defer func() {
35+
close(allLeafNodesCh)
36+
}()
3637

3738
filepath := filePathCheckpointHeader(dir, fileName)
3839

3940
f, err := os.Open(filepath)
4041
if err != nil {
41-
return nil, fmt.Errorf("could not open file %v: %w", filepath, err)
42+
return fmt.Errorf("could not open file %v: %w", filepath, err)
4243
}
4344
defer func(file *os.File) {
4445
errToReturn = closeAndMergeError(file, errToReturn)
4546
}(f)
4647

4748
subtrieChecksums, _, err := readCheckpointHeader(filepath, logger)
4849
if err != nil {
49-
return nil, fmt.Errorf("could not read header: %w", err)
50+
return fmt.Errorf("could not read header: %w", err)
5051
}
5152

5253
// ensure all checkpoint part file exists, might return os.ErrNotExist error
5354
// if a file is missing
5455
err = allPartFileExist(dir, fileName, len(subtrieChecksums))
5556
if err != nil {
56-
return nil, fmt.Errorf("fail to check all checkpoint part file exist: %w", err)
57+
return fmt.Errorf("fail to check all checkpoint part file exist: %w", err)
5758
}
5859

59-
bufSize := 1000
60-
leafNodesCh := make(chan LeafNodeResult, bufSize)
61-
allLeafNodesCh = leafNodesCh
62-
defer func() {
63-
close(leafNodesCh)
64-
}()
65-
6660
// push leaf nodes to allLeafNodesCh
6761
for i, checksum := range subtrieChecksums {
68-
readCheckpointSubTrieLeafNodes(leafNodesCh, dir, fileName, i, checksum, logger)
62+
err := readCheckpointSubTrieLeafNodes(allLeafNodesCh, dir, fileName, i, checksum, logger)
63+
if err != nil {
64+
return fmt.Errorf("fail to read checkpoint leaf nodes from %v-th subtrie file: %w", i, err)
65+
}
6966
}
7067

71-
return allLeafNodesCh, nil
68+
return nil
7269
}
7370

74-
func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- LeafNodeResult, dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) {
75-
err := processCheckpointSubTrie(dir, fileName, index, checksum, logger,
71+
func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- *LeafNode, dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) error {
72+
return processCheckpointSubTrie(dir, fileName, index, checksum, logger,
7673
func(reader *Crc32Reader, nodesCount uint64) error {
7774
scratch := make([]byte, 1024*4) // must not be less than 1024
7875

@@ -89,21 +86,11 @@ func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- LeafNodeResult, dir strin
8986
return fmt.Errorf("cannot read node %d: %w", i, err)
9087
}
9188
if node.IsLeaf() {
92-
leafNodesCh <- LeafNodeResult{
93-
LeafNode: nodeToLeaf(node),
94-
Err: nil,
95-
}
89+
leafNodesCh <- nodeToLeaf(node)
9690
}
9791

9892
logging(i)
9993
}
10094
return nil
10195
})
102-
103-
if err != nil {
104-
leafNodesCh <- LeafNodeResult{
105-
LeafNode: nil,
106-
Err: err,
107-
}
108-
}
10996
}

ledger/complete/wal/checkpoint_v6_test.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func createMultipleRandomTriesMini(t *testing.T) []*trie.MTrie {
140140
var err error
141141
// add tries with no shared paths
142142
for i := 0; i < 5; i++ {
143-
paths, payloads := randNPathPayloads(10)
143+
paths, payloads := randNPathPayloads(20)
144144
activeTrie, _, err = trie.NewTrieWithUpdatedRegisters(activeTrie, paths, payloads, false)
145145
require.NoError(t, err, "update registers")
146146
tries = append(tries, activeTrie)
@@ -318,9 +318,14 @@ func TestWriteAndReadCheckpointV6LeafEmptyTrie(t *testing.T) {
318318
fileName := "checkpoint-empty-trie"
319319
logger := unittest.Logger()
320320
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint")
321-
resultChan, err := OpenAndReadLeafNodesFromCheckpointV6(dir, fileName, &logger)
322-
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
323-
for range resultChan {
321+
322+
bufSize := 10
323+
leafNodesCh := make(chan *LeafNode, bufSize)
324+
go func() {
325+
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger)
326+
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
327+
}()
328+
for range leafNodesCh {
324329
require.Fail(t, "should not return any nodes")
325330
}
326331
})
@@ -332,14 +337,17 @@ func TestWriteAndReadCheckpointV6LeafSimpleTrie(t *testing.T) {
332337
fileName := "checkpoint"
333338
logger := unittest.Logger()
334339
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint")
335-
resultChan, err := OpenAndReadLeafNodesFromCheckpointV6(dir, fileName, &logger)
336-
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
340+
bufSize := 1
341+
leafNodesCh := make(chan *LeafNode, bufSize)
342+
go func() {
343+
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger)
344+
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
345+
}()
337346
resultPayloads := make([]ledger.Payload, 0)
338-
for readResult := range resultChan {
339-
require.NoError(t, readResult.Err, "no errors in read results")
347+
for leafNode := range leafNodesCh {
340348
// avoid dummy payload from empty trie
341-
if readResult.LeafNode.Payload != nil {
342-
resultPayloads = append(resultPayloads, *readResult.LeafNode.Payload)
349+
if leafNode.Payload != nil {
350+
resultPayloads = append(resultPayloads, *leafNode.Payload)
343351
}
344352
}
345353
require.EqualValues(t, tries[1].AllPayloads(), resultPayloads)
@@ -352,12 +360,15 @@ func TestWriteAndReadCheckpointV6LeafMultipleTries(t *testing.T) {
352360
tries := createMultipleRandomTriesMini(t)
353361
logger := unittest.Logger()
354362
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint")
355-
resultChan, err := OpenAndReadLeafNodesFromCheckpointV6(dir, fileName, &logger)
356-
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
363+
bufSize := 5
364+
leafNodesCh := make(chan *LeafNode, bufSize)
365+
go func() {
366+
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger)
367+
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
368+
}()
357369
resultPayloads := make([]ledger.Payload, 0)
358-
for readResult := range resultChan {
359-
require.NoError(t, readResult.Err, "no errors in read results")
360-
resultPayloads = append(resultPayloads, *readResult.LeafNode.Payload)
370+
for leafNode := range leafNodesCh {
371+
resultPayloads = append(resultPayloads, *leafNode.Payload)
361372
}
362373
require.NotEmpty(t, resultPayloads)
363374
})
@@ -528,7 +539,9 @@ func TestAllPartFileExistLeafReader(t *testing.T) {
528539
err = os.Remove(fileToDelete)
529540
require.NoError(t, err, "fail to remove part file")
530541

531-
_, err = OpenAndReadLeafNodesFromCheckpointV6(dir, fileName, &logger)
542+
bufSize := 10
543+
leafNodesCh := make(chan *LeafNode, bufSize)
544+
err = OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger)
532545
require.ErrorIs(t, err, os.ErrNotExist, "wrong error type returned")
533546
}
534547
})

0 commit comments

Comments
 (0)