Skip to content

Commit 18be100

Browse files
authored
Decouple block processing (ethereum#129)
* tmp relay reporting via rpc * decouple consumeBuiltBlock service and init it * reg builder flag * fix lint * add retry logic * cleanup old test * fix tests * go mod tidy
1 parent 03ee71c commit 18be100

File tree

13 files changed

+86
-474
lines changed

13 files changed

+86
-474
lines changed

builder/builder.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/flashbots/go-boost-utils/utils"
3030
"github.com/holiman/uint256"
3131
"golang.org/x/time/rate"
32+
33+
"github.com/cenkalti/backoff/v4"
3234
)
3335

3436
const (
@@ -64,6 +66,7 @@ type IBuilder interface {
6466

6567
type Builder struct {
6668
ds flashbotsextra.IDatabaseService
69+
blockConsumer flashbotsextra.BlockConsumer
6770
relay IRelay
6871
eth IEthereumService
6972
dryRun bool
@@ -91,6 +94,7 @@ type Builder struct {
9194
type BuilderArgs struct {
9295
sk *bls.SecretKey
9396
ds flashbotsextra.IDatabaseService
97+
blockConsumer flashbotsextra.BlockConsumer
9498
relay IRelay
9599
builderSigningDomain phase0.Domain
96100
builderBlockResubmitInterval time.Duration
@@ -130,6 +134,7 @@ func NewBuilder(args BuilderArgs) (*Builder, error) {
130134
slotCtx, slotCtxCancel := context.WithCancel(context.Background())
131135
return &Builder{
132136
ds: args.ds,
137+
blockConsumer: args.blockConsumer,
133138
relay: args.relay,
134139
eth: args.eth,
135140
dryRun: args.dryRun,
@@ -267,7 +272,7 @@ func (b *Builder) submitBellatrixBlock(block *types.Block, blockValue *big.Int,
267272
log.Error("could not validate bellatrix block", "err", err)
268273
}
269274
} else {
270-
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
275+
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
271276
err = b.relay.SubmitBlock(&blockSubmitReq, vd)
272277
if err != nil {
273278
log.Error("could not submit bellatrix block", "err", err, "#commitedBundles", len(commitedBundles))
@@ -326,7 +331,7 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
326331
log.Error("could not validate block for capella", "err", err)
327332
}
328333
} else {
329-
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
334+
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
330335
err = b.relay.SubmitBlockCapella(&blockSubmitReq, vd)
331336
if err != nil {
332337
log.Error("could not submit capella block", "err", err, "#commitedBundles", len(commitedBundles))
@@ -338,6 +343,19 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
338343
return nil
339344
}
340345

346+
func (b *Builder) processBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) {
347+
back := backoff.NewExponentialBackOff()
348+
back.MaxInterval = 3 * time.Second
349+
back.MaxElapsedTime = 12 * time.Second
350+
err := backoff.Retry(func() error {
351+
return b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace)
352+
}, back)
353+
if err != nil {
354+
log.Error("could not consume built block", "err", err)
355+
} else {
356+
log.Info("successfully relayed block data to consumer")
357+
}
358+
}
341359
func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) error {
342360
if attrs == nil {
343361
return nil

builder/builder_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func TestOnPayloadAttributes(t *testing.T) {
9797
validator: nil,
9898
beaconClient: &testBeacon,
9999
limiter: nil,
100+
blockConsumer: flashbotsextra.NilDbService{},
100101
}
101102
builder, err := NewBuilder(builderArgs)
102103
require.NoError(t, err)

builder/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Config struct {
2828
BuilderSubmissionOffset time.Duration `toml:",omitempty"`
2929
DiscardRevertibleTxOnErr bool `toml:",omitempty"`
3030
EnableCancellations bool `toml:",omitempty"`
31+
BlockProcessorURL string `toml:",omitempty"`
3132
}
3233

3334
// DefaultConfig is the default config for the builder.

builder/local_relay_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func newTestBackend(t *testing.T, forkchoiceData *engine.ExecutableData, block *
5656
validator: nil,
5757
beaconClient: beaconClient,
5858
limiter: nil,
59+
blockConsumer: flashbotsextra.NilDbService{},
5960
}
6061
backend, _ := NewBuilder(builderArgs)
6162

builder/service.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,15 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
250250
submissionOffset = SubmissionOffsetFromEndOfSlotSecondsDefault
251251
}
252252

253+
var blockConsumer flashbotsextra.BlockConsumer
254+
rpcURL := cfg.BlockProcessorURL
255+
if rpcURL != "" {
256+
blockConsumer = flashbotsextra.NewRpcBlockClient(rpcURL)
257+
} else {
258+
log.Warn("Block consumer url is empty. Built block data reporting is essentially disabled")
259+
blockConsumer = flashbotsextra.NilDbService{}
260+
}
261+
253262
// TODO: move to proper flags
254263
var ds flashbotsextra.IDatabaseService
255264
dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN")
@@ -282,6 +291,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
282291

283292
builderArgs := BuilderArgs{
284293
sk: builderSk,
294+
blockConsumer: blockConsumer,
285295
ds: ds,
286296
dryRun: cfg.DryRun,
287297
eth: ethereumService,

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ var (
184184
utils.BuilderSubmissionOffset,
185185
utils.BuilderDiscardRevertibleTxOnErr,
186186
utils.BuilderEnableCancellations,
187+
utils.BuilderBlockProcessorURL,
187188
}
188189

189190
rpcFlags = []cli.Flag{

cmd/utils/flags.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,12 @@ var (
893893
Category: flags.BuilderCategory,
894894
}
895895

896+
BuilderBlockProcessorURL = &cli.StringFlag{
897+
Name: "builder.block_processor_url",
898+
Usage: "RPC URL for the block processor",
899+
Category: flags.BuilderCategory,
900+
}
901+
896902
// RPC settings
897903
IPCDisabledFlag = &cli.BoolFlag{
898904
Name: "ipcdisable",
@@ -1724,6 +1730,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) {
17241730
cfg.DiscardRevertibleTxOnErr = ctx.Bool(BuilderDiscardRevertibleTxOnErr.Name)
17251731
cfg.EnableCancellations = ctx.IsSet(BuilderEnableCancellations.Name)
17261732
cfg.BuilderRateLimitResubmitInterval = ctx.String(BuilderBlockResubmitInterval.Name)
1733+
1734+
cfg.BlockProcessorURL = ctx.String(BuilderBlockProcessorURL.Name)
17271735
}
17281736

17291737
// SetNodeConfig applies node-related command line flags to the config.

0 commit comments

Comments
 (0)