Skip to content

Commit c127164

Browse files
authored
Merge pull request #7954 from onflow/leo/refactor-index-cluster-height
[Storage] Refactor index cluster height
2 parents aeccab9 + 7224417 commit c127164

File tree

7 files changed

+194
-31
lines changed

7 files changed

+194
-31
lines changed

cmd/util/cmd/read-light-block/read_light_block_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ func TestReadClusterRange(t *testing.T) {
2525
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
2626
// add parent as boundary
2727
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
28-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), parent.ChainID, parent.Height, parent.ID())
28+
return operation.IndexClusterBlockHeight(lctx, rw, parent.ChainID, parent.Height, parent.ID())
2929
})
3030
if err != nil {
3131
return err
3232
}
3333

3434
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
35-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), parent.ChainID, parent.Height)
35+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, parent.ChainID, parent.Height)
3636
})
3737
})
3838
require.NoError(t, err)

state/cluster/badger/state.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,12 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
6666
return fmt.Errorf("could not insert genesis block: %w", err)
6767
}
6868
// insert block height -> ID mapping
69-
err = operation.IndexClusterBlockHeight(lctx, rw.Writer(), chainID, genesis.Height, genesis.ID())
69+
err = operation.IndexClusterBlockHeight(lctx, rw, chainID, genesis.Height, genesis.ID())
7070
if err != nil {
7171
return fmt.Errorf("failed to map genesis block height to block: %w", err)
7272
}
7373
// insert boundary
74-
err = operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), chainID, genesis.Height)
75-
// insert started view for hotstuff
74+
err = operation.BootstrapClusterFinalizedHeight(lctx, rw, chainID, genesis.Height)
7675
if err != nil {
7776
return fmt.Errorf("could not insert genesis boundary: %w", err)
7877
}
@@ -83,7 +82,7 @@ func Bootstrap(db storage.DB, lockManager lockctx.Manager, stateRoot *StateRoot)
8382
}
8483

8584
livenessData := &hotstuff.LivenessData{
86-
CurrentView: genesis.View + 1,
85+
CurrentView: genesis.View + 1, // starting view for hotstuff
8786
NewestQC: rootQC,
8887
}
8988
// insert safety data

storage/operation/cluster.go

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package operation
22

33
import (
4+
"errors"
45
"fmt"
56

67
"github.com/jordanschalm/lockctx"
@@ -14,30 +15,117 @@ import (
1415
// for regular consensus, these functions include the cluster ID in order to
1516
// support storing multiple chains, for example during epoch switchover.
1617

17-
// IndexClusterBlockHeight indexes a cluster block from the specified cluster by its height.
18-
func IndexClusterBlockHeight(lctx lockctx.Proof, w storage.Writer, clusterID flow.ChainID, height uint64, blockID flow.Identifier) error {
18+
// IndexClusterBlockHeight indexes a cluster block ID by the cluster ID and block height.
19+
// The function ensures data integrity by first checking if a block ID already exists for the given
20+
// cluster and height, and rejecting overwrites with different values. This function is idempotent,
21+
// i.e. repeated calls with the *initially* indexed value are no-ops.
22+
//
23+
// CAUTION:
24+
// - Confirming that no value is already stored and the subsequent write must be atomic to prevent data corruption.
25+
// The caller must acquire the [storage.LockInsertOrFinalizeClusterBlock] and hold it until the database write has been committed.
26+
//
27+
// Expected error returns during normal operations:
28+
// - [storage.ErrDataMismatch] if a *different* block ID is already indexed for the same cluster and height
29+
func IndexClusterBlockHeight(lctx lockctx.Proof, rw storage.ReaderBatchWriter, clusterID flow.ChainID, height uint64, blockID flow.Identifier) error {
1930
if !lctx.HoldsLock(storage.LockInsertOrFinalizeClusterBlock) {
2031
return fmt.Errorf("missing lock: %v", storage.LockInsertOrFinalizeClusterBlock)
2132
}
2233

23-
return UpsertByKey(w, MakePrefix(codeFinalizedCluster, clusterID, height), blockID)
34+
key := MakePrefix(codeFinalizedCluster, clusterID, height)
35+
var existing flow.Identifier
36+
err := RetrieveByKey(rw.GlobalReader(), key, &existing)
37+
if err == nil {
38+
if existing != blockID {
39+
return fmt.Errorf("cluster block height already indexed with different block ID: %s vs %s: %w", existing, blockID, storage.ErrDataMismatch)
40+
}
41+
return nil // for the specified height, the finalized block is already set to `blockID`
42+
}
43+
// We do NOT want to continue with the WRITE UNLESS `storage.ErrNotFound` was received when checking for existing data.
44+
if !errors.Is(err, storage.ErrNotFound) {
45+
return fmt.Errorf("failed to check existing cluster block height index: %w", err)
46+
}
47+
48+
return UpsertByKey(rw.Writer(), key, blockID)
2449
}
2550

26-
// LookupClusterBlockHeight retrieves a block ID by height for the given cluster
27-
// (only finalized cluster blocks are indexed by height to guarantee uniqueness).
51+
// LookupClusterBlockHeight retrieves the ID of a finalized cluster block at the given height produced by the specified cluster.
52+
// Note that only finalized cluster blocks are indexed by height to guarantee uniqueness.
53+
//
54+
// Expected error returns during normal operations:
55+
// - [storage.ErrNotFound] if no finalized block from the specified cluster is known at the given height
2856
func LookupClusterBlockHeight(r storage.Reader, clusterID flow.ChainID, height uint64, blockID *flow.Identifier) error {
2957
return RetrieveByKey(r, MakePrefix(codeFinalizedCluster, clusterID, height), blockID)
3058
}
3159

32-
// UpsertClusterFinalizedHeight updates (overwrites!) the latest finalized cluster block height for the given cluster.
33-
func UpsertClusterFinalizedHeight(lctx lockctx.Proof, w storage.Writer, clusterID flow.ChainID, number uint64) error {
60+
// BootstrapClusterFinalizedHeight initializes the latest finalized cluster block height for the given cluster.
61+
//
62+
// CAUTION:
63+
// - This function is intended to be called during bootstrapping only. It expects that the height of the latest
64+
// known finalized cluster block has not yet been persisted.
65+
// - Confirming that no value is already stored and the subsequent write must be atomic to prevent data corruption.
66+
// Therefore, the caller must acquire the [storage.LockInsertOrFinalizeClusterBlock] and hold it until the database
67+
// write has been committed.
68+
//
69+
// No error returns expected during normal operations.
70+
func BootstrapClusterFinalizedHeight(lctx lockctx.Proof, rw storage.ReaderBatchWriter, clusterID flow.ChainID, number uint64) error {
71+
if !lctx.HoldsLock(storage.LockInsertOrFinalizeClusterBlock) {
72+
return fmt.Errorf("missing lock: %v", storage.LockInsertOrFinalizeClusterBlock)
73+
}
74+
75+
key := MakePrefix(codeClusterHeight, clusterID)
76+
77+
var existing uint64
78+
err := RetrieveByKey(rw.GlobalReader(), key, &existing)
79+
if err == nil {
80+
return fmt.Errorf("finalized height for cluster %v already initialized to %d", clusterID, existing)
81+
}
82+
83+
// We do NOT want to continue with the WRITE UNLESS `storage.ErrNotFound` was received when checking for existing data.
84+
if !errors.Is(err, storage.ErrNotFound) {
85+
return fmt.Errorf("failed to check existing finalized height: %w", err)
86+
}
87+
88+
return UpsertByKey(rw.Writer(), key, number)
89+
}
90+
91+
// UpdateClusterFinalizedHeight updates (overwrites!) the latest finalized cluster block height for the given cluster.
92+
//
93+
// CAUTION:
94+
// - This function is intended for normal operations after bootstrapping. It expects that the height of the
95+
// latest known finalized cluster block has already been persisted. This function guarantees that the height is updated
96+
// sequentially, i.e. the new height is equal to the old height plus one. Otherwise, an exception is returned.
97+
// - Reading the current height value, checking that it increases sequentially, and writing the new value must happen in one
98+
// atomic operation to prevent data corruption. Hence, the caller must acquire [storage.LockInsertOrFinalizeClusterBlock]
99+
// and hold it until the database write has been committed.
100+
//
101+
// No error returns expected during normal operations.
102+
func UpdateClusterFinalizedHeight(lctx lockctx.Proof, rw storage.ReaderBatchWriter, clusterID flow.ChainID, latestFinalizedHeight uint64) error {
34103
if !lctx.HoldsLock(storage.LockInsertOrFinalizeClusterBlock) {
35104
return fmt.Errorf("missing lock: %v", storage.LockInsertOrFinalizeClusterBlock)
36105
}
37-
return UpsertByKey(w, MakePrefix(codeClusterHeight, clusterID), number)
106+
107+
key := MakePrefix(codeClusterHeight, clusterID)
108+
109+
var existing uint64
110+
err := RetrieveByKey(rw.GlobalReader(), key, &existing)
111+
if err != nil {
112+
return fmt.Errorf("failed to check existing finalized height: %w", err)
113+
}
114+
115+
if existing+1 != latestFinalizedHeight {
116+
return fmt.Errorf("finalization isn't sequential: existing %d, new %d", existing, latestFinalizedHeight)
117+
}
118+
119+
return UpsertByKey(rw.Writer(), key, latestFinalizedHeight)
38120
}
39121

40122
// RetrieveClusterFinalizedHeight retrieves the latest finalized cluster block height of the given cluster.
123+
// For collector nodes in the specified cluster, this value should always exist (after bootstrapping).
124+
// However, other nodes outside the cluster typically do not track the latest finalized heights for the
125+
// different collector clusters.
126+
//
127+
// Expected error returns during normal operations:
128+
// - [storage.ErrNotFound] if the latest finalized height for the specified cluster is not present in the database
41129
func RetrieveClusterFinalizedHeight(r storage.Reader, clusterID flow.ChainID, height *uint64) error {
42130
return RetrieveByKey(r, MakePrefix(codeClusterHeight, clusterID), height)
43131
}

storage/operation/cluster_test.go

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestClusterHeights(t *testing.T) {
3535
t.Run("insert/retrieve", func(t *testing.T) {
3636
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
3737
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
38-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), clusterID, height, expected)
38+
return operation.IndexClusterBlockHeight(lctx, rw, clusterID, height, expected)
3939
})
4040
})
4141
require.NoError(t, err)
@@ -46,6 +46,32 @@ func TestClusterHeights(t *testing.T) {
4646
assert.Equal(t, expected, actual)
4747
})
4848

49+
t.Run("data mismatch error", func(t *testing.T) {
50+
// Use a different cluster ID and height to avoid conflicts with other tests
51+
testClusterID := flow.ChainID("test-cluster")
52+
testHeight := uint64(999)
53+
54+
// First index a block ID for the cluster and height
55+
firstBlockID := unittest.IdentifierFixture()
56+
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
57+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
58+
return operation.IndexClusterBlockHeight(lctx, rw, testClusterID, testHeight, firstBlockID)
59+
})
60+
})
61+
require.NoError(t, err)
62+
63+
// Try to index a different block ID for the same cluster and height
64+
differentBlockID := unittest.IdentifierFixture()
65+
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
66+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
67+
return operation.IndexClusterBlockHeight(lctx, rw, testClusterID, testHeight, differentBlockID)
68+
})
69+
})
70+
71+
require.Error(t, err)
72+
assert.ErrorIs(t, err, storage.ErrDataMismatch)
73+
})
74+
4975
t.Run("multiple chain IDs", func(t *testing.T) {
5076
// use different cluster ID but same block height
5177
// - we first index *all* three blocks from different clusters for the same height
@@ -61,7 +87,7 @@ func TestClusterHeights(t *testing.T) {
6187

6288
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
6389
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
64-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), clusterIDs[i], height, clusterBlockIDs[i])
90+
return operation.IndexClusterBlockHeight(lctx, rw, clusterIDs[i], height, clusterBlockIDs[i])
6591
})
6692
})
6793
require.NoError(t, err)
@@ -81,11 +107,10 @@ func Test_RetrieveClusterFinalizedHeight(t *testing.T) {
81107
lockManager := storage.NewTestingLockManager()
82108
var (
83109
clusterID flow.ChainID = "cluster"
84-
expected uint64 = 42
85110
err error
86111
)
87112

88-
t.Run("retrieve non-existant", func(t *testing.T) {
113+
t.Run("retrieve non-existent", func(t *testing.T) {
89114
var actual uint64
90115
err = operation.RetrieveClusterFinalizedHeight(db.Reader(), clusterID, &actual)
91116
t.Log(err)
@@ -96,22 +121,29 @@ func Test_RetrieveClusterFinalizedHeight(t *testing.T) {
96121

97122
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
98123
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
99-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), clusterID, 21)
124+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, clusterID, 20)
125+
})
126+
})
127+
require.NoError(t, err)
128+
129+
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
130+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
131+
return operation.UpdateClusterFinalizedHeight(lctx, rw, clusterID, 21)
100132
})
101133
})
102134
require.NoError(t, err)
103135

104136
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
105137
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
106-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), clusterID, expected)
138+
return operation.UpdateClusterFinalizedHeight(lctx, rw, clusterID, 22)
107139
})
108140
})
109141
require.NoError(t, err)
110142

111143
var actual uint64
112144
err = operation.RetrieveClusterFinalizedHeight(db.Reader(), clusterID, &actual)
113145
assert.NoError(t, err)
114-
assert.Equal(t, expected, actual)
146+
assert.Equal(t, uint64(22), actual)
115147
})
116148

117149
t.Run("multiple chain IDs", func(t *testing.T) {
@@ -129,7 +161,7 @@ func Test_RetrieveClusterFinalizedHeight(t *testing.T) {
129161

130162
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
131163
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
132-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), clusterIDs[i], clusterFinalizedHeights[i])
164+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, clusterIDs[i], clusterFinalizedHeights[i])
133165
})
134166
})
135167
require.NoError(t, err)
@@ -140,6 +172,51 @@ func Test_RetrieveClusterFinalizedHeight(t *testing.T) {
140172
assert.Equal(t, clusterFinalizedHeights[i], actual)
141173
}
142174
})
175+
176+
t.Run("update to non-sequential finalized height returns error", func(t *testing.T) {
177+
// Use a different cluster ID to avoid conflicts with other tests
178+
testClusterID := flow.ChainID("test-cluster-non-sequential")
179+
180+
// First bootstrap a cluster with height 20
181+
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
182+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
183+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, testClusterID, 20)
184+
})
185+
})
186+
require.NoError(t, err)
187+
188+
// Try to update to a non-sequential height (should fail)
189+
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
190+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
191+
return operation.UpdateClusterFinalizedHeight(lctx, rw, testClusterID, 25) // Should be 21, not 25
192+
})
193+
})
194+
require.Error(t, err)
195+
assert.Contains(t, err.Error(), "finalization isn't sequential")
196+
})
197+
198+
t.Run("bootstrap on non-empty key returns error", func(t *testing.T) {
199+
// Use a different cluster ID to avoid conflicts with other tests
200+
testClusterID := flow.ChainID("test-cluster-bootstrap-error")
201+
202+
// First bootstrap a cluster with height 30
203+
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
204+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
205+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, testClusterID, 30)
206+
})
207+
})
208+
require.NoError(t, err)
209+
210+
// Try to bootstrap again (should fail)
211+
err = unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
212+
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
213+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, testClusterID, 35)
214+
})
215+
})
216+
require.Error(t, err)
217+
assert.Contains(t, err.Error(), "finalized height for cluster")
218+
assert.Contains(t, err.Error(), "already initialized")
219+
})
143220
})
144221
}
145222

storage/procedure/cluster.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ func FinalizeClusterBlock(lctx lockctx.Proof, rw storage.ReaderBatchWriter, bloc
131131
}
132132

133133
r := rw.GlobalReader()
134-
writer := rw.Writer()
135134
// retrieve the header to check the parent
136135
var header flow.Header
137136
err := operation.RetrieveHeader(r, blockID, &header)
@@ -162,13 +161,13 @@ func FinalizeClusterBlock(lctx lockctx.Proof, rw storage.ReaderBatchWriter, bloc
162161
}
163162

164163
// index the block by its height
165-
err = operation.IndexClusterBlockHeight(lctx, writer, clusterID, header.Height, blockID)
164+
err = operation.IndexClusterBlockHeight(lctx, rw, clusterID, header.Height, blockID)
166165
if err != nil {
167166
return fmt.Errorf("could not index cluster block height: %w", err)
168167
}
169168

170169
// update the finalized boundary
171-
err = operation.UpsertClusterFinalizedHeight(lctx, writer, clusterID, header.Height)
170+
err = operation.UpdateClusterFinalizedHeight(lctx, rw, clusterID, header.Height)
172171
if err != nil {
173172
return fmt.Errorf("could not update finalized boundary: %w", err)
174173
}

storage/procedure/cluster_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ func TestFinalizeClusterBlock(t *testing.T) {
4949

5050
// index parent as latest finalized block (manually writing respective indexes like in bootstrapping to skip transitive consistency checks)
5151
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
52-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), block.ChainID, parent.Height, parent.ID())
52+
return operation.IndexClusterBlockHeight(lctx, rw, block.ChainID, parent.Height, parent.ID())
5353
}))
5454
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
55-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), block.ChainID, parent.Height)
55+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, block.ChainID, parent.Height)
5656
}))
5757

5858
// Insert new block and verify `FinalizeClusterBlock` procedure accepts it
@@ -148,10 +148,10 @@ func constructState(t *testing.T, db storage.DB, lctx lockctx.Proof) (blockA, bl
148148

149149
// index `blockA` as latest finalized block (manually writing respective indexes like in bootstrapping to skip transitive consistency checks)
150150
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
151-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), blockA.ChainID, blockA.Height, blockA.ID())
151+
return operation.IndexClusterBlockHeight(lctx, rw, blockA.ChainID, blockA.Height, blockA.ID())
152152
}))
153153
require.NoError(t, db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
154-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), blockA.ChainID, blockA.Height)
154+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, blockA.ChainID, blockA.Height)
155155
}))
156156

157157
return blockA, blockB, blockC, blockD

storage/store/cluster_blocks_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ func TestClusterBlocks(t *testing.T) {
2424
// add parent and mark its height as the latest finalized block
2525
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
2626
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
27-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), parent.ChainID, parent.Height, parent.ID())
27+
return operation.IndexClusterBlockHeight(lctx, rw, parent.ChainID, parent.Height, parent.ID())
2828
})
2929
require.NoError(t, err)
3030

3131
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
32-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), parent.ChainID, parent.Height)
32+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, parent.ChainID, parent.Height)
3333
})
3434
})
3535
require.NoError(t, err)

0 commit comments

Comments
 (0)