Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5aba904
Support sub scaling init
gloriousCode Feb 11, 2026
054d0d5
revert manager, rm conn assign
gloriousCode Feb 12, 2026
fb9ee7d
improve conn threading
gloriousCode Feb 12, 2026
525a88b
removing floaters with my fingers
gloriousCode Feb 12, 2026
eeba094
revert ratelimit change (for its own PR)
gloriousCode Feb 12, 2026
3d93fe9
rm poo poos from nappy of despair
gloriousCode Feb 12, 2026
b4e4fc3
sub limits, add gemini, rm leftover nugs
gloriousCode Feb 12, 2026
9c26628
DAMN THERES POOP EVERYWHERE
gloriousCode Feb 12, 2026
f973308
should, must fix nits
gloriousCode Feb 13, 2026
6ca9685
HEY WHY WERENT YOU INCLUDED
gloriousCode Feb 13, 2026
6e913ea
gemini test fixup
gloriousCode Feb 13, 2026
cb2c65c
fix sequence issue
gloriousCode Feb 13, 2026
dd01ee6
fix sequencing
gloriousCode Feb 13, 2026
5311c54
Fixing ai with ai.....
gloriousCode Feb 13, 2026
8745647
how dare you lint-check me
gloriousCode Feb 13, 2026
65d5f2f
SomeSamNits
gloriousCode Feb 16, 2026
5027fe2
mock conn
gloriousCode Feb 16, 2026
6e484cb
lint mint stint clint tint glint dint
gloriousCode Feb 17, 2026
3c1c117
expand coinbase ws testing, lint
gloriousCode Feb 17, 2026
016ccf9
a little nicer
gloriousCode Feb 17, 2026
e7d81dd
shazNOOT NOOT
gloriousCode Feb 19, 2026
e605bfe
minifixes
gloriousCode Feb 19, 2026
8832601
mini fixes
gloriousCode Feb 19, 2026
d0882b5
Merge remote-tracking branch 'upstream' into dom-subs
gloriousCode Feb 23, 2026
4cb4466
lint
gloriousCode Feb 23, 2026
d0383a0
deribit fix, test improvements
gloriousCode Feb 23, 2026
1557af7
bu, more like bs
gloriousCode Feb 24, 2026
e51cbf1
Merge remote-tracking branch 'upstream' into dom-subs
gloriousCode Mar 10, 2026
3c07995
merge stuff okay
gloriousCode Mar 10, 2026
49ec525
FIXING MY BUGS
gloriousCode Mar 10, 2026
a8bc068
mini bits
gloriousCode Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion contrib/spellcheck/ignore_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ prevend
flate
zar
insid
totalin
totalin
bu
3 changes: 0 additions & 3 deletions exchange/websocket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ func (m *Manager) connect(ctx context.Context) error {
}

if len(subs) == 0 {
// If no subscriptions are generated, we skip the connection
if m.verbose {
log.Warnf(log.WebsocketMgr, "%s websocket: no subscriptions generated", m.exchangeName)
}
Expand Down Expand Up @@ -614,7 +613,6 @@ func (m *Manager) createConnectAndSubscribe(ctx context.Context, ws *websocket,
}
return nil
}

if err := ws.setup.Subscriber(ctx, conn, subs); err != nil {
return fmt.Errorf("%w: %w", ErrSubscriptionFailure, err)
}
Expand Down Expand Up @@ -795,7 +793,6 @@ func (m *Manager) SetWebsocketURL(u string, auth, reconnect bool) error {
if defaultVals {
u = m.defaultURLAuth
}

err := checkWebsocketURL(u)
if err != nil {
return err
Expand Down
190 changes: 120 additions & 70 deletions exchanges/bitfinex/bitfinex_test.go

Large diffs are not rendered by default.

242 changes: 118 additions & 124 deletions exchanges/bitfinex/bitfinex_websocket.go

Large diffs are not rendered by default.

60 changes: 34 additions & 26 deletions exchanges/bitfinex/bitfinex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (e *Exchange) SetDefaults() {
}
e.API.Endpoints = e.NewEndpoints()
err = e.API.Endpoints.SetDefaultEndpoints(map[exchange.URL]string{
exchange.RestSpot: bitfinexAPIURLBase,
exchange.WebsocketSpot: publicBitfinexWebsocketEndpoint,
exchange.RestSpot: bitfinexAPIURLBase,
exchange.WebsocketSpot: publicBitfinexWebsocketEndpoint,
exchange.WebsocketSpotSupplementary: authenticatedBitfinexWebsocketEndpoint,
})
if err != nil {
log.Errorln(log.ExchangeSys, err)
Expand All @@ -188,39 +189,51 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
return err
}

wsEndpoint, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
err = e.Websocket.Setup(&websocket.ManagerSetup{
ExchangeConfig: exch,
Features: &e.Features.Supports.WebsocketCapabilities,
UseMultiConnectionManagement: true,
MaxWebsocketSubscriptionsPerConnection: 25, // https://docs.bitfinex.com/docs/requirements-and-limitations
})
if err != nil {
return err
}

err = e.Websocket.Setup(&websocket.ManagerSetup{
ExchangeConfig: exch,
DefaultURL: publicBitfinexWebsocketEndpoint,
RunningURL: wsEndpoint,
Connector: e.WsConnect,
Subscriber: e.Subscribe,
Unsubscriber: e.Unsubscribe,
GenerateSubscriptions: e.generateSubscriptions,
Features: &e.Features.Supports.WebsocketCapabilities,
})
wsPublicURL, err := e.API.Endpoints.GetURL(exchange.WebsocketSpot)
if err != nil {
return err
}
wsAuthURL, err := e.API.Endpoints.GetURL(exchange.WebsocketSpotSupplementary)
if err != nil {
return err
}

err = e.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: publicBitfinexWebsocketEndpoint,
Connector: e.wsConnect,
Subscriber: e.subscribeForConnection,
Unsubscriber: e.unsubscribeForConnection,
GenerateSubscriptions: e.generatePublicSubscriptions,
Handler: e.wsHandleData,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: wsPublicURL,
MessageFilter: wsPublicURL,
})
if err != nil {
return err
}

return e.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: authenticatedBitfinexWebsocketEndpoint,
Authenticated: true,
Connector: e.wsConnect,
Authenticate: e.wsSendAuthConn,
Subscriber: e.subscribeForConnection,
Unsubscriber: e.unsubscribeForConnection,
GenerateSubscriptions: e.generatePrivateSubscriptions,
SubscriptionsNotRequired: true,
Handler: e.wsHandleData,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
URL: wsAuthURL,
MessageFilter: wsAuthURL,
})
}

Expand Down Expand Up @@ -961,11 +974,6 @@ func (e *Exchange) GetOrderHistory(ctx context.Context, req *order.MultiOrderReq
return req.Filter(e.Name, orders), nil
}

// AuthenticateWebsocket sends an authentication message to the websocket
func (e *Exchange) AuthenticateWebsocket(ctx context.Context) error {
return e.WsSendAuth(ctx)
}

// appendOptionalDelimiter ensures that a delimiter is present for long character currencies
func (e *Exchange) appendOptionalDelimiter(p *currency.Pair) {
if (len(p.Base.String()) > 3 && !p.Quote.IsEmpty()) ||
Expand Down
2 changes: 1 addition & 1 deletion exchanges/bitstamp/bitstamp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
}

return e.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
URL: e.Websocket.GetWebsocketURL(),
URL: wsURL,
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
})
Expand Down
13 changes: 6 additions & 7 deletions exchanges/bybit/bybit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3041,8 +3041,9 @@ func TestWSHandleData(t *testing.T) {

keys := slices.Collect(maps.Keys(pushDataMap))
slices.Sort(keys)
conn := testexch.GetMockConn(t, e, "")
for x := range keys {
err := e.wsHandleData(t.Context(), nil, asset.Spot, []byte(pushDataMap[keys[x]]))
err := e.wsHandleData(t.Context(), conn, asset.Spot, []byte(pushDataMap[keys[x]]))
if keys[x] == "unhandled" {
assert.ErrorIs(t, err, errUnhandledStreamData, "wsHandleData should error correctly for unhandled topics")
} else {
Expand Down Expand Up @@ -3254,12 +3255,10 @@ func TestWsTicker(t *testing.T) {
asset.Spot, asset.Options, asset.USDTMarginedFutures, asset.USDTMarginedFutures,
asset.USDCMarginedFutures, asset.USDCMarginedFutures, asset.CoinMarginedFutures, asset.CoinMarginedFutures,
}
routingIndex := 0
conn := testexch.GetMockConn(t, e, "")
testexch.FixtureToDataHandler(t, "testdata/wsTicker.json", func(_ context.Context, r []byte) error {
require.Less(t, routingIndex, len(assetRouting), "routingIndex must stay within ticker fixture asset routing bounds")
a := assetRouting[routingIndex]
routingIndex++
return e.wsHandleData(t.Context(), nil, a, r)
defer slices.Delete(assetRouting, 0, 1)
return e.wsHandleData(t.Context(), conn, assetRouting[0], r)
})
e.Websocket.DataHandler.Close()
expected := 8
Expand Down Expand Up @@ -3508,7 +3507,7 @@ func TestFetchTradablePairs(t *testing.T) {
func TestDeltaUpdateOrderbook(t *testing.T) {
t.Parallel()
data := []byte(`{"topic":"orderbook.50.WEMIXUSDT","ts":1697573183768,"type":"snapshot","data":{"s":"WEMIXUSDT","b":[["0.9511","260.703"],["0.9677","0"]],"a":[],"u":3119516,"seq":14126848493},"cts":1728966699481}`)
err := e.wsHandleData(t.Context(), nil, asset.Spot, data)
err := e.wsHandleData(t.Context(), testexch.GetMockConn(t, e, ""), asset.Spot, data)
require.NoError(t, err, "wsHandleData must not error")
update := []byte(`{"topic":"orderbook.50.WEMIXUSDT","ts":1697573183768,"type":"delta","data":{"s":"WEMIXUSDT","b":[["0.9511","260.703"],["0.9677","0"]],"a":[],"u":3119516,"seq":14126848493},"cts":1728966699481}`)
var wsResponse WebsocketResponse
Expand Down
19 changes: 11 additions & 8 deletions exchanges/bybit/bybit_websocket_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ import (

"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/encoding/json"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)

// Websocket request operation types
const (
OutboundTradeConnection = "PRIVATE_TRADE"
InboundPrivateConnection = "PRIVATE"
)

// WSCreateOrder creates an order through the websocket connection
func (e *Exchange) WSCreateOrder(ctx context.Context, r *PlaceOrderRequest) (*WebsocketOrderDetails, error) {
if err := r.Validate(); err != nil {
Expand Down Expand Up @@ -64,13 +59,21 @@ func (e *Exchange) WSCancelOrder(ctx context.Context, r *CancelOrderRequest) (*W

// sendWebsocketTradeRequest sends a trade request to the exchange through the websocket connection
func (e *Exchange) sendWebsocketTradeRequest(ctx context.Context, op, orderLinkID string, payload any, limit request.EndpointLimit) (*WebsocketOrderDetails, error) {
wsTradeURL, err := e.API.Endpoints.GetURL(exchange.WebsocketTrade)
if err != nil {
return nil, err
}
wsPrivateURL, err := e.API.Endpoints.GetURL(exchange.WebsocketPrivate)
if err != nil {
return nil, err
}
// Get the outbound and inbound connections to send and receive the request. This makes sure both are live before
// sending the request.
outbound, err := e.Websocket.GetConnection(OutboundTradeConnection)
outbound, err := e.Websocket.GetConnection(wsTradeURL)
if err != nil {
return nil, err
}
inbound, err := e.Websocket.GetConnection(InboundPrivateConnection)
inbound, err := e.Websocket.GetConnection(wsPrivateURL)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions exchanges/bybit/bybit_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(ctx context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(ctx, conn, asset.USDTMarginedFutures, resp)
},
MessageFilter: asset.USDTMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
MessageFilter: asset.USDTMarginedFutures, // Required to differentiate linear futures connections sharing the same endpoint URL.
}); err != nil {
return err
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Handler: func(ctx context.Context, conn websocket.Connection, resp []byte) error {
return e.wsHandleData(ctx, conn, asset.USDCMarginedFutures, resp)
},
MessageFilter: asset.USDCMarginedFutures, // Unused but it allows us to differentiate between the two linear futures types.
MessageFilter: asset.USDCMarginedFutures, // Required to differentiate linear futures connections sharing the same endpoint URL.
}); err != nil {
return err
}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
return e.wsHandleTradeData(conn, resp)
},
Authenticate: e.WebsocketAuthenticateTradeConnection,
MessageFilter: OutboundTradeConnection,
MessageFilter: wsTradeURL,
SubscriptionsNotRequired: true,
}); err != nil {
return err
Expand All @@ -395,7 +395,7 @@ func (e *Exchange) Setup(exch *config.Exchange) error {
Unsubscriber: e.authUnsubscribe,
Handler: e.wsHandleAuthenticatedData,
Authenticate: e.WebsocketAuthenticatePrivateConnection,
MessageFilter: InboundPrivateConnection,
MessageFilter: wsPrivateURL,
})
}

Expand Down
Loading
Loading