Skip to content

Commit 072904d

Browse files
committed
Merge branch 'master' into leo/refactor-index-protocol-kv-store
2 parents 30355af + 7b2b9e8 commit 072904d

File tree

110 files changed

+3288
-1235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+3288
-1235
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ type AccessNodeConfig struct {
161161
rpcMetricsEnabled bool
162162
executionDataSyncEnabled bool
163163
publicNetworkExecutionDataEnabled bool
164-
executionDataDBMode string
165164
executionDataPrunerHeightRangeTarget uint64
166165
executionDataPrunerThreshold uint64
167166
executionDataPruningInterval time.Duration
@@ -277,7 +276,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
277276
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
278277
},
279278
executionDataIndexingEnabled: false,
280-
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
281279
executionDataPrunerHeightRangeTarget: 0,
282280
executionDataPrunerThreshold: pruner.DefaultThreshold,
283281
executionDataPruningInterval: pruner.DefaultPruningInterval,
@@ -591,7 +589,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
591589
Module("execution data datastore and blobstore", func(node *cmd.NodeConfig) error {
592590
var err error
593591
builder.ExecutionDatastoreManager, err = edstorage.CreateDatastoreManager(
594-
node.Logger, builder.executionDataDir, builder.executionDataDBMode)
592+
node.Logger, builder.executionDataDir)
595593
if err != nil {
596594
return fmt.Errorf("could not create execution data datastore manager: %w", err)
597595
}
@@ -1362,10 +1360,13 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
13621360
"execution-data-max-retry-delay",
13631361
defaultConfig.executionDataConfig.MaxRetryDelay,
13641362
"maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
1365-
flags.StringVar(&builder.executionDataDBMode,
1363+
1364+
var builderexecutionDataDBMode string
1365+
flags.StringVar(&builderexecutionDataDBMode,
13661366
"execution-data-db",
1367-
defaultConfig.executionDataDBMode,
1368-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
1367+
"pebble",
1368+
"[deprecated] the DB type for execution datastore")
1369+
13691370
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
13701371
"execution-data-height-range-target",
13711372
defaultConfig.executionDataPrunerHeightRangeTarget,

cmd/bootstrap/cmd/key.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,27 +100,30 @@ func keyCmdRun(_ *cobra.Command, _ []string) {
100100
}
101101
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeID)
102102

103-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeInfoPriv, nodeInfo.NodeID), flagOutdir, private)
103+
privKeyPath := fmt.Sprintf(model.PathNodeInfoPriv, nodeInfo.NodeID)
104+
err = common.WriteJSON(privKeyPath, flagOutdir, private)
104105
if err != nil {
105106
log.Fatal().Err(err).Msg("failed to write json")
106107
}
107-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeInfoPriv)
108+
log.Info().Msgf("wrote file %s/%s", flagOutdir, privKeyPath)
108109

109-
err = common.WriteText(fmt.Sprintf(model.PathSecretsEncryptionKey, nodeInfo.NodeID), flagOutdir, secretsDBKey)
110+
secretsKeyPath := fmt.Sprintf(model.PathSecretsEncryptionKey, nodeInfo.NodeID)
111+
err = common.WriteText(secretsKeyPath, flagOutdir, secretsDBKey)
110112
if err != nil {
111113
log.Fatal().Err(err).Msg("failed to write file")
112114
}
113-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathSecretsEncryptionKey)
115+
log.Info().Msgf("wrote file %s/%s", flagOutdir, secretsKeyPath)
114116

115117
public, err := nodeInfo.Public()
116118
if err != nil {
117119
log.Fatal().Err(err).Msg("could not access public keys")
118120
}
119-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeInfoPub, nodeInfo.NodeID), flagOutdir, public)
121+
pubNodeInfoPath := fmt.Sprintf(model.PathNodeInfoPub, nodeInfo.NodeID)
122+
err = common.WriteJSON(pubNodeInfoPath, flagOutdir, public)
120123
if err != nil {
121124
log.Fatal().Err(err).Msg("failed to write json")
122125
}
123-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeInfoPub)
126+
log.Info().Msgf("wrote file %s/%s", flagOutdir, pubNodeInfoPath)
124127

125128
// write machine account info
126129
if role == flow.RoleCollection || role == flow.RoleConsensus {
@@ -134,11 +137,12 @@ func keyCmdRun(_ *cobra.Command, _ []string) {
134137
log.Debug().Str("address", flagAddress).Msg("assembling machine account information")
135138
// write the public key to terminal for entry in Flow Port
136139
machineAccountPriv := assembleNodeMachineAccountKey(machineKey)
137-
err = common.WriteJSON(fmt.Sprintf(model.PathNodeMachineAccountPrivateKey, nodeInfo.NodeID), flagOutdir, machineAccountPriv)
140+
privateKeyPath := fmt.Sprintf(model.PathNodeMachineAccountPrivateKey, nodeInfo.NodeID)
141+
err = common.WriteJSON(privateKeyPath, flagOutdir, machineAccountPriv)
138142
if err != nil {
139143
log.Fatal().Err(err).Msg("failed to write json")
140144
}
141-
log.Info().Msgf("wrote file %s/%s", flagOutdir, model.PathNodeMachineAccountPrivateKey)
145+
log.Info().Msgf("wrote file %s/%s", flagOutdir, privateKeyPath)
142146
}
143147
}
144148

cmd/execution_builder.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ func (exeNode *ExecutionNode) LoadSyncCore(node *NodeConfig) error {
334334
func (exeNode *ExecutionNode) LoadExecutionStorage(
335335
node *NodeConfig,
336336
) error {
337+
var err error
337338
db := node.ProtocolDB
338339

339340
exeNode.events = store.NewEvents(node.Metrics.Cache, db)
@@ -342,7 +343,10 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
342343
exeNode.results = store.NewExecutionResults(node.Metrics.Cache, db)
343344
exeNode.receipts = store.NewExecutionReceipts(node.Metrics.Cache, db, exeNode.results, storage.DefaultCacheSize)
344345
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
345-
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
346+
exeNode.txResults, err = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)
347+
if err != nil {
348+
return err
349+
}
346350
exeNode.eventsReader = exeNode.events
347351
exeNode.commitsReader = exeNode.commits
348352
exeNode.resultsReader = exeNode.results
@@ -716,7 +720,7 @@ func (exeNode *ExecutionNode) LoadExecutionDataDatastore(
716720
node *NodeConfig,
717721
) (err error) {
718722
exeNode.executionDataDatastore, err = edstorage.CreateDatastoreManager(
719-
node.Logger, exeNode.exeConf.executionDataDir, exeNode.exeConf.executionDataDBMode)
723+
node.Logger, exeNode.exeConf.executionDataDir)
720724
if err != nil {
721725
return fmt.Errorf("could not create execution data datastore manager: %w", err)
722726
}

cmd/execution_config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ type ExecutionConfig struct {
5959
importCheckpointWorkerCount int
6060
transactionExecutionMetricsEnabled bool
6161
transactionExecutionMetricsBufferSize uint
62-
executionDataDBMode string
6362
scheduleCallbacksEnabled bool
6463

6564
computationConfig computation.ComputationConfig
@@ -136,10 +135,12 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
136135
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
137136
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
138137
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
139-
flags.StringVar(&exeConf.executionDataDBMode,
138+
139+
var exeConfExecutionDataDBMode string
140+
flags.StringVar(&exeConfExecutionDataDBMode,
140141
"execution-data-db",
141142
execution_data.ExecutionDataDBModePebble.String(),
142-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
143+
"[deprecated] the DB type for execution datastore. it's been deprecated")
143144

144145
flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes")
145146
flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false")

cmd/observer/node_builder/observer_builder.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ type ObserverServiceConfig struct {
154154
logTxTimeToSealed bool
155155
executionDataSyncEnabled bool
156156
executionDataIndexingEnabled bool
157-
executionDataDBMode string
158157
executionDataPrunerHeightRangeTarget uint64
159158
executionDataPrunerThreshold uint64
160159
executionDataPruningInterval time.Duration
@@ -234,7 +233,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
234233
logTxTimeToSealed: false,
235234
executionDataSyncEnabled: false,
236235
executionDataIndexingEnabled: false,
237-
executionDataDBMode: execution_data.ExecutionDataDBModePebble.String(),
238236
executionDataPrunerHeightRangeTarget: 0,
239237
executionDataPrunerThreshold: pruner.DefaultThreshold,
240238
executionDataPruningInterval: pruner.DefaultPruningInterval,
@@ -712,10 +710,9 @@ func (builder *ObserverServiceBuilder) extraFlags() {
712710
flags.BoolVar(&builder.localServiceAPIEnabled, "local-service-api-enabled", defaultConfig.localServiceAPIEnabled, "whether to use local indexed data for api queries")
713711
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
714712
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")
715-
flags.StringVar(&builder.executionDataDBMode,
716-
"execution-data-db",
717-
defaultConfig.executionDataDBMode,
718-
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
713+
714+
var builderExecutionDataDBMode string
715+
flags.StringVar(&builderExecutionDataDBMode, "execution-data-db", "pebble", "[deprecated] the DB type for execution datastore.")
719716

720717
// Execution data pruner
721718
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
@@ -1110,7 +1107,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
11101107
var execDataDistributor *edrequester.ExecutionDataDistributor
11111108
var execDataCacheBackend *herocache.BlockExecutionData
11121109
var executionDataStoreCache *execdatacache.ExecutionDataCache
1113-
var executionDataDBMode execution_data.ExecutionDataDBMode
11141110

11151111
// setup dependency chain to ensure indexer starts after the requester
11161112
requesterDependable := module.NewProxiedReadyDoneAware()
@@ -1129,20 +1125,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
11291125
return err
11301126
}
11311127

1132-
executionDataDBMode, err = execution_data.ParseExecutionDataDBMode(builder.executionDataDBMode)
1128+
builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(
1129+
node.Logger.With().Str("pebbledb", "endata").Logger(),
1130+
datastoreDir, nil)
11331131
if err != nil {
1134-
return fmt.Errorf("could not parse execution data DB mode: %w", err)
1135-
}
1136-
1137-
if executionDataDBMode == execution_data.ExecutionDataDBModePebble {
1138-
builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(
1139-
node.Logger.With().Str("pebbledb", "endata").Logger(),
1140-
datastoreDir, nil)
1141-
if err != nil {
1142-
return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err)
1143-
}
1144-
} else {
1145-
return fmt.Errorf("datastore with badger has been deprecated, please use pebble instead")
1132+
return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err)
11461133
}
11471134
ds = builder.ExecutionDatastoreManager.Datastore()
11481135

cmd/util/cmd/read-badger/cmd/transaction_results.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ var transactionResultsCmd = &cobra.Command{
2525
Short: "get transaction-result by block ID",
2626
RunE: func(cmd *cobra.Command, args []string) error {
2727
return common.WithStorage(flagDatadir, func(db storage.DB) error {
28-
transactionResults := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1)
28+
transactionResults, err := store.NewTransactionResults(metrics.NewNoopCollector(), db, 1)
29+
if err != nil {
30+
return err
31+
}
2932
storages := common.InitStorages(db)
3033
log.Info().Msgf("got flag block id: %s", flagBlockID)
3134
blockID, err := flow.HexStringToIdentifier(flagBlockID)

cmd/util/cmd/read-light-block/read_light_block_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ func TestReadClusterRange(t *testing.T) {
2525
err := unittest.WithLock(t, lockManager, storage.LockInsertOrFinalizeClusterBlock, func(lctx lockctx.Context) error {
2626
// add parent as boundary
2727
err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
28-
return operation.IndexClusterBlockHeight(lctx, rw.Writer(), parent.ChainID, parent.Height, parent.ID())
28+
return operation.IndexClusterBlockHeight(lctx, rw, parent.ChainID, parent.Height, parent.ID())
2929
})
3030
if err != nil {
3131
return err
3232
}
3333

3434
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
35-
return operation.UpsertClusterFinalizedHeight(lctx, rw.Writer(), parent.ChainID, parent.Height)
35+
return operation.BootstrapClusterFinalizedHeight(lctx, rw, parent.ChainID, parent.Height)
3636
})
3737
})
3838
require.NoError(t, err)

cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ func runE(*cobra.Command, []string) error {
6767

6868
metrics := &metrics.NoopCollector{}
6969

70-
transactionResults := store.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
70+
transactionResults, err := store.NewTransactionResults(metrics, db, badger.DefaultCacheSize)
71+
if err != nil {
72+
return err
73+
}
7174
commits := store.NewCommits(metrics, db)
7275
results := store.NewExecutionResults(metrics, db)
7376
receipts := store.NewExecutionReceipts(metrics, db, results, badger.DefaultCacheSize)

cmd/util/cmd/rollback-executed-height/cmd/rollback_executed_height_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func TestReExecuteBlock(t *testing.T) {
4141
all := store.InitAll(metrics, db)
4242
headers := all.Headers
4343
blocks := all.Blocks
44-
txResults := store.NewTransactionResults(metrics, db, store.DefaultCacheSize)
44+
txResults, err := store.NewTransactionResults(metrics, db, store.DefaultCacheSize)
45+
require.NoError(t, err)
4546
commits := store.NewCommits(metrics, db)
4647
chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), store.NewCollections(db, store.NewTransactions(metrics, db)), store.DefaultCacheSize)
4748
results := all.Results
@@ -201,7 +202,8 @@ func TestReExecuteBlockWithDifferentResult(t *testing.T) {
201202
transactions := store.NewTransactions(metrics, db)
202203
collections := store.NewCollections(db, transactions)
203204
chunkDataPacks := store.NewChunkDataPacks(metrics, pebbleimpl.ToDB(pdb), collections, bstorage.DefaultCacheSize)
204-
txResults := store.NewTransactionResults(metrics, db, bstorage.DefaultCacheSize)
205+
txResults, err := store.NewTransactionResults(metrics, db, bstorage.DefaultCacheSize)
206+
require.NoError(t, err)
205207

206208
err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error {
207209
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {

engine/access/access_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package access_test
33
import (
44
"context"
55
"encoding/json"
6-
"os"
76
"testing"
87

98
"github.com/cockroachdb/pebble/v2"
@@ -93,7 +92,7 @@ func TestAccess(t *testing.T) {
9392

9493
func (suite *Suite) SetupTest() {
9594
suite.lockManager = storage.NewTestingLockManager()
96-
suite.log = zerolog.New(os.Stderr)
95+
suite.log = unittest.Logger()
9796
suite.net = new(mocknetwork.EngineRegistry)
9897
suite.state = new(protocol.State)
9998
suite.finalSnapshot = new(protocol.Snapshot)
@@ -774,6 +773,10 @@ func (suite *Suite) TestGetSealedTransaction() {
774773
ctx := irrecoverable.NewMockSignalerContext(suite.T(), background)
775774
ingestEng.Start(ctx)
776775
<-ingestEng.Ready()
776+
defer func() {
777+
cancel()
778+
<-ingestEng.Done()
779+
}()
777780

778781
// 2. Ingest engine was notified by the follower engine about a new block.
779782
// Follower engine --> Ingest engine

0 commit comments

Comments
 (0)