Skip to content

Commit 7c815f3

Browse files
committed
Sync progress of optimistic block validation across builder instances via redis
1 parent d8a0d7b commit 7c815f3

File tree

2 files changed

+73
-2
lines changed

2 files changed

+73
-2
lines changed

datastore/redis.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ var (
2424
redisPrefix = "boost-relay"
2525

2626
expiryBidCache = 45 * time.Second
27+
expiryLock = 24 * time.Second
2728

2829
RedisConfigFieldPubkey = "pubkey"
2930
RedisStatsFieldLatestSlot = "latest-slot"
@@ -91,6 +92,7 @@ type RedisCache struct {
9192
prefixTopBidValue string
9293
prefixFloorBid string
9394
prefixFloorBidValue string
95+
prefixProcessingSlot string
9496

9597
// keys
9698
keyValidatorRegistrationTimestamp string
@@ -101,6 +103,8 @@ type RedisCache struct {
101103
keyBlockBuilderStatus string
102104
keyLastSlotDelivered string
103105
keyLastHashDelivered string
106+
107+
currentSlot uint64
104108
}
105109

106110
func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
@@ -132,6 +136,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
132136
prefixTopBidValue: fmt.Sprintf("%s/%s:top-bid-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
133137
prefixFloorBid: fmt.Sprintf("%s/%s:bid-floor", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
134138
prefixFloorBidValue: fmt.Sprintf("%s/%s:bid-floor-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
139+
prefixProcessingSlot: fmt.Sprintf("%s/%s:processing-slot", redisPrefix, prefix), // prefix:slot
135140

136141
keyValidatorRegistrationTimestamp: fmt.Sprintf("%s/%s:validator-registration-timestamp", redisPrefix, prefix),
137142
keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix),
@@ -190,6 +195,11 @@ func (r *RedisCache) keyFloorBidValue(slot uint64, parentHash, proposerPubkey st
190195
return fmt.Sprintf("%s:%d_%s_%s", r.prefixFloorBidValue, slot, parentHash, proposerPubkey)
191196
}
192197

198+
// keyProcessingSlot returns the key for the counter of builder processes working on a given slot
199+
func (r *RedisCache) keyProcessingSlot(slot uint64) string {
200+
return fmt.Sprintf("%s:%d", r.prefixProcessingSlot, slot)
201+
}
202+
193203
func (r *RedisCache) GetObj(key string, obj any) (err error) {
194204
value, err := r.client.Get(context.Background(), key).Result()
195205
if err != nil {
@@ -800,6 +810,51 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v
800810
return err
801811
}
802812

813+
// BeginProcessingSlot signals that a builder process is handling blocks for a given slot
814+
func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) {
815+
// Should never process more than one slot at a time
816+
if r.currentSlot != 0 {
817+
return fmt.Errorf("already processing slot %d", r.currentSlot)
818+
}
819+
820+
keyProcessingSlot := r.keyProcessingSlot(slot)
821+
err = r.client.Incr(ctx, keyProcessingSlot).Err()
822+
if err != nil {
823+
return err
824+
}
825+
r.currentSlot = slot
826+
err = r.client.Expire(ctx, keyProcessingSlot, expiryLock).Err()
827+
return err
828+
}
829+
830+
// EndProcessingSlot signals that a builder process is done handling blocks for the current slot
831+
func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) {
832+
// Do not decrement if called multiple times
833+
if r.currentSlot == 0 {
834+
return nil
835+
}
836+
837+
keyProcessingSlot := r.keyProcessingSlot(r.currentSlot)
838+
err = r.client.Decr(ctx, keyProcessingSlot).Err()
839+
r.currentSlot = 0
840+
return err
841+
}
842+
843+
// WaitForSlotComplete waits for a slot to be completed by all builder processes
844+
func (r *RedisCache) WaitForSlotComplete(ctx context.Context, slot uint64) (err error) {
845+
keyProcessingSlot := r.keyProcessingSlot(slot)
846+
for {
847+
processing, err := r.client.Get(ctx, keyProcessingSlot).Uint64()
848+
if err != nil {
849+
return err
850+
}
851+
if processing == 0 {
852+
return nil
853+
}
854+
time.Sleep(50 * time.Millisecond)
855+
}
856+
}
857+
803858
func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint
804859
return r.client.Pipeline()
805860
}

services/api/service.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ func (api *RelayAPI) IsReady() bool {
516516
// - Stop returning bids
517517
// - Set ready /readyz to negative status
518518
// - Wait a bit to allow removal of service from load balancer and draining of requests
519+
// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock
519520
func (api *RelayAPI) StopServer() (err error) {
520521
// avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status
521522
if wasStopping := api.srvShutdown.Swap(true); wasStopping {
@@ -538,6 +539,10 @@ func (api *RelayAPI) StopServer() (err error) {
538539
// wait for any active getPayload call to finish
539540
api.getPayloadCallsInFlight.Wait()
540541

542+
// wait for optimistic blocks
543+
api.optimisticBlocksWG.Wait()
544+
api.redis.EndProcessingSlot(context.Background())
545+
541546
// shutdown
542547
return api.srv.Shutdown(context.Background())
543548
}
@@ -826,10 +831,21 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) {
826831
}
827832

828833
func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64) {
829-
// Wait until there are no optimistic blocks being processed. Then we can
830-
// safely update the slot.
834+
// First wait for this process to finish processing optimistic blocks
831835
api.optimisticBlocksWG.Wait()
836+
837+
// Now we release our lock and wait for all other builder processes to wrap up
838+
api.redis.EndProcessingSlot(context.Background())
839+
api.redis.WaitForSlotComplete(context.Background(), headSlot)
840+
841+
// Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down
842+
if api.srvShutdown.Load() {
843+
return
844+
}
845+
846+
// Update the optimistic slot and signal processing of the next slot
832847
api.optimisticSlot.Store(headSlot + 1)
848+
api.redis.BeginProcessingSlot(context.Background(), headSlot + 1)
833849

834850
builders, err := api.db.GetBlockBuilders()
835851
if err != nil {

0 commit comments

Comments
 (0)