diff --git a/common/common.go b/common/common.go index f6f2df6e84b..2e395bba913 100644 --- a/common/common.go +++ b/common/common.go @@ -75,6 +75,7 @@ var ( ErrGettingField = errors.New("error getting field") ErrSettingField = errors.New("error setting field") ErrParsingWSField = errors.New("error parsing websocket field") + ErrMalformedData = errors.New("malformed data") ) var ( diff --git a/currency/pair.go b/currency/pair.go index f6829836176..2554c752a6d 100644 --- a/currency/pair.go +++ b/currency/pair.go @@ -7,8 +7,12 @@ import ( "unicode" ) +// Public errors +var ( + ErrCreatingPair = errors.New("error creating currency pair") +) + var ( - errCannotCreatePair = errors.New("cannot create currency pair") errDelimiterNotFound = errors.New("delimiter not found") errDelimiterCannotBeEmpty = errors.New("delimiter cannot be empty") ) @@ -70,7 +74,7 @@ func NewPairWithDelimiter(base, quote, delimiter string) Pair { // with or without delimiter func NewPairFromString(currencyPair string) (Pair, error) { if len(currencyPair) < 3 { - return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", errCannotCreatePair, currencyPair) + return EMPTYPAIR, fmt.Errorf("%w from %s string too short to be a currency pair", ErrCreatingPair, currencyPair) } for x := range currencyPair { diff --git a/currency/pair_methods.go b/currency/pair_methods.go index a8a3a6f0238..b346814d27b 100644 --- a/currency/pair_methods.go +++ b/currency/pair_methods.go @@ -61,7 +61,7 @@ func (p *Pair) UnmarshalJSON(d []byte) error { // incorrectly converted to DUS-KUSDT, ELKRW (Bithumb) which will convert // converted to ELK-RW and HTUSDT (Lbank) which will be incorrectly // converted to HTU-SDT. - return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", errCannotCreatePair, pair) + return fmt.Errorf("%w from %s cannot ensure pair is in correct format, please use exchange method MatchSymbolWithAvailablePairs", ErrCreatingPair, pair) } // MarshalJSON conforms type to the marshaler interface diff --git a/currency/pair_test.go b/currency/pair_test.go index 126f4ef9fcb..3146c75eb21 100644 --- a/currency/pair_test.go +++ b/currency/pair_test.go @@ -54,7 +54,7 @@ func TestPairUnmarshalJSON(t *testing.T) { assert.Equal(t, "usd", p.Quote.String(), "Quote should be correct") assert.Equal(t, "_", p.Delimiter, "Delimiter should be correct") - assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), errCannotCreatePair, "UnmarshalJSON with no delimiter should error") + assert.ErrorIs(t, p.UnmarshalJSON([]byte(`"btcusd"`)), ErrCreatingPair, "UnmarshalJSON with no delimiter should error") assert.NoError(t, p.UnmarshalJSON([]byte(`""`)), "UnmarshalJSON should not error on empty value") assert.Equal(t, EMPTYPAIR, p, "UnmarshalJSON empty value should give EMPTYPAIR") diff --git a/currency/pairs_test.go b/currency/pairs_test.go index 426a40b85b3..5b37d4112f4 100644 --- a/currency/pairs_test.go +++ b/currency/pairs_test.go @@ -54,7 +54,7 @@ func TestPairsFromString(t *testing.T) { _, err := NewPairsFromString("", "") assert.ErrorIs(t, err, errNoDelimiter) _, err = NewPairsFromString("", ",") - assert.ErrorIs(t, err, errCannotCreatePair) + assert.ErrorIs(t, err, ErrCreatingPair) pairs, err := NewPairsFromString("ALGO-AUD,BAT-AUD,BCH-AUD,BSV-AUD,BTC-AUD,COMP-AUD,ENJ-AUD,ETC-AUD,ETH-AUD,ETH-BTC,GNT-AUD,LINK-AUD,LTC-AUD,LTC-BTC,MCAU-AUD,OMG-AUD,POWR-AUD,UNI-AUD,USDT-AUD,XLM-AUD,XRP-AUD,XRP-BTC", ",") require.NoError(t, err) diff --git a/exchanges/deribit/deribit_types.go b/exchanges/deribit/deribit_types.go index f61ae96e8e8..733b3582479 100644 --- a/exchanges/deribit/deribit_types.go +++ b/exchanges/deribit/deribit_types.go @@ -42,7 +42,6 @@ var ( errInvalidID = errors.New("invalid id") errInvalidMarginModel = errors.New("missing margin model") errInvalidEmailAddress = errors.New("invalid email address") - errMalformedData = errors.New("malformed data") errWebsocketConnectionNotAuthenticated = errors.New("websocket connection is not authenticated") errResolutionNotSet = errors.New("resolution not set") errInvalidDestinationID = errors.New("invalid destination id") diff --git a/exchanges/deribit/deribit_websocket.go b/exchanges/deribit/deribit_websocket.go index 8efdd5b6fed..86c34334f13 100644 --- a/exchanges/deribit/deribit_websocket.go +++ b/exchanges/deribit/deribit_websocket.go @@ -333,7 +333,7 @@ func (e *Exchange) wsSendHeartbeat(ctx context.Context) { func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error { if len(channels) != 4 && len(channels) != 5 { - return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'user.orders.{instrument_name}.raw, user.orders.{instrument_name}.{interval}, user.orders.{kind}.{currency}.raw, or user.orders.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse orderData := []WsOrder{} @@ -382,7 +382,7 @@ func (e *Exchange) processUserOrders(respRaw []byte, channels []string) error { func (e *Exchange) processUserOrderChanges(respRaw []byte, channels []string) error { if len(channels) < 4 || len(channels) > 5 { - return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse changeData := &wsChanges{} @@ -492,7 +492,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error { } if len(channels) < 3 || len(channels) > 5 { - return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'trades.{instrument_name}.{interval} or trades.{kind}.{currency}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } var response wsResponse var tradeList []wsTrade @@ -536,7 +536,7 @@ func (e *Exchange) processTrades(respRaw []byte, channels []string) error { func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) error { if len(channels) != 2 { - return fmt.Errorf("%w, expected format 'incremental_ticker.{instrument_name}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'incremental_ticker.{instrument_name}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } a, cp, err := getAssetPairByInstrument(channels[1]) if err != nil { @@ -568,7 +568,7 @@ func (e *Exchange) processIncrementalTicker(respRaw []byte, channels []string) e func (e *Exchange) processInstrumentTicker(respRaw []byte, channels []string) error { if len(channels) != 3 { - return fmt.Errorf("%w, expected format 'ticker.{instrument_name}.{interval}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'ticker.{instrument_name}.{interval}', but found %s", common.ErrMalformedData, strings.Join(channels, ".")) } return e.processTicker(respRaw, channels) } @@ -623,7 +623,7 @@ func (e *Exchange) processData(respRaw []byte, result any) error { func (e *Exchange) processCandleChart(respRaw []byte, channels []string) error { if len(channels) != 4 { - return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", errMalformedData, strings.Join(channels, ".")) + return fmt.Errorf("%w, expected format 'chart.trades.{instrument_name}.{resolution}', but found %s", common.ErrInvalidResponse, strings.Join(channels, ".")) } a, cp, err := getAssetPairByInstrument(channels[2]) if err != nil { @@ -666,15 +666,15 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { asks := make(orderbook.Levels, 0, len(orderbookData.Asks)) for x := range orderbookData.Asks { if len(orderbookData.Asks[x]) != 3 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Asks[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } amount, okay := orderbookData.Asks[x][2].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } asks = append(asks, orderbook.Level{ Price: price, @@ -684,17 +684,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { bids := make(orderbook.Levels, 0, len(orderbookData.Bids)) for x := range orderbookData.Bids { if len(orderbookData.Bids[x]) != 3 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Bids[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0.0 { continue } amount, okay := orderbookData.Bids[x][2].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } bids = append(bids, orderbook.Level{ Price: price, @@ -735,17 +735,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { asks := make(orderbook.Levels, 0, len(orderbookData.Asks)) for x := range orderbookData.Asks { if len(orderbookData.Asks[x]) != 2 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Asks[x][0].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0 { continue } amount, okay := orderbookData.Asks[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } asks = append(asks, orderbook.Level{ Price: price, @@ -755,17 +755,17 @@ func (e *Exchange) processOrderbook(respRaw []byte, channels []string) error { bids := make([]orderbook.Level, 0, len(orderbookData.Bids)) for x := range orderbookData.Bids { if len(orderbookData.Bids[x]) != 2 { - return errMalformedData + return common.ErrMalformedData } price, okay := orderbookData.Bids[x][0].(float64) if !okay { - return fmt.Errorf("%w, invalid orderbook price", errMalformedData) + return fmt.Errorf("%w, invalid orderbook price", common.ErrMalformedData) } else if price == 0 { continue } amount, okay := orderbookData.Bids[x][1].(float64) if !okay { - return fmt.Errorf("%w, invalid amount", errMalformedData) + return fmt.Errorf("%w, invalid amount", common.ErrMalformedData) } bids = append(bids, orderbook.Level{ Price: price, diff --git a/exchanges/gateio/gateio.go b/exchanges/gateio/gateio.go index e89b61c6e00..cb2b99f34db 100644 --- a/exchanges/gateio/gateio.go +++ b/exchanges/gateio/gateio.go @@ -195,6 +195,7 @@ type Exchange struct { messageIDSeq common.Counter wsOBUpdateMgr *wsOBUpdateManager + wsOBResubMgr *wsOBResubManager } // ***************************************** SubAccounts ******************************** diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 97293514160..1bc926401b7 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -2092,6 +2092,17 @@ type WsOrderbookUpdate struct { Asks orderbook.LevelsArrayPriceAmount `json:"a"` } +// WsOrderbookUpdateWithSnapshot represents websocket orderbook update push data +type WsOrderbookUpdateWithSnapshot struct { + UpdateTime types.Time `json:"t"` + Full bool `json:"full"` + Channel string `json:"s"` + FirstUpdateID int64 `json:"U"` + LastUpdateID int64 `json:"u"` + Bids orderbook.LevelsArrayPriceAmount `json:"b"` + Asks orderbook.LevelsArrayPriceAmount `json:"a"` +} + // WsOrderbookSnapshot represents a websocket orderbook snapshot push data type WsOrderbookSnapshot struct { UpdateTime types.Time `json:"t"` diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 4f6c3805744..06bc719d958 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -46,6 +46,7 @@ const ( spotOrderbookTickerChannel = "spot.book_ticker" // Best bid or ask price spotOrderbookUpdateChannel = "spot.order_book_update" // Changed order book levels spotOrderbookChannel = "spot.order_book" // Limited-Level Full Order Book Snapshot + spotOrderbookV2 = "spot.obu" spotOrdersChannel = "spot.orders" spotUserTradesChannel = "spot.usertrades" spotBalancesChannel = "spot.balances" @@ -64,6 +65,7 @@ var defaultSubscriptions = subscription.List{ {Enabled: true, Channel: subscription.OrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds}, {Enabled: false, Channel: spotOrderbookTickerChannel, Asset: asset.Spot, Interval: kline.TenMilliseconds, Levels: 1}, {Enabled: false, Channel: spotOrderbookChannel, Asset: asset.Spot, Interval: kline.HundredMilliseconds, Levels: 100}, + {Enabled: false, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, {Enabled: true, Channel: spotBalancesChannel, Asset: asset.Spot, Authenticated: true}, {Enabled: true, Channel: crossMarginBalanceChannel, Asset: asset.CrossMargin, Authenticated: true}, {Enabled: true, Channel: marginBalancesChannel, Asset: asset.Margin, Authenticated: true}, @@ -191,6 +193,8 @@ func (e *Exchange) WsHandleSpotData(ctx context.Context, conn websocket.Connecti return e.processOrderbookUpdate(ctx, push.Result, push.Time) case spotOrderbookChannel: return e.processOrderbookSnapshot(push.Result, push.Time) + case spotOrderbookV2: + return e.processOrderbookUpdateWithSnapshot(conn, push.Result, push.Time, asset.Spot) case spotOrdersChannel: return e.processSpotOrders(respRaw) case spotUserTradesChannel: @@ -324,7 +328,7 @@ func (e *Exchange) processCandlestick(incoming []byte) error { } icp := strings.Split(data.NameOfSubscription, currency.UnderscoreDelimiter) if len(icp) < 3 { - return errors.New("malformed candlestick websocket push data") + return fmt.Errorf("%w: candlestick websocket", common.ErrMalformedData) } currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter)) if err != nil { @@ -409,6 +413,59 @@ func (e *Exchange) processOrderbookSnapshot(incoming []byte, lastPushed time.Tim return nil } +func (e *Exchange) processOrderbookUpdateWithSnapshot(conn websocket.Connection, incoming []byte, lastPushed time.Time, a asset.Item) error { + var data WsOrderbookUpdateWithSnapshot + if err := json.Unmarshal(incoming, &data); err != nil { + return err + } + + channelParts := strings.Split(data.Channel, ".") + if len(channelParts) < 3 { + return fmt.Errorf("%w: %q", common.ErrMalformedData, data.Channel) + } + + pair, err := currency.NewPairFromString(channelParts[1]) + if err != nil { + return err + } + + if data.Full { + if err := e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Exchange: e.Name, + Pair: pair, + Asset: a, + LastUpdated: data.UpdateTime.Time(), + LastPushed: lastPushed, + LastUpdateID: data.LastUpdateID, + Bids: data.Bids.Levels(), + Asks: data.Asks.Levels(), + }); err != nil { + return err + } + e.wsOBResubMgr.CompletedResubscribe(pair, a) + return nil + } + + if e.wsOBResubMgr.IsResubscribing(pair, a) { + return nil // Drop incremental updates; waiting for a fresh snapshot + } + + lastUpdateID, err := e.Websocket.Orderbook.LastUpdateID(pair, a) + if err != nil || lastUpdateID+1 != data.FirstUpdateID { + return common.AppendError(err, e.wsOBResubMgr.Resubscribe(e, conn, data.Channel, pair, a)) + } + return e.Websocket.Orderbook.Update(&orderbook.Update{ + Pair: pair, + Asset: a, + UpdateTime: data.UpdateTime.Time(), + LastPushed: lastPushed, + UpdateID: data.LastUpdateID, + Bids: data.Bids.Levels(), + Asks: data.Asks.Levels(), + AllowEmpty: true, + }) +} + func (e *Exchange) processSpotOrders(data []byte) error { resp := struct { Time types.Time `json:"time"` @@ -608,11 +665,12 @@ func (e *Exchange) GetSubscriptionTemplate(_ *subscription.Subscription) (*templ return template.New("master.tmpl"). Funcs(sprig.FuncMap()). Funcs(template.FuncMap{ - "channelName": channelName, - "singleSymbolChannel": singleSymbolChannel, - "orderbookInterval": orderbookChannelInterval, - "candlesInterval": candlesChannelInterval, - "levels": channelLevels, + "channelName": channelName, + "singleSymbolChannel": singleSymbolChannel, + "orderbookInterval": orderbookChannelInterval, + "candlesInterval": candlesChannelInterval, + "levels": channelLevels, + "compactOrderbookPayload": isCompactOrderbookPayload, }).Parse(subTplText) } @@ -700,7 +758,7 @@ func channelName(s *subscription.Subscription) string { // singleSymbolChannel returns if the channel should be fanned out into single symbol requests func singleSymbolChannel(name string) bool { switch name { - case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel: + case spotCandlesticksChannel, spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookV2: return true } return false @@ -739,6 +797,7 @@ func isSingleOrderbookChannel(name string) bool { case spotOrderbookUpdateChannel, spotOrderbookChannel, spotOrderbookTickerChannel, + spotOrderbookV2, futuresOrderbookChannel, futuresOrderbookTickerChannel, futuresOrderbookUpdateChannel, @@ -812,6 +871,7 @@ var channelLevelsMap = map[asset.Item]map[string][]int{ spotOrderbookTickerChannel: {}, spotOrderbookUpdateChannel: {}, spotOrderbookChannel: {1, 5, 10, 20, 50, 100}, + spotOrderbookV2: {50, 400}, }, asset.Futures: { futuresOrderbookChannel: {1, 5, 10, 20, 50, 100}, @@ -852,16 +912,27 @@ func channelLevels(s *subscription.Subscription, a asset.Item) (string, error) { return strconv.Itoa(s.Levels), nil } +func isCompactOrderbookPayload(channel string) bool { + return channel == spotOrderbookV2 +} + const subTplText = ` {{- with $name := channelName $.S }} {{- range $asset, $pairs := $.AssetPairs }} {{- if singleSymbolChannel $name }} {{- range $i, $p := $pairs -}} - {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} - {{- $p }} - {{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }} - {{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }} - {{- $.PairSeparator }} + {{- if compactOrderbookPayload $name }} + {{- with $l := levels $.S $asset -}} + ob.{{ $p }}.{{ $l }} + {{- end -}} + {{- $.PairSeparator }} + {{- else }} + {{- with $i := candlesInterval $.S }}{{ $i -}} , {{- end }} + {{- $p }} + {{- with $l := levels $.S $asset -}} , {{- $l }}{{ end }} + {{- with $i := orderbookInterval $.S $asset -}} , {{- $i }}{{- end }} + {{- $.PairSeparator }} + {{- end }} {{- end }} {{- $.AssetSeparator }} {{- else }} diff --git a/exchanges/gateio/gateio_websocket_futures.go b/exchanges/gateio/gateio_websocket_futures.go index 4c010fbe847..4511965f1e9 100644 --- a/exchanges/gateio/gateio_websocket_futures.go +++ b/exchanges/gateio/gateio_websocket_futures.go @@ -3,12 +3,14 @@ package gateio import ( "context" "errors" + "fmt" "net/http" "strconv" "strings" "time" gws "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/encoding/json" "github.com/thrasher-corp/gocryptotrader/exchange/accounts" @@ -367,7 +369,7 @@ func (e *Exchange) processFuturesCandlesticks(data []byte, assetType asset.Item) for x := range resp.Result { icp := strings.Split(resp.Result[x].Name, currency.UnderscoreDelimiter) if len(icp) < 3 { - return errors.New("malformed futures candlestick websocket push data") + return fmt.Errorf("%w: futures candlestick websocket", common.ErrMalformedData) } currencyPair, err := currency.NewPairFromString(strings.Join(icp[1:], currency.UnderscoreDelimiter)) if err != nil { diff --git a/exchanges/gateio/gateio_websocket_test.go b/exchanges/gateio/gateio_websocket_test.go index 2fec131e17a..6e8cb29614b 100644 --- a/exchanges/gateio/gateio_websocket_test.go +++ b/exchanges/gateio/gateio_websocket_test.go @@ -11,11 +11,14 @@ import ( gws "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/common/key" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchange/accounts" exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/kline" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" ) @@ -251,3 +254,56 @@ func TestExtractOrderbookLimit(t *testing.T) { require.Equal(t, tc.exp, limit) } } + +func TestProcessOrderbookUpdateWithSnapshot(t *testing.T) { + t.Parallel() + + e := new(Exchange) + require.NoError(t, testexch.Setup(e)) + e.Name = "ProcessOrderbookUpdateWithSnapshot" + e.Features.Subscriptions = subscription.List{ + {Enabled: true, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, + } + subs, err := e.Features.Subscriptions.ExpandTemplates(e) + require.NoError(t, err) + + conn := &FixtureConnection{} + err = e.Websocket.AddSubscriptions(conn, subs...) + require.NoError(t, err) + + e.wsOBResubMgr.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + + for _, tc := range []struct { + payload []byte + err error + }{ + {payload: []byte(`{"t":"bingbong"}`), err: strconv.ErrSyntax}, + {payload: []byte(`{"s":"ob.50"}`), err: common.ErrMalformedData}, + {payload: []byte(`{"s":"ob..50"}`), err: currency.ErrCreatingPair}, + {payload: []byte(`{"s":"ob.BTC_USDT.50","full":true}`), err: orderbook.ErrLastUpdatedNotSet}, + { + // Simulate orderbook update already resubscribing + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + }, + { + // Full snapshot will reset resubscribing state + payload: []byte(`{"t":1757377580046,"full":true,"s":"ob.BTC_USDT.50","u":27053258981,"b":[["111666","0.131287"],["111665.3","0.048403"],["111665.2","0.268681"],["111665.1","0.153269"],["111664.9","0.004"],["111663.8","0.010919"],["111663.7","0.214867"],["111661.8","0.268681"],["111659.4","0.01144"],["111659.3","0.184127"],["111658.4","0.268681"],["111658.3","0.11897"],["111656.9","0.00653"],["111656.7","0.184127"],["111656.1","0.040381"],["111655","0.044859"],["111654.9","0.268681"],["111654.8","0.033575"],["111653.9","0.184127"],["111653.6","0.601785"],["111653.5","0.017118"],["111651.7","0.160346"],["111651.6","0.184127"],["111651.5","0.268681"],["111650.1","0.09042"],["111647.9","0.191292"],["111647.5","0.268681"],["111646","0.098528"],["111645.9","0.1443"],["111645.6","0.184127"],["111643.8","1.015409"],["111643","0.099889"],["111641.5","0.004925"],["111641.2","0.179895"],["111641.1","0.184127"],["111640.7","0.268681"],["111638.6","0.184912"],["111638.4","0.010182"],["111637.6","0.026862"],["111637.5","0.09042"],["111636.6","0.184127"],["111634.8","0.129187"],["111634.7","0.014213"],["111633.9","0.268681"],["111632.1","0.184127"],["111631.8","0.1443"],["111631.6","0.027"],["111631.3","0.089539"],["111630.3","0.00001"],["111629.6","0.000029"]],"a":[["111666.1","0.818887"],["111668.3","0.008062"],["111668.5","0.005399"],["111670.3","0.043892"],["111670.4","0.019653"],["111673.7","0.046898"],["111674.1","0.004227"],["111674.4","0.026258"],["111674.8","0.09042"],["111674.9","0.268681"],["111675","0.004227"],["111676","0.004227"],["111676.8","0.005"],["111677","0.004227"],["111678.1","0.077789"],["111678.2","0.210991"],["111678.3","0.268681"],["111678.4","0.025039"],["111678.5","0.051456"],["111679.2","0.007163"],["111679.5","0.013019"],["111681.5","0.036343"],["111681.7","0.268681"],["111682.9","0.184127"],["111685.2","0.184127"],["111685.8","0.040538"],["111686.4","0.201931"],["111687.3","0.03"],["111687.4","0.09042"],["111687.5","0.452808"],["111687.6","1.815093"],["111691.9","0.139287"],["111692.2","0.184127"],["111693.7","0.268681"],["111694.3","1.05115"],["111694.5","0.184127"],["111697","0.184127"],["111697.1","0.268681"],["111697.4","0.0967"],["111698.7","0.1443"],["111699.5","0.014213"],["111700.2","0.601783"],["111700.7","0.09042"],["111700.9","0.367517"],["111701.5","0.184127"],["111705.2","0.017703"],["111706","0.184127"],["111707.6","0.268681"],["111709.9","0.1443"],["111710.2","0.004"]]}`), + }, + { + // Incremental update will apply correctly + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + }, + { + // Incremental update out of order will force resubscription + payload: []byte(`{"t":1757377580073,"s":"ob.BTC_USDT.50","u":27053258987,"U":27053258982,"b":[["111666","0.146841"]],"a":[["111666.1","0.791633"],["111676.8","0.014"]]}`), + }, + } { + // Sequential tests, do not use t.Parallel(); Some timestamps are deliberately identical from trading activity + err := e.processOrderbookUpdateWithSnapshot(conn, tc.payload, time.Now(), asset.Spot) + if tc.err != nil { + require.ErrorIs(t, err, tc.err) + continue + } + require.NoError(t, err) + } +} diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index ccd12990d30..e74bb4be795 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -181,6 +181,7 @@ func (e *Exchange) SetDefaults() { e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit e.wsOBUpdateMgr = newWsOBUpdateManager(defaultWsOrderbookUpdateTimeDelay, defaultWSOrderbookUpdateDeadline) + e.wsOBResubMgr = newWSOBResubManager() } // Setup sets user configuration diff --git a/exchanges/gateio/ws_ob_resub_manager.go b/exchanges/gateio/ws_ob_resub_manager.go new file mode 100644 index 00000000000..ee9947e73d1 --- /dev/null +++ b/exchanges/gateio/ws_ob_resub_manager.go @@ -0,0 +1,74 @@ +package gateio + +import ( + "fmt" + "sync" + + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchange/websocket" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + "github.com/thrasher-corp/gocryptotrader/log" +) + +type wsOBResubManager struct { + lookup map[key.PairAsset]bool + m sync.RWMutex +} + +func newWSOBResubManager() *wsOBResubManager { + return &wsOBResubManager{lookup: make(map[key.PairAsset]bool)} +} + +// IsResubscribing checks if a subscription is currently being resubscribed +func (m *wsOBResubManager) IsResubscribing(pair currency.Pair, a asset.Item) bool { + m.m.RLock() + defer m.m.RUnlock() + return m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] +} + +// Resubscribe marks a subscription as resubscribing and starts the unsubscribe/resubscribe process +func (m *wsOBResubManager) Resubscribe(e *Exchange, conn websocket.Connection, qualifiedChannel string, pair currency.Pair, a asset.Item) error { + if err := e.Websocket.Orderbook.InvalidateOrderbook(pair, a); err != nil { + return err + } + + sub := e.Websocket.GetSubscription(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: qualifiedChannel}}) + if sub == nil { + return fmt.Errorf("%w: %q", subscription.ErrNotFound, qualifiedChannel) + } + + m.m.Lock() + defer m.m.Unlock() + + m.lookup[key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}] = true + + go func() { // Has to be called in routine to not impede websocket throughput + if err := e.Websocket.ResubscribeToChannel(conn, sub); err != nil { + m.CompletedResubscribe(pair, a) // Ensure we clear the map entry on failure too + log.Errorf(log.ExchangeSys, "Failed to resubscribe to channel %q: %v", qualifiedChannel, err) + } + }() + + return nil +} + +// CompletedResubscribe removes a subscription from the resubscribing map +func (m *wsOBResubManager) CompletedResubscribe(pair currency.Pair, a asset.Item) { + m.m.Lock() + defer m.m.Unlock() + delete(m.lookup, key.PairAsset{Base: pair.Base.Item, Quote: pair.Quote.Item, Asset: a}) +} + +type qualifiedChannelKey struct { + *subscription.Subscription +} + +func (k qualifiedChannelKey) Match(eachKey subscription.MatchableKey) bool { + return k.Subscription.QualifiedChannel == eachKey.GetSubscription().QualifiedChannel +} + +func (k qualifiedChannelKey) GetSubscription() *subscription.Subscription { + return k.Subscription +} diff --git a/exchanges/gateio/ws_ob_resub_manager_test.go b/exchanges/gateio/ws_ob_resub_manager_test.go new file mode 100644 index 00000000000..ecc093d399b --- /dev/null +++ b/exchanges/gateio/ws_ob_resub_manager_test.go @@ -0,0 +1,107 @@ +package gateio + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thrasher-corp/gocryptotrader/common/key" + "github.com/thrasher-corp/gocryptotrader/currency" + "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" + "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange" +) + +func TestNewWSOBResubManager(t *testing.T) { + t.Parallel() + + m := newWSOBResubManager() + require.NotNil(t, m) + assert.NotNil(t, m.lookup) +} + +func TestIsResubscribing(t *testing.T) { + t.Parallel() + + m := newWSOBResubManager() + m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Futures)) +} + +func TestResubscribe(t *testing.T) { + t.Parallel() + + m := newWSOBResubManager() + + conn := &FixtureConnection{} + + e := new(Exchange) + require.NoError(t, testexch.Setup(e)) + e.Name = "Resubscribe" + + err := m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) + require.ErrorIs(t, err, orderbook.ErrDepthNotFound) + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + + err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Asks: []orderbook.Level{{Price: 50000, Amount: 0.1}}, + Bids: []orderbook.Level{{Price: 49000, Amount: 0.2}}, + Exchange: e.Name, + Pair: currency.NewBTCUSDT(), + Asset: asset.Spot, + LastUpdated: time.Now(), + }) + require.NoError(t, err) + err = m.Resubscribe(e, conn, "notfound", currency.NewBTCUSDT(), asset.Spot) + require.ErrorIs(t, err, subscription.ErrNotFound) + + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + + e.Features.Subscriptions = subscription.List{ + {Enabled: true, Channel: spotOrderbookV2, Asset: asset.Spot, Levels: 50}, + } + expanded, err := e.Features.Subscriptions.ExpandTemplates(e) + require.NoError(t, err) + + err = e.Websocket.AddSubscriptions(conn, expanded...) + require.NoError(t, err) + + err = e.Websocket.Orderbook.LoadSnapshot(&orderbook.Book{ + Asks: []orderbook.Level{{Price: 50000, Amount: 0.1}}, + Bids: []orderbook.Level{{Price: 49000, Amount: 0.2}}, + Exchange: e.Name, + Pair: currency.NewBTCUSDT(), + Asset: asset.Spot, + LastUpdated: time.Now(), + }) + require.NoError(t, err) + err = m.Resubscribe(e, conn, "ob.BTC_USDT.50", currency.NewBTCUSDT(), asset.Spot) + require.NoError(t, err) + assert.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) +} + +func TestCompletedResubscribe(t *testing.T) { + t.Parallel() + + m := newWSOBResubManager() + m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) // no-op + require.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + m.lookup[key.PairAsset{Base: currency.BTC.Item, Quote: currency.USDT.Item, Asset: asset.Spot}] = true + require.True(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) + m.CompletedResubscribe(currency.NewBTCUSDT(), asset.Spot) + assert.False(t, m.IsResubscribing(currency.NewBTCUSDT(), asset.Spot)) +} + +func TestQualifiedChannelKey_Match(t *testing.T) { + t.Parallel() + + require.Implements(t, (*subscription.MatchableKey)(nil), new(qualifiedChannelKey)) + + k := qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "test.channel"}} + require.True(t, k.Match(k)) + require.False(t, k.Match(qualifiedChannelKey{&subscription.Subscription{QualifiedChannel: "TEST.channel"}})) + assert.NotNil(t, k.GetSubscription()) +} diff --git a/exchanges/kucoin/kucoin_convert.go b/exchanges/kucoin/kucoin_convert.go index 7bfb372920f..a65a844f0f5 100644 --- a/exchanges/kucoin/kucoin_convert.go +++ b/exchanges/kucoin/kucoin_convert.go @@ -3,6 +3,7 @@ package kucoin import ( "fmt" + "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/encoding/json" ) @@ -23,5 +24,5 @@ func (a *SubAccountsResponse) UnmarshalJSON(data []byte) error { } else if _, ok := result.([]any); ok { return nil } - return fmt.Errorf("%w can not unmarshal to SubAccountsResponse", errMalformedData) + return fmt.Errorf("%w can not unmarshal to SubAccountsResponse", common.ErrMalformedData) } diff --git a/exchanges/kucoin/kucoin_types.go b/exchanges/kucoin/kucoin_types.go index 76ac4c0254a..1a8f97d57eb 100644 --- a/exchanges/kucoin/kucoin_types.go +++ b/exchanges/kucoin/kucoin_types.go @@ -26,7 +26,6 @@ var ( errInvalidResponseReceiver = errors.New("invalid response receiver") errInvalidStopPriceType = errors.New("stopPriceType is required") - errMalformedData = errors.New("malformed data") errNoDepositAddress = errors.New("no deposit address found") errAddressRequired = errors.New("address is required") errMultipleDepositAddress = errors.New("multiple deposit addresses") diff --git a/exchanges/kucoin/kucoin_websocket.go b/exchanges/kucoin/kucoin_websocket.go index 812c2535bb7..dbcf63c0cea 100644 --- a/exchanges/kucoin/kucoin_websocket.go +++ b/exchanges/kucoin/kucoin_websocket.go @@ -242,7 +242,7 @@ func (e *Exchange) wsHandleData(ctx context.Context, respData []byte) error { case marketCandlesChannel: symbolAndInterval := strings.Split(topicInfo[1], currency.UnderscoreDelimiter) if len(symbolAndInterval) != 2 { - return errMalformedData + return common.ErrMalformedData } return e.processCandlesticks(resp.Data, symbolAndInterval[0], symbolAndInterval[1], topicInfo[0]) case marketMatchChannel: