Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5aba904
Support sub scaling init
gloriousCode Feb 11, 2026
054d0d5
revert manager, rm conn assign
gloriousCode Feb 12, 2026
fb9ee7d
improve conn threading
gloriousCode Feb 12, 2026
525a88b
removing floaters with my fingers
gloriousCode Feb 12, 2026
eeba094
revert ratelimit change (for its own PR)
gloriousCode Feb 12, 2026
3d93fe9
rm poo poos from nappy of despair
gloriousCode Feb 12, 2026
b4e4fc3
sub limits, add gemini, rm leftover nugs
gloriousCode Feb 12, 2026
9c26628
DAMN THERES POOP EVERYWHERE
gloriousCode Feb 12, 2026
f973308
should, must fix nits
gloriousCode Feb 13, 2026
6ca9685
HEY WHY WERENT YOU INCLUDED
gloriousCode Feb 13, 2026
6e913ea
gemini test fixup
gloriousCode Feb 13, 2026
cb2c65c
fix sequence issue
gloriousCode Feb 13, 2026
dd01ee6
fix sequencing
gloriousCode Feb 13, 2026
5311c54
Fixing ai with ai.....
gloriousCode Feb 13, 2026
8745647
how dare you lint-check me
gloriousCode Feb 13, 2026
65d5f2f
SomeSamNits
gloriousCode Feb 16, 2026
5027fe2
mock conn
gloriousCode Feb 16, 2026
6e484cb
lint mint stint clint tint glint dint
gloriousCode Feb 17, 2026
3c1c117
expand coinbase ws testing, lint
gloriousCode Feb 17, 2026
016ccf9
a little nicer
gloriousCode Feb 17, 2026
e7d81dd
shazNOOT NOOT
gloriousCode Feb 19, 2026
e605bfe
minifixes
gloriousCode Feb 19, 2026
8832601
mini fixes
gloriousCode Feb 19, 2026
d0882b5
Merge remote-tracking branch 'upstream' into dom-subs
gloriousCode Feb 23, 2026
4cb4466
lint
gloriousCode Feb 23, 2026
d0383a0
deribit fix, test improvements
gloriousCode Feb 23, 2026
1557af7
bu, more like bs
gloriousCode Feb 24, 2026
e51cbf1
Merge remote-tracking branch 'upstream' into dom-subs
gloriousCode Mar 10, 2026
3c07995
merge stuff okay
gloriousCode Mar 10, 2026
49ec525
FIXING MY BUGS
gloriousCode Mar 10, 2026
a8bc068
mini bits
gloriousCode Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions exchange/websocket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ func (m *Manager) connect(ctx context.Context) error {
}

if len(subs) == 0 {
// If no subscriptions are generated, we skip the connection
if m.verbose {
log.Warnf(log.WebsocketMgr, "%s websocket: no subscriptions generated", m.exchangeName)
}
Expand Down Expand Up @@ -614,7 +613,6 @@ func (m *Manager) createConnectAndSubscribe(ctx context.Context, ws *websocket,
}
return nil
}

if err := ws.setup.Subscriber(ctx, conn, subs); err != nil {
return fmt.Errorf("%w: %w", ErrSubscriptionFailure, err)
}
Expand Down Expand Up @@ -795,7 +793,6 @@ func (m *Manager) SetWebsocketURL(u string, auth, reconnect bool) error {
if defaultVals {
u = m.defaultURLAuth
}

err := checkWebsocketURL(u)
if err != nil {
return err
Expand Down
159 changes: 106 additions & 53 deletions exchanges/bitfinex/bitfinex_test.go

Large diffs are not rendered by default.

240 changes: 126 additions & 114 deletions exchanges/bitfinex/bitfinex_websocket.go

Large diffs are not rendered by default.

49 changes: 24 additions & 25 deletions exchanges/bitfinex/bitfinex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,39 +188,43 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
return err
}

wsEndpoint, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}

err = e.Websocket.Setup(&websocket.ManagerSetup{
ExchangeConfig: exch,
DefaultURL: publicBitfinexWebsocketEndpoint,
RunningURL: wsEndpoint,
Connector: e.WsConnect,
Subscriber: e.Subscribe,
Unsubscriber: e.Unsubscribe,
GenerateSubscriptions: e.generateSubscriptions,
Features: &e.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
Features: &e.Features.Supports.WebsocketCapabilities,
UseMultiConnectionManagement: true,
MaxWebsocketSubscriptionsPerConnection: 25, // https://docs.bitfinex.com/docs/requirements-and-limitations
})
if err != nil {
return err
}

err = e.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: publicBitfinexWebsocketEndpoint,
Connector: e.wsConnect,
Subscriber: e.subscribeForConnection,
Unsubscriber: e.unsubscribeForConnection,
GenerateSubscriptions: e.generatePublicSubscriptions,
Handler: e.wsHandleData,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: publicBitfinexWebsocketEndpoint,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use e.API.Endpoints.GetURL(exchange.WebsocketSpot) so that the config can override this and for the auth below.

Copy link
Collaborator Author

@gloriousCode gloriousCode Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use gateio as an inspiration since you use it so much and I'm mislead by your implementation. This will not stand!

(I will update all implementations)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the standard is GetURL is used for URL and is also the MessageFilter, I've rolled that out too, but let me know if that makes you mad and/or sad, especially gateio. The only exceptional circumstance is Bybit since we have 2 assets using the same connection

MessageFilter: publicBitfinexWebsocketEndpoint,
})
if err != nil {
return err
}

return e.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: authenticatedBitfinexWebsocketEndpoint,
Authenticated: true,
Connector: e.wsConnect,
Authenticate: e.wsAuthenticate,
Subscriber: e.subscribeForConnection,
Unsubscriber: e.unsubscribeForConnection,
GenerateSubscriptions: e.generatePrivateSubscriptions,
Handler: e.wsHandleData,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: authenticatedBitfinexWebsocketEndpoint,
Authenticated: true,
MessageFilter: authenticatedBitfinexWebsocketEndpoint,
})
}

Expand Down Expand Up @@ -952,11 +956,6 @@ func (e *Exchange) GetOrderHistory(ctx context.Context, req *order.MultiOrderReq
return req.Filter(e.Name, orders), nil
}

// AuthenticateWebsocket sends an authentication message to the websocket
func (e *Exchange) AuthenticateWebsocket(ctx context.Context) error {
return e.WsSendAuth(ctx)
}

// appendOptionalDelimiter ensures that a delimiter is present for long character currencies
func (e *Exchange) appendOptionalDelimiter(p *currency.Pair) {
if (len(p.Base.String()) > 3 && !p.Quote.IsEmpty()) ||
Expand Down
158 changes: 132 additions & 26 deletions exchanges/coinbase/coinbase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"github.com/thrasher-corp/gocryptotrader/exchanges/futures"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
Expand Down Expand Up @@ -107,7 +108,7 @@
sharedtestvalues.SkipTestIfCredentialsUnset(t, e)
exch := &Exchange{}
exch.Websocket = sharedtestvalues.NewTestWebsocket()
err := exch.WsConnect()
err := exch.Websocket.Connect(t.Context())
assert.ErrorIs(t, err, websocket.ErrWebsocketNotEnabled)
err = exchangeBaseHelper(exch)
require.NoError(t, err)
Expand Down Expand Up @@ -1512,12 +1513,14 @@
if e.Websocket.IsEnabled() && !e.API.AuthenticatedWebsocketSupport || !sharedtestvalues.AreAPICredentialsSet(e) {
t.Skip(websocket.ErrWebsocketNotEnabled.Error())
}
var dialer gws.Dialer
err := e.Websocket.Conn.Dial(t.Context(), &dialer, http.Header{})
err := e.Websocket.Connect(t.Context())
require.NoError(t, err)
e.Websocket.Wg.Add(1)
go e.wsReadData(t.Context())
err = e.Subscribe(subscription.List{
wsRunningURL, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
require.NoError(t, err)
conn, err := e.Websocket.GetConnection(wsRunningURL)
require.NoError(t, err)
startWSReadLoop(t.Context(), e, conn)
err = subscribeForTest(t.Context(), e, subscription.List{
{
Channel: "myAccount",
Asset: asset.All,
Expand Down Expand Up @@ -1550,68 +1553,92 @@
}
}
}()
_, err := e.wsHandleData(t.Context(), nil)
err := e.wsHandleData(t.Context(), nil, nil)
var syntaxErr *json.SyntaxError
assert.True(t, errors.As(err, &syntaxErr) || strings.Contains(err.Error(), "Syntax error no sources available, the input json is empty"), errJSONUnmarshalUnexpected)
mockJSON := []byte(`{"type": "error"}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.Error(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "subscriptions"}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
var unmarshalTypeErr *json.UnmarshalTypeError
mockJSON = []byte(`{"sequence_num": 0, "channel": "status", "events": [{"type": 1234}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "status", "events": [{"type": "moo"}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "ticker", "events": [{"type": "moo", "tickers": false}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "candles", "events": [{"type": false}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "candles", "events": [{"type": "moo", "candles": [{"low": "1.1"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "market_trades", "events": [{"type": false}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "market_trades", "events": [{"type": "moo", "trades": [{"price": "1.1"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "l2_data", "events": [{"type": false, "updates": [{"price_level": "1.1"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "l2_data", "timestamp": "2006-01-02T15:04:05Z", "events": [{"type": "moo", "updates": [{"price_level": "1.1"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.ErrorIs(t, err, errUnknownL2DataType)
mockJSON = []byte(`{"sequence_num": 0, "channel": "l2_data", "timestamp": "2006-01-02T15:04:05Z", "events": [{"type": "snapshot", "product_id": "BTC-USD", "updates": [{"side": "bid", "price_level": "1.1", "new_quantity": "2.2"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "l2_data", "timestamp": "2006-01-02T15:04:05Z", "events": [{"type": "update", "product_id": "BTC-USD", "updates": [{"side": "bid", "price_level": "1.1", "new_quantity": "2.2"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
mockJSON = []byte(`{"sequence_num": 0, "channel": "user", "events": [{"type": false}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.True(t, errors.As(err, &unmarshalTypeErr) || strings.Contains(err.Error(), "mismatched type with value"), errJSONUnmarshalUnexpected)
mockJSON = []byte(`{"sequence_num": 0, "channel": "user", "events": [{"type": "l", "orders": [{"limit_price": "2.2", "total_fees": "1.1", "post_only": true}], "positions": {"perpetual_futures_positions": [{"margin_type": "fakeMarginType"}], "expiring_futures_positions": [{}]}}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.ErrorIs(t, err, order.ErrUnrecognisedOrderType)
mockJSON = []byte(`{"sequence_num": 0, "channel": "fakechan", "events": [{"type": ""}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.ErrorIs(t, err, errChannelNameUnknown)
p, err := e.FormatExchangeCurrency(currency.NewBTCUSD(), asset.Spot)
require.NoError(t, err)
e.pairAliases.Load(map[currency.Pair]currency.Pairs{
p: {p},
})
mockJSON = []byte(`{"sequence_num": 0, "channel": "ticker", "events": [{"type": "moo", "tickers": [{"product_id": "BTC-USD", "price": "1.1"}]}]}`)
_, err = e.wsHandleData(t.Context(), mockJSON)
err = e.wsHandleData(t.Context(), nil, mockJSON)
assert.NoError(t, err)
}

func TestWsHandleDataSequence(t *testing.T) {
t.Parallel()
connA := &testWSConn{url: "ws://coinbase-seq-a"}
connB := &testWSConn{url: "ws://coinbase-seq-b"}
buildSubMsg := func(seq uint64) []byte {
return []byte(`{"sequence_num":` + strconv.FormatUint(seq, 10) + `,"channel":"subscriptions"}`)
}

err := e.wsHandleData(t.Context(), connA, buildSubMsg(7))
assert.NoError(t, err, "wsHandleData should not error for initial sequence")

err = e.wsHandleData(t.Context(), connA, buildSubMsg(8))
assert.NoError(t, err, "wsHandleData should not error for in-order sequence")

err = e.wsHandleData(t.Context(), connA, buildSubMsg(10))
assert.ErrorIs(t, err, errOutOfSequence, "wsHandleData should error for out-of-order sequence")

err = e.wsHandleData(t.Context(), connA, buildSubMsg(11))
assert.NoError(t, err, "wsHandleData should not error after sequence state is resynced")

err = e.wsHandleData(t.Context(), connB, buildSubMsg(3))
assert.NoError(t, err, "wsHandleData should not error for a different connection sequence state")
}

func TestProcessSnapshotUpdate(t *testing.T) {
t.Parallel()
req := WebsocketOrderbookDataHolder{Changes: []WebsocketOrderbookData{{Side: "fakeside", PriceLevel: 1.1, NewQuantity: 2.2}}, ProductID: currency.NewBTCUSD()}
Expand Down Expand Up @@ -1667,12 +1694,61 @@
t.Parallel()
sharedtestvalues.SkipTestIfCredentialsUnset(t, e)
req := subscription.List{{Channel: "heartbeat", Asset: asset.Spot, Pairs: currency.Pairs{currency.NewPairWithDelimiter(testCrypto.String(), testFiat.String(), "-")}}}
err := e.Subscribe(req)
err := subscribeForTest(t.Context(), e, req)
assert.NoError(t, err)
err = e.Unsubscribe(req)
err = unsubscribeForTest(t.Context(), e, req)
assert.NoError(t, err)
}

func subscribeForTest(ctx context.Context, e *Exchange, subs subscription.List) error {
wsRunningURL, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
conn, err := e.Websocket.GetConnection(wsRunningURL)
if err != nil {
conn, err = e.Websocket.GetConnection(coinbaseWebsocketURL)
if err != nil {
return err
}
}
return e.subscribeForConnection(ctx, conn, subs)
}

func unsubscribeForTest(ctx context.Context, e *Exchange, subs subscription.List) error {
wsRunningURL, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
conn, err := e.Websocket.GetConnection(wsRunningURL)
if err != nil {
conn, err = e.Websocket.GetConnection(coinbaseWebsocketURL)
if err != nil {
return err
}
}
return e.unsubscribeForConnection(ctx, conn, subs)
}

func startWSReadLoop(ctx context.Context, e *Exchange, conn websocket.Connection) {
e.Websocket.Wg.Add(1)
go func() {

Check failure on line 1735 in exchanges/coinbase/coinbase_test.go

View workflow job for this annotation

GitHub Actions / miscellaneous checks

Goroutine creation can be simplified using WaitGroup.Go
defer e.Websocket.Wg.Done()
for {
resp := conn.ReadMessage()
if resp.Raw == nil {
return
}
err := e.wsHandleData(ctx, conn, resp.Raw)
if err != nil {
if errSend := e.Websocket.DataHandler.Send(ctx, err); errSend != nil {
log.Printf("%s %s: %s %s", e.Name, conn.GetURL(), errSend, err)
}
}
}
}()
}

func TestCheckSubscriptions(t *testing.T) {
t.Parallel()
e := &Exchange{
Expand Down Expand Up @@ -2045,3 +2121,33 @@
require.NoError(t, err)
assert.NotEmpty(t, resp, errExpectedNonEmpty)
}

type testWSConn struct{ url string }

func (m *testWSConn) Dial(context.Context, *gws.Dialer, http.Header) error { return nil }
func (m *testWSConn) ReadMessage() websocket.Response { return websocket.Response{} }
func (m *testWSConn) SetupPingHandler(request.EndpointLimit, websocket.PingHandler) {
}

Check failure on line 2130 in exchanges/coinbase/coinbase_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)
func (m *testWSConn) SendMessageReturnResponse(context.Context, request.EndpointLimit, any, any) ([]byte, error) {
return nil, nil
}
func (m *testWSConn) SendMessageReturnResponses(context.Context, request.EndpointLimit, any, any, int) ([][]byte, error) {
return nil, nil
}
func (m *testWSConn) SendMessageReturnResponsesWithInspector(context.Context, request.EndpointLimit, any, any, int, websocket.Inspector) ([][]byte, error) {
return nil, nil
}
func (m *testWSConn) SendRawMessage(context.Context, request.EndpointLimit, int, []byte) error {
return nil
}
func (m *testWSConn) SendJSONMessage(context.Context, request.EndpointLimit, any) error { return nil }
func (m *testWSConn) SetURL(url string) { m.url = url }
func (m *testWSConn) SetProxy(string) {}
func (m *testWSConn) GetURL() string { return m.url }
func (m *testWSConn) Shutdown() error { return nil }
func (m *testWSConn) RequireMatchWithData(any, []byte) error { return nil }
func (m *testWSConn) IncomingWithData(any, []byte) bool { return false }
func (m *testWSConn) MatchReturnResponses(context.Context, any, int) (<-chan websocket.MatchedResponse, error) {
return nil, nil
}
func (m *testWSConn) Subscriptions() *subscription.Store { return nil }
3 changes: 3 additions & 0 deletions exchanges/coinbase/coinbase_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/types"
Expand All @@ -29,6 +30,8 @@ type Exchange struct {
exchange.Base
jwt jwtManager
pairAliases pairAliases
wsSeqState map[websocket.Connection]uint64
wsSeqMu sync.Mutex
}

// Version is used for the niche cases where the Version of the API must be specified and passed around for proper functionality
Expand Down
Loading
Loading