Skip to content

Commit 8434bfb

Browse files
authored
Merge branch 'master' into janez/system-chunk-logging-changes
2 parents 0562459 + ef71573 commit 8434bfb

File tree

6 files changed

+188
-9
lines changed

6 files changed

+188
-9
lines changed

storage/locks.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package storage
22

33
import (
4+
"fmt"
45
"sync"
56

67
"github.com/jordanschalm/lockctx"
@@ -115,6 +116,25 @@ func NewTestingLockManager() lockctx.Manager {
115116
return lockctx.NewManager(Locks(), makeLockPolicy())
116117
}
117118

119+
// HeldOneLock checks that exactly one of the two specified locks is held in the provided lock context.
120+
func HeldOneLock(lctx lockctx.Proof, lockA string, lockB string) (bool, string) {
121+
heldLockA := lctx.HoldsLock(lockA)
122+
heldLockB := lctx.HoldsLock(lockB)
123+
if heldLockA {
124+
if heldLockB {
125+
return false, fmt.Sprintf("epxect to hold only one lock, but actually held both locks: %s and %s", lockA, lockB)
126+
} else {
127+
return true, ""
128+
}
129+
} else {
130+
if heldLockB {
131+
return true, ""
132+
} else {
133+
return false, fmt.Sprintf("expect to hold one of the locks: %s or %s, but actually held none", lockA, lockB)
134+
}
135+
}
136+
}
137+
118138
// WithLock is a helper function that creates a new lock context, acquires the specified lock,
119139
// and executes the provided function within that context.
120140
// This function passes through any errors returned by fn.

storage/locks_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package storage
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestHeldOneLock(t *testing.T) {
11+
lockManager := NewTestingLockManager()
12+
13+
t.Run("holds only lockA", func(t *testing.T) {
14+
lctx := lockManager.NewContext()
15+
defer lctx.Release()
16+
err := lctx.AcquireLock(LockInsertBlock)
17+
require.NoError(t, err)
18+
19+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
20+
assert.True(t, held)
21+
assert.Empty(t, msg)
22+
})
23+
24+
t.Run("holds only lockB", func(t *testing.T) {
25+
lctx := lockManager.NewContext()
26+
defer lctx.Release()
27+
err := lctx.AcquireLock(LockFinalizeBlock)
28+
require.NoError(t, err)
29+
30+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
31+
assert.True(t, held)
32+
assert.Empty(t, msg)
33+
})
34+
35+
t.Run("holds both locks", func(t *testing.T) {
36+
lctx := lockManager.NewContext()
37+
defer lctx.Release()
38+
err := lctx.AcquireLock(LockInsertBlock)
39+
require.NoError(t, err)
40+
err = lctx.AcquireLock(LockFinalizeBlock)
41+
require.NoError(t, err)
42+
43+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
44+
assert.False(t, held)
45+
assert.Contains(t, msg, "epxect to hold only one lock, but actually held both locks")
46+
assert.Contains(t, msg, LockInsertBlock)
47+
assert.Contains(t, msg, LockFinalizeBlock)
48+
})
49+
50+
t.Run("holds neither lock", func(t *testing.T) {
51+
// Create a context that doesn't hold any locks
52+
lctx := lockManager.NewContext()
53+
defer lctx.Release()
54+
55+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
56+
assert.False(t, held)
57+
assert.Contains(t, msg, "expect to hold one of the locks")
58+
assert.Contains(t, msg, LockInsertBlock)
59+
assert.Contains(t, msg, LockFinalizeBlock)
60+
})
61+
62+
t.Run("holds different lock", func(t *testing.T) {
63+
lctx := lockManager.NewContext()
64+
defer lctx.Release()
65+
err := lctx.AcquireLock(LockBootstrapping)
66+
require.NoError(t, err)
67+
68+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
69+
assert.False(t, held)
70+
assert.Contains(t, msg, "expect to hold one of the locks")
71+
assert.Contains(t, msg, LockInsertBlock)
72+
assert.Contains(t, msg, LockFinalizeBlock)
73+
})
74+
75+
t.Run("with different lock combinations", func(t *testing.T) {
76+
lctx := lockManager.NewContext()
77+
defer lctx.Release()
78+
err := lctx.AcquireLock(LockInsertOwnReceipt)
79+
require.NoError(t, err)
80+
81+
held, msg := HeldOneLock(lctx, LockInsertOwnReceipt, LockInsertCollection)
82+
assert.True(t, held)
83+
assert.Empty(t, msg)
84+
})
85+
86+
t.Run("with cluster block locks", func(t *testing.T) {
87+
lctx := lockManager.NewContext()
88+
defer lctx.Release()
89+
err := lctx.AcquireLock(LockInsertOrFinalizeClusterBlock)
90+
require.NoError(t, err)
91+
92+
held, msg := HeldOneLock(lctx, LockInsertOrFinalizeClusterBlock, LockIndexResultApproval)
93+
assert.True(t, held)
94+
assert.Empty(t, msg)
95+
})
96+
97+
t.Run("error message format for both locks", func(t *testing.T) {
98+
lctx := lockManager.NewContext()
99+
defer lctx.Release()
100+
err := lctx.AcquireLock(LockInsertBlock)
101+
require.NoError(t, err)
102+
err = lctx.AcquireLock(LockFinalizeBlock)
103+
require.NoError(t, err)
104+
105+
// Check that both locks are actually held
106+
assert.True(t, lctx.HoldsLock(LockInsertBlock))
107+
assert.True(t, lctx.HoldsLock(LockFinalizeBlock))
108+
109+
held, msg := HeldOneLock(lctx, LockInsertBlock, LockFinalizeBlock)
110+
assert.False(t, held)
111+
require.NotEmpty(t, msg)
112+
assert.Contains(t, msg, "epxect to hold only one lock, but actually held both locks")
113+
assert.Contains(t, msg, LockInsertBlock)
114+
assert.Contains(t, msg, LockFinalizeBlock)
115+
})
116+
117+
t.Run("error message format for no locks", func(t *testing.T) {
118+
lctx := lockManager.NewContext()
119+
defer lctx.Release()
120+
121+
held, msg := HeldOneLock(lctx, "lockA", "lockB")
122+
assert.False(t, held)
123+
require.NotEmpty(t, msg)
124+
assert.Contains(t, msg, "expect to hold one of the locks")
125+
assert.Contains(t, msg, "lockA")
126+
assert.Contains(t, msg, "lockB")
127+
})
128+
}

storage/operation/proposal_signatures.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
7+
48
"github.com/onflow/flow-go/model/flow"
59
"github.com/onflow/flow-go/storage"
610
)
711

812
// InsertProposalSignature inserts a proposal signature by block ID.
9-
// Returns storage.ErrAlreadyExists if a proposal signature has already been inserted for the block.
10-
func InsertProposalSignature(w storage.Writer, blockID flow.Identifier, sig *[]byte) error {
13+
//
14+
// CAUTION:
15+
// - The caller must acquire either the lock [storage.LockInsertBlock] or [storage.LockInsertOrFinalizeClusterBlock] (but not both)
16+
// and hold it until the database write has been committed.
17+
// - OVERWRITES existing data (potential for data corruption):
18+
// The lock proof serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK
19+
// is done elsewhere ATOMICALLY with this write operation. It is intended that this function is called only for new
20+
// blocks, i.e. no signature was previously persisted for it.
21+
//
22+
// No errors are expected during normal operation.
23+
func InsertProposalSignature(lctx lockctx.Proof, w storage.Writer, blockID flow.Identifier, sig *[]byte) error {
24+
held, errStr := storage.HeldOneLock(lctx, storage.LockInsertBlock, storage.LockInsertOrFinalizeClusterBlock)
25+
if !held {
26+
return fmt.Errorf("%s", errStr)
27+
}
28+
1129
return UpsertByKey(w, MakePrefix(codeBlockIDToProposalSignature, blockID), sig)
1230
}
1331

storage/procedure/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func InsertClusterBlock(lctx lockctx.Proof, rw storage.ReaderBatchWriter, propos
5353
}
5454

5555
// insert the block payload; without further overwrite checks (see above for explanation)
56-
err = operation.InsertProposalSignature(rw.Writer(), blockID, &proposal.ProposerSigData)
56+
err = operation.InsertProposalSignature(lctx, rw.Writer(), blockID, &proposal.ProposerSigData)
5757
if err != nil {
5858
return fmt.Errorf("could not insert proposer signature: %w", err)
5959
}

storage/store/headers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (h *Headers) storeTx(
8383
return err
8484
}
8585

86-
return h.sigs.storeTx(rw, blockID, proposalSig)
86+
return h.sigs.storeTx(lctx, rw, blockID, proposalSig)
8787
}
8888

8989
func (h *Headers) retrieveTx(blockID flow.Identifier) (*flow.Header, error) {

storage/store/proposal_signatures.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package store
22

33
import (
4+
"github.com/jordanschalm/lockctx"
5+
46
"github.com/onflow/flow-go/model/flow"
57
"github.com/onflow/flow-go/module"
68
"github.com/onflow/flow-go/module/metrics"
@@ -21,8 +23,8 @@ type proposalSignatures struct {
2123
// newProposalSignatures creates a proposalSignatures instance which is a database of block proposal signatures
2224
// which supports storing, caching and retrieving by block ID.
2325
func newProposalSignatures(collector module.CacheMetrics, db storage.DB) *proposalSignatures {
24-
store := func(rw storage.ReaderBatchWriter, blockID flow.Identifier, sig []byte) error {
25-
return operation.InsertProposalSignature(rw.Writer(), blockID, &sig)
26+
store := func(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, sig []byte) error {
27+
return operation.InsertProposalSignature(lctx, rw.Writer(), blockID, &sig)
2628
}
2729

2830
retrieve := func(r storage.Reader, blockID flow.Identifier) ([]byte, error) {
@@ -35,13 +37,24 @@ func newProposalSignatures(collector module.CacheMetrics, db storage.DB) *propos
3537
db: db,
3638
cache: newCache(collector, metrics.ResourceProposalSignature,
3739
withLimit[flow.Identifier, []byte](4*flow.DefaultTransactionExpiry),
38-
withStore(store),
40+
withStoreWithLock(store),
3941
withRetrieve(retrieve)),
4042
}
4143
}
4244

43-
func (h *proposalSignatures) storeTx(rw storage.ReaderBatchWriter, blockID flow.Identifier, sig []byte) error {
44-
return h.cache.PutTx(rw, blockID, sig)
45+
// storeTx persists the given `sig` as the proposer's signature for the specified block.
46+
//
47+
// CAUTION:
48+
// - The caller must acquire either the lock [storage.LockInsertBlock] or [storage.LockInsertOrFinalizeClusterBlock]
49+
// but not both and hold the lock until the database write has been committed.
50+
// - OVERWRITES existing data (potential for data corruption):
51+
// The lock proof serves as a reminder that the CALLER is responsible to ensure that the DEDUPLICATION CHECK
52+
// is done elsewhere ATOMICALLY with this write operation. It is intended that this function is called only for new
53+
// blocks, i.e. no signature was previously persisted for it.
54+
//
55+
// No error returns expected during normal operations.
56+
func (h *proposalSignatures) storeTx(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blockID flow.Identifier, sig []byte) error {
57+
return h.cache.PutWithLockTx(lctx, rw, blockID, sig)
4558
}
4659

4760
func (h *proposalSignatures) retrieveTx(blockID flow.Identifier) ([]byte, error) {

0 commit comments

Comments
 (0)