Skip to content

Commit 029a340

Browse files
authored
Merge branch 'master' into gregor/callbacks/process-fallback
2 parents c49b7c2 + c127164 commit 029a340

29 files changed

+1156
-427
lines changed

cmd/util/cmd/read-light-block/read_light_block_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ func TestReadClusterRange(t *testing.T) {
2525
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
2626
// add parent as boundary
2727
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
28-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), parent.ChainID, parent.Height, parent.ID())
28+
return operation.IndexClusterBlockHeight(lctx, rw, parent.ChainID, parent.Height, parent.ID())
2929
})
3030
if err != nil {
3131
return err
3232
}
3333

3434
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
35-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), parent.ChainID, parent.Height)
35+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, parent.ChainID, parent.Height)
3636
})
3737
})
3838
require.NoError(t, err)

state/cluster/badger/mutator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ func (suite *MutatorSuite) TestExtend_Success() {
389389

390390
// the block should be indexed by its parent
391391
var childIDs flow.IdentifierList
392-
err = procedure.LookupBlockChildren(r, suite.genesis.ID(), &childIDs)
392+
err = operation.RetrieveBlockChildren(r, suite.genesis.ID(), &childIDs)
393393
suite.Assert().Nil(err)
394394
suite.Require().Len(childIDs, 1)
395395
suite.Assert().Equal(proposal.Block.ID(), childIDs[0])

state/cluster/badger/snapshot.go

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,46 @@
11
package badger
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/onflow/flow-go/model/cluster"
78
"github.com/onflow/flow-go/model/flow"
9+
"github.com/onflow/flow-go/module/irrecoverable"
810
clusterState "github.com/onflow/flow-go/state/cluster"
11+
"github.com/onflow/flow-go/storage"
912
"github.com/onflow/flow-go/storage/operation"
1013
"github.com/onflow/flow-go/storage/procedure"
1114
)
1215

13-
// Snapshot represents a snapshot of chain state anchored at a particular
14-
// reference block.
16+
// Snapshot pertains to a specific fork of the collector cluster consensus. Specifically,
17+
// it references one block denoted as the `Head`. This Snapshot type is for collector
18+
// clusters, so we are referencing a cluster block, aka collection, here.
19+
//
20+
// This implementation must be used for KNOWN reference BLOCKs only.
1521
type Snapshot struct {
16-
err error
1722
state *State
1823
blockID flow.Identifier
1924
}
2025

2126
var _ clusterState.Snapshot = (*Snapshot)(nil)
2227

23-
func (s *Snapshot) Collection() (*flow.Collection, error) {
24-
if s.err != nil {
25-
return nil, s.err
28+
// newSnapshot instantiates a new snapshot for the given collection ID.
29+
// CAUTION: This constructor must be called for KNOWN blocks.
30+
// For unknown blocks, please use `invalid.NewSnapshot` or `invalid.NewSnapshotf`.
31+
func newSnapshot(state *State, blockID flow.Identifier) *Snapshot {
32+
return &Snapshot{
33+
state: state,
34+
blockID: blockID,
2635
}
36+
}
2737

38+
// Collection returns the collection designated as the reference for this
39+
// snapshot. Technically, this is a portion of the payload of a cluster block.
40+
//
41+
// By contract of the constructor, the blockID must correspond to a known collection in the database.
42+
// No error returns are expected during normal operation.
43+
func (s *Snapshot) Collection() (*flow.Collection, error) {
2844
// get the payload
2945
var payload cluster.Payload
3046
err := procedure.RetrieveClusterPayload(s.state.db.Reader(), s.blockID, &payload)
@@ -36,38 +52,50 @@ func (s *Snapshot) Collection() (*flow.Collection, error) {
3652
return &collection, nil
3753
}
3854

55+
// Head returns the header of the collection that designated as the reference for this
56+
// snapshot.
57+
//
58+
// By contract of the constructor, the blockID must correspond to a known collection in the database.
59+
// No error returns are expected during normal operation.
3960
func (s *Snapshot) Head() (*flow.Header, error) {
40-
if s.err != nil {
41-
return nil, s.err
42-
}
43-
4461
var head flow.Header
45-
err := s.head(&head)
62+
err := operation.RetrieveHeader(s.state.db.Reader(), s.blockID, &head)
63+
if err != nil {
64+
// `storage.ErrNotFound` is the only error that the storage layer may return other than exceptions.
65+
// In the context of this call, `s.blockID` should correspond to a known block, so receiving a
66+
// `storage.ErrNotFound` is an exception here.
67+
return nil, irrecoverable.NewExceptionf("could not retrieve header for block (%s): %w", s.blockID, err)
68+
}
4669
return &head, err
4770
}
4871

72+
// Pending returns the IDs of all collections descending from the snapshot's head collection.
73+
// The result is ordered such that parents are included before their children. While only valid
74+
// descendants will be returned, note that the descendants may not be finalized yet.
75+
// By contract of the constructor, the blockID must correspond to a known collection in the database.
76+
// No error returns are expected during normal operation.
4977
func (s *Snapshot) Pending() ([]flow.Identifier, error) {
50-
if s.err != nil {
51-
return nil, s.err
52-
}
5378
return s.pending(s.blockID)
5479
}
5580

56-
// head finds the header referenced by the snapshot.
57-
func (s *Snapshot) head(head *flow.Header) error {
58-
// get the snapshot header
59-
err := operation.RetrieveHeader(s.state.db.Reader(), s.blockID, head)
60-
if err != nil {
61-
return fmt.Errorf("could not retrieve header for block (%s): %w", s.blockID, err)
62-
}
63-
return nil
64-
}
65-
81+
// pending returns a slice with all blocks descending from the given blockID (children, grandchildren, etc).
82+
// CAUTION: this function behaves only correctly for known blocks, which should always be the case as
83+
// required by the constructor.
84+
// No error returns are expected during normal operation.
6685
func (s *Snapshot) pending(blockID flow.Identifier) ([]flow.Identifier, error) {
6786
var pendingIDs flow.IdentifierList
68-
err := procedure.LookupBlockChildren(s.state.db.Reader(), blockID, &pendingIDs)
87+
err := operation.RetrieveBlockChildren(s.state.db.Reader(), blockID, &pendingIDs)
6988
if err != nil {
70-
return nil, fmt.Errorf("could not get pending children: %w", err)
89+
if !errors.Is(err, storage.ErrNotFound) {
90+
return nil, fmt.Errorf("could not get pending block %v: %w", blockID, err)
91+
}
92+
93+
// The low-level storage returns `storage.ErrNotFound` in two cases:
94+
// 1. the block/collection is unknown
95+
// 2. the block/collection is known but no children have been indexed yet
96+
// By contract of the constructor, the blockID must correspond to a known collection in the database.
97+
// A snapshot with s.err == nil is only created for known blocks. Hence, only case 2 is
98+
// possible here, and we just return an empty list.
7199
}
72100

73101
for _, pendingID := range pendingIDs {

state/cluster/badger/state.go

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
clustermodel "github.com/onflow/flow-go/model/cluster"
1111
"github.com/onflow/flow-go/model/flow"
1212
"github.com/onflow/flow-go/module"
13+
"github.com/onflow/flow-go/state"
1314
"github.com/onflow/flow-go/state/cluster"
15+
"github.com/onflow/flow-go/state/cluster/invalid"
1416
"github.com/onflow/flow-go/storage"
1517
"github.com/onflow/flow-go/storage/operation"
1618
"github.com/onflow/flow-go/storage/procedure"
@@ -22,6 +24,8 @@ type State struct {
2224
epoch uint64 // the operating epoch for the cluster
2325
}
2426

27+
var _ cluster.State = (*State)(nil)
28+
2529
// Bootstrap initializes the persistent cluster state with a genesis block.
2630
// The genesis block must have height 0, a parent hash of 32 zero bytes,
2731
// and an empty collection as payload.
@@ -62,13 +66,12 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
6266
return fmt.Errorf("could not insert genesis block: %w", err)
6367
}
6468
// insert block height -> ID mapping
65-
err = operation.IndexClusterBlockHeight(lctx, rw.Writer(), chainID, genesis.Height, genesis.ID())
69+
err = operation.IndexClusterBlockHeight(lctx, rw, chainID, genesis.Height, genesis.ID())
6670
if err != nil {
6771
return fmt.Errorf("failed to map genesis block height to block: %w", err)
6872
}
6973
// insert boundary
70-
err = operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), chainID, genesis.Height)
71-
// insert started view for hotstuff
74+
err = operation.BootstrapClusterFinalizedHeight(lctx, rw, chainID, genesis.Height)
7275
if err != nil {
7376
return fmt.Errorf("could not insert genesis boundary: %w", err)
7477
}
@@ -79,7 +82,7 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
7982
}
8083

8184
livenessData := &hotstuff.LivenessData{
82-
CurrentView: genesis.View + 1,
85+
CurrentView: genesis.View + 1, // starting view for hotstuff
8386
NewestQC: rootQC,
8487
}
8588
// insert safety data
@@ -131,42 +134,38 @@ func (s *State) Params() cluster.Params {
131134
}
132135

133136
func (s *State) Final() cluster.Snapshot {
134-
// get the finalized block ID
135-
var blockID flow.Identifier
136-
err := (func(r storage.Reader) error {
137-
var boundary uint64
138-
err := operation.RetrieveClusterFinalizedHeight(r, s.clusterID, &boundary)
139-
if err != nil {
140-
return fmt.Errorf("could not retrieve finalized boundary: %w", err)
141-
}
142-
143-
err = operation.LookupClusterBlockHeight(r, s.clusterID, boundary, &blockID)
144-
if err != nil {
145-
return fmt.Errorf("could not retrieve finalized ID: %w", err)
146-
}
147-
148-
return nil
149-
})(s.db.Reader())
150-
137+
// get height of latest finalized collection and then the ID of the collection with the corresponding height
138+
r := s.db.Reader()
139+
var latestFinalizedClusterHeight uint64
140+
err := operation.RetrieveClusterFinalizedHeight(r, s.clusterID, &latestFinalizedClusterHeight)
151141
if err != nil {
152-
return &Snapshot{
153-
err: err,
154-
}
142+
return invalid.NewSnapshotf("could not retrieve finalized boundary: %w", err)
155143
}
156144

157-
snapshot := &Snapshot{
158-
state: s,
159-
blockID: blockID,
145+
var blockID flow.Identifier
146+
err = operation.LookupClusterBlockHeight(r, s.clusterID, latestFinalizedClusterHeight, &blockID)
147+
if err != nil {
148+
return invalid.NewSnapshotf("could not retrieve finalized ID: %w", err)
160149
}
161-
return snapshot
150+
151+
return newSnapshot(s, blockID)
162152
}
163153

154+
// AtBlockID returns the snapshot of the persistent cluster at the given
155+
// block ID. It is available for any block that was introduced into the
156+
// cluster state, and can thus represent an ambiguous state that was or
157+
// will never be finalized.
158+
// If the block is unknown, it returns an invalid snapshot, which returns
159+
// state.ErrUnknownSnapshotReference for all methods
164160
func (s *State) AtBlockID(blockID flow.Identifier) cluster.Snapshot {
165-
snapshot := &Snapshot{
166-
state: s,
167-
blockID: blockID,
161+
exists, err := operation.BlockExists(s.db.Reader(), blockID)
162+
if err != nil {
163+
return invalid.NewSnapshotf("could not check existence of reference block: %w", err)
164+
}
165+
if !exists {
166+
return invalid.NewSnapshotf("unknown block %x: %w", blockID, state.ErrUnknownSnapshotReference)
168167
}
169-
return snapshot
168+
return newSnapshot(s, blockID)
170169
}
171170

172171
// IsBootstrapped returns whether the database contains a bootstrapped state.

state/cluster/badger/state_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package badger
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/onflow/flow-go/state"
10+
"github.com/onflow/flow-go/storage"
11+
"github.com/onflow/flow-go/storage/operation/dbtest"
12+
"github.com/onflow/flow-go/utils/unittest"
13+
)
14+
15+
// TestUnknownSnapshotReference verifies that AtBlockID returns a snapshot that
16+
// returns state.ErrUnknownSnapshotReference for all methods when given an unknown block ID.
17+
func TestUnknownSnapshotReference(t *testing.T) {
18+
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
19+
lockManager := storage.NewTestingLockManager()
20+
21+
// Setup
22+
genesis, err := unittest.ClusterBlock.Genesis()
23+
require.NoError(t, err)
24+
25+
root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles()))
26+
epochCounter := root.Encodable().SealingSegment.LatestProtocolStateEntry().EpochEntry.EpochCounter()
27+
28+
clusterStateRoot, err := NewStateRoot(genesis, unittest.QuorumCertificateFixture(), epochCounter)
29+
require.NoError(t, err)
30+
clusterState, err := Bootstrap(db, lockManager, clusterStateRoot)
31+
require.NoError(t, err)
32+
33+
// Test
34+
unknownBlockID := unittest.IdentifierFixture()
35+
snapshot := clusterState.AtBlockID(unknownBlockID)
36+
37+
// Verify that Collection() returns state.ErrUnknownSnapshotReference
38+
_, err = snapshot.Collection()
39+
assert.Error(t, err)
40+
assert.ErrorIs(t, err, state.ErrUnknownSnapshotReference)
41+
42+
// Verify that Head() returns state.ErrUnknownSnapshotReference
43+
_, err = snapshot.Head()
44+
assert.Error(t, err)
45+
assert.ErrorIs(t, err, state.ErrUnknownSnapshotReference)
46+
47+
// Verify that Pending() returns state.ErrUnknownSnapshotReference
48+
_, err = snapshot.Pending()
49+
assert.Error(t, err)
50+
assert.ErrorIs(t, err, state.ErrUnknownSnapshotReference)
51+
})
52+
}
53+
54+
// TestValidSnapshotReference verifies that AtBlockID returns a working snapshot
55+
// when given a valid block ID (the genesis block).
56+
func TestValidSnapshotReference(t *testing.T) {
57+
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
58+
lockManager := storage.NewTestingLockManager()
59+
60+
// Setup
61+
genesis, err := unittest.ClusterBlock.Genesis()
62+
require.NoError(t, err)
63+
64+
root := unittest.RootSnapshotFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles()))
65+
epochCounter := root.Encodable().SealingSegment.LatestProtocolStateEntry().EpochEntry.EpochCounter()
66+
67+
clusterStateRoot, err := NewStateRoot(genesis, unittest.QuorumCertificateFixture(), epochCounter)
68+
require.NoError(t, err)
69+
clusterState, err := Bootstrap(db, lockManager, clusterStateRoot)
70+
require.NoError(t, err)
71+
72+
// Test with valid block ID (genesis block)
73+
snapshot := clusterState.AtBlockID(genesis.ID())
74+
75+
// Verify that Collection() works correctly
76+
collection, err := snapshot.Collection()
77+
assert.NoError(t, err)
78+
assert.Equal(t, &genesis.Payload.Collection, collection)
79+
80+
// Verify that Head() works correctly
81+
head, err := snapshot.Head()
82+
assert.NoError(t, err)
83+
assert.Equal(t, genesis.ToHeader().ID(), head.ID())
84+
85+
// Verify that Pending() works correctly (should return empty list for genesis)
86+
pending, err := snapshot.Pending()
87+
assert.NoError(t, err)
88+
assert.Empty(t, pending)
89+
})
90+
}

state/cluster/invalid/snapshot.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package invalid
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/onflow/flow-go/model/flow"
8+
"github.com/onflow/flow-go/state"
9+
"github.com/onflow/flow-go/state/cluster"
10+
)
11+
12+
// Snapshot represents a snapshot that does not exist or could not be queried.
13+
type Snapshot struct {
14+
err error
15+
}
16+
17+
// NewSnapshot returns a new invalid snapshot, containing an error describing why the
18+
// snapshot could not be retrieved. The following are typical
19+
// errors resulting in the construction of an invalid Snapshot:
20+
// - state.ErrUnknownSnapshotReference if the reference point for the snapshot
21+
// (height or block ID) does not resolve to a queriable block in the state.
22+
// - generic error in case of unexpected state inconsistencies or bugs
23+
func NewSnapshot(err error) *Snapshot {
24+
if errors.Is(err, state.ErrUnknownSnapshotReference) {
25+
return &Snapshot{err: err}
26+
}
27+
return &Snapshot{fmt.Errorf("critical unexpected error querying snapshot: %w", err)}
28+
}
29+
30+
var _ cluster.Snapshot = (*Snapshot)(nil)
31+
32+
// NewSnapshotf is NewSnapshot with ergonomic error formatting.
33+
func NewSnapshotf(msg string, args ...interface{}) *Snapshot {
34+
return NewSnapshot(fmt.Errorf(msg, args...))
35+
}
36+
37+
func (u *Snapshot) Collection() (*flow.Collection, error) {
38+
return nil, u.err
39+
}
40+
41+
func (u *Snapshot) Head() (*flow.Header, error) {
42+
return nil, u.err
43+
}
44+
45+
func (u *Snapshot) Pending() ([]flow.Identifier, error) {
46+
return nil, u.err
47+
}

0 commit comments

Comments
 (0)