Skip to content

Commit 9e520e7

Browse files
authored
feat(tx sender): add multiple write clients for more reliable tx submission (#1740)
1 parent de7f6e5 commit 9e520e7

File tree

4 files changed

+122
-6
lines changed

4 files changed

+122
-6
lines changed

common/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"runtime/debug"
66
)
77

8-
var tag = "v4.5.46"
8+
var tag = "v4.5.47"
99

1010
var commit = func() string {
1111
if info, ok := debug.ReadBuildInfo(); ok {

rollup/internal/config/relayer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ import (
77

88
// SenderConfig The config for transaction sender
99
type SenderConfig struct {
10-
// The RPC endpoint of the ethereum or scroll public node.
10+
// The RPC endpoint of the ethereum or scroll public node (for backward compatibility).
11+
// If WriteEndpoints is specified, this endpoint will be used only for reading.
12+
// If WriteEndpoints is empty, this endpoint will be used for both reading and writing.
1113
Endpoint string `json:"endpoint"`
14+
// The RPC endpoints to send transactions to (optional).
15+
// If specified, transactions will be sent to all these endpoints in parallel.
16+
// If empty, transactions will be sent to the Endpoint.
17+
WriteEndpoints []string `json:"write_endpoints,omitempty"`
1218
// The time to trigger check pending txs in sender.
1319
CheckPendingTime uint64 `json:"check_pending_time"`
1420
// The number of blocks to wait to escalate increase gas price of the transaction.

rollup/internal/controller/relayer/relayer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func setupEnv(t *testing.T) {
5656

5757
cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetPoSL1EndPoint()
5858
assert.NoError(t, err)
59+
cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, cfg.L2Config.RelayerConfig.SenderConfig.Endpoint}
5960
cfg.L1Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetL2GethEndPoint()
6061
assert.NoError(t, err)
6162

rollup/internal/controller/sender/sender.go

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math/big"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/holiman/uint256"
@@ -67,7 +68,8 @@ type FeeData struct {
6768
type Sender struct {
6869
config *config.SenderConfig
6970
gethClient *gethclient.Client
70-
client *ethclient.Client // The client to retrieve on chain data or send transaction.
71+
client *ethclient.Client // The client to retrieve on chain data (read-only)
72+
writeClients []*ethclient.Client // The clients to send transactions to (write operations)
7173
transactionSigner *TransactionSigner
7274
chainID *big.Int // The chain id of the endpoint
7375
ctx context.Context
@@ -90,9 +92,10 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c
9092
return nil, fmt.Errorf("invalid params, EscalateMultipleNum; %v, EscalateMultipleDen: %v", config.EscalateMultipleNum, config.EscalateMultipleDen)
9193
}
9294

95+
// Initialize read client
9396
rpcClient, err := rpc.Dial(config.Endpoint)
9497
if err != nil {
95-
return nil, fmt.Errorf("failed to dial eth client, err: %w", err)
98+
return nil, fmt.Errorf("failed to dial read client, err: %w", err)
9699
}
97100

98101
client := ethclient.NewClient(rpcClient)
@@ -105,12 +108,42 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c
105108
return nil, fmt.Errorf("failed to create transaction signer, err: %w", err)
106109
}
107110

111+
// Initialize write clients
112+
var writeClients []*ethclient.Client
113+
if len(config.WriteEndpoints) > 0 {
114+
// Use specified write endpoints
115+
for i, endpoint := range config.WriteEndpoints {
116+
writeRpcClient, err := rpc.Dial(endpoint)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to dial write client %d (endpoint: %s), err: %w", i, endpoint, err)
119+
}
120+
writeClient := ethclient.NewClient(writeRpcClient)
121+
122+
// Verify the write client is connected to the same chain
123+
writeChainID, err := writeClient.ChainID(ctx)
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to get chain ID from write client %d (endpoint: %s), err: %w", i, endpoint, err)
126+
}
127+
if writeChainID.Cmp(chainID) != 0 {
128+
return nil, fmt.Errorf("write client %d (endpoint: %s) has different chain ID %s, expected %s", i, endpoint, writeChainID.String(), chainID.String())
129+
}
130+
131+
writeClients = append(writeClients, writeClient)
132+
}
133+
log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", config.Endpoint, "writeEndpoints", config.WriteEndpoints)
134+
} else {
135+
// Use read client for writing (backward compatibility)
136+
writeClients = append(writeClients, client)
137+
log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", config.Endpoint)
138+
}
139+
108140
// Create sender instance first and then initialize nonce
109141
sender := &Sender{
110142
ctx: ctx,
111143
config: config,
112144
gethClient: gethclient.New(rpcClient),
113145
client: client,
146+
writeClients: writeClients,
114147
chainID: chainID,
115148
transactionSigner: transactionSigner,
116149
db: db,
@@ -169,6 +202,82 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy
169202
}
170203
}
171204

205+
// sendTransactionToMultipleClients sends a transaction to all write clients in parallel
206+
// and returns success if at least one client succeeds
207+
func (s *Sender) sendTransactionToMultipleClients(signedTx *gethTypes.Transaction) error {
208+
ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second)
209+
defer cancel()
210+
211+
if len(s.writeClients) == 1 {
212+
// Single client - use direct approach
213+
return s.writeClients[0].SendTransaction(ctx, signedTx)
214+
}
215+
216+
// Multiple clients - send in parallel
217+
type result struct {
218+
endpoint string
219+
err error
220+
}
221+
222+
resultChan := make(chan result, len(s.writeClients))
223+
var wg sync.WaitGroup
224+
225+
// Send transaction to all write clients in parallel
226+
for i, client := range s.writeClients {
227+
wg.Add(1)
228+
// Determine endpoint URL for this client
229+
endpoint := s.config.WriteEndpoints[i]
230+
231+
go func(ep string, writeClient *ethclient.Client) {
232+
defer wg.Done()
233+
err := writeClient.SendTransaction(ctx, signedTx)
234+
resultChan <- result{endpoint: ep, err: err}
235+
}(endpoint, client)
236+
}
237+
238+
// Wait for all goroutines to finish
239+
go func() {
240+
wg.Wait()
241+
close(resultChan)
242+
}()
243+
244+
// Collect results
245+
var errs []error
246+
for res := range resultChan {
247+
if res.err != nil {
248+
errs = append(errs, fmt.Errorf("%s: %w", res.endpoint, res.err))
249+
log.Warn("failed to send transaction to write client",
250+
"endpoint", res.endpoint,
251+
"txHash", signedTx.Hash().Hex(),
252+
"nonce", signedTx.Nonce(),
253+
"from", s.transactionSigner.GetAddr().String(),
254+
"error", res.err)
255+
} else {
256+
log.Info("successfully sent transaction to write client",
257+
"endpoint", res.endpoint,
258+
"txHash", signedTx.Hash().Hex(),
259+
"nonce", signedTx.Nonce(),
260+
"from", s.transactionSigner.GetAddr().String())
261+
}
262+
}
263+
264+
// Check if at least one client succeeded
265+
if len(errs) < len(s.writeClients) {
266+
successCount := len(s.writeClients) - len(errs)
267+
if len(errs) > 0 {
268+
log.Info("transaction partially succeeded",
269+
"txHash", signedTx.Hash().Hex(),
270+
"successCount", successCount,
271+
"totalClients", len(s.writeClients),
272+
"failures", errors.Join(errs...))
273+
}
274+
return nil
275+
}
276+
277+
// All clients failed
278+
return fmt.Errorf("failed to send transaction to all %d write clients: %w", len(s.writeClients), errors.Join(errs...))
279+
}
280+
172281
// SendTransaction send a signed L2tL1 transaction.
173282
func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob) (common.Hash, uint64, error) {
174283
s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc()
@@ -230,7 +339,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data
230339
return common.Hash{}, 0, fmt.Errorf("failed to insert transaction, err: %w", err)
231340
}
232341

233-
if err := s.client.SendTransaction(s.ctx, signedTx); err != nil {
342+
if err := s.sendTransactionToMultipleClients(signedTx); err != nil {
234343
// Delete the transaction from the pending transaction table if it fails to send.
235344
if updateErr := s.pendingTransactionOrm.DeleteTransactionByTxHash(s.ctx, signedTx.Hash()); updateErr != nil {
236345
log.Error("failed to delete transaction", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", updateErr)
@@ -645,7 +754,7 @@ func (s *Sender) checkPendingTransaction() {
645754
return
646755
}
647756

648-
if err := s.client.SendTransaction(s.ctx, newSignedTx); err != nil {
757+
if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil {
649758
if strings.Contains(err.Error(), "nonce too low") {
650759
// When we receive a 'nonce too low' error but cannot find the transaction receipt, it indicates another transaction with this nonce has already been processed, so this transaction will never be mined and should be marked as failed.
651760
log.Warn("nonce too low detected, marking all non-confirmed transactions with same nonce as failed", "nonce", originalTx.Nonce(), "address", s.transactionSigner.GetAddr().Hex(), "txHash", originalTx.Hash().Hex(), "newTxHash", newSignedTx.Hash().Hex(), "err", err)

0 commit comments

Comments
 (0)