Skip to content

Commit 076315b

Browse files
authored
Try fixing registerValidator cache misses (#720)
* maybe fix for regval cache misses * more logs * more logging * cleanup * cache full registration, instead of timestamp only
1 parent fdbb01a commit 076315b

File tree

5 files changed

+118
-111
lines changed

5 files changed

+118
-111
lines changed

datastore/datastore.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,7 @@ func (ds *Datastore) SaveValidatorRegistration(entry builderApiV1.SignedValidato
192192
return errors.Wrap(err, "failed saving validator registration to database")
193193
}
194194

195-
// then save in redis
196-
pk := common.NewPubkeyHex(entry.Message.Pubkey.String())
197-
err = ds.redis.SetValidatorRegistrationTimestampIfNewer(pk, uint64(entry.Message.Timestamp.Unix())) //nolint:gosec
195+
err = ds.redis.SetValidatorRegistrationData(entry.Message)
198196
if err != nil {
199197
return errors.Wrap(err, "failed saving validator registration to redis")
200198
}

datastore/redis.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
builderApi "github.com/attestantio/go-builder-client/api"
1414
builderApiDeneb "github.com/attestantio/go-builder-client/api/deneb"
15+
builderApiV1 "github.com/attestantio/go-builder-client/api/v1"
1516
builderSpec "github.com/attestantio/go-builder-client/spec"
1617
"github.com/attestantio/go-eth2-client/spec"
1718
"github.com/attestantio/go-eth2-client/spec/capella"
@@ -95,7 +96,7 @@ type RedisCache struct {
9596
prefixFloorBidValue string
9697

9798
// keys
98-
keyValidatorRegistrationTimestamp string
99+
keyValidatorRegistrationData string
99100

100101
keyRelayConfig string
101102
keyStats string
@@ -136,8 +137,8 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
136137
prefixFloorBid: fmt.Sprintf("%s/%s:bid-floor", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
137138
prefixFloorBidValue: fmt.Sprintf("%s/%s:bid-floor-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
138139

139-
keyValidatorRegistrationTimestamp: fmt.Sprintf("%s/%s:validator-registration-timestamp", redisPrefix, prefix),
140-
keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix),
140+
keyValidatorRegistrationData: fmt.Sprintf("%s/%s:validator-registration-data", redisPrefix, prefix),
141+
keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix),
141142

142143
keyStats: fmt.Sprintf("%s/%s:stats", redisPrefix, prefix),
143144
keyProposerDuties: fmt.Sprintf("%s/%s:proposer-duties", redisPrefix, prefix),
@@ -239,27 +240,26 @@ func (r *RedisCache) HSetObj(key, field string, value any, expiration time.Durat
239240
return r.client.Expire(context.Background(), key, expiration).Err()
240241
}
241242

242-
func (r *RedisCache) GetValidatorRegistrationTimestamp(proposerPubkey common.PubkeyHex) (uint64, error) {
243-
timestamp, err := r.client.HGet(context.Background(), r.keyValidatorRegistrationTimestamp, strings.ToLower(proposerPubkey.String())).Uint64()
243+
func (r *RedisCache) GetValidatorRegistrationData(proposerPubkey common.PubkeyHex) (*builderApiV1.ValidatorRegistration, error) {
244+
data := new(builderApiV1.ValidatorRegistration)
245+
pk := strings.ToLower(proposerPubkey.String())
246+
247+
dataRaw, err := r.client.HGet(context.Background(), r.keyValidatorRegistrationData, pk).Result()
244248
if errors.Is(err, redis.Nil) {
245-
return 0, nil
249+
return nil, nil
246250
}
247-
return timestamp, err
251+
err = data.UnmarshalSSZ([]byte(dataRaw))
252+
return data, err
248253
}
249254

250-
func (r *RedisCache) SetValidatorRegistrationTimestampIfNewer(proposerPubkey common.PubkeyHex, timestamp uint64) error {
251-
knownTimestamp, err := r.GetValidatorRegistrationTimestamp(proposerPubkey)
255+
func (r *RedisCache) SetValidatorRegistrationData(data *builderApiV1.ValidatorRegistration) error {
256+
pk := strings.ToLower(data.Pubkey.String())
257+
258+
dataBytes, err := data.MarshalSSZ()
252259
if err != nil {
253260
return err
254261
}
255-
if knownTimestamp >= timestamp {
256-
return nil
257-
}
258-
return r.SetValidatorRegistrationTimestamp(proposerPubkey, timestamp)
259-
}
260-
261-
func (r *RedisCache) SetValidatorRegistrationTimestamp(proposerPubkey common.PubkeyHex, timestamp uint64) error {
262-
return r.client.HSet(context.Background(), r.keyValidatorRegistrationTimestamp, proposerPubkey.String(), timestamp).Err()
262+
return r.client.HSet(context.Background(), r.keyValidatorRegistrationData, pk, dataBytes).Err()
263263
}
264264

265265
func (r *RedisCache) CheckAndSetLastSlotAndHashDelivered(slot uint64, hash string) (err error) {

datastore/redis_test.go

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,52 +38,36 @@ func TestRedisValidatorRegistration(t *testing.T) {
3838
cache := setupTestRedis(t)
3939

4040
t.Run("Can save and get validator registration from cache", func(t *testing.T) {
41-
key := common.ValidPayloadRegisterValidator.Message.Pubkey
42-
value := common.ValidPayloadRegisterValidator
43-
pkHex := common.NewPubkeyHex(key.String())
44-
err := cache.SetValidatorRegistrationTimestamp(pkHex, uint64(value.Message.Timestamp.Unix())) //nolint:gosec
41+
data := common.ValidPayloadRegisterValidator.Message
42+
43+
err := cache.SetValidatorRegistrationData(data)
4544
require.NoError(t, err)
46-
result, err := cache.GetValidatorRegistrationTimestamp(common.NewPubkeyHex(key.String()))
45+
46+
result, err := cache.GetValidatorRegistrationData(common.NewPubkeyHex(data.Pubkey.String()))
4747
require.NoError(t, err)
48-
require.Equal(t, result, uint64(value.Message.Timestamp.Unix())) //nolint:gosec
48+
require.NotNil(t, result)
49+
require.Equal(t, result.Timestamp.UTC(), data.Timestamp.UTC())
50+
require.Equal(t, result.FeeRecipient, data.FeeRecipient)
51+
require.Equal(t, result.GasLimit, data.GasLimit)
4952
})
5053

5154
t.Run("Returns nil if validator registration is not in cache", func(t *testing.T) {
5255
key := phase0.BLSPubKey{}
53-
result, err := cache.GetValidatorRegistrationTimestamp(common.NewPubkeyHex(key.String()))
56+
result, err := cache.GetValidatorRegistrationData(common.NewPubkeyHex(key.String()))
5457
require.NoError(t, err)
55-
require.Equal(t, uint64(0), result)
58+
require.Nil(t, result)
5659
})
5760

5861
t.Run("test SetValidatorRegistrationTimestampIfNewer", func(t *testing.T) {
59-
key := common.ValidPayloadRegisterValidator.Message.Pubkey
60-
value := common.ValidPayloadRegisterValidator
61-
62-
pkHex := common.NewPubkeyHex(key.String())
63-
timestamp := uint64(value.Message.Timestamp.Unix()) //nolint:gosec
64-
65-
err := cache.SetValidatorRegistrationTimestampIfNewer(pkHex, timestamp)
66-
require.NoError(t, err)
62+
data := *common.ValidPayloadRegisterValidator.Message
63+
pkHex := common.NewPubkeyHex(data.Pubkey.String())
6764

68-
result, err := cache.GetValidatorRegistrationTimestamp(common.NewPubkeyHex(key.String()))
65+
err := cache.SetValidatorRegistrationData(&data)
6966
require.NoError(t, err)
70-
require.Equal(t, result, timestamp)
7167

72-
// Try to set an older timestamp (should not work)
73-
timestamp2 := timestamp - 10
74-
err = cache.SetValidatorRegistrationTimestampIfNewer(pkHex, timestamp2)
75-
require.NoError(t, err)
76-
result, err = cache.GetValidatorRegistrationTimestamp(common.NewPubkeyHex(key.String()))
77-
require.NoError(t, err)
78-
require.Equal(t, result, timestamp)
79-
80-
// Try to set an older timestamp (should not work)
81-
timestamp3 := timestamp + 10
82-
err = cache.SetValidatorRegistrationTimestampIfNewer(pkHex, timestamp3)
83-
require.NoError(t, err)
84-
result, err = cache.GetValidatorRegistrationTimestamp(common.NewPubkeyHex(key.String()))
68+
result, err := cache.GetValidatorRegistrationData(pkHex)
8569
require.NoError(t, err)
86-
require.Equal(t, result, timestamp3)
70+
require.Equal(t, result.Timestamp.UTC(), common.ValidPayloadRegisterValidator.Message.Timestamp.UTC())
8771
})
8872
}
8973

services/api/service.go

Lines changed: 75 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -997,17 +997,13 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re
997997
start := time.Now().UTC()
998998
registrationTimestampUpperBound := start.Unix() + 10 // 10 seconds from now
999999

1000-
numRegTotal := 0
10011000
numRegProcessed := 0
1002-
numRegActive := 0
10031001
numRegNew := 0
1004-
processingStoppedByError := false
10051002

10061003
// Setup error handling
1007-
handleError := func(_log *logrus.Entry, code int, msg string) {
1008-
processingStoppedByError = true
1009-
_log.Warnf("error: %s", msg)
1010-
api.RespondError(w, code, msg)
1004+
logAndReturnError := func(_log *logrus.Entry, code int, userMsg string, err error) {
1005+
_log.WithError(err).Warnf("error: %s", userMsg)
1006+
api.RespondError(w, code, userMsg)
10111007
}
10121008

10131009
// Start processing
@@ -1036,29 +1032,53 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re
10361032
}
10371033
req.Body.Close()
10381034

1035+
//
10391036
// Parse the registrations
1040-
signedValidatorRegistrations, err := api.fastRegistrationParsing(regBytes, proposerContentType)
1041-
if err != nil {
1042-
handleError(log, http.StatusBadRequest, err.Error())
1043-
return
1044-
}
1037+
//
1038+
signedValidatorRegistrations := new(builderApiV1.SignedValidatorRegistrations)
1039+
if proposerContentType == ApplicationOctetStream {
1040+
// Registrations in SSZ
1041+
log = log.WithField("is_ssz", true)
1042+
log.Debug("Parsing registrations as SSZ")
10451043

1046-
// Iterate over the registrations
1047-
for regIndex, signedValidatorRegistration := range signedValidatorRegistrations.Registrations {
1048-
numRegTotal += 1
1049-
if processingStoppedByError {
1044+
timeStart := time.Now()
1045+
err := signedValidatorRegistrations.UnmarshalSSZ(regBytes)
1046+
if err != nil {
1047+
logAndReturnError(log, http.StatusBadRequest, err.Error(), err)
10501048
return
10511049
}
1050+
log.WithFields(logrus.Fields{
1051+
"sszDecodeDurationMs": time.Since(timeStart).Milliseconds(),
1052+
"numRegistrations": len(signedValidatorRegistrations.Registrations),
1053+
}).Debug("Parsed registrations as SSZ")
1054+
} else {
1055+
// Registrations in JSON
1056+
log = log.WithField("is_ssz", false)
1057+
api.log.Debug("Parsing registrations as JSON")
1058+
1059+
timeStart := time.Now()
1060+
signedValidatorRegistrations, err = api.parseValidatorRegistrationsJSON(regBytes)
1061+
if err != nil {
1062+
logAndReturnError(log, http.StatusBadRequest, err.Error(), err)
1063+
return
1064+
}
1065+
log.WithFields(logrus.Fields{
1066+
"jsonDecodeDurationMs": time.Since(timeStart).Milliseconds(),
1067+
"numRegistrations": len(signedValidatorRegistrations.Registrations),
1068+
}).Debug("Parsed registrations as JSON")
1069+
}
1070+
1071+
//
1072+
// Iterate over the registrations and process them
1073+
//
1074+
for regIndex, signedValidatorRegistration := range signedValidatorRegistrations.Registrations {
10521075
numRegProcessed += 1
1076+
pkHex := common.NewPubkeyHex(signedValidatorRegistration.Message.Pubkey.String())
1077+
10531078
regLog := log.WithFields(logrus.Fields{
10541079
"regIndex": regIndex,
1055-
"numRegistrationsSoFar": numRegTotal,
10561080
"numRegistrationsProcessed": numRegProcessed,
1057-
})
10581081

1059-
// Add validator pubkey to logs
1060-
pkHex := common.PubkeyHex(signedValidatorRegistration.Message.Pubkey.String())
1061-
regLog = regLog.WithFields(logrus.Fields{
10621082
"pubkey": pkHex,
10631083
"signature": signedValidatorRegistration.Signature.String(),
10641084
"feeRecipient": signedValidatorRegistration.Message.FeeRecipient.String(),
@@ -1069,45 +1089,60 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re
10691089
// Ensure a valid timestamp (not too early, and not too far in the future)
10701090
registrationTimestamp := signedValidatorRegistration.Message.Timestamp.Unix()
10711091
if registrationTimestamp < int64(api.genesisInfo.Data.GenesisTime) { //nolint:gosec
1072-
handleError(regLog, http.StatusBadRequest, "timestamp too early")
1073-
return
1092+
logAndReturnError(regLog, http.StatusBadRequest, "timestamp too early", nil)
1093+
break
10741094
} else if registrationTimestamp > registrationTimestampUpperBound {
1075-
handleError(regLog, http.StatusBadRequest, "timestamp too far in the future")
1076-
return
1095+
logAndReturnError(regLog, http.StatusBadRequest, "timestamp too far in the future", nil)
1096+
break
10771097
}
10781098

10791099
// Check if a real validator
10801100
isKnownValidator := api.datastore.IsKnownValidator(pkHex)
10811101
if !isKnownValidator {
1082-
handleError(regLog, http.StatusBadRequest, fmt.Sprintf("not a known validator: %s", pkHex))
1083-
return
1102+
logAndReturnError(regLog, http.StatusBadRequest, fmt.Sprintf("not a known validator: %s", pkHex), nil)
1103+
break
10841104
}
10851105

1086-
// Check for a previous registration timestamp
1087-
prevTimestamp, err := api.redis.GetValidatorRegistrationTimestamp(pkHex)
1106+
// Check for a previous registration timestamp and see if fields changed
1107+
cachedRegistrationData, err := api.redis.GetValidatorRegistrationData(pkHex)
1108+
haveCachedRegistration := cachedRegistrationData != nil
1109+
10881110
if err != nil {
1089-
regLog.WithError(err).Error("error getting last registration timestamp")
1090-
} else if prevTimestamp >= uint64(signedValidatorRegistration.Message.Timestamp.Unix()) { //nolint:gosec
1091-
// abort if the current registration timestamp is older or equal to the last known one
1092-
return
1111+
regLog.WithError(err).Error("error getting last registration") // maybe a Redis error. continue to validation + processing
1112+
} else if haveCachedRegistration {
1113+
// See if we can discard (if no fields changed, or old timestamp)
1114+
isChangedFeeRecipient := cachedRegistrationData.FeeRecipient != signedValidatorRegistration.Message.FeeRecipient
1115+
isChangedGasLimit := cachedRegistrationData.GasLimit != signedValidatorRegistration.Message.GasLimit
1116+
isNewerTimestamp := signedValidatorRegistration.Message.Timestamp.UTC().Unix() > cachedRegistrationData.Timestamp.UTC().Unix()
1117+
1118+
// If key fields haven't changed, can just discard without signature validation
1119+
if !isChangedFeeRecipient && !isChangedGasLimit {
1120+
continue
1121+
}
1122+
1123+
// Ensure it's not a replay of an old registration
1124+
if !isNewerTimestamp {
1125+
continue
1126+
}
10931127
}
10941128

10951129
// Verify the signature
1130+
regLog.Debug("verifying BLS signature...")
10961131
ok, err := ssz.VerifySignature(signedValidatorRegistration.Message, api.opts.EthNetDetails.DomainBuilder, signedValidatorRegistration.Message.Pubkey[:], signedValidatorRegistration.Signature[:])
10971132
if err != nil {
10981133
regLog.WithError(err).Error("error verifying registerValidator signature")
1099-
return
1134+
break
11001135
} else if !ok {
11011136
regLog.Info("invalid validator signature")
11021137
if api.ffRegValContinueOnInvalidSig {
1103-
return
1138+
continue
11041139
} else {
1105-
handleError(regLog, http.StatusBadRequest, "failed to verify validator signature for "+signedValidatorRegistration.Message.Pubkey.String())
1106-
return
1140+
logAndReturnError(regLog, http.StatusBadRequest, "failed to verify validator signature for "+signedValidatorRegistration.Message.Pubkey.String(), err)
1141+
break
11071142
}
11081143
}
11091144

1110-
// Now we have a new registration to process
1145+
// Now we have a new registration to process (store in DB + Cache)
11111146
numRegNew += 1
11121147

11131148
// Save to database
@@ -1121,18 +1156,11 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re
11211156
log = log.WithFields(logrus.Fields{
11221157
"timeNeededSec": time.Since(start).Seconds(),
11231158
"timeNeededMs": time.Since(start).Milliseconds(),
1124-
"numRegistrations": numRegTotal,
1125-
"numRegistrationsActive": numRegActive,
1159+
"numRegistrations": len(signedValidatorRegistrations.Registrations),
11261160
"numRegistrationsProcessed": numRegProcessed,
11271161
"numRegistrationsNew": numRegNew,
1128-
"processingStoppedByError": processingStoppedByError,
11291162
})
11301163

1131-
if err != nil {
1132-
handleError(log, http.StatusBadRequest, "error in traversing json")
1133-
return
1134-
}
1135-
11361164
// notify that new registrations are available
11371165
select {
11381166
case api.validatorUpdateCh <- struct{}{}:
@@ -1143,19 +1171,9 @@ func (api *RelayAPI) handleRegisterValidator(w http.ResponseWriter, req *http.Re
11431171
w.WriteHeader(http.StatusOK)
11441172
}
11451173

1146-
func (api *RelayAPI) fastRegistrationParsing(regBytes []byte, contentType string) (*builderApiV1.SignedValidatorRegistrations, error) {
1174+
func (api *RelayAPI) parseValidatorRegistrationsJSON(regBytes []byte) (*builderApiV1.SignedValidatorRegistrations, error) {
11471175
signedValidatorRegistrations := new(builderApiV1.SignedValidatorRegistrations)
11481176

1149-
// Parse registrations as SSZ
1150-
if contentType == ApplicationOctetStream {
1151-
api.log.Debug("Parsing registrations as SSZ")
1152-
err := signedValidatorRegistrations.UnmarshalSSZ(regBytes)
1153-
if err != nil {
1154-
return nil, err
1155-
}
1156-
return signedValidatorRegistrations, nil
1157-
}
1158-
11591177
// Parse registrations as JSON
11601178
parseRegistration := func(value []byte) (reg *builderApiV1.SignedValidatorRegistration, err error) {
11611179
// Pubkey
@@ -1231,7 +1249,6 @@ func (api *RelayAPI) fastRegistrationParsing(regBytes []byte, contentType string
12311249
}
12321250

12331251
var parseErr error
1234-
api.log.Debug("Parsing registrations as JSON")
12351252
_, forEachErr := jsonparser.ArrayEach(regBytes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
12361253
if err != nil {
12371254
parseErr = err

services/housekeeper/housekeeper.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,15 @@ func (hk *Housekeeper) updateValidatorRegistrationsInRedis() {
260260
timeStarted := time.Now()
261261

262262
for _, reg := range regs {
263-
err = hk.redis.SetValidatorRegistrationTimestampIfNewer(common.NewPubkeyHex(reg.Pubkey), reg.Timestamp)
263+
// convert DB data to original struct
264+
data, err := reg.ToSignedValidatorRegistration()
265+
if err != nil {
266+
hk.log.WithError(err).Error("failed to convert validator registration entry to signed validator registration")
267+
continue
268+
}
269+
270+
// save to Redis
271+
err = hk.redis.SetValidatorRegistrationData(data.Message)
264272
if err != nil {
265273
hk.log.WithError(err).Error("failed to set validator registration")
266274
continue

0 commit comments

Comments
 (0)