From d17e31543ff860a429bbc88f400166ca33038062 Mon Sep 17 00:00:00 2001 From: shazbert Date: Tue, 9 Sep 2025 15:04:11 +1000 Subject: [PATCH 01/21] feat(gateio): add websocket subscription manager and orderbook update with snapshot functionality [spot] --- currency/pair.go | 8 +- currency/pair_methods.go | 2 +- currency/pair_test.go | 2 +- currency/pairs_test.go | 2 +- exchanges/gateio/gateio.go | 1 + exchanges/gateio/gateio_types.go | 11 ++ exchanges/gateio/gateio_websocket.go | 145 ++++++++++++++++----- exchanges/gateio/gateio_websocket_test.go | 71 ++++++++++ exchanges/gateio/gateio_wrapper.go | 1 + exchanges/gateio/ws_ob_sub_manager.go | 72 ++++++++++ exchanges/gateio/ws_ob_sub_manager_test.go | 79 +++++++++++ 11 files changed, 360 insertions(+), 34 deletions(-) create mode 100644 exchanges/gateio/gateio_websocket_test.go create mode 100644 exchanges/gateio/ws_ob_sub_manager.go create mode 100644 exchanges/gateio/ws_ob_sub_manager_test.go diff --git a/currency/pair.go b/currency/pair.go index f6829836176..b71919d3ea4 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -7,8 +7,12 @@ import ( "unicode" ) +// Public error vars +var ( + ErrCannotCreatePair = errors.New("cannot create currency pair") +) + var ( - errCannotCreatePair = errors.New("cannot create currency pair") errDelimiterNotFound = errors.New("delimiter not found") errDelimiterCannotBeEmpty = errors.New("delimiter cannot be empty") ) @@ -70,7 +74,7 @@ func NewPairWithDelimiter(base, quote, delimiter string) Pair { // with or without delimiter func NewPairFromString(currencyPair string) (Pair, error) { if len(currencyPair) < 3 { - return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", errCannotCreatePair, currencyPair) + return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", ErrCannotCreatePair, currencyPair) } for x := range currencyPair { diff --git a/currency/pair_methods.go b/currency/pair_methods.go index a8a3a6f0238..519493f23a8 100644 --- a/currency/pair_methods.go +++ b/currency/pair_methods.go @@ -61,7 +61,7 @@ func (p *Pair) UnmarshalJSON(d []byte) error { // incorrectly converted to DUS-KUSDT, ELKRW (Bithumb) which will convert // converted to ELK-RW and HTUSDT (Lbank) which will be incorrectly // converted to HTU-SDT. - return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", errCannotCreatePair, pair) + return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", ErrCannotCreatePair, pair) } // MarshalJSON conforms type to the marshaler interface diff --git a/currency/pair_test.go b/currency/pair_test.go index 126f4ef9fcb..7fefb731795 100644 --- a/currency/pair_test.go +++ b/currency/pair_test.go @@ -54,7 +54,7 @@ func TestPairUnmarshalJSON(t *testing.T) { assert.Equal(t, "usd", p.Quote.String(), "Quote should be correct") assert.Equal(t, "_", p.Delimiter, "Delimiter should be correct") - assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), errCannotCreatePair, "UnmarshalJSON with no delimiter should error") + assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), ErrCannotCreatePair, "UnmarshalJSON with no delimiter should error") assert.NoError(t, p.UnmarshalJSON([]byte(`""`)), "UnmarshalJSON should not error on empty value") assert.Equal(t, EMPTYPAIR, p, "UnmarshalJSON empty value should give EMPTYPAIR") diff --git a/currency/pairs_test.go b/currency/pairs_test.go index 426a40b85b3..b9ab830a8c3 100644 --- a/currency/pairs_test.go +++ b/currency/pairs_test.go @@ -54,7 +54,7 @@ func TestPairsFromString(t *testing.T) { _, err := NewPairsFromString("", "") assert.ErrorIs(t, err, errNoDelimiter) _, err = NewPairsFromString("", ",") - assert.ErrorIs(t, err, errCannotCreatePair) + assert.ErrorIs(t, err, ErrCannotCreatePair) pairs, err := NewPairsFromString("ALGO-AUD,BAT-AUD,BCH-AUD,BSV-AUD,BTC-AUD,COMP-AUD,ENJ-AUD,ETC-AUD,ETH-AUD,ETH-BTC,GNT-AUD,LINK-AUD,LTC-AUD,LTC-BTC,MCAU-AUD,OMG-AUD,POWR-AUD,UNI-AUD,USDT-AUD,XLM-AUD,XRP-AUD,XRP-BTC", ",") require.NoError(t, err) diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index 52adaf9518d..8944d9b2f9e 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -192,6 +192,7 @@ type Exchange struct { messageIDSeq common.Counter wsOBUpdateMgr *wsOBUpdateManager + wsOBSubMgr *wsSubscriptionManager } // ***************************************** SubAccounts ******************************** diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index ac2d94d7304..71218416970 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2061,6 +2061,17 @@ type WsOrderbookUpdate struct { Asks [][2]types.Number `json:"a"` } +// WsOrderbookUpdateWithSnapshot represents websocket orderbook update push data +type WsOrderbookUpdateWithSnapshot struct { + UpdateTime types.Time `json:"t"` + Full bool `json:"full"` + Channel string `json:"s"` // returns ob.. which needs further processing + FirstUpdateID int64 `json:"U"` // First update order book id in this event since last update + LastUpdateID int64 `json:"u"` + Bids [][2]types.Number `json:"b"` + Asks [][2]types.Number `json:"a"` +} + // WsOrderbookSnapshot represents a websocket orderbook snapshot push data type WsOrderbookSnapshot struct { UpdateTime types.Time `json:"t"` diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 403785b7bba..e19c18e57ab 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -38,21 +38,22 @@ import ( const ( gateioWebsocketEndpoint = "wss://api.gateio.ws/ws/v4/" - spotPingChannel = "spot.ping" - spotPongChannel = "spot.pong" - spotTickerChannel = "spot.tickers" - spotTradesChannel = "spot.trades" - spotCandlesticksChannel = "spot.candlesticks" - spotOrderbookTickerChannel = "spot.book_ticker" // Best bid or ask price - spotOrderbookUpdateChannel = "spot.order_book_update" // Changed order book levels - spotOrderbookChannel = "spot.order_book" // Limited-Level Full Order Book Snapshot - spotOrdersChannel = "spot.orders" - spotUserTradesChannel = "spot.usertrades" - spotBalancesChannel = "spot.balances" - marginBalancesChannel = "spot.margin_balances" - spotFundingBalanceChannel = "spot.funding_balances" - crossMarginBalanceChannel = "spot.cross_balances" - crossMarginLoanChannel = "spot.cross_loan" + spotPingChannel = "spot.ping" + spotPongChannel = "spot.pong" + spotTickerChannel = "spot.tickers" + spotTradesChannel = "spot.trades" + spotCandlesticksChannel = "spot.candlesticks" + spotOrderbookTickerChannel = "spot.book_ticker" // Best bid or ask price + spotOrderbookUpdateChannel = "spot.order_book_update" // Changed order book levels + spotOrderbookChannel = "spot.order_book" // Limited-Level Full Order Book Snapshot + spotOrderbookUpdateWithSnapshotChannel = "spot.obu" + spotOrdersChannel = "spot.orders" + spotUserTradesChannel = "spot.usertrades" + spotBalancesChannel = "spot.balances" + marginBalancesChannel = "spot.margin_balances" + spotFundingBalanceChannel = "spot.funding_balances" + crossMarginBalanceChannel = "spot.cross_balances" + crossMarginLoanChannel = "spot.cross_loan" subscribeEvent = "subscribe" unsubscribeEvent = "unsubscribe" @@ -64,6 +65,7 @@ var defaultSubscriptions = subscription.List{ {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds}, {Enabled: false, Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Interval: kline.TenMilliseconds, Levels: 1}, {Enabled: false, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds, Levels: 100}, + {Enabled: false, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, {Enabled: true, Channel: spotBalancesChannel, Asset: asset.Spot, Authenticated: true}, {Enabled: true, Channel: crossMarginBalanceChannel, Asset: asset.CrossMargin, Authenticated: true}, {Enabled: true, Channel: marginBalancesChannel, Asset: asset.Margin, Authenticated: true}, @@ -193,6 +195,8 @@ func (e *Exchange) WsHandleSpotData(ctx context.Context, conn websocket.Connecti return e.processOrderbookUpdate(ctx, push.Result, push.Time) case spotOrderbookChannel: return e.processOrderbookSnapshot(push.Result, push.Time) + case spotOrderbookUpdateWithSnapshotChannel: + return e.processOrderbookUpdateWithSnapshot(conn, push.Result, push.Time) case spotOrdersChannel: return e.processSpotOrders(respRaw) case spotUserTradesChannel: @@ -432,6 +436,75 @@ func (e *Exchange) processOrderbookSnapshot(incoming []byte, lastPushed time.Tim return nil } +func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, incoming []byte, lastPushed time.Time) error { + var data WsOrderbookUpdateWithSnapshot + if err := json.Unmarshal(incoming, &data); err != nil { + return err + } + + asks := make([]orderbook.Level, len(data.Asks)) + for x := range data.Asks { + asks[x].Price = data.Asks[x][0].Float64() + asks[x].Amount = data.Asks[x][1].Float64() + } + bids := make([]orderbook.Level, len(data.Bids)) + for x := range data.Bids { + bids[x].Price = data.Bids[x][0].Float64() + bids[x].Amount = data.Bids[x][1].Float64() + } + + pair, err := currency.NewPairFromString(strings.Split(data.Channel, ".")[1]) + if err != nil { + return err + } + + var errs error + for _, a := range standardMarginAssetTypes { + if enabled, _ := e.CurrencyPairs.IsPairEnabled(pair, a); !enabled { + continue + } + if data.Full { + if err := e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Exchange: e.Name, + Pair: pair, + Asset: a, + LastUpdated: data.UpdateTime.Time(), + LastPushed: lastPushed, + LastUpdateID: data.LastUpdateID, + Bids: bids, + Asks: asks, + }); err != nil { + return err + } + e.wsOBSubMgr.CompletedResubscribe(pair, asset.Spot) // asset.Spot used so that all pathways don't compete + continue + } + + if e.wsOBSubMgr.IsResubscribing(pair, asset.Spot) { // asset.Spot used so that all pathways don't compete + continue // Drop incremental updates; waiting for a fresh snapshot + } + + if lastUpdateID, _ := e.Websocket.Orderbook.LastUpdateID(pair, a); lastUpdateID+1 != data.FirstUpdateID { + errs = common.AppendError(errs, e.wsOBSubMgr.Resubscribe(e, conn, data.Channel, pair, asset.Spot)) // asset.Spot used so that all pathways don't compete + continue + } + + if err := e.Websocket.Orderbook.Update(&orderbook.Update{ + Pair: pair, + Asset: a, + UpdateTime: data.UpdateTime.Time(), + LastPushed: lastPushed, + UpdateID: data.LastUpdateID, + Bids: bids, + Asks: asks, + AllowEmpty: true, + }); err != nil { + return err + } + } + return errs +} + func (e *Exchange) processSpotOrders(data []byte) error { resp := struct { Time types.Time `json:"time"` @@ -648,11 +721,12 @@ func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*templ return template.New("master.tmpl"). Funcs(sprig.FuncMap()). Funcs(template.FuncMap{ - "channelName": channelName, - "singleSymbolChannel": singleSymbolChannel, - "orderbookInterval": orderbookChannelInterval, - "candlesInterval": candlesChannelInterval, - "levels": channelLevels, + "channelName": channelName, + "singleSymbolChannel": singleSymbolChannel, + "orderbookInterval": orderbookChannelInterval, + "candlesInterval": candlesChannelInterval, + "levels": channelLevels, + "compactOrderbookPayload": isCompactOrderbookPayload, }).Parse(subTplText) } @@ -740,7 +814,7 @@ func channelName(s *subscription.Subscription) string { // singleSymbolChannel returns if the channel should be fanned out into single symbol requests func singleSymbolChannel(name string) bool { switch name { - case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel: + case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookUpdateWithSnapshotChannel: return true } return false @@ -779,6 +853,7 @@ func isSingleOrderbookChannel(name string) bool { case spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookTickerChannel, + spotOrderbookUpdateWithSnapshotChannel, futuresOrderbookChannel, futuresOrderbookTickerChannel, futuresOrderbookUpdateChannel, @@ -849,9 +924,10 @@ func orderbookChannelInterval(s *subscription.Subscription, a asset.Item) (strin var channelLevelsMap = map[asset.Item]map[string][]int{ asset.Spot: { - spotOrderbookTickerChannel: {}, - spotOrderbookUpdateChannel: {}, - spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, + spotOrderbookTickerChannel: {}, + spotOrderbookUpdateChannel: {}, + spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, + spotOrderbookUpdateWithSnapshotChannel: {50, 400}, }, asset.Futures: { futuresOrderbookChannel: {1, 5, 10, 20, 50, 100}, @@ -892,16 +968,27 @@ func channelLevels(s *subscription.Subscription, a asset.Item) (string, error) { return strconv.Itoa(s.Levels), nil } +func isCompactOrderbookPayload(channel string) bool { + return channel == spotOrderbookUpdateWithSnapshotChannel +} + const subTplText = ` {{- with $name := channelName $.S }} {{- range $asset, $pairs := $.AssetPairs }} {{- if singleSymbolChannel $name }} {{- range $i, $p := $pairs -}} - {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} - {{- $p }} - {{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }} - {{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }} - {{- $.PairSeparator }} + {{- if compactOrderbookPayload $name }} + {{- with $l := levels $.S $asset -}} + ob.{{ $p }}.{{ $l }} + {{- end -}} + {{- $.PairSeparator }} + {{- else }} + {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} + {{- $p }} + {{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }} + {{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }} + {{- $.PairSeparator }} + {{- end }} {{- end }} {{- $.AssetSeparator }} {{- else }} diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go new file mode 100644 index 00000000000..cf737906f23 --- /dev/null +++ b/exchanges/gateio/gateio_websocket_test.go @@ -0,0 +1,71 @@ +package gateio + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" +) + +func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { + t.Parallel() + + e := new(Exchange) + require.NoError(t, testexch.Setup(e)) + e.Name = "ProcessOrderbookUpdateWithSnapshot" + e.Features.Subscriptions = subscription.List{ + {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, + } + expanded, err := e.Features.Subscriptions.ExpandTemplates(e) + require.NoError(t, err) + + conn := &FixtureConnection{} + err = e.Websocket.AddSubscriptions(conn, expanded...) + require.NoError(t, err) + + e.wsOBSubMgr.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + + for _, tc := range []struct { + payload []byte + err error + }{ + {payload: []byte(`{"t":"bingbong"}`), err: strconv.ErrSyntax}, + {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCannotCreatePair}, + {payload: []byte(`{"s":"ob.BTC_USDT.50","full":true}`), err: orderbook.ErrLastUpdatedNotSet}, + {payload: []byte(`{"s":"ob.DING_USDT.50","full":true}`), err: nil}, // asset not enabled + { + // Simulate orderbook update already resubscribing + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + err: nil, + }, + { + // Full snapshot will reset resubscribing state + payload: []byte(`{"t":1757377580046,"full":true,"s":"ob.BTC_USDT.50","u":27053258981,"b":[["111666","0.131287"],["111665.3","0.048403"],["111665.2","0.268681"],["111665.1","0.153269"],["111664.9","0.004"],["111663.8","0.010919"],["111663.7","0.214867"],["111661.8","0.268681"],["111659.4","0.01144"],["111659.3","0.184127"],["111658.4","0.268681"],["111658.3","0.11897"],["111656.9","0.00653"],["111656.7","0.184127"],["111656.1","0.040381"],["111655","0.044859"],["111654.9","0.268681"],["111654.8","0.033575"],["111653.9","0.184127"],["111653.6","0.601785"],["111653.5","0.017118"],["111651.7","0.160346"],["111651.6","0.184127"],["111651.5","0.268681"],["111650.1","0.09042"],["111647.9","0.191292"],["111647.5","0.268681"],["111646","0.098528"],["111645.9","0.1443"],["111645.6","0.184127"],["111643.8","1.015409"],["111643","0.099889"],["111641.5","0.004925"],["111641.2","0.179895"],["111641.1","0.184127"],["111640.7","0.268681"],["111638.6","0.184912"],["111638.4","0.010182"],["111637.6","0.026862"],["111637.5","0.09042"],["111636.6","0.184127"],["111634.8","0.129187"],["111634.7","0.014213"],["111633.9","0.268681"],["111632.1","0.184127"],["111631.8","0.1443"],["111631.6","0.027"],["111631.3","0.089539"],["111630.3","0.00001"],["111629.6","0.000029"]],"a":[["111666.1","0.818887"],["111668.3","0.008062"],["111668.5","0.005399"],["111670.3","0.043892"],["111670.4","0.019653"],["111673.7","0.046898"],["111674.1","0.004227"],["111674.4","0.026258"],["111674.8","0.09042"],["111674.9","0.268681"],["111675","0.004227"],["111676","0.004227"],["111676.8","0.005"],["111677","0.004227"],["111678.1","0.077789"],["111678.2","0.210991"],["111678.3","0.268681"],["111678.4","0.025039"],["111678.5","0.051456"],["111679.2","0.007163"],["111679.5","0.013019"],["111681.5","0.036343"],["111681.7","0.268681"],["111682.9","0.184127"],["111685.2","0.184127"],["111685.8","0.040538"],["111686.4","0.201931"],["111687.3","0.03"],["111687.4","0.09042"],["111687.5","0.452808"],["111687.6","1.815093"],["111691.9","0.139287"],["111692.2","0.184127"],["111693.7","0.268681"],["111694.3","1.05115"],["111694.5","0.184127"],["111697","0.184127"],["111697.1","0.268681"],["111697.4","0.0967"],["111698.7","0.1443"],["111699.5","0.014213"],["111700.2","0.601783"],["111700.7","0.09042"],["111700.9","0.367517"],["111701.5","0.184127"],["111705.2","0.017703"],["111706","0.184127"],["111707.6","0.268681"],["111709.9","0.1443"],["111710.2","0.004"]]}`), + err: nil, + }, + { + // Incremental update will apply correctly + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + err: nil, + }, + { + // Incremental update out of order will force resubscription + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + err: nil, + }, + } { + err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now()) + if tc.err != nil { + require.ErrorIs(t, err, tc.err) + continue + } + require.NoError(t, err) + } +} diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index f6a1a3239cf..bd494064b22 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -179,6 +179,7 @@ func (e *Exchange) SetDefaults() { e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay) + e.wsOBSubMgr = newWSSubscriptionManager() } // Setup sets user configuration diff --git a/exchanges/gateio/ws_ob_sub_manager.go b/exchanges/gateio/ws_ob_sub_manager.go new file mode 100644 index 00000000000..2692f10d198 --- /dev/null +++ b/exchanges/gateio/ws_ob_sub_manager.go @@ -0,0 +1,72 @@ +package gateio + +import ( + "fmt" + "sync" + + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchange/websocket" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" +) + +type wsSubscriptionManager struct { + lookup map[key.PairAsset]bool + m sync.RWMutex +} + +func newWSSubscriptionManager() *wsSubscriptionManager { + return &wsSubscriptionManager{lookup: make(map[key.PairAsset]bool)} +} + +// IsResubscribing checks if a subscription is currently being resubscribed +func (m *wsSubscriptionManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { + m.m.RLock() + defer m.m.RUnlock() + return m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] +} + +// Resubscribe marks a subscription as resubscribing and starts the unsubscribe/resubscribe process +func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { + m.m.Lock() + defer m.m.Unlock() + + sub := e.Websocket.GetSubscription(newQualifiedChannelKey(qualifiedChannel)) + if sub == nil { + return fmt.Errorf("%w: %q", subscription.ErrNotFound, qualifiedChannel) + } + + m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] = true + + go func() { // Has to be called in routine to not impede websocket throughput + if err := e.Websocket.ResubscribeToChannel(conn, sub); err != nil { + fmt.Printf("Failed to resubscribe to channel %q: %v\n", qualifiedChannel, err) + } + }() + + return nil +} + +// CompletedResubscribe marks a subscription as completed +func (m *wsSubscriptionManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { + m.m.Lock() + defer m.m.Unlock() + delete(m.lookup, key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}) +} + +func newQualifiedChannelKey(qualifiedChannel string) qualifiedChannelKey { + return qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: qualifiedChannel}} +} + +type qualifiedChannelKey struct { + *subscription.Subscription +} + +func (k qualifiedChannelKey) Match(eachKey subscription.MatchableKey) bool { + return k.Subscription.QualifiedChannel == eachKey.GetSubscription().QualifiedChannel +} + +func (k qualifiedChannelKey) GetSubscription() *subscription.Subscription { + return k.Subscription +} diff --git a/exchanges/gateio/ws_ob_sub_manager_test.go b/exchanges/gateio/ws_ob_sub_manager_test.go new file mode 100644 index 00000000000..15ddc528dfe --- /dev/null +++ b/exchanges/gateio/ws_ob_sub_manager_test.go @@ -0,0 +1,79 @@ +package gateio + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" +) + +func TestNewWSSubscriptionManager(t *testing.T) { + t.Parallel() + + m := newWSSubscriptionManager() + require.NotNil(t, m) + require.NotNil(t, m.lookup) +} + +func TestIsResubscribing(t *testing.T) { + t.Parallel() + + m := newWSSubscriptionManager() + m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) +} + +func TestResubscribe(t *testing.T) { + t.Parallel() + + m := newWSSubscriptionManager() + + conn := &FixtureConnection{} + err := m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) + require.ErrorIs(t, err, subscription.ErrNotFound) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + + e := new(Exchange) + require.NoError(t, testexch.Setup(e)) + e.Name = "Resubscribe" + e.Features.Subscriptions = subscription.List{ + {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, + } + expanded, err := e.Features.Subscriptions.ExpandTemplates(e) + require.NoError(t, err) + + err = e.Websocket.AddSubscriptions(conn, expanded...) + require.NoError(t, err) + + err = m.Resubscribe(e, conn, "ob.BTC_USDT.50", currency.NewBTCUSDT(), asset.Spot) + require.NoError(t, err) + require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) +} + +func TestCompletedResubscribe(t *testing.T) { + t.Parallel() + + m := newWSSubscriptionManager() + m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) // no-op + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) +} + +func TestQualifiedChannelKey_Match(t *testing.T) { + t.Parallel() + + require.Implements(t, (*subscription.MatchableKey)(nil), new(qualifiedChannelKey)) + + key := qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "test.channel"}} + require.True(t, key.Match(key)) + require.False(t, key.Match(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "TEST.channel"}})) + require.NotNil(t, key.GetSubscription()) +} From 07817a4dc0b16b877c3067916a390ff1e88dbe52 Mon Sep 17 00:00:00 2001 From: shazbert Date: Tue, 9 Sep 2025 15:42:51 +1000 Subject: [PATCH 02/21] linter + other fixes --- exchanges/gateio/gateio_websocket.go | 6 ++-- exchanges/gateio/gateio_websocket_test.go | 2 +- exchanges/gateio/ws_ob_sub_manager.go | 7 ++++- exchanges/gateio/ws_ob_sub_manager_test.go | 36 +++++++++++++++++++--- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index e19c18e57ab..3c5624be070 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -979,9 +979,9 @@ const subTplText = ` {{- range $i, $p := $pairs -}} {{- if compactOrderbookPayload $name }} {{- with $l := levels $.S $asset -}} - ob.{{ $p }}.{{ $l }} - {{- end -}} - {{- $.PairSeparator }} + ob.{{ $p }}.{{ $l }} + {{- end -}} + {{- $.PairSeparator }} {{- else }} {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} {{- $p }} diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index cf737906f23..6529ad64231 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -17,7 +17,7 @@ import ( func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { t.Parallel() - e := new(Exchange) + e := new(Exchange) //nolint:govet // Intentional shadow require.NoError(t, testexch.Setup(e)) e.Name = "ProcessOrderbookUpdateWithSnapshot" e.Features.Subscriptions = subscription.List{ diff --git a/exchanges/gateio/ws_ob_sub_manager.go b/exchanges/gateio/ws_ob_sub_manager.go index 2692f10d198..6119e75f246 100644 --- a/exchanges/gateio/ws_ob_sub_manager.go +++ b/exchanges/gateio/ws_ob_sub_manager.go @@ -9,6 +9,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchange/websocket" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + "github.com/thrasher-corp/gocryptotrader/log" ) type wsSubscriptionManager struct { @@ -29,6 +30,10 @@ func (m *wsSubscriptionManager) IsResubscribing(pair currency.Pair, a asset.Item // Resubscribe marks a subscription as resubscribing and starts the unsubscribe/resubscribe process func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { + if err := e.Websocket.Orderbook.InvalidateOrderbook(pair, a); err != nil { + return err + } + m.m.Lock() defer m.m.Unlock() @@ -41,7 +46,7 @@ func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connecti go func() { // Has to be called in routine to not impede websocket throughput if err := e.Websocket.ResubscribeToChannel(conn, sub); err != nil { - fmt.Printf("Failed to resubscribe to channel %q: %v\n", qualifiedChannel, err) + log.Errorf(log.ExchangeSys, "Failed to resubscribe to channel %q: %v", qualifiedChannel, err) } }() diff --git a/exchanges/gateio/ws_ob_sub_manager_test.go b/exchanges/gateio/ws_ob_sub_manager_test.go index 15ddc528dfe..433efbfa5ea 100644 --- a/exchanges/gateio/ws_ob_sub_manager_test.go +++ b/exchanges/gateio/ws_ob_sub_manager_test.go @@ -2,11 +2,13 @@ package gateio import ( "testing" + "time" "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) @@ -34,11 +36,26 @@ func TestResubscribe(t *testing.T) { m := newWSSubscriptionManager() conn := &FixtureConnection{} + err := m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) + require.ErrorIs(t, err, orderbook.ErrDepthNotFound) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + + err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Asks: []orderbook.Level{{Price: 50000, Amount: 0.1}}, + Bids: []orderbook.Level{{Price: 49000, Amount: 0.2}}, + Exchange: e.Name, + Pair: currency.NewBTCUSDT(), + Asset: asset.Spot, + LastUpdated: time.Now(), + }) + require.NoError(t, err) + err = m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) require.ErrorIs(t, err, subscription.ErrNotFound) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) - e := new(Exchange) + e := new(Exchange) //nolint:govet // Intentional shadow require.NoError(t, testexch.Setup(e)) e.Name = "Resubscribe" e.Features.Subscriptions = subscription.List{ @@ -50,6 +67,15 @@ func TestResubscribe(t *testing.T) { err = e.Websocket.AddSubscriptions(conn, expanded...) require.NoError(t, err) + err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Asks: []orderbook.Level{{Price: 50000, Amount: 0.1}}, + Bids: []orderbook.Level{{Price: 49000, Amount: 0.2}}, + Exchange: e.Name, + Pair: currency.NewBTCUSDT(), + Asset: asset.Spot, + LastUpdated: time.Now(), + }) + require.NoError(t, err) err = m.Resubscribe(e, conn, "ob.BTC_USDT.50", currency.NewBTCUSDT(), asset.Spot) require.NoError(t, err) require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) @@ -72,8 +98,8 @@ func TestQualifiedChannelKey_Match(t *testing.T) { require.Implements(t, (*subscription.MatchableKey)(nil), new(qualifiedChannelKey)) - key := qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "test.channel"}} - require.True(t, key.Match(key)) - require.False(t, key.Match(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "TEST.channel"}})) - require.NotNil(t, key.GetSubscription()) + k := qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "test.channel"}} + require.True(t, k.Match(k)) + require.False(t, k.Match(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "TEST.channel"}})) + require.NotNil(t, k.GetSubscription()) } From 4007fb29b62dd71e6f3feb0c16e8aea0c2edac64 Mon Sep 17 00:00:00 2001 From: shazbert Date: Thu, 11 Sep 2025 07:34:13 +1000 Subject: [PATCH 03/21] AI+Boss: nits --- exchanges/gateio/gateio_websocket.go | 10 +++++++--- exchanges/gateio/ws_ob_sub_manager.go | 1 + exchanges/gateio/ws_ob_sub_manager_test.go | 13 +++++++------ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 3c5624be070..1d410c379ed 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -57,6 +57,10 @@ const ( subscribeEvent = "subscribe" unsubscribeEvent = "unsubscribe" + + // Used for orderbook resubscription management so as to not compete with margin/cross-margin updates as they use + // the same orderbook + defaultExclusiveAsset = asset.Spot ) var defaultSubscriptions = subscription.List{ @@ -476,16 +480,16 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, }); err != nil { return err } - e.wsOBSubMgr.CompletedResubscribe(pair, asset.Spot) // asset.Spot used so that all pathways don't compete + e.wsOBSubMgr.CompletedResubscribe(pair, defaultExclusiveAsset) continue } - if e.wsOBSubMgr.IsResubscribing(pair, asset.Spot) { // asset.Spot used so that all pathways don't compete + if e.wsOBSubMgr.IsResubscribing(pair, defaultExclusiveAsset) { continue // Drop incremental updates; waiting for a fresh snapshot } if lastUpdateID, _ := e.Websocket.Orderbook.LastUpdateID(pair, a); lastUpdateID+1 != data.FirstUpdateID { - errs = common.AppendError(errs, e.wsOBSubMgr.Resubscribe(e, conn, data.Channel, pair, asset.Spot)) // asset.Spot used so that all pathways don't compete + errs = common.AppendError(errs, e.wsOBSubMgr.Resubscribe(e, conn, data.Channel, pair, defaultExclusiveAsset)) continue } diff --git a/exchanges/gateio/ws_ob_sub_manager.go b/exchanges/gateio/ws_ob_sub_manager.go index 6119e75f246..5a93f71b62b 100644 --- a/exchanges/gateio/ws_ob_sub_manager.go +++ b/exchanges/gateio/ws_ob_sub_manager.go @@ -46,6 +46,7 @@ func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connecti go func() { // Has to be called in routine to not impede websocket throughput if err := e.Websocket.ResubscribeToChannel(conn, sub); err != nil { + m.CompletedResubscribe(pair, a) // Ensure we clear the map entry on failure too log.Errorf(log.ExchangeSys, "Failed to resubscribe to channel %q: %v", qualifiedChannel, err) } }() diff --git a/exchanges/gateio/ws_ob_sub_manager_test.go b/exchanges/gateio/ws_ob_sub_manager_test.go index 433efbfa5ea..1560588e6cd 100644 --- a/exchanges/gateio/ws_ob_sub_manager_test.go +++ b/exchanges/gateio/ws_ob_sub_manager_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/currency" @@ -18,7 +19,7 @@ func TestNewWSSubscriptionManager(t *testing.T) { m := newWSSubscriptionManager() require.NotNil(t, m) - require.NotNil(t, m.lookup) + assert.NotNil(t, m.lookup) } func TestIsResubscribing(t *testing.T) { @@ -26,8 +27,8 @@ func TestIsResubscribing(t *testing.T) { m := newWSSubscriptionManager() m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true - require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) - require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) + assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) } func TestResubscribe(t *testing.T) { @@ -78,7 +79,7 @@ func TestResubscribe(t *testing.T) { require.NoError(t, err) err = m.Resubscribe(e, conn, "ob.BTC_USDT.50", currency.NewBTCUSDT(), asset.Spot) require.NoError(t, err) - require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) } func TestCompletedResubscribe(t *testing.T) { @@ -90,7 +91,7 @@ func TestCompletedResubscribe(t *testing.T) { m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) - require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) } func TestQualifiedChannelKey_Match(t *testing.T) { @@ -101,5 +102,5 @@ func TestQualifiedChannelKey_Match(t *testing.T) { k := qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "test.channel"}} require.True(t, k.Match(k)) require.False(t, k.Match(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "TEST.channel"}})) - require.NotNil(t, k.GetSubscription()) + assert.NotNil(t, k.GetSubscription()) } From ab9e7e0836748905bf78ffd651788502fa8f14b0 Mon Sep 17 00:00:00 2001 From: shazbert Date: Thu, 11 Sep 2025 15:31:34 +1000 Subject: [PATCH 04/21] ai+glorious: nits --- exchanges/gateio/gateio.go | 2 +- exchanges/gateio/gateio_websocket.go | 18 +++++++++++++----- exchanges/gateio/gateio_websocket_test.go | 3 ++- exchanges/gateio/gateio_wrapper.go | 2 +- ...b_sub_manager.go => ws_ob_resub_manager.go} | 12 ++++++------ ...ger_test.go => ws_ob_resub_manager_test.go} | 17 +++++++++-------- 6 files changed, 32 insertions(+), 22 deletions(-) rename exchanges/gateio/{ws_ob_sub_manager.go => ws_ob_resub_manager.go} (81%) rename exchanges/gateio/{ws_ob_sub_manager_test.go => ws_ob_resub_manager_test.go} (95%) diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index 8944d9b2f9e..a066fc4d230 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -192,7 +192,7 @@ type Exchange struct { messageIDSeq common.Counter wsOBUpdateMgr *wsOBUpdateManager - wsOBSubMgr *wsSubscriptionManager + wsOBResubMgr *wsObResubManager } // ***************************************** SubAccounts ******************************** diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 1c6791dad57..413b40421b8 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -88,7 +88,10 @@ var ( validPingChannels = []string{optionsPingChannel, futuresPingChannel, spotPingChannel} ) -var errInvalidPingChannel = errors.New("invalid ping channel") +var ( + errInvalidPingChannel = errors.New("invalid ping channel") + errMalformedData = errors.New("malformed data") +) // WsConnectSpot initiates a websocket connection func (e *Exchange) WsConnectSpot(ctx context.Context, conn websocket.Connection) error { @@ -455,7 +458,12 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, bids[x].Amount = data.Bids[x][1].Float64() } - pair, err := currency.NewPairFromString(strings.Split(data.Channel, ".")[1]) + splitChannel := strings.Split(data.Channel, ".") + if len(splitChannel) < 3 { + return fmt.Errorf("%w: %q", errMalformedData, data.Channel) + } + + pair, err := currency.NewPairFromString(splitChannel[1]) if err != nil { return err } @@ -478,16 +486,16 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, }); err != nil { return err } - e.wsOBSubMgr.CompletedResubscribe(pair, defaultExclusiveAsset) + e.wsOBResubMgr.CompletedResubscribe(pair, defaultExclusiveAsset) continue } - if e.wsOBSubMgr.IsResubscribing(pair, defaultExclusiveAsset) { + if e.wsOBResubMgr.IsResubscribing(pair, defaultExclusiveAsset) { continue // Drop incremental updates; waiting for a fresh snapshot } if lastUpdateID, _ := e.Websocket.Orderbook.LastUpdateID(pair, a); lastUpdateID+1 != data.FirstUpdateID { - errs = common.AppendError(errs, e.wsOBSubMgr.Resubscribe(e, conn, data.Channel, pair, defaultExclusiveAsset)) + errs = common.AppendError(errs, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, defaultExclusiveAsset)) continue } diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 97aa1392e05..b223386b2c9 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -55,13 +55,14 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { err = e.Websocket.AddSubscriptions(conn, expanded...) require.NoError(t, err) - e.wsOBSubMgr.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + e.wsOBResubMgr.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true for _, tc := range []struct { payload []byte err error }{ {payload: []byte(`{"t":"bingbong"}`), err: strconv.ErrSyntax}, + {payload: []byte(`{"s":"ob.50"}`), err: errMalformedData}, {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCannotCreatePair}, {payload: []byte(`{"s":"ob.BTC_USDT.50","full":true}`), err: orderbook.ErrLastUpdatedNotSet}, {payload: []byte(`{"s":"ob.DING_USDT.50","full":true}`), err: nil}, // asset not enabled diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index bd494064b22..48ca5549d5e 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -179,7 +179,7 @@ func (e *Exchange) SetDefaults() { e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay) - e.wsOBSubMgr = newWSSubscriptionManager() + e.wsOBResubMgr = newWSObResubManager() } // Setup sets user configuration diff --git a/exchanges/gateio/ws_ob_sub_manager.go b/exchanges/gateio/ws_ob_resub_manager.go similarity index 81% rename from exchanges/gateio/ws_ob_sub_manager.go rename to exchanges/gateio/ws_ob_resub_manager.go index 5a93f71b62b..cf2d1746b1b 100644 --- a/exchanges/gateio/ws_ob_sub_manager.go +++ b/exchanges/gateio/ws_ob_resub_manager.go @@ -12,24 +12,24 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) -type wsSubscriptionManager struct { +type wsObResubManager struct { lookup map[key.PairAsset]bool m sync.RWMutex } -func newWSSubscriptionManager() *wsSubscriptionManager { - return &wsSubscriptionManager{lookup: make(map[key.PairAsset]bool)} +func newWSObResubManager() *wsObResubManager { + return &wsObResubManager{lookup: make(map[key.PairAsset]bool)} } // IsResubscribing checks if a subscription is currently being resubscribed -func (m *wsSubscriptionManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { +func (m *wsObResubManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { m.m.RLock() defer m.m.RUnlock() return m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] } // Resubscribe marks a subscription as resubscribing and starts the unsubscribe/resubscribe process -func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { +func (m *wsObResubManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { if err := e.Websocket.Orderbook.InvalidateOrderbook(pair, a); err != nil { return err } @@ -55,7 +55,7 @@ func (m *wsSubscriptionManager) Resubscribe(e *Exchange, conn websocket.Connecti } // CompletedResubscribe marks a subscription as completed -func (m *wsSubscriptionManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { +func (m *wsObResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { m.m.Lock() defer m.m.Unlock() delete(m.lookup, key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}) diff --git a/exchanges/gateio/ws_ob_sub_manager_test.go b/exchanges/gateio/ws_ob_resub_manager_test.go similarity index 95% rename from exchanges/gateio/ws_ob_sub_manager_test.go rename to exchanges/gateio/ws_ob_resub_manager_test.go index 1560588e6cd..01c65f48927 100644 --- a/exchanges/gateio/ws_ob_sub_manager_test.go +++ b/exchanges/gateio/ws_ob_resub_manager_test.go @@ -14,10 +14,10 @@ import ( testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) -func TestNewWSSubscriptionManager(t *testing.T) { +func TestNewWSObResubManager(t *testing.T) { t.Parallel() - m := newWSSubscriptionManager() + m := newWSObResubManager() require.NotNil(t, m) assert.NotNil(t, m.lookup) } @@ -25,7 +25,7 @@ func TestNewWSSubscriptionManager(t *testing.T) { func TestIsResubscribing(t *testing.T) { t.Parallel() - m := newWSSubscriptionManager() + m := newWSObResubManager() m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) @@ -34,10 +34,14 @@ func TestIsResubscribing(t *testing.T) { func TestResubscribe(t *testing.T) { t.Parallel() - m := newWSSubscriptionManager() + m := newWSObResubManager() conn := &FixtureConnection{} + e := new(Exchange) //nolint:govet // Intentional shadow + require.NoError(t, testexch.Setup(e)) + e.Name = "Resubscribe" + err := m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) require.ErrorIs(t, err, orderbook.ErrDepthNotFound) require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) @@ -56,9 +60,6 @@ func TestResubscribe(t *testing.T) { require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) - e := new(Exchange) //nolint:govet // Intentional shadow - require.NoError(t, testexch.Setup(e)) - e.Name = "Resubscribe" e.Features.Subscriptions = subscription.List{ {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, } @@ -85,7 +86,7 @@ func TestResubscribe(t *testing.T) { func TestCompletedResubscribe(t *testing.T) { t.Parallel() - m := newWSSubscriptionManager() + m := newWSObResubManager() m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) // no-op require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true From 35eb48c95edc954d3d7e21eb5dfe330cbd999f47 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 17 Sep 2025 08:32:47 +1000 Subject: [PATCH 05/21] bossking: nits --- exchanges/gateio/ws_ob_resub_manager.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/exchanges/gateio/ws_ob_resub_manager.go b/exchanges/gateio/ws_ob_resub_manager.go index cf2d1746b1b..61e8ecbe683 100644 --- a/exchanges/gateio/ws_ob_resub_manager.go +++ b/exchanges/gateio/ws_ob_resub_manager.go @@ -34,14 +34,14 @@ func (m *wsObResubManager) Resubscribe(e *Exchange, conn websocket.Connection, q return err } - m.m.Lock() - defer m.m.Unlock() - - sub := e.Websocket.GetSubscription(newQualifiedChannelKey(qualifiedChannel)) + sub := e.Websocket.GetSubscription(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: qualifiedChannel}}) if sub == nil { return fmt.Errorf("%w: %q", subscription.ErrNotFound, qualifiedChannel) } + m.m.Lock() + defer m.m.Unlock() + m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] = true go func() { // Has to be called in routine to not impede websocket throughput @@ -61,10 +61,6 @@ func (m *wsObResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item delete(m.lookup, key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}) } -func newQualifiedChannelKey(qualifiedChannel string) qualifiedChannelKey { - return qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: qualifiedChannel}} -} - type qualifiedChannelKey struct { *subscription.Subscription } From 36ba2ceba89b882c43763473281b73a6e767a8b0 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 17 Sep 2025 11:25:01 +1000 Subject: [PATCH 06/21] drop cross/margin handling for spot pathway as its turned off anyway, handle error from LastUpdateID --- exchanges/gateio/gateio_websocket.go | 60 ++++++++++------------- exchanges/gateio/gateio_websocket_test.go | 3 +- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index eb048f173de..33ddf6e6d20 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -201,7 +201,7 @@ func (e *Exchange) WsHandleSpotData(ctx context.Context, conn websocket.Connecti case spotOrderbookChannel: return e.processOrderbookSnapshot(push.Result, push.Time) case spotOrderbookUpdateWithSnapshotChannel: - return e.processOrderbookUpdateWithSnapshot(conn, push.Result, push.Time) + return e.processOrderbookUpdateWithSnapshot(conn, push.Result, push.Time, asset.Spot) case spotOrdersChannel: return e.processSpotOrders(respRaw) case spotUserTradesChannel: @@ -441,7 +441,7 @@ func (e *Exchange) processOrderbookSnapshot(incoming []byte, lastPushed time.Tim return nil } -func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, incoming []byte, lastPushed time.Time) error { +func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, incoming []byte, lastPushed time.Time, a asset.Item) error { var data WsOrderbookUpdateWithSnapshot if err := json.Unmarshal(incoming, &data); err != nil { return err @@ -468,38 +468,31 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, return err } - var errs error - for _, a := range standardMarginAssetTypes { - if enabled, _ := e.CurrencyPairs.IsPairEnabled(pair, a); !enabled { - continue - } - if data.Full { - if err := e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ - Exchange: e.Name, - Pair: pair, - Asset: a, - LastUpdated: data.UpdateTime.Time(), - LastPushed: lastPushed, - LastUpdateID: data.LastUpdateID, - Bids: bids, - Asks: asks, - }); err != nil { - return err - } - e.wsOBResubMgr.CompletedResubscribe(pair, defaultExclusiveAsset) - continue + if data.Full { + if err := e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Exchange: e.Name, + Pair: pair, + Asset: a, + LastUpdated: data.UpdateTime.Time(), + LastPushed: lastPushed, + LastUpdateID: data.LastUpdateID, + Bids: bids, + Asks: asks, + }); err != nil { + return err } + e.wsOBResubMgr.CompletedResubscribe(pair, a) + return nil + } - if e.wsOBResubMgr.IsResubscribing(pair, defaultExclusiveAsset) { - continue // Drop incremental updates; waiting for a fresh snapshot - } + if e.wsOBResubMgr.IsResubscribing(pair, a) { + return nil // Drop incremental updates; waiting for a fresh snapshot + } - if lastUpdateID, _ := e.Websocket.Orderbook.LastUpdateID(pair, a); lastUpdateID+1 != data.FirstUpdateID { - errs = common.AppendError(errs, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, defaultExclusiveAsset)) - continue - } + lastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(pair, a) - if err := e.Websocket.Orderbook.Update(&orderbook.Update{ + if lastUpdateID+1 == data.FirstUpdateID { + if err = e.Websocket.Orderbook.Update(&orderbook.Update{ Pair: pair, Asset: a, UpdateTime: data.UpdateTime.Time(), @@ -508,11 +501,12 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, Bids: bids, Asks: asks, AllowEmpty: true, - }); err != nil { - return err + }); err == nil { + return nil } } - return errs + + return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) } func (e *Exchange) processSpotOrders(data []byte) error { diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 7c937c24132..c24d103d100 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -229,7 +229,6 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { {payload: []byte(`{"s":"ob.50"}`), err: errMalformedData}, {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCannotCreatePair}, {payload: []byte(`{"s":"ob.BTC_USDT.50","full":true}`), err: orderbook.ErrLastUpdatedNotSet}, - {payload: []byte(`{"s":"ob.DING_USDT.50","full":true}`), err: nil}, // asset not enabled { // Simulate orderbook update already resubscribing payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), @@ -251,7 +250,7 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { err: nil, }, } { - err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now()) + err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now(), asset.Spot) if tc.err != nil { require.ErrorIs(t, err, tc.err) continue From 52f9e6bc8e53f01ce6f5c17d077783aba252f92f Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 17 Sep 2025 15:39:20 +1000 Subject: [PATCH 07/21] linter: fix --- exchanges/gateio/gateio_websocket.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 33ddf6e6d20..e2fce57fee4 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -57,10 +57,6 @@ const ( subscribeEvent = "subscribe" unsubscribeEvent = "unsubscribe" - - // Used for orderbook resubscription management so as to not compete with margin/cross-margin updates as they use - // the same orderbook - defaultExclusiveAsset = asset.Spot ) var defaultSubscriptions = subscription.List{ From 298e90227441923ab34700c674095fffcb779567 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:12:49 +1000 Subject: [PATCH 08/21] Update currency/pair.go Co-authored-by: Gareth Kirwan --- currency/pair.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/currency/pair.go b/currency/pair.go index b71919d3ea4..4f5957098c0 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -7,7 +7,7 @@ import ( "unicode" ) -// Public error vars +// Public errors var ( ErrCannotCreatePair = errors.New("cannot create currency pair") ) From d544b6177b536016d36adfab816bd614c87a05d9 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:13:16 +1000 Subject: [PATCH 09/21] Update currency/pair.go Co-authored-by: Gareth Kirwan --- currency/pair.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/currency/pair.go b/currency/pair.go index 4f5957098c0..5c234b1e66f 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -9,7 +9,7 @@ import ( // Public errors var ( - ErrCannotCreatePair = errors.New("cannot create currency pair") + ErrCreatingPair = errors.New("error creating currency pair") ) var ( From c26163eaa5e362ab5b6ae025186d9de7c53e3949 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:13:38 +1000 Subject: [PATCH 10/21] Update exchanges/gateio/gateio.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index a066fc4d230..56409c27064 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -192,7 +192,7 @@ type Exchange struct { messageIDSeq common.Counter wsOBUpdateMgr *wsOBUpdateManager - wsOBResubMgr *wsObResubManager + wsOBResubMgr *wsOBResubManager } // ***************************************** SubAccounts ******************************** From 87fa5d15bfd9e20a82c143c84dbe9d1ff09d75f3 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:14:10 +1000 Subject: [PATCH 11/21] Update exchanges/gateio/gateio_types.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index a00d3cbc05d..b127795535f 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2065,7 +2065,7 @@ type WsOrderbookUpdate struct { type WsOrderbookUpdateWithSnapshot struct { UpdateTime types.Time `json:"t"` Full bool `json:"full"` - Channel string `json:"s"` // returns ob.. which needs further processing + Channel string `json:"s"` FirstUpdateID int64 `json:"U"` // First update order book id in this event since last update LastUpdateID int64 `json:"u"` Bids [][2]types.Number `json:"b"` From 2f1d4e9b4bc85a3140272c96c510a34063670d85 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:14:25 +1000 Subject: [PATCH 12/21] Update exchanges/gateio/gateio_types.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index b127795535f..7edaa0ac6e7 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2066,7 +2066,7 @@ type WsOrderbookUpdateWithSnapshot struct { UpdateTime types.Time `json:"t"` Full bool `json:"full"` Channel string `json:"s"` - FirstUpdateID int64 `json:"U"` // First update order book id in this event since last update + FirstUpdateID int64 `json:"U"` LastUpdateID int64 `json:"u"` Bids [][2]types.Number `json:"b"` Asks [][2]types.Number `json:"a"` From 7c324dc8ed6751305e512555d9541971a834c706 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:14:39 +1000 Subject: [PATCH 13/21] Update exchanges/gateio/gateio_websocket.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio_websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index e2fce57fee4..d8b02fb5567 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -454,7 +454,7 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, bids[x].Amount = data.Bids[x][1].Float64() } - splitChannel := strings.Split(data.Channel, ".") + channelParts := strings.Split(data.Channel, ".") if len(splitChannel) < 3 { return fmt.Errorf("%w: %q", errMalformedData, data.Channel) } From 0b37b1c53f55007933479c430b228eb2ff04f75e Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:15:47 +1000 Subject: [PATCH 14/21] Update exchanges/gateio/gateio_websocket_test.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio_websocket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index c24d103d100..8f120b71c2d 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -212,7 +212,7 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { e.Features.Subscriptions = subscription.List{ {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, } - expanded, err := e.Features.Subscriptions.ExpandTemplates(e) + subs, err := e.Features.Subscriptions.ExpandTemplates(e) require.NoError(t, err) conn := &FixtureConnection{} From de2f1271da37a1641171882319d6a8b400f70a74 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 09:16:02 +1000 Subject: [PATCH 15/21] Update exchanges/gateio/gateio_websocket_test.go Co-authored-by: Gareth Kirwan --- exchanges/gateio/gateio_websocket_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 8f120b71c2d..427825a519c 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -237,7 +237,6 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { { // Full snapshot will reset resubscribing state payload: []byte(`{"t":1757377580046,"full":true,"s":"ob.BTC_USDT.50","u":27053258981,"b":[["111666","0.131287"],["111665.3","0.048403"],["111665.2","0.268681"],["111665.1","0.153269"],["111664.9","0.004"],["111663.8","0.010919"],["111663.7","0.214867"],["111661.8","0.268681"],["111659.4","0.01144"],["111659.3","0.184127"],["111658.4","0.268681"],["111658.3","0.11897"],["111656.9","0.00653"],["111656.7","0.184127"],["111656.1","0.040381"],["111655","0.044859"],["111654.9","0.268681"],["111654.8","0.033575"],["111653.9","0.184127"],["111653.6","0.601785"],["111653.5","0.017118"],["111651.7","0.160346"],["111651.6","0.184127"],["111651.5","0.268681"],["111650.1","0.09042"],["111647.9","0.191292"],["111647.5","0.268681"],["111646","0.098528"],["111645.9","0.1443"],["111645.6","0.184127"],["111643.8","1.015409"],["111643","0.099889"],["111641.5","0.004925"],["111641.2","0.179895"],["111641.1","0.184127"],["111640.7","0.268681"],["111638.6","0.184912"],["111638.4","0.010182"],["111637.6","0.026862"],["111637.5","0.09042"],["111636.6","0.184127"],["111634.8","0.129187"],["111634.7","0.014213"],["111633.9","0.268681"],["111632.1","0.184127"],["111631.8","0.1443"],["111631.6","0.027"],["111631.3","0.089539"],["111630.3","0.00001"],["111629.6","0.000029"]],"a":[["111666.1","0.818887"],["111668.3","0.008062"],["111668.5","0.005399"],["111670.3","0.043892"],["111670.4","0.019653"],["111673.7","0.046898"],["111674.1","0.004227"],["111674.4","0.026258"],["111674.8","0.09042"],["111674.9","0.268681"],["111675","0.004227"],["111676","0.004227"],["111676.8","0.005"],["111677","0.004227"],["111678.1","0.077789"],["111678.2","0.210991"],["111678.3","0.268681"],["111678.4","0.025039"],["111678.5","0.051456"],["111679.2","0.007163"],["111679.5","0.013019"],["111681.5","0.036343"],["111681.7","0.268681"],["111682.9","0.184127"],["111685.2","0.184127"],["111685.8","0.040538"],["111686.4","0.201931"],["111687.3","0.03"],["111687.4","0.09042"],["111687.5","0.452808"],["111687.6","1.815093"],["111691.9","0.139287"],["111692.2","0.184127"],["111693.7","0.268681"],["111694.3","1.05115"],["111694.5","0.184127"],["111697","0.184127"],["111697.1","0.268681"],["111697.4","0.0967"],["111698.7","0.1443"],["111699.5","0.014213"],["111700.2","0.601783"],["111700.7","0.09042"],["111700.9","0.367517"],["111701.5","0.184127"],["111705.2","0.017703"],["111706","0.184127"],["111707.6","0.268681"],["111709.9","0.1443"],["111710.2","0.004"]]}`), - err: nil, }, { // Incremental update will apply correctly From 4c965e707e75813d1432814b758ac89207a74cec Mon Sep 17 00:00:00 2001 From: shazbert Date: Tue, 23 Sep 2025 09:43:55 +1000 Subject: [PATCH 16/21] gk: nits --- common/common.go | 1 + currency/pair.go | 2 +- currency/pair_methods.go | 2 +- currency/pair_test.go | 2 +- currency/pairs_test.go | 2 +- exchanges/deribit/deribit_types.go | 1 - exchanges/deribit/deribit_websocket.go | 36 +++++------ exchanges/gateio/gateio_websocket.go | 63 ++++++++++---------- exchanges/gateio/gateio_websocket_futures.go | 4 +- exchanges/gateio/gateio_websocket_test.go | 12 ++-- exchanges/gateio/gateio_wrapper.go | 2 +- exchanges/gateio/ws_ob_resub_manager.go | 12 ++-- exchanges/gateio/ws_ob_resub_manager_test.go | 12 ++-- exchanges/kucoin/kucoin_convert.go | 3 +- exchanges/kucoin/kucoin_types.go | 1 - exchanges/kucoin/kucoin_websocket.go | 2 +- 16 files changed, 77 insertions(+), 80 deletions(-) diff --git a/common/common.go b/common/common.go index 1f4641a4e7d..2ae21bd1cd0 100644 --- a/common/common.go +++ b/common/common.go @@ -75,6 +75,7 @@ var ( ErrGettingField = errors.New("error getting field") ErrSettingField = errors.New("error setting field") ErrParsingWSField = errors.New("error parsing websocket field") + ErrMalformedData = errors.New("malformed data") ) var ( diff --git a/currency/pair.go b/currency/pair.go index 5c234b1e66f..2554c752a6d 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -74,7 +74,7 @@ func NewPairWithDelimiter(base, quote, delimiter string) Pair { // with or without delimiter func NewPairFromString(currencyPair string) (Pair, error) { if len(currencyPair) < 3 { - return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", ErrCannotCreatePair, currencyPair) + return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", ErrCreatingPair, currencyPair) } for x := range currencyPair { diff --git a/currency/pair_methods.go b/currency/pair_methods.go index 519493f23a8..b346814d27b 100644 --- a/currency/pair_methods.go +++ b/currency/pair_methods.go @@ -61,7 +61,7 @@ func (p *Pair) UnmarshalJSON(d []byte) error { // incorrectly converted to DUS-KUSDT, ELKRW (Bithumb) which will convert // converted to ELK-RW and HTUSDT (Lbank) which will be incorrectly // converted to HTU-SDT. - return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", ErrCannotCreatePair, pair) + return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", ErrCreatingPair, pair) } // MarshalJSON conforms type to the marshaler interface diff --git a/currency/pair_test.go b/currency/pair_test.go index 7fefb731795..3146c75eb21 100644 --- a/currency/pair_test.go +++ b/currency/pair_test.go @@ -54,7 +54,7 @@ func TestPairUnmarshalJSON(t *testing.T) { assert.Equal(t, "usd", p.Quote.String(), "Quote should be correct") assert.Equal(t, "_", p.Delimiter, "Delimiter should be correct") - assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), ErrCannotCreatePair, "UnmarshalJSON with no delimiter should error") + assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), ErrCreatingPair, "UnmarshalJSON with no delimiter should error") assert.NoError(t, p.UnmarshalJSON([]byte(`""`)), "UnmarshalJSON should not error on empty value") assert.Equal(t, EMPTYPAIR, p, "UnmarshalJSON empty value should give EMPTYPAIR") diff --git a/currency/pairs_test.go b/currency/pairs_test.go index b9ab830a8c3..5b37d4112f4 100644 --- a/currency/pairs_test.go +++ b/currency/pairs_test.go @@ -54,7 +54,7 @@ func TestPairsFromString(t *testing.T) { _, err := NewPairsFromString("", "") assert.ErrorIs(t, err, errNoDelimiter) _, err = NewPairsFromString("", ",") - assert.ErrorIs(t, err, ErrCannotCreatePair) + assert.ErrorIs(t, err, ErrCreatingPair) pairs, err := NewPairsFromString("ALGO-AUD,BAT-AUD,BCH-AUD,BSV-AUD,BTC-AUD,COMP-AUD,ENJ-AUD,ETC-AUD,ETH-AUD,ETH-BTC,GNT-AUD,LINK-AUD,LTC-AUD,LTC-BTC,MCAU-AUD,OMG-AUD,POWR-AUD,UNI-AUD,USDT-AUD,XLM-AUD,XRP-AUD,XRP-BTC", ",") require.NoError(t, err) diff --git a/exchanges/deribit/deribit_types.go b/exchanges/deribit/deribit_types.go index 15815d82e68..b12a90ae240 100644 --- a/exchanges/deribit/deribit_types.go +++ b/exchanges/deribit/deribit_types.go @@ -41,7 +41,6 @@ var ( errInvalidID = errors.New("invalid id") errInvalidMarginModel = errors.New("missing margin model") errInvalidEmailAddress = errors.New("invalid email address") - errMalformedData = errors.New("malformed data") errWebsocketConnectionNotAuthenticated = errors.New("websocket connection is not authenticated") errResolutionNotSet = errors.New("resolution not set") errInvalidDestinationID = errors.New("invalid destination id") diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index 9d987bbcb2f..9e5843a25c0 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -338,7 +338,7 @@ func (e *Exchange) wsSendHeartbeat(ctx context.Context) { func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error { if len(channels) != 4 && len(channels) != 5 { - return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse orderData := []WsOrder{} @@ -387,7 +387,7 @@ func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error { func (e *Exchange) processUserOrderChanges(respRaw []byte, channels []string) error { if len(channels) < 4 || len(channels) > 5 { - return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse changeData := &wsChanges{} @@ -497,7 +497,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error { } if len(channels) < 3 || len(channels) > 5 { - return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse var tradeList []wsTrade @@ -541,7 +541,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error { func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) error { if len(channels) != 2 { - return fmt.Errorf("%w, expected format 'incremental_ticker.{instrument_name}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'incremental_ticker.{instrument_name}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } a, cp, err := getAssetPairByInstrument(channels[1]) if err != nil { @@ -573,7 +573,7 @@ func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) e func (e *Exchange) processInstrumentTicker(respRaw []byte, channels []string) error { if len(channels) != 3 { - return fmt.Errorf("%w, expected format 'ticker.{instrument_name}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'ticker.{instrument_name}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } return e.processTicker(respRaw, channels) } @@ -628,7 +628,7 @@ func (e *Exchange) processData(respRaw []byte, result any) error { func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error { if len(channels) != 4 { - return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } a, cp, err := getAssetPairByInstrument(channels[2]) if err != nil { @@ -671,15 +671,15 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { asks := make(orderbook.Levels, 0, len(orderbookData.Asks)) for x := range orderbookData.Asks { if len(orderbookData.Asks[x]) != 3 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Asks[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } amount, okay := orderbookData.Asks[x][2].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } asks = append(asks, orderbook.Level{ Price: price, @@ -689,17 +689,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { bids := make(orderbook.Levels, 0, len(orderbookData.Bids)) for x := range orderbookData.Bids { if len(orderbookData.Bids[x]) != 3 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Bids[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0.0 { continue } amount, okay := orderbookData.Bids[x][2].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } bids = append(bids, orderbook.Level{ Price: price, @@ -740,17 +740,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { asks := make(orderbook.Levels, 0, len(orderbookData.Asks)) for x := range orderbookData.Asks { if len(orderbookData.Asks[x]) != 2 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Asks[x][0].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0 { continue } amount, okay := orderbookData.Asks[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } asks = append(asks, orderbook.Level{ Price: price, @@ -760,17 +760,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { bids := make([]orderbook.Level, 0, len(orderbookData.Bids)) for x := range orderbookData.Bids { if len(orderbookData.Bids[x]) != 2 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Bids[x][0].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0 { continue } amount, okay := orderbookData.Bids[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } bids = append(bids, orderbook.Level{ Price: price, diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index d8b02fb5567..e4ada7a9270 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -38,22 +38,22 @@ import ( const ( gateioWebsocketEndpoint = "wss://api.gateio.ws/ws/v4/" - spotPingChannel = "spot.ping" - spotPongChannel = "spot.pong" - spotTickerChannel = "spot.tickers" - spotTradesChannel = "spot.trades" - spotCandlesticksChannel = "spot.candlesticks" - spotOrderbookTickerChannel = "spot.book_ticker" // Best bid or ask price - spotOrderbookUpdateChannel = "spot.order_book_update" // Changed order book levels - spotOrderbookChannel = "spot.order_book" // Limited-Level Full Order Book Snapshot - spotOrderbookUpdateWithSnapshotChannel = "spot.obu" - spotOrdersChannel = "spot.orders" - spotUserTradesChannel = "spot.usertrades" - spotBalancesChannel = "spot.balances" - marginBalancesChannel = "spot.margin_balances" - spotFundingBalanceChannel = "spot.funding_balances" - crossMarginBalanceChannel = "spot.cross_balances" - crossMarginLoanChannel = "spot.cross_loan" + spotPingChannel = "spot.ping" + spotPongChannel = "spot.pong" + spotTickerChannel = "spot.tickers" + spotTradesChannel = "spot.trades" + spotCandlesticksChannel = "spot.candlesticks" + spotOrderbookTickerChannel = "spot.book_ticker" // Best bid or ask price + spotOrderbookUpdateChannel = "spot.order_book_update" // Changed order book levels + spotOrderbookChannel = "spot.order_book" // Limited-Level Full Order Book Snapshot + spotOrderbookV2 = "spot.obu" + spotOrdersChannel = "spot.orders" + spotUserTradesChannel = "spot.usertrades" + spotBalancesChannel = "spot.balances" + marginBalancesChannel = "spot.margin_balances" + spotFundingBalanceChannel = "spot.funding_balances" + crossMarginBalanceChannel = "spot.cross_balances" + crossMarginLoanChannel = "spot.cross_loan" subscribeEvent = "subscribe" unsubscribeEvent = "unsubscribe" @@ -65,7 +65,7 @@ var defaultSubscriptions = subscription.List{ {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds}, {Enabled: false, Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Interval: kline.TenMilliseconds, Levels: 1}, {Enabled: false, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds, Levels: 100}, - {Enabled: false, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, + {Enabled: false, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, {Enabled: true, Channel: spotBalancesChannel, Asset: asset.Spot, Authenticated: true}, {Enabled: true, Channel: crossMarginBalanceChannel, Asset: asset.CrossMargin, Authenticated: true}, {Enabled: true, Channel: marginBalancesChannel, Asset: asset.Margin, Authenticated: true}, @@ -84,10 +84,7 @@ var ( validPingChannels = []string{optionsPingChannel, futuresPingChannel, spotPingChannel} ) -var ( - errInvalidPingChannel = errors.New("invalid ping channel") - errMalformedData = errors.New("malformed data") -) +var errInvalidPingChannel = errors.New("invalid ping channel") // WsConnectSpot initiates a websocket connection func (e *Exchange) WsConnectSpot(ctx context.Context, conn websocket.Connection) error { @@ -196,7 +193,7 @@ func (e *Exchange) WsHandleSpotData(ctx context.Context, conn websocket.Connecti return e.processOrderbookUpdate(ctx, push.Result, push.Time) case spotOrderbookChannel: return e.processOrderbookSnapshot(push.Result, push.Time) - case spotOrderbookUpdateWithSnapshotChannel: + case spotOrderbookV2: return e.processOrderbookUpdateWithSnapshot(conn, push.Result, push.Time, asset.Spot) case spotOrdersChannel: return e.processSpotOrders(respRaw) @@ -331,7 +328,7 @@ func (e *Exchange) processCandlestick(incoming []byte) error { } icp := strings.Split(data.NameOfSubscription, currency.UnderscoreDelimiter) if len(icp) < 3 { - return errors.New("malformed candlestick websocket push data") + return fmt.Errorf("%w: candlestick websocket", common.ErrMalformedData) } currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter)) if err != nil { @@ -455,11 +452,11 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, } channelParts := strings.Split(data.Channel, ".") - if len(splitChannel) < 3 { - return fmt.Errorf("%w: %q", errMalformedData, data.Channel) + if len(channelParts) < 3 { + return fmt.Errorf("%w: %q", common.ErrMalformedData, data.Channel) } - pair, err := currency.NewPairFromString(splitChannel[1]) + pair, err := currency.NewPairFromString(channelParts[1]) if err != nil { return err } @@ -811,7 +808,7 @@ func channelName(s *subscription.Subscription) string { // singleSymbolChannel returns if the channel should be fanned out into single symbol requests func singleSymbolChannel(name string) bool { switch name { - case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookUpdateWithSnapshotChannel: + case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookV2: return true } return false @@ -850,7 +847,7 @@ func isSingleOrderbookChannel(name string) bool { case spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookTickerChannel, - spotOrderbookUpdateWithSnapshotChannel, + spotOrderbookV2, futuresOrderbookChannel, futuresOrderbookTickerChannel, futuresOrderbookUpdateChannel, @@ -921,10 +918,10 @@ func orderbookChannelInterval(s *subscription.Subscription, a asset.Item) (strin var channelLevelsMap = map[asset.Item]map[string][]int{ asset.Spot: { - spotOrderbookTickerChannel: {}, - spotOrderbookUpdateChannel: {}, - spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, - spotOrderbookUpdateWithSnapshotChannel: {50, 400}, + spotOrderbookTickerChannel: {}, + spotOrderbookUpdateChannel: {}, + spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, + spotOrderbookV2: {50, 400}, }, asset.Futures: { futuresOrderbookChannel: {1, 5, 10, 20, 50, 100}, @@ -966,7 +963,7 @@ func channelLevels(s *subscription.Subscription, a asset.Item) (string, error) { } func isCompactOrderbookPayload(channel string) bool { - return channel == spotOrderbookUpdateWithSnapshotChannel + return channel == spotOrderbookV2 } const subTplText = ` diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 1dd6ecebb70..c52622fde7c 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -3,12 +3,14 @@ package gateio import ( "context" "errors" + "fmt" "net/http" "strconv" "strings" "time" gws "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/websocket" @@ -367,7 +369,7 @@ func (e *Exchange) processFuturesCandlesticks(data []byte, assetType asset.Item) for x := range resp.Result { icp := strings.Split(resp.Result[x].Name, currency.UnderscoreDelimiter) if len(icp) < 3 { - return errors.New("malformed futures candlestick websocket push data") + return fmt.Errorf("%w: futures candlestick websocket", common.ErrMalformedData) } currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter)) if err != nil { diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 427825a519c..63091e29c99 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -9,6 +9,7 @@ import ( gws "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/currency" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" @@ -210,13 +211,13 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { require.NoError(t, testexch.Setup(e)) e.Name = "ProcessOrderbookUpdateWithSnapshot" e.Features.Subscriptions = subscription.List{ - {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, + {Enabled: true, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, } subs, err := e.Features.Subscriptions.ExpandTemplates(e) require.NoError(t, err) conn := &FixtureConnection{} - err = e.Websocket.AddSubscriptions(conn, expanded...) + err = e.Websocket.AddSubscriptions(conn, subs...) require.NoError(t, err) e.wsOBResubMgr.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true @@ -226,13 +227,12 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { err error }{ {payload: []byte(`{"t":"bingbong"}`), err: strconv.ErrSyntax}, - {payload: []byte(`{"s":"ob.50"}`), err: errMalformedData}, - {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCannotCreatePair}, + {payload: []byte(`{"s":"ob.50"}`), err: common.ErrMalformedData}, + {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCreatingPair}, {payload: []byte(`{"s":"ob.BTC_USDT.50","full":true}`), err: orderbook.ErrLastUpdatedNotSet}, { // Simulate orderbook update already resubscribing payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), - err: nil, }, { // Full snapshot will reset resubscribing state @@ -241,12 +241,10 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { { // Incremental update will apply correctly payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), - err: nil, }, { // Incremental update out of order will force resubscription payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), - err: nil, }, } { err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now(), asset.Spot) diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index eec5c7bf40c..fb8869b329d 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -179,7 +179,7 @@ func (e *Exchange) SetDefaults() { e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWSSnapshotSyncDelay) - e.wsOBResubMgr = newWSObResubManager() + e.wsOBResubMgr = newWSOBResubManager() } // Setup sets user configuration diff --git a/exchanges/gateio/ws_ob_resub_manager.go b/exchanges/gateio/ws_ob_resub_manager.go index 61e8ecbe683..ba8f3f9048f 100644 --- a/exchanges/gateio/ws_ob_resub_manager.go +++ b/exchanges/gateio/ws_ob_resub_manager.go @@ -12,24 +12,24 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) -type wsObResubManager struct { +type wsOBResubManager struct { lookup map[key.PairAsset]bool m sync.RWMutex } -func newWSObResubManager() *wsObResubManager { - return &wsObResubManager{lookup: make(map[key.PairAsset]bool)} +func newWSOBResubManager() *wsOBResubManager { + return &wsOBResubManager{lookup: make(map[key.PairAsset]bool)} } // IsResubscribing checks if a subscription is currently being resubscribed -func (m *wsObResubManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { +func (m *wsOBResubManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { m.m.RLock() defer m.m.RUnlock() return m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] } // Resubscribe marks a subscription as resubscribing and starts the unsubscribe/resubscribe process -func (m *wsObResubManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { +func (m *wsOBResubManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { if err := e.Websocket.Orderbook.InvalidateOrderbook(pair, a); err != nil { return err } @@ -55,7 +55,7 @@ func (m *wsObResubManager) Resubscribe(e *Exchange, conn websocket.Connection, q } // CompletedResubscribe marks a subscription as completed -func (m *wsObResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { +func (m *wsOBResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { m.m.Lock() defer m.m.Unlock() delete(m.lookup, key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}) diff --git a/exchanges/gateio/ws_ob_resub_manager_test.go b/exchanges/gateio/ws_ob_resub_manager_test.go index 01c65f48927..c532cd29a20 100644 --- a/exchanges/gateio/ws_ob_resub_manager_test.go +++ b/exchanges/gateio/ws_ob_resub_manager_test.go @@ -14,10 +14,10 @@ import ( testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) -func TestNewWSObResubManager(t *testing.T) { +func TestNewWSOBResubManager(t *testing.T) { t.Parallel() - m := newWSObResubManager() + m := newWSOBResubManager() require.NotNil(t, m) assert.NotNil(t, m.lookup) } @@ -25,7 +25,7 @@ func TestNewWSObResubManager(t *testing.T) { func TestIsResubscribing(t *testing.T) { t.Parallel() - m := newWSObResubManager() + m := newWSOBResubManager() m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) @@ -34,7 +34,7 @@ func TestIsResubscribing(t *testing.T) { func TestResubscribe(t *testing.T) { t.Parallel() - m := newWSObResubManager() + m := newWSOBResubManager() conn := &FixtureConnection{} @@ -61,7 +61,7 @@ func TestResubscribe(t *testing.T) { require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) e.Features.Subscriptions = subscription.List{ - {Enabled: true, Channel: spotOrderbookUpdateWithSnapshotChannel, Asset: asset.Spot, Levels: 50}, + {Enabled: true, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, } expanded, err := e.Features.Subscriptions.ExpandTemplates(e) require.NoError(t, err) @@ -86,7 +86,7 @@ func TestResubscribe(t *testing.T) { func TestCompletedResubscribe(t *testing.T) { t.Parallel() - m := newWSObResubManager() + m := newWSOBResubManager() m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) // no-op require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true diff --git a/exchanges/kucoin/kucoin_convert.go b/exchanges/kucoin/kucoin_convert.go index 7bfb372920f..a65a844f0f5 100644 --- a/exchanges/kucoin/kucoin_convert.go +++ b/exchanges/kucoin/kucoin_convert.go @@ -3,6 +3,7 @@ package kucoin import ( "fmt" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/encoding/json" ) @@ -23,5 +24,5 @@ func (a *SubAccountsResponse) UnmarshalJSON(data []byte) error { } else if _, ok := result.([]any); ok { return nil } - return fmt.Errorf("%w can not unmarshal to SubAccountsResponse", errMalformedData) + return fmt.Errorf("%w can not unmarshal to SubAccountsResponse", common.ErrMalformedData) } diff --git a/exchanges/kucoin/kucoin_types.go b/exchanges/kucoin/kucoin_types.go index 5da468d647c..9f352c5055e 100644 --- a/exchanges/kucoin/kucoin_types.go +++ b/exchanges/kucoin/kucoin_types.go @@ -26,7 +26,6 @@ var ( errInvalidResponseReceiver = errors.New("invalid response receiver") errInvalidStopPriceType = errors.New("stopPriceType is required") - errMalformedData = errors.New("malformed data") errNoDepositAddress = errors.New("no deposit address found") errAddressRequired = errors.New("address is required") errMultipleDepositAddress = errors.New("multiple deposit addresses") diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index cd252016bc4..14ba9232952 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -248,7 +248,7 @@ func (e *Exchange) wsHandleData(ctx context.Context, respData []byte) error { case marketCandlesChannel: symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter) if len(symbolAndInterval) != 2 { - return errMalformedData + return common.ErrMalformedData } return e.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0]) case marketMatchChannel: From 887eb7965611bb782d3217211279839e6edb1274 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Tue, 23 Sep 2025 15:37:53 +1000 Subject: [PATCH 17/21] Update exchanges/deribit/deribit_websocket.go Co-authored-by: Gareth Kirwan --- exchanges/deribit/deribit_websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index 9e5843a25c0..fa08da46967 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -628,7 +628,7 @@ func (e *Exchange) processData(respRaw []byte, result any) error { func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error { if len(channels) != 4 { - return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", common.ErrInvalidResponse, strings.Join(channels, ".")) } a, cp, err := getAssetPairByInstrument(channels[2]) if err != nil { From dfa780240cd4666e3b20c1d893d6f686c3ddea93 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Thu, 6 Nov 2025 16:13:04 +1100 Subject: [PATCH 18/21] linter: fix --- exchanges/gateio/gateio_websocket_test.go | 2 +- exchanges/gateio/ws_ob_resub_manager_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 310fbd18d07..2917ebb8420 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -218,7 +218,7 @@ func checkAccountChange(ctx context.Context, t *testing.T, exch *Exchange, tc *w func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { t.Parallel() - e := new(Exchange) //nolint:govet // Intentional shadow + e := new(Exchange) require.NoError(t, testexch.Setup(e)) e.Name = "ProcessOrderbookUpdateWithSnapshot" e.Features.Subscriptions = subscription.List{ diff --git a/exchanges/gateio/ws_ob_resub_manager_test.go b/exchanges/gateio/ws_ob_resub_manager_test.go index c532cd29a20..ecc093d399b 100644 --- a/exchanges/gateio/ws_ob_resub_manager_test.go +++ b/exchanges/gateio/ws_ob_resub_manager_test.go @@ -38,7 +38,7 @@ func TestResubscribe(t *testing.T) { conn := &FixtureConnection{} - e := new(Exchange) //nolint:govet // Intentional shadow + e := new(Exchange) require.NoError(t, testexch.Setup(e)) e.Name = "Resubscribe" From 5265429078cb3e9c8a3480c65bfdd7d1ff1fcb4e Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 21 Jan 2026 09:39:42 +1100 Subject: [PATCH 19/21] Update exchanges/gateio/gateio_websocket.go Co-authored-by: Scott --- exchanges/gateio/gateio_websocket.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index ead6b9a931f..5f8953ddf16 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -463,8 +463,10 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, lastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(pair, a) - if lastUpdateID+1 == data.FirstUpdateID { - if err = e.Websocket.Orderbook.Update(&orderbook.Update{ + if lastUpdateID+1 != data.FirstUpdateID { + return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) + } + return e.Websocket.Orderbook.Update(&orderbook.Update{ Pair: pair, Asset: a, UpdateTime: data.UpdateTime.Time(), @@ -473,12 +475,7 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, Bids: bids, Asks: asks, AllowEmpty: true, - }); err == nil { - return nil - } - } - - return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) + }) } func (e *Exchange) processSpotOrders(data []byte) error { From b7d921f54f5a49efd436d5684bc2256e4b67bd97 Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 21 Jan 2026 09:44:01 +1100 Subject: [PATCH 20/21] glorious: suggestion update --- exchanges/gateio/gateio_websocket.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 5f8953ddf16..3c24bc44256 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -462,20 +462,19 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, } lastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(pair, a) - - if lastUpdateID+1 != data.FirstUpdateID { - return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) - } - return e.Websocket.Orderbook.Update(&orderbook.Update{ - Pair: pair, - Asset: a, - UpdateTime: data.UpdateTime.Time(), - LastPushed: lastPushed, - UpdateID: data.LastUpdateID, - Bids: bids, - Asks: asks, - AllowEmpty: true, - }) + if err != nil || lastUpdateID+1 != data.FirstUpdateID { + return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) + } + return e.Websocket.Orderbook.Update(&orderbook.Update{ + Pair: pair, + Asset: a, + UpdateTime: data.UpdateTime.Time(), + LastPushed: lastPushed, + UpdateID: data.LastUpdateID, + Bids: bids, + Asks: asks, + AllowEmpty: true, + }) } func (e *Exchange) processSpotOrders(data []byte) error { From 1838e73078ea8f0334fdadbb6e561097be12078f Mon Sep 17 00:00:00 2001 From: Ryan O'Hara-Reid Date: Wed, 21 Jan 2026 09:54:05 +1100 Subject: [PATCH 21/21] glorious: nits --- exchanges/gateio/gateio_types.go | 14 +++++++------- exchanges/gateio/gateio_websocket.go | 19 ++++--------------- exchanges/gateio/gateio_websocket_test.go | 1 + exchanges/gateio/ws_ob_resub_manager.go | 2 +- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 34179d9ffd8..1bc926401b7 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2094,13 +2094,13 @@ type WsOrderbookUpdate struct { // WsOrderbookUpdateWithSnapshot represents websocket orderbook update push data type WsOrderbookUpdateWithSnapshot struct { - UpdateTime types.Time `json:"t"` - Full bool `json:"full"` - Channel string `json:"s"` - FirstUpdateID int64 `json:"U"` - LastUpdateID int64 `json:"u"` - Bids [][2]types.Number `json:"b"` - Asks [][2]types.Number `json:"a"` + UpdateTime types.Time `json:"t"` + Full bool `json:"full"` + Channel string `json:"s"` + FirstUpdateID int64 `json:"U"` + LastUpdateID int64 `json:"u"` + Bids orderbook.LevelsArrayPriceAmount `json:"b"` + Asks orderbook.LevelsArrayPriceAmount `json:"a"` } // WsOrderbookSnapshot represents a websocket orderbook snapshot push data diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 3c24bc44256..06bc719d958 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -419,17 +419,6 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, return err } - asks := make([]orderbook.Level, len(data.Asks)) - for x := range data.Asks { - asks[x].Price = data.Asks[x][0].Float64() - asks[x].Amount = data.Asks[x][1].Float64() - } - bids := make([]orderbook.Level, len(data.Bids)) - for x := range data.Bids { - bids[x].Price = data.Bids[x][0].Float64() - bids[x].Amount = data.Bids[x][1].Float64() - } - channelParts := strings.Split(data.Channel, ".") if len(channelParts) < 3 { return fmt.Errorf("%w: %q", common.ErrMalformedData, data.Channel) @@ -448,8 +437,8 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, LastUpdated: data.UpdateTime.Time(), LastPushed: lastPushed, LastUpdateID: data.LastUpdateID, - Bids: bids, - Asks: asks, + Bids: data.Bids.Levels(), + Asks: data.Asks.Levels(), }); err != nil { return err } @@ -471,8 +460,8 @@ func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, UpdateTime: data.UpdateTime.Time(), LastPushed: lastPushed, UpdateID: data.LastUpdateID, - Bids: bids, - Asks: asks, + Bids: data.Bids.Levels(), + Asks: data.Asks.Levels(), AllowEmpty: true, }) } diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 8640eff829c..6e8cb29614b 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -298,6 +298,7 @@ func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), }, } { + // Sequential tests, do not use t.Parallel(); Some timestamps are deliberately identical from trading activity err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now(), asset.Spot) if tc.err != nil { require.ErrorIs(t, err, tc.err) diff --git a/exchanges/gateio/ws_ob_resub_manager.go b/exchanges/gateio/ws_ob_resub_manager.go index ba8f3f9048f..ee9947e73d1 100644 --- a/exchanges/gateio/ws_ob_resub_manager.go +++ b/exchanges/gateio/ws_ob_resub_manager.go @@ -54,7 +54,7 @@ func (m *wsOBResubManager) Resubscribe(e *Exchange, conn websocket.Connection, q return nil } -// CompletedResubscribe marks a subscription as completed +// CompletedResubscribe removes a subscription from the resubscribing map func (m *wsOBResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { m.m.Lock() defer m.m.Unlock()