Skip to content

Commit 7a02570

Browse files
authored
Merge pull request #473 from shutter-network/batch_transition
Avoid crash when sending from inactive keyper
2 parents 393eba9 + 24902ec commit 7a02570

File tree

3 files changed

+48
-34
lines changed

3 files changed

+48
-34
lines changed

rolling-shutter/keyper/fx/messagesender.go

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/base64"
88
"encoding/binary"
99
"fmt"
10+
"sync/atomic"
1011

1112
"github.com/pkg/errors"
1213
"github.com/rs/zerolog/log"
@@ -31,34 +32,6 @@ type RemoteError struct {
3132

3233
var _ IRetriable = &RemoteError{}
3334

34-
type NonRetriableError struct {
35-
Err error
36-
}
37-
38-
func (*NonRetriableError) IsRetriable() bool {
39-
return false
40-
}
41-
42-
func (e *NonRetriableError) Error() string {
43-
return e.Err.Error()
44-
}
45-
46-
func (e *NonRetriableError) Unwrap() error {
47-
return e.Err
48-
}
49-
50-
var _ IRetriable = &NonRetriableError{}
51-
52-
// IsRetriable checks if we should retry an action that resulted in the given error.
53-
func IsRetriable(err error) bool {
54-
switch e := err.(type) {
55-
case IRetriable:
56-
return e.IsRetriable()
57-
default:
58-
return true
59-
}
60-
}
61-
6235
func (remoteError *RemoteError) Error() string {
6336
return fmt.Sprintf("remote error: %s", remoteError.msg)
6437
}
@@ -74,9 +47,10 @@ type MessageSender interface {
7447

7548
// RPCMessageSender signs messages and sends them via RPC to shuttermint.
7649
type RPCMessageSender struct {
77-
rpcclient client.Client
78-
chainID string
79-
signingKey *ecdsa.PrivateKey
50+
rpcclient client.Client
51+
chainID string
52+
signingKey *ecdsa.PrivateKey
53+
AllowedToSend *atomic.Bool
8054
}
8155

8256
var _ MessageSender = &RPCMessageSender{}
@@ -92,15 +66,21 @@ var mockMessageSenderBufferSize = 0x10000
9266

9367
// NewRPCMessageSender creates a new RPCMessageSender.
9468
func NewRPCMessageSender(cl client.Client, signingKey *ecdsa.PrivateKey) RPCMessageSender {
95-
return RPCMessageSender{
69+
ms := RPCMessageSender{
9670
rpcclient: cl,
9771
chainID: "",
9872
signingKey: signingKey,
9973
}
74+
ms.AllowedToSend.Store(false)
75+
return ms
10076
}
10177

10278
// SendMessage signs the given shmsg.Message and sends the message to shuttermint.
10379
func (ms *RPCMessageSender) SendMessage(ctx context.Context, msg *shmsg.Message) error {
80+
if !ms.AllowedToSend.Load() {
81+
log.Info().Str("msg", msg.String()).Msg("not allowed to send")
82+
return nil
83+
}
10484
if err := ms.maybeFetchChainID(ctx); err != nil {
10585
return err
10686
}

rolling-shutter/keyper/fx/send.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ func SendShutterMessages(
3131
}
3232
err = messageSender.SendMessage(ctx, msg)
3333
if err != nil {
34-
return err // XXX retry
34+
if !isRetrieable(msg) {
35+
log.Err(err).Str("msg", msg.String()).Msg("sending non-retrieable msg failed")
36+
return err
37+
}
38+
log.Info().Str("msg", msg.String()).Msg("msg not accepted, will be retried")
39+
return nil
3540
}
3641
log.Info().Int32("id", outgoing.ID).
3742
Str("description", outgoing.Description).
@@ -42,3 +47,8 @@ func SendShutterMessages(
4247
}
4348
}
4449
}
50+
51+
// FIXME: isRetrieable is a no-op so far.
52+
func isRetrieable(_ *shmsg.Message) bool {
53+
return true
54+
}

rolling-shutter/keyper/keyper.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ func (kpr *KeyperCore) handleOnChainKeyperSetChanges(
289289
return err
290290
}
291291
if lastSent == keyperSet.KeyperConfigIndex {
292+
log.Debug().
293+
Int64("keyper-config-index", keyperSet.KeyperConfigIndex).
294+
Msg("batch config already sent (scheduled).")
292295
return nil
293296
}
294297

@@ -309,6 +312,10 @@ func (kpr *KeyperCore) handleOnChainKeyperSetChanges(
309312

310313
err = q.SetLastBatchConfigSent(ctx, keyperSet.KeyperConfigIndex)
311314
if err != nil {
315+
log.Warn().Err(err).
316+
Interface("keyper-set", keyperSet).
317+
Int64("keyper-config-index", keyperSet.KeyperConfigIndex).
318+
Msg("error when setting last batch config sent. Returning nil.")
312319
return nil
313320
}
314321

@@ -358,7 +365,9 @@ func (kpr *KeyperCore) operateShuttermint(ctx context.Context, _ service.Runner)
358365
if err != nil {
359366
return err
360367
}
361-
368+
if !kpr.messageSender.AllowedToSend.Load() {
369+
allowSendIfInKeyperSet(ctx, database.New(kpr.dbpool), syncBlockNumber, kpr)
370+
}
362371
err = fx.SendShutterMessages(ctx, database.New(kpr.dbpool), &kpr.messageSender)
363372
if err != nil {
364373
return err
@@ -370,3 +379,18 @@ func (kpr *KeyperCore) operateShuttermint(ctx context.Context, _ service.Runner)
370379
}
371380
}
372381
}
382+
383+
func allowSendIfInKeyperSet(ctx context.Context, queries *database.Queries, syncBlockNumber uint64, kpr *KeyperCore) {
384+
count, err := queries.CountBatchConfigsInBlockRangeWithKeyper(ctx,
385+
database.CountBatchConfigsInBlockRangeWithKeyperParams{
386+
KeyperAddress: []string{kpr.config.GetAddress().String()},
387+
StartBlock: 0,
388+
EndBlock: int64(syncBlockNumber),
389+
})
390+
if err != nil {
391+
log.Err(err).Msg("could not query if in keyper set")
392+
}
393+
if count > 0 {
394+
kpr.messageSender.AllowedToSend.Store(true)
395+
}
396+
}

0 commit comments

Comments
 (0)