Skip to content

Commit 7f20965

Browse files
committed
Add storage locking for index operations
1 parent 95f1339 commit 7f20965

File tree

8 files changed

+77
-25
lines changed

8 files changed

+77
-25
lines changed

module/state_synchronization/indexer/indexer_core.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
171171
return fmt.Errorf("could not collect scheduled transaction data: %w", err)
172172
}
173173

174+
lctx := c.lockManager.NewContext()
175+
defer lctx.Release()
176+
if err = lctx.AcquireLock(storage.LockIndexScheduledTransaction); err != nil {
177+
return fmt.Errorf("could not acquire lock for indexing scheduled transactions: %w", err)
178+
}
179+
174180
err = c.protocolDB.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
175181
err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, rw)
176182
if err != nil {
@@ -183,7 +189,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
183189
}
184190

185191
for txID, scheduledTxID := range scheduledTransactionData {
186-
err = c.scheduledTransactions.BatchIndex(data.BlockID, txID, scheduledTxID, rw)
192+
err = c.scheduledTransactions.BatchIndex(lctx, data.BlockID, txID, scheduledTxID, rw)
187193
if err != nil {
188194
return fmt.Errorf("could not index scheduled transaction (%d) %s at height %d: %w", scheduledTxID, txID, header.Height, err)
189195
}

module/state_synchronization/indexer/indexer_core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
245245
test.collections.On("StoreAndIndexByTransaction", mock.Anything, collection).Return(&flow.LightCollection{}, nil)
246246
}
247247
for txID, scheduledTxID := range tf.ExpectedScheduledTransactions {
248-
test.scheduledTransactions.On("BatchIndex", blockID, txID, scheduledTxID, mock.Anything).Return(nil)
248+
test.scheduledTransactions.On("BatchIndex", mock.Anything, blockID, txID, scheduledTxID, mock.Anything).Return(nil)
249249
}
250250

251251
err := test.indexer.IndexBlockData(tf.ExecutionDataEntity())

storage/locks.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ const (
2727
LockInsertCollection = "lock_insert_collection"
2828
// LockBootstrapping protects data that is *exclusively* written during bootstrapping.
2929
LockBootstrapping = "lock_bootstrapping"
30-
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere
30+
// LockInsertChunkDataPack protects the insertion of chunk data packs (not yet used anywhere)
3131
LockInsertChunkDataPack = "lock_insert_chunk_data_pack"
32+
// LockIndexScheduledTransaction protects the indexing of scheduled transactions.
33+
LockIndexScheduledTransaction = "lock_index_scheduled_transaction"
3234
)
3335

3436
// Locks returns a list of all named locks used by the storage layer.
@@ -42,6 +44,7 @@ func Locks() []string {
4244
LockInsertCollection,
4345
LockBootstrapping,
4446
LockInsertChunkDataPack,
47+
LockIndexScheduledTransaction,
4548
}
4649
}
4750

storage/mock/scheduled_transactions.go

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

storage/operation/scheduled_transactions.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package operation
22

33
import (
4+
"fmt"
5+
6+
"github.com/jordanschalm/lockctx"
47
"github.com/onflow/flow-go/model/flow"
58
"github.com/onflow/flow-go/storage"
69
)
@@ -22,16 +25,44 @@ func RetrieveBlockIDByScheduledTransactionID(r storage.Reader, txID flow.Identif
2225
return RetrieveByKey(r, MakePrefix(codeBlockIDByScheduledTransactionID, txID), blockID)
2326
}
2427

25-
// BatchIndexScheduledTransactionID indexes the scheduled transaction's transaction ID by its scheduled transaction ID.
28+
// IndexScheduledTransactionID indexes the scheduled transaction's transaction ID by its scheduled transaction ID.
2629
//
27-
// No errors are expected during normal operation.
28-
func BatchIndexScheduledTransactionID(w storage.Writer, scheduledTxID uint64, txID flow.Identifier) error {
29-
return UpsertByKey(w, MakePrefix(codeTransactionIDByScheduledTransactionID, scheduledTxID), txID)
30+
// Expected error returns during normal operation:
31+
// - [storage.ErrAlreadyExists]: if the scheduled transaction ID is already indexed
32+
func IndexScheduledTransactionID(lctx lockctx.Proof, rw storage.ReaderBatchWriter, scheduledTxID uint64, txID flow.Identifier) error {
33+
if !lctx.HoldsLock(storage.LockIndexScheduledTransaction) {
34+
return fmt.Errorf("missing lock: %v", storage.LockIndexScheduledTransaction)
35+
}
36+
37+
key := MakePrefix(codeTransactionIDByScheduledTransactionID, scheduledTxID)
38+
exists, err := KeyExists(rw.GlobalReader(), key)
39+
if err != nil {
40+
return err
41+
}
42+
if exists {
43+
return fmt.Errorf("scheduled transaction ID already indexed: %w", storage.ErrAlreadyExists)
44+
}
45+
46+
return UpsertByKey(rw.Writer(), key, txID)
3047
}
3148

32-
// BatchIndexScheduledTransactionBlockID indexes the scheduled transaction's block ID by its transaction ID.
49+
// IndexScheduledTransactionBlockID indexes the scheduled transaction's block ID by its transaction ID.
3350
//
34-
// No errors are expected during normal operation.
35-
func BatchIndexScheduledTransactionBlockID(w storage.Writer, txID flow.Identifier, blockID flow.Identifier) error {
36-
return UpsertByKey(w, MakePrefix(codeBlockIDByScheduledTransactionID, txID), blockID)
51+
// Expected error returns during normal operation:
52+
// - [storage.ErrAlreadyExists]: if the scheduled transaction block ID is already indexed
53+
func IndexScheduledTransactionBlockID(lctx lockctx.Proof, rw storage.ReaderBatchWriter, txID flow.Identifier, blockID flow.Identifier) error {
54+
if !lctx.HoldsLock(storage.LockIndexScheduledTransaction) {
55+
return fmt.Errorf("missing lock: %v", storage.LockIndexScheduledTransaction)
56+
}
57+
58+
key := MakePrefix(codeBlockIDByScheduledTransactionID, txID)
59+
exists, err := KeyExists(rw.GlobalReader(), key)
60+
if err != nil {
61+
return err
62+
}
63+
if exists {
64+
return fmt.Errorf("scheduled transaction block ID already indexed: %w", storage.ErrAlreadyExists)
65+
}
66+
67+
return UpsertByKey(rw.Writer(), key, blockID)
3768
}

storage/scheduled_transactions.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package storage
22

33
import (
4+
"github.com/jordanschalm/lockctx"
45
"github.com/onflow/flow-go/model/flow"
56
)
67

@@ -30,7 +31,8 @@ type ScheduledTransactions interface {
3031
// BatchIndex indexes the scheduled transaction by its block ID, transaction ID, and scheduled transaction ID.
3132
// `scheduledTxID` is the uint64 id field returned by the system smart contract.
3233
// `txID` is be the TransactionBody.ID of the scheduled transaction.
34+
// Requires the lock: [storage.LockIndexScheduledTransaction]
3335
//
3436
// No errors are expected during normal operation.
35-
BatchIndex(blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch ReaderBatchWriter) error
37+
BatchIndex(lctx lockctx.Proof, blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch ReaderBatchWriter) error
3638
}

storage/store/scheduled_transactions.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package store
33
import (
44
"fmt"
55

6+
"github.com/jordanschalm/lockctx"
67
"github.com/onflow/flow-go/model/flow"
78
"github.com/onflow/flow-go/module"
89
"github.com/onflow/flow-go/module/metrics"
@@ -57,19 +58,19 @@ func NewScheduledTransactions(collector module.CacheMetrics, db storage.DB, cach
5758
}
5859

5960
// BatchIndex indexes the scheduled transaction by its block ID, transaction ID, and scheduled transaction ID.
60-
// `scheduledTxID` is the uint64 id field returned by the system smart contract.
6161
// `txID` is be the TransactionBody.ID of the scheduled transaction.
62+
// `scheduledTxID` is the uint64 id field returned by the system smart contract.
63+
// Requires the lock: [storage.LockIndexScheduledTransaction]
6264
//
63-
// No errors are expected during normal operation.
64-
func (st *ScheduledTransactions) BatchIndex(blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch storage.ReaderBatchWriter) error {
65-
writer := batch.Writer()
66-
67-
err := operation.BatchIndexScheduledTransactionID(writer, scheduledTxID, txID)
65+
// Expected error returns during normal operation:
66+
// - [storage.ErrAlreadyExists]: if the scheduled transaction is already indexed
67+
func (st *ScheduledTransactions) BatchIndex(lctx lockctx.Proof, blockID flow.Identifier, txID flow.Identifier, scheduledTxID uint64, batch storage.ReaderBatchWriter) error {
68+
err := operation.IndexScheduledTransactionID(lctx, batch, scheduledTxID, txID)
6869
if err != nil {
6970
return fmt.Errorf("failed to batch index scheduled transaction: %w", err)
7071
}
7172

72-
err = operation.BatchIndexScheduledTransactionBlockID(writer, txID, blockID)
73+
err = operation.IndexScheduledTransactionBlockID(lctx, batch, txID, blockID)
7374
if err != nil {
7475
return fmt.Errorf("failed to batch index scheduled transaction block ID: %w", err)
7576
}

storage/store/scheduled_transactions_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,19 @@ func runTest(t *testing.T, g *fixtures.GeneratorSuite, db storage.DB, store *Sch
3838
data[txID] = g.Random().Uint64()
3939
}
4040

41+
lockManager := storage.NewTestingLockManager()
42+
lctx := lockManager.NewContext()
43+
defer lctx.Release()
44+
if err := lctx.AcquireLock(storage.LockIndexScheduledTransaction); err != nil {
45+
t.Fatalf("could not acquire lock for indexing scheduled transactions: %v", err)
46+
}
47+
4148
// index data within a batch
4249
batch := db.NewBatch()
4350
defer batch.Close()
4451

4552
for txID, scheduledTxID := range data {
46-
err := store.BatchIndex(blockID, txID, scheduledTxID, batch)
53+
err := store.BatchIndex(lctx, blockID, txID, scheduledTxID, batch)
4754
require.NoError(t, err)
4855
}
4956

0 commit comments

Comments
 (0)