Skip to content

Commit 9fd0fc5

Browse files
authored
Merge pull request #278 from ziggie1984/map-backend-error
pushtx: map different backend err to internal err.
2 parents cb61d7f + eab6cc3 commit 9fd0fc5

File tree

2 files changed

+105
-10
lines changed

2 files changed

+105
-10
lines changed

pushtx/broadcaster.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ type Config struct {
5252
// RebroadcastInterval is the interval that we'll continually try to
5353
// re-broadcast transactions in-between new block arrival.
5454
RebroadcastInterval time.Duration
55+
56+
// MapCustomBroadcastError allows the Rebroadcaster to map broadcast
57+
// errors from other backends to the neutrino internal BroadcastError.
58+
// This allows the Rebroadcaster to behave consistently over different
59+
// backends.
60+
MapCustomBroadcastError func(error) error
5561
}
5662

5763
// Broadcaster is a subsystem responsible for reliably broadcasting transactions
@@ -171,10 +177,19 @@ func (b *Broadcaster) broadcastHandler(sub *blockntfns.Subscription) {
171177
// A new broadcast request was submitted by an external caller.
172178
case req := <-b.broadcastReqs:
173179
err := b.cfg.Broadcast(req.tx)
174-
if err != nil && !IsBroadcastError(err, Mempool) {
175-
log.Errorf("Broadcast attempt failed: %v", err)
176-
req.errChan <- err
177-
continue
180+
if err != nil {
181+
// We apply the custom err mapping function if
182+
// it was supplied which allows to map other
183+
// backend errors to the neutrino BroadcastError.
184+
if b.cfg.MapCustomBroadcastError != nil {
185+
err = b.cfg.MapCustomBroadcastError(err)
186+
}
187+
if !IsBroadcastError(err, Mempool) {
188+
log.Errorf("Broadcast attempt "+
189+
"failed: %v", err)
190+
req.errChan <- err
191+
continue
192+
}
178193
}
179194

180195
transactions[req.tx.TxHash()] = req.tx
@@ -231,6 +246,12 @@ func (b *Broadcaster) rebroadcast(txs map[chainhash.Hash]*wire.MsgTx,
231246
}
232247

233248
err := b.cfg.Broadcast(tx)
249+
// We apply the custom err mapping function if it was supplied
250+
// which allows to map other backend errors to the neutrino
251+
// BroadcastError.
252+
if err != nil && b.cfg.MapCustomBroadcastError != nil {
253+
err = b.cfg.MapCustomBroadcastError(err)
254+
}
234255
switch {
235256
// If the transaction has already confirmed on-chain, we can
236257
// stop broadcasting it further.

pushtx/broadcaster_test.go

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package pushtx
22

33
import (
4+
"fmt"
45
"math/rand"
6+
"strings"
57
"testing"
68
"time"
79

10+
"github.com/btcsuite/btcd/btcjson"
811
"github.com/btcsuite/btcd/wire"
912
"github.com/lightninglabs/neutrino/blockntfns"
1013
)
@@ -78,7 +81,7 @@ func TestBroadcaster(t *testing.T) {
7881
func TestRebroadcast(t *testing.T) {
7982
t.Parallel()
8083

81-
const numTxs = 3
84+
const numTxs = 5
8285

8386
// We'll start by setting up the broadcaster with channels to mock the
8487
// behavior of its external dependencies.
@@ -177,8 +180,9 @@ func TestRebroadcast(t *testing.T) {
177180
wire.BlockHeader{}, 100, wire.BlockHeader{},
178181
)
179182

180-
// This time however, only the last two transactions will be rebroadcast
181-
// since the first one confirmed in the previous rebroadcast attempt.
183+
// This time however, only the last four transactions will be
184+
// rebroadcasted since the first one confirmed in the previous
185+
// rebroadcast attempt.
182186
assertBroadcastOrder(txs[1:])
183187

184188
// We now manually mark one of the transactions as confirmed.
@@ -187,16 +191,86 @@ func TestRebroadcast(t *testing.T) {
187191
// Trigger a new block notification to rebroadcast the transactions.
188192
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 101)
189193

190-
// We assert that only the last transaction is rebroadcast.
194+
// We assert that only the last three transactions are rebroadcasted.
191195
assertBroadcastOrder(txs[2:])
192196

193-
// Manually mark the last transaction as confirmed.
197+
// Manually mark the third transaction as confirmed.
194198
broadcaster.MarkAsConfirmed(txs[2].TxHash())
195199

200+
// Now we inject a custom error mapping function for backend errors
201+
// other than neutrino.
202+
broadcaster.cfg.MapCustomBroadcastError = func(err error) error {
203+
// match is a helper method to easily string match on the error
204+
// message.
205+
match := func(err error, s string) bool {
206+
return strings.Contains(strings.ToLower(err.Error()), s)
207+
}
208+
209+
switch {
210+
case match(err, "mempool min fee not met"):
211+
return &BroadcastError{
212+
Code: Mempool,
213+
Reason: err.Error(),
214+
}
215+
216+
case match(err, "transaction already exists"):
217+
return &BroadcastError{
218+
Code: Confirmed,
219+
Reason: err.Error(),
220+
}
221+
222+
default:
223+
return fmt.Errorf("unmatched backend error: %v", err)
224+
}
225+
}
226+
227+
// Now, we'll modify the Broadcast method to mark the fourth transaction
228+
// as confirmed but with a bitcoind backend notification to test that
229+
// the mapping between different backend errors and the neutrino
230+
// BroadcastError works as expected. We also mark the last transaction
231+
// with the bitcoind backend error for not having enough fees to be
232+
// included in the mempool. We expected that it gets rebroadcasted too.
233+
broadcaster.cfg.Broadcast = func(tx *wire.MsgTx) error {
234+
broadcastChan <- tx
235+
if tx.TxHash() == txs[3].TxHash() {
236+
return &btcjson.RPCError{
237+
Code: btcjson.ErrRPCVerifyAlreadyInChain,
238+
Message: "transaction already exists",
239+
}
240+
}
241+
if tx.TxHash() == txs[4].TxHash() {
242+
return &btcjson.RPCError{
243+
Code: btcjson.ErrRPCTxRejected,
244+
Message: "mempool min fee not met",
245+
}
246+
}
247+
248+
return nil
249+
}
250+
196251
// Trigger a new block notification.
197252
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 102)
198253

199-
// Assert that no transactions were rebroadcast.
254+
// We assert that only the last two transactions are rebroadcasted.
255+
assertBroadcastOrder(txs[3:])
256+
257+
// Trigger another block notification simulating a reorg in the chain.
258+
// The transactions should be rebroadcasted again to ensure they
259+
// properly propagate throughout the network.
260+
ntfnChan <- blockntfns.NewBlockDisconnected(
261+
wire.BlockHeader{}, 102, wire.BlockHeader{},
262+
)
263+
264+
// We assert that only the last transaction is rebroadcasted.
265+
assertBroadcastOrder(txs[4:])
266+
267+
// Manually mark the last transaction as confirmed.
268+
broadcaster.MarkAsConfirmed(txs[4].TxHash())
269+
270+
// Trigger a new block notification.
271+
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 103)
272+
273+
// Assert that no transactions were rebroadcasted.
200274
select {
201275
case tx := <-broadcastChan:
202276
t.Fatalf("unexpected rebroadcast of tx %s", tx.TxHash())

0 commit comments

Comments
 (0)