Skip to content

Commit 084d5e3

Browse files
committed
Allow for coordinator to notify other relayers of an across message
1 parent 4d80bbd commit 084d5e3

File tree

5 files changed

+125
-19
lines changed

5 files changed

+125
-19
lines changed

cache/signature.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,21 @@ const (
1818

1919
type SignatureCache struct {
2020
sigCache *ttlcache.Cache[string, []byte]
21-
22-
comm comm.Communication
23-
subID comm.SubscriptionID
21+
comm comm.Communication
2422
}
2523

2624
func NewSignatureCache(ctx context.Context, c comm.Communication, sigChn chan interface{}) *SignatureCache {
2725
cache := ttlcache.New(
2826
ttlcache.WithTTL[string, []byte](SIGNATURE_TTL),
2927
)
3028

31-
msgChn := make(chan *comm.WrappedMessage)
32-
subID := c.Subscribe(comm.SignatureSessionID, comm.SignatureMsg, msgChn)
33-
3429
sc := &SignatureCache{
3530
sigCache: cache,
36-
subID: subID,
3731
comm: c,
3832
}
3933

4034
go cache.Start()
41-
go sc.watch(ctx, sigChn, msgChn)
35+
go sc.watch(ctx, sigChn)
4236
return sc
4337
}
4438

@@ -51,7 +45,10 @@ func (s *SignatureCache) Signature(id string) ([]byte, error) {
5145
return sig.Value(), nil
5246
}
5347

54-
func (s *SignatureCache) watch(ctx context.Context, sigChn chan interface{}, msgChn chan *comm.WrappedMessage) {
48+
func (s *SignatureCache) watch(ctx context.Context, sigChn chan interface{}) {
49+
msgChn := make(chan *comm.WrappedMessage)
50+
subID := s.comm.Subscribe(comm.SignatureSessionID, comm.SignatureMsg, msgChn)
51+
5552
for {
5653
select {
5754
case sig := <-sigChn:
@@ -73,7 +70,7 @@ func (s *SignatureCache) watch(ctx context.Context, sigChn chan interface{}, msg
7370
case <-ctx.Done():
7471
{
7572
s.sigCache.Stop()
76-
s.comm.UnSubscribe(s.subID)
73+
s.comm.UnSubscribe(subID)
7774
return
7875
}
7976
}

chains/evm/message/across.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ import (
1111
"github.com/ethereum/go-ethereum/common"
1212
"github.com/ethereum/go-ethereum/core/types"
1313
"github.com/libp2p/go-libp2p/core/host"
14+
"github.com/libp2p/go-libp2p/core/peer"
15+
"github.com/rs/zerolog/log"
1416
"github.com/sprintertech/sprinter-signing/chains/evm/calls/events"
1517
"github.com/sprintertech/sprinter-signing/comm"
1618
"github.com/sprintertech/sprinter-signing/tss"
1719
"github.com/sprintertech/sprinter-signing/tss/ecdsa/signing"
20+
tssMessage "github.com/sprintertech/sprinter-signing/tss/message"
1821
"github.com/sygmaprotocol/sygma-core/relayer/message"
1922
"github.com/sygmaprotocol/sygma-core/relayer/proposal"
2023
)
@@ -28,7 +31,8 @@ type EventFilterer interface {
2831
}
2932

3033
type AcrossData struct {
31-
depositId *big.Int
34+
depositId *big.Int
35+
coordinator peer.ID
3236
}
3337

3438
func NewAcrossMessage(source uint8, destination uint8, acrossData AcrossData) *message.Message {
@@ -47,19 +51,56 @@ type AcrossMessageHandler struct {
4751
across common.Address
4852
abi abi.ABI
4953

50-
coordinator *tss.Coordinator
51-
host host.Host
52-
communication comm.Communication
53-
fetcher signing.SaveDataFetcher
54+
coordinator *tss.Coordinator
55+
host host.Host
56+
comm comm.Communication
57+
fetcher signing.SaveDataFetcher
5458

55-
resultChn chan interface{}
59+
sigChn chan interface{}
60+
}
61+
62+
func (h *AcrossMessageHandler) Listen(ctx context.Context) {
63+
msgChn := make(chan *comm.WrappedMessage)
64+
subID := h.comm.Subscribe(comm.AcrossSessionID, comm.AcrossMsg, msgChn)
65+
66+
for {
67+
select {
68+
case wMsg := <-msgChn:
69+
{
70+
acrossMsg, err := tssMessage.UnmarshalAcrossMessage(wMsg.Payload)
71+
if err != nil {
72+
log.Warn().Msgf("Failed unmarshaling across message: %s", err)
73+
continue
74+
}
75+
76+
msg := NewAcrossMessage(acrossMsg.Source, acrossMsg.Destination, AcrossData{
77+
depositId: acrossMsg.DepositId,
78+
coordinator: wMsg.From,
79+
})
80+
_, err = h.HandleMessage(msg)
81+
if err != nil {
82+
log.Err(err).Msgf("Failed handling across message %+v because of: %s", acrossMsg, err)
83+
}
84+
}
85+
case <-ctx.Done():
86+
{
87+
h.comm.UnSubscribe(subID)
88+
return
89+
}
90+
}
91+
}
5692
}
5793

5894
// HandleMessage finds the Across deposit with the according deposit ID and starts
5995
// the MPC signature process for it. The result will be saved into the signature
6096
// cache through the result channel.
6197
func (h *AcrossMessageHandler) HandleMessage(m *message.Message) (*proposal.Proposal, error) {
6298
data := m.Data.(AcrossData)
99+
err := h.notify(m, data)
100+
if err != nil {
101+
return nil, err
102+
}
103+
63104
d, err := h.deposit(data.depositId)
64105
if err != nil {
65106
return nil, err
@@ -70,19 +111,28 @@ func (h *AcrossMessageHandler) HandleMessage(m *message.Message) (*proposal.Prop
70111
data.depositId.Text(16),
71112
data.depositId.Text(16),
72113
h.host,
73-
h.communication,
114+
h.comm,
74115
h.fetcher)
75116
if err != nil {
76117
return nil, err
77118
}
78119

79-
err = h.coordinator.Execute(context.Background(), []tss.TssProcess{signing}, h.resultChn, h.host.ID())
120+
err = h.coordinator.Execute(context.Background(), []tss.TssProcess{signing}, h.sigChn, data.coordinator)
80121
if err != nil {
81122
return nil, err
82123
}
83124
return nil, nil
84125
}
85126

127+
func (h *AcrossMessageHandler) notify(m *message.Message, data AcrossData) error {
128+
msgBytes, err := tssMessage.MarshalAcrossMessage(data.depositId, m.Source, m.Destination)
129+
if err != nil {
130+
return err
131+
}
132+
133+
return h.comm.Broadcast(h.host.Peerstore().Peers(), msgBytes, comm.AcrossMsg, comm.AcrossSessionID)
134+
}
135+
86136
func (h *AcrossMessageHandler) deposit(depositId *big.Int) (*events.AcrossDeposit, error) {
87137
q := ethereum.FilterQuery{
88138
Addresses: []common.Address{

comm/messages.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@ const (
3535
CoordinatorPingResponseMsg
3636
// SignatureMsg message type is used to the share the signature to all relayers
3737
SignatureMsg
38+
// AcrossMsg message type is used for the process coordinator to share across data
39+
AcrossMsg
3840
// Unknown message type
3941
Unknown
4042
)
4143

4244
const (
4345
SignatureSessionID = "signatures"
46+
AcrossSessionID = "across"
4447
)
4548

4649
// String implements fmt.Stringer

tss/message/message.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package message
55

66
import (
77
"encoding/json"
8+
"math/big"
89
)
910

1011
type TssMessage struct {
@@ -91,3 +92,34 @@ func UnmarshalSignatureMessage(msgBytes []byte) (*SignatureMessage, error) {
9192

9293
return msg, nil
9394
}
95+
96+
type AcrossMessage struct {
97+
DepositId *big.Int `json:"depositId"`
98+
Source uint8 `json:"source"`
99+
Destination uint8 `json:"destination"`
100+
}
101+
102+
func MarshalAcrossMessage(depositId *big.Int, source, destination uint8) ([]byte, error) {
103+
signatureMessage := &AcrossMessage{
104+
DepositId: depositId,
105+
Source: source,
106+
Destination: destination,
107+
}
108+
109+
msgBytes, err := json.Marshal(signatureMessage)
110+
if err != nil {
111+
return []byte{}, err
112+
}
113+
114+
return msgBytes, nil
115+
}
116+
117+
func UnmarshalAcrossMessage(msgBytes []byte) (*AcrossMessage, error) {
118+
msg := &AcrossMessage{}
119+
err := json.Unmarshal(msgBytes, msg)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
return msg, nil
125+
}

tss/message/message_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package message_test
55

66
import (
7+
"math/big"
78
"testing"
89

910
"github.com/sprintertech/sprinter-signing/tss/message"
@@ -58,7 +59,7 @@ type SignatureMessageTestSuite struct {
5859
}
5960

6061
func TestRunSignatureMessageTestSuite(t *testing.T) {
61-
suite.Run(t, new(StartMessageTestSuite))
62+
suite.Run(t, new(SignatureMessageTestSuite))
6263
}
6364

6465
func (s *SignatureMessageTestSuite) Test_UnmarshaledMessageShouldBeEqual() {
@@ -74,3 +75,26 @@ func (s *SignatureMessageTestSuite) Test_UnmarshaledMessageShouldBeEqual() {
7475

7576
s.Equal(originalMsg, unmarshaledMsg)
7677
}
78+
79+
type AcrossMessageTestSuite struct {
80+
suite.Suite
81+
}
82+
83+
func TestRunAcrossMessageTestSuite(t *testing.T) {
84+
suite.Run(t, new(AcrossMessageTestSuite))
85+
}
86+
87+
func (s *AcrossMessageTestSuite) Test_UnmarshaledMessageShouldBeEqual() {
88+
originalMsg := &message.AcrossMessage{
89+
DepositId: big.NewInt(100),
90+
Source: 1,
91+
Destination: 2,
92+
}
93+
msgBytes, err := message.MarshalAcrossMessage(originalMsg.DepositId, originalMsg.Source, originalMsg.Destination)
94+
s.Nil(err)
95+
96+
unmarshaledMsg, err := message.UnmarshalAcrossMessage(msgBytes)
97+
s.Nil(err)
98+
99+
s.Equal(originalMsg, unmarshaledMsg)
100+
}

0 commit comments

Comments
 (0)