Skip to content

Commit ea9a798

Browse files
Prevent nonce gaps for untracked transactions (#646)
* Prevent nonce gaps for untracked transactions * Update nonce management logic * Update tests --------- Co-authored-by: Augustus <[email protected]>
1 parent 1f21ec7 commit ea9a798

File tree

5 files changed

+110
-133
lines changed

5 files changed

+110
-133
lines changed

relayer/pkg/chainlink/txm/txm.go

Lines changed: 28 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import (
2323
)
2424

2525
const (
26-
MaxQueueLen = 1000
26+
MaxQueueLen = 1000
27+
ConfirmationThreshold = 4
2728
)
2829

2930
type TxManager interface {
@@ -188,7 +189,7 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun
188189
if accountNonceErr != nil {
189190
return txhash, fmt.Errorf("failed to check account nonce during TxStore creation: %+w", accountNonceErr)
190191
}
191-
newTxStore, createErr := txm.accountStore.CreateTxStore(accountAddress, initialNonce)
192+
newTxStore, createErr := txm.accountStore.CreateTxStore(accountAddress, initialNonce, txm.lggr)
192193
if createErr != nil {
193194
return txhash, fmt.Errorf("failed to create TxStore: %+w", createErr)
194195
}
@@ -273,7 +274,7 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun
273274
broadcastTxnV3.InvokeTxnV3.ResourceBounds.L1Gas.MaxPricePerUnit = txm.updateMaxPriceUnitBounds(L1GasPrice, 150)
274275
broadcastTxnV3.InvokeTxnV3.ResourceBounds.L2Gas.MaxPricePerUnit = txm.updateMaxPriceUnitBounds(L2GasPrice, 150)
275276

276-
txm.lggr.Infow("Set resource bounds", "L1MaxAmount", tx.ResourceBounds.L1Gas.MaxAmount, "L1MaxPricePerUnit", tx.ResourceBounds.L1Gas.MaxPricePerUnit)
277+
txm.lggr.Infow("Set resource bounds", "L1MaxAmount", tx.ResourceBounds.L1Gas.MaxAmount, "L1MaxPricePerUnit", tx.ResourceBounds.L1Gas.MaxPricePerUnit, "FinalNonce", nonce)
277278

278279
L1DataGasConsumed := friEstimate.L1DataGasConsumed.BigInt(new(big.Int))
279280
L1DataGasPrice := friEstimate.L1DataGasPrice.BigInt(new(big.Int))
@@ -293,17 +294,8 @@ func (txm *starktxm) broadcast(ctx context.Context, publicKey *felt.Felt, accoun
293294
// finally, transmit the invoke
294295
res, err := account.Provider.AddInvokeTransaction(execCtx, &broadcastTxnV3)
295296
if err != nil {
296-
// TODO: handle initial broadcast errors - what kind of errors occur?
297-
var dataErr *starknetrpc.RPCError
298-
var dataStr string
299-
if !errors.As(err, &dataErr) {
300-
return txhash, fmt.Errorf("failed to read EstimateFee error: %T %+v", err, err)
301-
}
302-
data := dataErr.Data
303-
dataStr = fmt.Sprintf("%+v", data)
304-
txm.lggr.Errorw("failed to invoke tx", "accountAddress", accountAddress, "error", err, "data", dataStr)
305-
306-
if strings.Contains(dataStr, RPCNonceErrMsg) {
297+
txm.lggr.Errorw("failed to invoke tx", "accountAddress", accountAddress, "error", err)
298+
if strings.Contains(err.Error(), RPCNonceErrMsg) {
307299
// if we see an invalid nonce error at the broadcast stage, that means that we are out of sync.
308300
// see the comment at resyncNonce for more details.
309301
if resyncErr := txm.resyncNonce(ctx, client, accountAddress); resyncErr != nil {
@@ -362,56 +354,34 @@ func (txm *starktxm) confirmLoop() {
362354
break
363355
}
364356

365-
allUnconfirmedTxs := txm.accountStore.GetAllUnconfirmed()
366-
for accountAddressStr, unconfirmedTxs := range allUnconfirmedTxs {
357+
for _, accountAddressStr := range txm.accountStore.Accounts() {
367358
accountAddress, err := new(felt.Felt).SetString(accountAddressStr)
368359
// this should never occur because the acccount address string key was created from the account address felt.
369360
if err != nil {
370361
txm.lggr.Errorw("could not recreate account address felt", "accountAddress", accountAddressStr)
371362
continue
372363
}
373-
for _, unconfirmedTx := range unconfirmedTxs {
374-
hash := unconfirmedTx.Hash
375-
f, err := starknetutils.HexToFelt(hash)
376-
if err != nil {
377-
txm.lggr.Errorw("invalid felt value", "hash", hash)
378-
continue
379-
}
380-
response, err := client.Provider.GetTransactionStatus(ctx, f)
381-
382-
// tx can be rejected due to a nonce error. but we cannot know from the Starknet RPC directly so we have to wait for
383-
// a broadcasted tx to fail in order to fix the nonce errors
384-
385-
if err != nil {
386-
txm.lggr.Errorw("failed to fetch transaction status", "hash", hash, "nonce", unconfirmedTx.Nonce, "error", err)
387-
continue
388-
}
389-
390-
finalityStatus := response.FinalityStatus
391-
executionStatus := response.ExecutionStatus
392-
393-
// any finalityStatus other than received
394-
if finalityStatus == starknetrpc.TxnStatus_Accepted_On_L1 || finalityStatus == starknetrpc.TxnStatus_Accepted_On_L2 || finalityStatus == starknetrpc.TxnStatus_Rejected {
395-
txm.lggr.Debugw(fmt.Sprintf("tx confirmed: %s", finalityStatus), "hash", hash, "nonce", unconfirmedTx.Nonce, "finalityStatus", finalityStatus)
396-
if err := txm.accountStore.GetTxStore(accountAddress).Confirm(unconfirmedTx.Nonce, hash); err != nil {
397-
txm.lggr.Errorw("failed to confirm tx in TxStore", "hash", hash, "accountAddress", accountAddress, "error", err)
398-
}
399-
}
400-
401-
// currently, feeder client is only way to get rejected reason
402-
if finalityStatus == starknetrpc.TxnStatus_Rejected {
403-
// we assume that all rejected transactions results in a unused rejected nonce, so
404-
// resync. see the comment at resyncNonce for more details.
405-
if resyncErr := txm.resyncNonce(ctx, client, accountAddress); resyncErr != nil {
406-
txm.lggr.Errorw("resync failed for rejected tx", "error", resyncErr)
407-
}
408-
409-
go txm.logFeederError(ctx, hash, f)
410-
}
411-
412-
if executionStatus == starknetrpc.TxnExecutionStatusREVERTED {
413-
// TODO: get revert reason?
414-
txm.lggr.Errorw("transaction reverted", "hash", hash)
364+
nonce, err := client.AccountNonceLatest(ctx, accountAddress)
365+
if err != nil {
366+
txm.lggr.Errorf("failed to fetch latest nonce for account %v, err: %v", accountAddress, err)
367+
continue
368+
}
369+
// Confirm all transactions with nonce lower than the latest.
370+
confirmed, highestUnconfirmed := txm.accountStore.GetTxStore(accountAddress).Confirm(nonce)
371+
txm.lggr.Infow("Confirmation loop", "accountAddress", accountAddress, "latestNonce", nonce,
372+
"transactionsConfirmed", confirmed, "highestUnconfirmed", highestUnconfirmed)
373+
374+
// We add a maximum threshold between latest nonce and highest unconfirmed. This prevents the TXM from sending a very large
375+
// number of unconfirmed transactions in the mempool and triggers a resync to prevent nonce gaps since the RPC responses are unreliable.
376+
// The nonce stored here won't necessarily be picked up by the next transaction since there is a fast-forward functionality in broadcasting.
377+
// But it ensures that if for whatever reason the diff between mined and uncofirmed transactions starts to grow, the TXM will be able to
378+
// go back on the nonce and fill any nonce gaps.
379+
hu := highestUnconfirmed.BigInt(new(big.Int))
380+
n := nonce.BigInt(new(big.Int))
381+
threshold := big.NewInt(ConfirmationThreshold)
382+
if new(big.Int).Sub(hu, n).Cmp(threshold) == 1 {
383+
if resyncErr := txm.resyncNonce(ctx, client, accountAddress); resyncErr != nil {
384+
txm.lggr.Errorw("resync failed for rejected tx", "error", resyncErr)
415385
}
416386
}
417387
}
@@ -424,22 +394,6 @@ func (txm *starktxm) confirmLoop() {
424394
}
425395
}
426396

427-
func (txm *starktxm) logFeederError(ctx context.Context, hash string, f *felt.Felt) {
428-
feederClient, err := txm.feederClient.Get()
429-
if err != nil {
430-
txm.lggr.Errorw("failed to load feeder client", "error", err)
431-
return
432-
}
433-
434-
rejectedTx, err := feederClient.TransactionFailure(ctx, f)
435-
if err != nil {
436-
txm.lggr.Errorw("failed to fetch reason for transaction failure", "hash", hash, "error", err)
437-
return
438-
}
439-
440-
txm.lggr.Errorw("feeder rejected reason", "hash", hash, "errorMessage", rejectedTx.ErrorMessage)
441-
}
442-
443397
func (txm *starktxm) resyncNonce(ctx context.Context, client *starknet.Client, accountAddress *felt.Felt) error {
444398
/*
445399
the follow errors indicate that there could be a problem with our locally tracked nonce value:

relayer/pkg/chainlink/txm/txm_test.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/NethermindEth/juno/core/felt"
1213
"github.com/NethermindEth/starknet.go/curve"
1314
"github.com/NethermindEth/starknet.go/devnet"
1415
starknetrpc "github.com/NethermindEth/starknet.go/rpc"
@@ -27,7 +28,7 @@ import (
2728

2829
func TestIntegration_Txm(t *testing.T) {
2930
ctx := t.Context()
30-
n := 2 // number of txs per key
31+
var nTransactions uint64 = 2 // Number of txs per key. If you increase that you might have to increase the confirmation timeout
3132
// url := SetupLocalStarknetNode(t)
3233
url := "http://127.0.0.1:5050"
3334
devnet := devnet.NewDevNet(url)
@@ -59,7 +60,7 @@ func TestIntegration_Txm(t *testing.T) {
5960
})
6061
ksAdapter := NewKeystoreAdapter(looppKs)
6162

62-
lggr, observer := logger.TestObserved(t, zapcore.DebugLevel)
63+
lggr, _ := logger.TestObserved(t, zapcore.DebugLevel)
6364
timeout := 10 * time.Second
6465
client, err := starknet.NewClient("SN_SEPOLIA", url+"/rpc", "", lggr, &timeout)
6566
require.NoError(t, err)
@@ -86,47 +87,47 @@ func TestIntegration_Txm(t *testing.T) {
8687
// start txm + checks
8788
require.NoError(t, txm.Start(context.Background()))
8889
require.NoError(t, txm.Ready())
89-
fmt.Println("sss")
9090

91+
accountAddresses := make(map[*felt.Felt]*felt.Felt) // address -> latestNonce
9192
for publicKeyStr := range localKeys {
9293
publicKey, err := starknetutils.HexToFelt(publicKeyStr)
9394
require.NoError(t, err)
9495

9596
accountAddress, err := starknetutils.HexToFelt(localKeys[publicKeyStr].Account)
9697
require.NoError(t, err)
9798

99+
c, err := getClient()
100+
require.NoError(t, err)
101+
latestNonce, err := c.AccountNonceLatest(ctx, accountAddress)
102+
require.NoError(t, err)
103+
accountAddresses[accountAddress] = latestNonce
104+
98105
contractAddress, err := starknetutils.HexToFelt("0x49D36570D4E46F48E99674BD3FCC84644DDD6B96F7C741B1562B82F9E004DC7")
99106
require.NoError(t, err)
100107

101108
selector := starknetutils.GetSelectorFromNameFelt("totalSupply")
102109

103-
for i := 0; i < n; i++ {
110+
for range nTransactions {
104111
require.NoError(t, txm.Enqueue(ctx, accountAddress, publicKey, starknetrpc.FunctionCall{
105112
ContractAddress: contractAddress, // send to ETH token contract
106113
EntryPointSelector: selector,
107114
}))
108115
}
109116
}
110-
var empty bool
111-
for i := 0; i < 30; i++ {
112-
time.Sleep(500 * time.Millisecond)
113-
queued, unconfirmed := txm.InflightCount()
114-
accepted := len(observer.FilterMessageSnippet("ACCEPTED_ON_L2").All())
115-
t.Logf("inflight count: queued (%d), unconfirmed (%d), accepted (%d)", queued, unconfirmed, accepted)
116-
117-
// check queue + tx store counts are 0, accepted txs == total txs broadcast
118-
if queued == 0 && unconfirmed == 0 && n*len(localKeys) == accepted {
119-
empty = true
120-
break
121-
}
122-
}
123117

124-
// stop txm
125-
assert.True(t, empty, "txm timed out while trying to confirm transactions")
118+
assert.Eventually(t, func() bool {
119+
queued, unconfirmed := txm.InflightCount()
120+
return queued == 0 && unconfirmed == 0
121+
}, 15*time.Second, 500*time.Millisecond)
126122
require.NoError(t, txm.Close())
127-
require.Error(t, txm.Ready())
128-
assert.Equal(t, 0, observer.FilterLevelExact(zapcore.ErrorLevel).Len()) // assert no error logs
129-
assert.Equal(t, n*len(localKeys), len(observer.FilterMessageSnippet("ACCEPTED_ON_L2").All())) // validate txs were successfully included on chain
123+
// Ensure all transactions are confirmed via nonce
124+
for accountAddress, initialNonce := range accountAddresses {
125+
c, err := getClient()
126+
require.NoError(t, err)
127+
latestNonce, err := c.AccountNonceLatest(ctx, accountAddress)
128+
require.NoError(t, err)
129+
require.Equal(t, int(0), new(felt.Felt).Add(initialNonce, new(felt.Felt).SetUint64(nTransactions)).Cmp(latestNonce))
130+
}
130131
}
131132

132133
// LooppKeystore implements [loop.Keystore] interface and the requirements

relayer/pkg/chainlink/txm/txstore.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/NethermindEth/juno/core/felt"
99
starknetrpc "github.com/NethermindEth/starknet.go/rpc"
1010
"golang.org/x/exp/maps"
11+
12+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1113
)
1214

1315
type UnconfirmedTx struct {
@@ -20,15 +22,17 @@ type UnconfirmedTx struct {
2022
// TxStore tracks broadcast & unconfirmed txs per account address per chain id
2123
type TxStore struct {
2224
lock sync.RWMutex
25+
lggr logger.Logger
2326

2427
nextNonce *felt.Felt
2528
unconfirmedNonces map[string]*UnconfirmedTx
2629
}
2730

28-
func NewTxStore(initialNonce *felt.Felt) *TxStore {
31+
func NewTxStore(initialNonce *felt.Felt, lggr logger.Logger) *TxStore {
2932
return &TxStore{
3033
nextNonce: new(felt.Felt).Set(initialNonce),
3134
unconfirmedNonces: map[string]*UnconfirmedTx{},
35+
lggr: lggr,
3236
}
3337
}
3438

@@ -75,7 +79,7 @@ func (s *TxStore) AddUnconfirmed(nonce *felt.Felt, hash string, call starknetrpc
7579

7680
nonceStr := nonce.String()
7781
if h, exists := s.unconfirmedNonces[nonceStr]; exists {
78-
return fmt.Errorf("nonce used: tried to use nonce (%s) for tx (%s), already used by (%s)", nonce, h.Hash, h)
82+
s.lggr.Warnf("nonce used: replacing tx (hash: %s) with nonce (%s) for tx with hash (%s)", h.Hash, nonce, hash)
7983
}
8084

8185
s.unconfirmedNonces[nonceStr] = &UnconfirmedTx{
@@ -89,21 +93,24 @@ func (s *TxStore) AddUnconfirmed(nonce *felt.Felt, hash string, call starknetrpc
8993
return nil
9094
}
9195

92-
func (s *TxStore) Confirm(nonce *felt.Felt, hash string) error {
96+
func (s *TxStore) Confirm(latestNonce *felt.Felt) (int, *felt.Felt) {
9397
s.lock.Lock()
9498
defer s.lock.Unlock()
9599

96-
nonceStr := nonce.String()
97-
unconfirmed, exists := s.unconfirmedNonces[nonceStr]
98-
if !exists {
99-
return fmt.Errorf("no such unconfirmed nonce: %s", nonce)
100-
}
101-
// sanity check that the hash matches
102-
if unconfirmed.Hash != hash {
103-
return fmt.Errorf("unexpected tx hash: expected %s, got %s", unconfirmed.Hash, hash)
100+
// confirm all transactions with a nonce lower than the latest nonce
101+
confirmed := 0
102+
highestUnconfirmed := new(felt.Felt).SetUint64(0)
103+
for nonceStr, tx := range s.unconfirmedNonces {
104+
if tx.Nonce.Cmp(latestNonce) < 0 {
105+
confirmed++
106+
delete(s.unconfirmedNonces, nonceStr)
107+
}
108+
if highestUnconfirmed.Cmp(tx.Nonce) < 0 {
109+
highestUnconfirmed = tx.Nonce
110+
}
104111
}
105-
delete(s.unconfirmedNonces, nonceStr)
106-
return nil
112+
113+
return confirmed, highestUnconfirmed
107114
}
108115

109116
func (s *TxStore) GetUnconfirmed() []*UnconfirmedTx {
@@ -137,15 +144,22 @@ func NewAccountStore() *AccountStore {
137144
}
138145
}
139146

140-
func (c *AccountStore) CreateTxStore(accountAddress *felt.Felt, initialNonce *felt.Felt) (*TxStore, error) {
147+
func (c *AccountStore) Accounts() []string {
148+
c.lock.Lock()
149+
defer c.lock.Unlock()
150+
151+
return maps.Keys(c.store)
152+
}
153+
154+
func (c *AccountStore) CreateTxStore(accountAddress *felt.Felt, initialNonce *felt.Felt, lggr logger.Logger) (*TxStore, error) {
141155
c.lock.Lock()
142156
defer c.lock.Unlock()
143157
addressStr := accountAddress.String()
144158
_, ok := c.store[addressStr]
145159
if ok {
146160
return nil, fmt.Errorf("TxStore already exists: %s", accountAddress)
147161
}
148-
store := NewTxStore(initialNonce)
162+
store := NewTxStore(initialNonce, logger.Named(lggr, "TxStore"))
149163
c.store[addressStr] = store
150164
return store, nil
151165
}

0 commit comments

Comments
 (0)