Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
opts = append(opts, blob.WithReprovideInterval(-1))
}

if !builder.BitswapBloomCacheEnabled {
opts = append(opts, blob.WithSkipBloomCache(true))
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDatastoreManager.Datastore(), opts...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (exeNode *ExecutionNode) LoadBlobService(
opts = append(opts, blob.WithReprovideInterval(-1))
}

if !node.BitswapBloomCacheEnabled {
opts = append(opts, blob.WithSkipBloomCache(true))
}

if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 {
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ type BaseConfig struct {
// This is only meaningful to Access and Execution nodes.
BitswapReprovideEnabled bool

// BitswapBloomCacheEnabled configures whether the Bitswap bloom cache is enabled.
// When disabled, uses a plain blockstore instead of cached blockstore, avoiding
// the CPU cost of building the bloom filter on startup. Pebble's built-in bloom
// filters (persisted in SSTables) are still used for efficient lookups.
// This is only meaningful to Access and Execution nodes.
BitswapBloomCacheEnabled bool

TransactionFeesDisabled bool
}

Expand Down Expand Up @@ -297,12 +304,13 @@ func DefaultBaseConfig() *BaseConfig {
Duration: 10 * time.Second,
},

HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
BitswapBloomCacheEnabled: true, // default: use cached blockstore TODO leo: change default to false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases also passed when I changed the default to false.

}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,10 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
),
}

if !builder.BitswapBloomCacheEnabled {
opts = append(opts, blob.WithSkipBloomCache(true))
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
"bitswap-reprovide-enabled",
defaultConfig.BitswapReprovideEnabled,
"[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.")
fnb.flags.BoolVar(&fnb.BaseConfig.BitswapBloomCacheEnabled,
"bitswap-bloom-cache-enabled",
defaultConfig.BitswapBloomCacheEnabled,
"[experimental] whether to enable bitswap bloom cache. When disabled, uses a plain blockstore instead of cached blockstore, avoiding the CPU cost of building the bloom filter on startup. Pebble's built-in bloom filters (persisted in SSTables) are still used. This is an experimental feature. Use with caution.")

// dynamic node startup flags
fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey,
Expand Down
12 changes: 12 additions & 0 deletions integration/tests/access/cohort3/execution_state_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,25 @@ func (s *ExecutionStateSyncSuite) executionDataForHeight(ctx context.Context, no
BlockId: header.ID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
})

if err != nil {
s.log.Info().
Uint64("height", height).
Hex("block_id", header.ID[:]).
Err(err).
Msg("failed to get execution data")
return err
}

blockED, err = convert.MessageToBlockExecutionData(ed.GetBlockExecutionData(), flow.Localnet.Chain())
s.Require().NoError(err, "could not convert execution data")

s.log.Info().
Uint64("height", height).
Hex("block_id", header.ID[:]).
Int("chunks", len(blockED.ChunkExecutionDatas)).
Msg("successfully retrieved execution data")

return err
}), "could not get execution data for block %d", height)

Expand Down
35 changes: 27 additions & 8 deletions network/p2p/blob/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var _ component.Component = (*blobService)(nil)
type BlobServiceConfig struct {
ReprovideInterval time.Duration // the interval at which the DHT provider entries are refreshed
BitswapOptions []bitswap.Option // options to pass to the Bitswap service
SkipBloomCache bool // if true, skip the bloom cache and use plain blockstore
}

// WithReprovideInterval sets the interval at which DHT provider entries are refreshed
Expand Down Expand Up @@ -98,6 +99,15 @@ func WithRateLimit(r float64, b int) network.BlobServiceOption {
}
}

// WithSkipBloomCache disables the bloom cache, using a plain blockstore instead.
// This avoids the CPU cost of building the bloom filter on startup by scanning all keys.
// Pebble's built-in bloom filters (persisted in SSTables) are still used for efficient lookups.
func WithSkipBloomCache(skip bool) network.BlobServiceOption {
return func(bs network.BlobService) {
bs.(*blobService).config.SkipBloomCache = skip
}
}

// NewBlobService creates a new BlobService.
func NewBlobService(
host host.Host,
Expand All @@ -109,26 +119,35 @@ func NewBlobService(
opts ...network.BlobServiceOption,
) (*blobService, error) {
bsNetwork := bsnet.NewFromIpfsHost(host, r, bsnet.Prefix(protocol.ID(prefix)))
blockStore, err := blockstore.CachedBlockstore(
context.Background(),
blockstore.NewBlockstore(ds),
blockstore.DefaultCacheOpts(),
)
if err != nil {
return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
}

blockStore := blockstore.NewBlockstore(ds)

bs := &blobService{
prefix: prefix,
config: &BlobServiceConfig{
ReprovideInterval: DefaultReprovideInterval,
SkipBloomCache: false, // default: use cached blockstore
},
blockStore: blockStore,
}

// Apply options before creating blockstore, as SkipBloomCache affects blockstore creation
for _, opt := range opts {
opt(bs)
}

if bs.config.SkipBloomCache {
cachedBlockStore, err := blockstore.CachedBlockstore(
context.Background(),
blockStore,
blockstore.DefaultCacheOpts(),
)
if err != nil {
return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
}
bs.blockStore = cachedBlockStore
}

cm := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
btswp := bitswap.New(ctx, bsNetwork, bs.blockStore, bs.config.BitswapOptions...)
Expand Down
Loading