Skip to content

Commit 11912f9

Browse files
committed
Add Tx Sender go routines leak fix
1 parent 76d9596 commit 11912f9

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

multinode/transaction_sender.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCCl
9191
// * Otherwise, returns any (effectively random) of the errors.
9292
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT {
9393
var result RESULT
94+
ctx, cancel := txSender.chStop.Ctx(ctx)
95+
defer cancel()
9496
if !txSender.IfStarted(func() {
9597
txResults := make(chan RESULT)
9698
txResultsToReport := make(chan RESULT)
@@ -101,8 +103,6 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
101103
if isSendOnly {
102104
txSender.wg.Add(1)
103105
go func(ctx context.Context) {
104-
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
105-
defer cancel()
106106
defer txSender.wg.Done()
107107
// Send-only nodes' results are ignored as they tend to return false-positive responses.
108108
// Broadcast to them is necessary to speed up the propagation of TX in the network.
@@ -115,8 +115,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
115115
healthyNodesNum++
116116
primaryNodeWg.Add(1)
117117
go func(ctx context.Context) {
118-
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
119-
defer cancel()
118+
// Broadcasting transaction and results reporting for invariant detection are background jobs that must be detached from
119+
// callers cancellation.
120+
// Results reporting to SendTransaction caller must respect caller's context to avoid goroutine leak.
120121
defer primaryNodeWg.Done()
121122
r := txSender.broadcastTxAsync(ctx, rpc, tx)
122123
select {
@@ -126,6 +127,8 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
126127
case txResults <- r:
127128
}
128129

130+
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
131+
defer cancel()
129132
select {
130133
case <-ctx.Done():
131134
txSender.lggr.Debugw("Failed to send tx results to report", "err", ctx.Err())
@@ -149,8 +152,13 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
149152
return
150153
}
151154

155+
if healthyNodesNum == 0 {
156+
result = txSender.newResult(ErroringNodeError)
157+
return
158+
}
159+
152160
txSender.wg.Add(1)
153-
go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport)
161+
go txSender.reportSendTxAnomalies(tx, txResultsToReport)
154162

155163
result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults)
156164
}) {
@@ -161,6 +169,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
161169
}
162170

163171
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT {
172+
// broadcast is a background job, so always detach from caller's cancellation
173+
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
174+
defer cancel()
164175
result := rpc.SendTransaction(ctx, tx)
165176
txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error())
166177
if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil {
@@ -169,16 +180,25 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(c
169180
return result
170181
}
171182

172-
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) {
183+
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) {
173184
defer txSender.wg.Done()
174185
resultsByCode := sendTxResults[RESULT]{}
175186
// txResults eventually will be closed
176187
for txResult := range txResults {
177188
resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult)
178189
}
179190

191+
select {
192+
case <-txSender.chStop:
193+
// it's ok to receive no results if txSender is closing. Return early to prevent false reporting of invariant violation.
194+
if len(resultsByCode) == 0 {
195+
return
196+
}
197+
default:
198+
}
199+
180200
_, criticalErr := aggregateTxResults[RESULT](resultsByCode)
181-
if criticalErr != nil && ctx.Err() == nil {
201+
if criticalErr != nil {
182202
txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
183203
PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc()
184204
}
@@ -216,9 +236,6 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result
216236
}
217237

218238
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT {
219-
if healthyNodesNum == 0 {
220-
return txSender.newResult(ErroringNodeError)
221-
}
222239
requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum))
223240
errorsByCode := sendTxResults[RESULT]{}
224241
var softTimeoutChan <-chan time.Time

multinode/transaction_sender_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,27 @@ func TestTransactionSender_SendTransaction(t *testing.T) {
292292
require.NoError(t, result.Error())
293293
require.Equal(t, Successful, result.Code())
294294
})
295+
t.Run("All background jobs stop even if RPC returns result after soft timeout", func(t *testing.T) {
296+
chainID := RandomID()
297+
expectedError := errors.New("transaction failed")
298+
fastNode := newNode(t, expectedError, nil)
299+
300+
// hold reply from the node till SendTransaction returns result
301+
sendTxContext, sendTxCancel := context.WithCancel(tests.Context(t))
302+
slowNode := newNode(t, errors.New("transaction failed"), func(_ mock.Arguments) {
303+
<-sendTxContext.Done()
304+
})
305+
306+
lggr := logger.Test(t)
307+
308+
_, txSender := newTestTransactionSender(t, chainID, lggr, []Node[ID, TestSendTxRPCClient]{fastNode, slowNode}, nil)
309+
result := txSender.SendTransaction(sendTxContext, nil)
310+
sendTxCancel()
311+
require.EqualError(t, result.Error(), expectedError.Error())
312+
// TxSender should stop all background go routines after SendTransaction is done and before test is done.
313+
// Otherwise, it signals that we have a goroutine leak.
314+
txSender.wg.Wait()
315+
})
295316
}
296317

297318
func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) {

0 commit comments

Comments
 (0)