Skip to content

Commit 4710141

Browse files
authored
refactor(debounce): add context param and check if error is recoverable (#471)
1 parent 19d154f commit 4710141

File tree

9 files changed

+85
-64
lines changed

9 files changed

+85
-64
lines changed

pkg/debounce/debounce.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,28 @@ type rpcError struct {
141141

142142
// Debounce is a wrapper around the rate limiter
143143
// that retries the request if it fails with rate limit error.
144-
func Debounce(logger *log.ZapEventLogger, f func() error) error {
144+
func Debounce(
145+
ctx context.Context,
146+
logger *log.ZapEventLogger,
147+
f func(context.Context) error,
148+
) error {
145149
for {
146-
if err := rpcRateLimiter.Wait(context.TODO()); err != nil {
150+
if err := rpcRateLimiter.Wait(ctx); err != nil {
147151
if logger != nil {
148152
logger.Warnf("failed to acquire rate limiter: %s", err)
149153
}
150154
return err
151155
}
152156

153-
err := f()
157+
err := f(ctx)
154158
if err == nil {
155159
return nil
156160
}
157161

162+
// Search for the error in the list of known HTTP RPC errors
158163
for _, httpRpcError := range httpRpcErrors {
159164
for _, pattern := range httpRpcError.Patterns {
160-
if strings.Contains(err.Error(), pattern) {
165+
if strings.Contains(err.Error(), pattern) && httpRpcError.Recoverable {
161166
if logger != nil {
162167
logger.Warn("recoverable error",
163168
"message", httpRpcError.Message,

pkg/quotes/base_dex.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type baseDEX[Event any, Contract any, EventIterator dexEventIterator] struct {
3333

3434
// Hooks
3535
postStart func(*baseDEX[Event, Contract, EventIterator]) error
36-
getPool func(Market) ([]*dexPool[Event, EventIterator], error)
36+
getPool func(context.Context, Market) ([]*dexPool[Event, EventIterator], error)
3737
parse func(*Event, *dexPool[Event, EventIterator]) (TradeEvent, error)
3838
derefIter func(EventIterator) *Event
3939

@@ -71,7 +71,7 @@ type baseDexConfig[Event any, Contract any, EventIterator dexEventIterator] stru
7171

7272
// Hooks
7373
PostStartHook func(*baseDEX[Event, Contract, EventIterator]) error
74-
PoolGetter func(Market) ([]*dexPool[Event, EventIterator], error)
74+
PoolGetter func(context.Context, Market) ([]*dexPool[Event, EventIterator], error)
7575
EventParser func(*Event, *dexPool[Event, EventIterator]) (TradeEvent, error)
7676
IterDeref func(EventIterator) *Event
7777

@@ -239,6 +239,8 @@ func (b *baseDEX[Event, Contract, EventIterator]) Stop() error {
239239
}
240240

241241
func (b *baseDEX[Event, Contract, EventIterator]) Subscribe(market Market) error {
242+
ctx := context.TODO()
243+
242244
if !b.once.Subscribe() {
243245
return ErrNotStarted
244246
}
@@ -253,7 +255,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) Subscribe(market Market) error
253255

254256
for _, mappedToken := range mappings {
255257
market := NewMarketWithMainQuote(market.Base(), mappedToken, market.Quote())
256-
if err := debounce.Debounce(b.logger, func() error { return b.Subscribe(market) }); err != nil {
258+
if err := debounce.Debounce(ctx, b.logger, func(_ context.Context) error { return b.Subscribe(market) }); err != nil {
257259
b.logger.Errorf("failed to subscribe to market %s: %s", market, err)
258260
mappingErr = err
259261
}
@@ -277,7 +279,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) Subscribe(market Market) error
277279
return fmt.Errorf("%w: %s", ErrMarketDisabled, market)
278280
}
279281

280-
pools, err := b.getPool(market)
282+
pools, err := b.getPool(ctx, market)
281283
if err != nil {
282284
return fmt.Errorf("failed to get pool for market %s: %s", market.StringWithoutMain(), err)
283285
}
@@ -304,8 +306,8 @@ func (b *baseDEX[Event, Contract, EventIterator]) subscribePool(pool *dexPool[Ev
304306

305307
var sub event.Subscription
306308
var err error
307-
err = debounce.Debounce(b.logger, func() error {
308-
opts := &bind.WatchOpts{Context: watchCtx}
309+
err = debounce.Debounce(watchCtx, b.logger, func(ctx context.Context) error {
310+
opts := &bind.WatchOpts{Context: ctx}
309311
sub, err = pool.Contract.WatchSwap(opts, sink, []common.Address{}, []common.Address{})
310312
return err
311313
})
@@ -357,7 +359,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) HistoricalData(ctx context.Con
357359
if strings.ToLower(market.Quote()) == "usd" {
358360
m = NewMarket(market.Base(), market.Quote()+"t") // convert USD quote to USDT
359361
}
360-
pools, err := b.getPool(m)
362+
pools, err := b.getPool(ctx, m)
361363
if err != nil {
362364
return nil, fmt.Errorf("failed to get pool for market %s: %w", m, err)
363365
}
@@ -373,7 +375,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) HistoricalData(ctx context.Con
373375
for i, pool := range pools {
374376
var iter EventIterator
375377

376-
err = debounce.Debounce(b.logger, func() error {
378+
err = debounce.Debounce(ctx, b.logger, func(ctx context.Context) error {
377379
opts := &bind.FilterOpts{Start: block.Uint64(), Context: ctx}
378380
iter, err = pool.Contract.FilterSwap(opts, nil, nil)
379381
return err
@@ -426,7 +428,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) findBlockByTimestamp(
426428

427429
var header *types.Header
428430
var err error
429-
err = debounce.Debounce(b.logger, func() error {
431+
err = debounce.Debounce(ctx, b.logger, func(ctx context.Context) error {
430432
header, err = client.HeaderByNumber(ctx, nil)
431433
return err
432434
})
@@ -442,7 +444,7 @@ func (b *baseDEX[Event, Contract, EventIterator]) findBlockByTimestamp(
442444
mid := new(big.Int).Add(low, high)
443445
mid.Div(mid, big.NewInt(2))
444446

445-
err = debounce.Debounce(b.logger, func() error {
447+
err = debounce.Debounce(ctx, b.logger, func(ctx context.Context) error {
446448
header, err = client.HeaderByNumber(ctx, mid)
447449
return err
448450
})

pkg/quotes/lynex_v2.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package quotes
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
57

68
"github.com/ethereum/go-ethereum/common"
79
"github.com/ethereum/go-ethereum/ethclient"
@@ -85,7 +87,7 @@ func (l *lynexV2) postStart(driver *baseDEX[
8587
return nil
8688
}
8789

88-
func (l *lynexV2) getPool(market Market) ([]*dexPool[ilynex_v2_pair.ILynexPairSwap, *ilynex_v2_pair.ILynexPairSwapIterator], error) {
90+
func (l *lynexV2) getPool(ctx context.Context, market Market) ([]*dexPool[ilynex_v2_pair.ILynexPairSwap, *ilynex_v2_pair.ILynexPairSwapIterator], error) {
8991
baseToken, quoteToken, err := getTokens(l.assets, market, loggerLynexV2)
9092
if err != nil {
9193
return nil, fmt.Errorf("failed to get tokens: %w", err)
@@ -95,8 +97,8 @@ func (l *lynexV2) getPool(market Market) ([]*dexPool[ilynex_v2_pair.ILynexPairSw
9597
_, isStablePool := l.stablePoolMarkets[market]
9698

9799
loggerLynexV2.Infow("searching for pool", "market", market)
98-
err = debounce.Debounce(loggerLynexV2, func() error {
99-
poolAddress, err = l.factory.GetPair(nil, baseToken.Address, quoteToken.Address, isStablePool)
100+
err = debounce.Debounce(context.TODO(), loggerLynexV2, func(ctx context.Context) error {
101+
poolAddress, err = l.factory.GetPair(&bind.CallOpts{Context: ctx}, baseToken.Address, quoteToken.Address, isStablePool)
100102
return err
101103
})
102104
if err != nil {
@@ -118,17 +120,17 @@ func (l *lynexV2) getPool(market Market) ([]*dexPool[ilynex_v2_pair.ILynexPairSw
118120
}
119121

120122
var basePoolToken common.Address
121-
err = debounce.Debounce(loggerLynexV2, func() error {
122-
basePoolToken, err = poolContract.Token0(nil)
123+
err = debounce.Debounce(ctx, loggerLynexV2, func(ctx context.Context) error {
124+
basePoolToken, err = poolContract.Token0(&bind.CallOpts{Context: ctx})
123125
return err
124126
})
125127
if err != nil {
126128
return nil, fmt.Errorf("failed to get base token address for Lynex v2 pool: %w", err)
127129
}
128130

129131
var quotePoolToken common.Address
130-
err = debounce.Debounce(loggerLynexV2, func() error {
131-
quotePoolToken, err = poolContract.Token1(nil)
132+
err = debounce.Debounce(ctx, loggerLynexV2, func(ctx context.Context) error {
133+
quotePoolToken, err = poolContract.Token1(&bind.CallOpts{Context: ctx})
132134
return err
133135
})
134136
if err != nil {

pkg/quotes/lynex_v3.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package quotes
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
57

68
"github.com/ethereum/go-ethereum/common"
79
"github.com/ethereum/go-ethereum/ethclient"
@@ -71,16 +73,16 @@ func (l *lynexV3) postStart(driver *baseDEX[
7173
return nil
7274
}
7375

74-
func (l *lynexV3) getPool(market Market) ([]*dexPool[ilynex_v3_pool.ILynexV3PoolSwap, *ilynex_v3_pool.ILynexV3PoolSwapIterator], error) {
76+
func (l *lynexV3) getPool(ctx context.Context, market Market) ([]*dexPool[ilynex_v3_pool.ILynexV3PoolSwap, *ilynex_v3_pool.ILynexV3PoolSwapIterator], error) {
7577
baseToken, quoteToken, err := getTokens(l.assets, market, loggerLynexV3)
7678
if err != nil {
7779
return nil, fmt.Errorf("failed to get tokens: %w", err)
7880
}
7981

8082
var poolAddress common.Address
8183
loggerLynexV3.Infow("searching for pool", "market", market)
82-
err = debounce.Debounce(loggerLynexV3, func() error {
83-
poolAddress, err = l.factory.PoolByPair(nil, baseToken.Address, quoteToken.Address)
84+
err = debounce.Debounce(ctx, loggerLynexV3, func(ctx context.Context) error {
85+
poolAddress, err = l.factory.PoolByPair(&bind.CallOpts{Context: ctx}, baseToken.Address, quoteToken.Address)
8486
return err
8587
})
8688
if err != nil {
@@ -101,17 +103,17 @@ func (l *lynexV3) getPool(market Market) ([]*dexPool[ilynex_v3_pool.ILynexV3Pool
101103
}
102104

103105
var basePoolToken common.Address
104-
err = debounce.Debounce(loggerLynexV3, func() error {
105-
basePoolToken, err = poolContract.Token0(nil)
106+
err = debounce.Debounce(ctx, loggerLynexV3, func(ctx context.Context) error {
107+
basePoolToken, err = poolContract.Token0(&bind.CallOpts{Context: ctx})
106108
return err
107109
})
108110
if err != nil {
109111
return nil, fmt.Errorf("failed to get base token address for Lynex v3 pool: %w", err)
110112
}
111113

112114
var quotePoolToken common.Address
113-
err = debounce.Debounce(loggerLynexV3, func() error {
114-
quotePoolToken, err = poolContract.Token1(nil)
115+
err = debounce.Debounce(ctx, loggerLynexV3, func(ctx context.Context) error {
116+
quotePoolToken, err = poolContract.Token1(&bind.CallOpts{Context: ctx})
115117
return err
116118
})
117119
if err != nil {

pkg/quotes/quickswap.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package quotes
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
57

68
"github.com/ethereum/go-ethereum/common"
79
"github.com/ethereum/go-ethereum/ethclient"
@@ -70,15 +72,15 @@ func (s *quickswap) postStart(driver *baseDEX[
7072
return nil
7173
}
7274

73-
func (s *quickswap) getPool(market Market) ([]*dexPool[quickswap_v3_pool.IQuickswapV3PoolSwap, *quickswap_v3_pool.IQuickswapV3PoolSwapIterator], error) {
75+
func (s *quickswap) getPool(ctx context.Context, market Market) ([]*dexPool[quickswap_v3_pool.IQuickswapV3PoolSwap, *quickswap_v3_pool.IQuickswapV3PoolSwapIterator], error) {
7476
baseToken, quoteToken, err := getTokens(s.assets, market, loggerQuickswap)
7577
if err != nil {
7678
return nil, fmt.Errorf("failed to get tokens: %w", err)
7779
}
7880

7981
var poolAddress common.Address
80-
err = debounce.Debounce(loggerQuickswap, func() error {
81-
poolAddress, err = s.factory.PoolByPair(nil, baseToken.Address, quoteToken.Address)
82+
err = debounce.Debounce(ctx, loggerQuickswap, func(ctx context.Context) error {
83+
poolAddress, err = s.factory.PoolByPair(&bind.CallOpts{Context: ctx}, baseToken.Address, quoteToken.Address)
8284
return err
8385
})
8486
if err != nil {
@@ -97,17 +99,17 @@ func (s *quickswap) getPool(market Market) ([]*dexPool[quickswap_v3_pool.IQuicks
9799
}
98100

99101
var basePoolToken common.Address
100-
err = debounce.Debounce(loggerQuickswap, func() error {
101-
basePoolToken, err = poolContract.Token0(nil)
102+
err = debounce.Debounce(ctx, loggerQuickswap, func(ctx context.Context) error {
103+
basePoolToken, err = poolContract.Token0(&bind.CallOpts{Context: ctx})
102104
return err
103105
})
104106
if err != nil {
105107
return nil, fmt.Errorf("failed to get base token address for Quickswap pool: %w", err)
106108
}
107109

108110
var quotePoolToken common.Address
109-
err = debounce.Debounce(loggerQuickswap, func() error {
110-
quotePoolToken, err = poolContract.Token1(nil)
111+
err = debounce.Debounce(ctx, loggerQuickswap, func(ctx context.Context) error {
112+
quotePoolToken, err = poolContract.Token1(&bind.CallOpts{Context: ctx})
111113
return err
112114
})
113115
if err != nil {

pkg/quotes/secta_v2.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package quotes
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
57

68
"github.com/ethereum/go-ethereum/common"
79
"github.com/ethereum/go-ethereum/ethclient"
@@ -69,15 +71,15 @@ func (s *sectaV2) postStart(driver *baseDEX[
6971
return nil
7072
}
7173

72-
func (s *sectaV2) getPool(market Market) ([]*dexPool[isecta_v2_pair.ISectaV2PairSwap, *isecta_v2_pair.ISectaV2PairSwapIterator], error) {
74+
func (s *sectaV2) getPool(ctx context.Context, market Market) ([]*dexPool[isecta_v2_pair.ISectaV2PairSwap, *isecta_v2_pair.ISectaV2PairSwapIterator], error) {
7375
baseToken, quoteToken, err := getTokens(s.assets, market, loggerSectaV2)
7476
if err != nil {
7577
return nil, fmt.Errorf("failed to get tokens: %w", err)
7678
}
7779

7880
var poolAddress common.Address
79-
err = debounce.Debounce(loggerSectaV2, func() error {
80-
poolAddress, err = s.factory.GetPair(nil, baseToken.Address, quoteToken.Address)
81+
err = debounce.Debounce(ctx, loggerSectaV2, func(ctx context.Context) error {
82+
poolAddress, err = s.factory.GetPair(&bind.CallOpts{Context: ctx}, baseToken.Address, quoteToken.Address)
8183
return err
8284
})
8385
if err != nil {
@@ -96,16 +98,16 @@ func (s *sectaV2) getPool(market Market) ([]*dexPool[isecta_v2_pair.ISectaV2Pair
9698
}
9799

98100
var basePoolToken common.Address
99-
err = debounce.Debounce(loggerSectaV2, func() error {
100-
basePoolToken, err = poolContract.Token0(nil)
101+
err = debounce.Debounce(ctx, loggerSectaV2, func(ctx context.Context) error {
102+
basePoolToken, err = poolContract.Token0(&bind.CallOpts{Context: ctx})
101103
return err
102104
})
103105
if err != nil {
104106
return nil, fmt.Errorf("failed to get base token address for Secta v2 pool: %w", err)
105107
}
106108

107109
var quotePoolToken common.Address
108-
err = debounce.Debounce(loggerSectaV2, func() error {
110+
err = debounce.Debounce(ctx, loggerSectaV2, func(ctx context.Context) error {
109111
quotePoolToken, err = poolContract.Token1(nil)
110112
return err
111113
})

pkg/quotes/secta_v3.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package quotes
22

33
import (
4+
"context"
45
"fmt"
6+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
57
"math/big"
68
"strings"
79

@@ -75,7 +77,7 @@ func (s *sectaV3) postStart(driver *baseDEX[
7577
return nil
7678
}
7779

78-
func (s *sectaV3) getPool(market Market) ([]*dexPool[isecta_v3_pool.ISectaV3PoolSwap, *isecta_v3_pool.ISectaV3PoolSwapIterator], error) {
80+
func (s *sectaV3) getPool(ctx context.Context, market Market) ([]*dexPool[isecta_v3_pool.ISectaV3PoolSwap, *isecta_v3_pool.ISectaV3PoolSwapIterator], error) {
7981
baseToken, quoteToken, err := getTokens(s.assets, market, loggerSectaV3)
8082
if err != nil {
8183
return nil, fmt.Errorf("failed to get tokens: %w", err)
@@ -89,8 +91,8 @@ func (s *sectaV3) getPool(market Market) ([]*dexPool[isecta_v3_pool.ISectaV3Pool
8991
zeroAddress := common.HexToAddress("0x0")
9092
for _, feeTier := range sectaV3FeeTiers {
9193
var poolAddress common.Address
92-
err = debounce.Debounce(loggerSectaV3, func() error {
93-
poolAddress, err = s.factory.GetPool(nil, baseToken.Address, quoteToken.Address, big.NewInt(int64(feeTier)))
94+
err = debounce.Debounce(ctx, loggerSectaV3, func(ctx context.Context) error {
95+
poolAddress, err = s.factory.GetPool(&bind.CallOpts{Context: ctx}, baseToken.Address, quoteToken.Address, big.NewInt(int64(feeTier)))
9496
return err
9597
})
9698
if err != nil {
@@ -114,17 +116,17 @@ func (s *sectaV3) getPool(market Market) ([]*dexPool[isecta_v3_pool.ISectaV3Pool
114116
}
115117

116118
var basePoolToken common.Address
117-
err = debounce.Debounce(loggerSectaV3, func() error {
118-
basePoolToken, err = poolContract.Token0(nil)
119+
err = debounce.Debounce(ctx, loggerSectaV3, func(ctx context.Context) error {
120+
basePoolToken, err = poolContract.Token0(&bind.CallOpts{Context: ctx})
119121
return err
120122
})
121123
if err != nil {
122124
return nil, fmt.Errorf("failed to get base token address for Secta v3 pool: %w", err)
123125
}
124126

125127
var quotePoolToken common.Address
126-
err = debounce.Debounce(loggerSectaV3, func() error {
127-
quotePoolToken, err = poolContract.Token1(nil)
128+
err = debounce.Debounce(ctx, loggerSectaV3, func(ctx context.Context) error {
129+
quotePoolToken, err = poolContract.Token1(&bind.CallOpts{Context: ctx})
128130
return err
129131
})
130132
if err != nil {

0 commit comments

Comments
 (0)