Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions exchange/websocket/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,99 @@ func TestCreateConnectAndSubscribe(t *testing.T) {
mgr.Wg.Wait()
}

func TestSetSubscriptionsNotRequired(t *testing.T) {
t.Parallel()

singleConn := NewManager()
singleConn.GenerateSubs = func() (subscription.List, error) {
return subscription.List{{Channel: "single"}}, nil
}

singleConn.SetSubscriptionsNotRequired()

subs, err := singleConn.GenerateSubs()
require.NoError(t, err, "GenerateSubs must not error after subscriptions are disabled")
assert.Empty(t, subs, "GenerateSubs should return no subscriptions after subscriptions are disabled")

multiConn := NewManager()
multiConn.useMultiConnectionManagement = true
multiConn.connectionManager = []*websocket{
{setup: nil},
{setup: &ConnectionSetup{}},
{setup: &ConnectionSetup{SubscriptionsNotRequired: true}},
}

multiConn.SetSubscriptionsNotRequired()

for i := range multiConn.connectionManager {
require.NotNil(t,
multiConn.connectionManager[i].setup,
"connection setup should be initialised when missing")
assert.True(t,
multiConn.connectionManager[i].setup.SubscriptionsNotRequired,
"connection setup should not require subscriptions after override")
}
}

func TestSetAllConnectionURLs(t *testing.T) {
t.Parallel()

singleConn := NewManager()
singleConn.Conn = &connection{URL: "ws://old-public.example.com"}
singleConn.AuthConn = &connection{URL: "ws://old-auth.example.com"}

err := singleConn.SetAllConnectionURLs("ws://mock.example.com/ws")
require.NoError(t, err, "SetAllConnectionURLs must not error for single-connection managers")
assert.Equal(t, "ws://mock.example.com/ws", singleConn.runningURL, "runningURL should be updated for single-connection managers")
assert.Equal(t, "ws://mock.example.com/ws", singleConn.runningURLAuth, "runningURLAuth should be updated for single-connection managers")
assert.Equal(t, "ws://mock.example.com/ws", singleConn.Conn.GetURL(), "Conn URL should be updated for single-connection managers")
assert.Equal(t, "ws://mock.example.com/ws", singleConn.AuthConn.GetURL(), "AuthConn URL should be updated for single-connection managers")

multiConn := NewManager()
multiConn.useMultiConnectionManagement = true
multiConn.connectionManager = []*websocket{
{setup: nil},
{setup: &ConnectionSetup{URL: "ws://first.example.com"}},
{setup: &ConnectionSetup{URL: "ws://second.example.com"}, connections: []Connection{&connection{URL: "ws://live.example.com"}}},
}

err = multiConn.SetAllConnectionURLs("ws://mock.example.com/ws")
require.NoError(t, err, "SetAllConnectionURLs must not error for multi-connection managers")

for i := range multiConn.connectionManager {
require.NotNil(t,
multiConn.connectionManager[i].setup,
"connection setup should be initialised when missing")
assert.Equal(t,
"ws://mock.example.com/ws",
multiConn.connectionManager[i].setup.URL,
"connection setup URL should be updated for each multi-connection setup")
}
assert.Equal(t,
"ws://live.example.com",
multiConn.connectionManager[2].connections[0].GetURL(),
"existing live connection URL should not be mutated by the pre-connect helper")
}

func TestSetAllConnectionURLsErrorsAfterConnect(t *testing.T) {
t.Parallel()

ws := NewManager()

err := ws.SetAllConnectionURLs("ws://mock.example.com/ws")
require.NoError(t, err, "SetAllConnectionURLs must allow pre-connect configuration")

ws.setState(connectingState)
err = ws.SetAllConnectionURLs("ws://mock.example.com/ws")
require.ErrorIs(t, err, errAlreadyReconnecting, "SetAllConnectionURLs must error once Connect has started")
require.ErrorContains(t, err, "SetAllConnectionURLs must be called before Connect")

ws.setState(connectedState)
err = ws.SetAllConnectionURLs("ws://mock.example.com/ws")
require.ErrorIs(t, err, errAlreadyConnected, "SetAllConnectionURLs must error after connect")
require.ErrorContains(t, err, "SetAllConnectionURLs must be called before Connect")
}

func TestManager(t *testing.T) {
t.Parallel()

Expand Down
82 changes: 82 additions & 0 deletions exchange/websocket/manager_testhelpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package websocket

import (
"fmt"

"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/log"
)

// SetSubscriptionsNotRequired configures the manager to connect without
// generating default subscriptions.
//
// This exported helper exists for cross-package test harnesses only. It is a
// pre-connect test-mode mutation used by websocket unit-test helpers that need
// request-response connectivity without normal stream bootstraps. Tests that
// later need the manager's default subscription generation should use a fresh
// exchange/manager instance.
func (m *Manager) SetSubscriptionsNotRequired() {
m.m.Lock()
defer m.m.Unlock()

if !m.useMultiConnectionManagement {
m.GenerateSubs = func() (subscription.List, error) { return subscription.List{}, nil }
return
}

for _, ws := range m.connectionManager {
if ws.setup == nil {
log.Warnf(log.WebsocketMgr, "%s websocket: missing connection setup while disabling required subscriptions; creating empty setup", m.exchangeName)
ws.setup = &ConnectionSetup{}
}
ws.setup.SubscriptionsNotRequired = true
}
}

// SetAllConnectionURLs configures every managed websocket connection to use the
// same URL.
//
// This exported helper exists for cross-package test harnesses only. It is a
// pre-connect test-mode mutation used by websocket unit-test helpers to
// redirect all websocket traffic through a single mock server before
// connecting. Calling this after Connect has started returns an error.
func (m *Manager) SetAllConnectionURLs(u string) error {
if err := common.NilGuard(m); err != nil {
return err
}
if err := checkWebsocketURL(u); err != nil {
return err
}

m.m.Lock()
defer m.m.Unlock()

if m.IsConnecting() {
return fmt.Errorf("%v %w: SetAllConnectionURLs must be called before Connect", m.exchangeName, errAlreadyReconnecting)
}
if m.IsConnected() {
return fmt.Errorf("%v %w: SetAllConnectionURLs must be called before Connect", m.exchangeName, errAlreadyConnected)
}

if !m.useMultiConnectionManagement {
m.runningURL = u
m.runningURLAuth = u
if m.Conn != nil {
m.Conn.SetURL(u)
}
if m.AuthConn != nil {
m.AuthConn.SetURL(u)
}
return nil
}

for _, ws := range m.connectionManager {
if ws.setup == nil {
log.Warnf(log.WebsocketMgr, "%s websocket: missing connection setup while updating connection URLs; creating empty setup", m.exchangeName)
ws.setup = &ConnectionSetup{}
}
ws.setup.URL = u
}
return nil
}
44 changes: 44 additions & 0 deletions exchanges/poloniex/poloniex_websocket_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package poloniex

import (
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
)

func TestSetupWsSupportsMultiConnectionManagement(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mockws.WsMockUpgrader(t, w, r, mockws.EchoHandler)
}))
t.Cleanup(server.Close)

e := new(Exchange)
require.NoError(t, testexch.Setup(e), "Test instance Setup must not error")

wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
err := e.Websocket.SetAllConnectionURLs(wsURL)
require.NoError(t, err, "SetAllConnectionURLs must not error for Poloniex")

testexch.SetupWs(t, e)
t.Cleanup(func() {
if e.Websocket.IsConnected() {
assert.NoError(t, e.Websocket.Shutdown(), "Websocket shutdown should not error")
}
})

assert.Empty(t, e.Features.Subscriptions, "Features.Subscriptions should be cleared by SetupWs")
assert.True(t, e.Websocket.IsConnected(), "Websocket manager should be connected after SetupWs")

for _, messageFilter := range []string{connSpotPublic, connSpotPrivate, connFuturesPublic, connFuturesPrivate} {
conn, connErr := e.Websocket.GetConnection(messageFilter)
require.NoErrorf(t, connErr, "GetConnection must not error for message filter %s", messageFilter)
assert.Equalf(t, wsURL, conn.GetURL(), "Connection URL should be redirected for message filter %s", messageFilter)
assert.Emptyf(t, conn.Subscriptions().List(), "Connection subscriptions should remain empty for message filter %s", messageFilter)
}
}
14 changes: 6 additions & 8 deletions internal/testing/exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,14 @@ func MockWsInstance[T any, PT interface {
b.API.AuthenticatedWebsocketSupport = true
err := b.API.Endpoints.SetRunningURL("RestSpotURL", s.URL)
require.NoError(tb, err, "Endpoints.SetRunningURL must not error for RestSpotURL")
for _, auth := range []bool{true, false} {
err = b.Websocket.SetWebsocketURL("ws"+strings.TrimPrefix(s.URL, "http"), auth, true)
require.NoErrorf(tb, err, "SetWebsocketURL must not error for auth: %v", auth)
}

wsURL := "ws" + strings.TrimPrefix(s.URL, "http")
err = b.Websocket.SetAllConnectionURLs(wsURL)
require.NoError(tb, err, "SetAllConnectionURLs must not error")

// For testing we never want to use the default subscriptions; Tests of GenerateSubscriptions should be exercising it directly
b.Features.Subscriptions = subscription.List{}
// Exchanges which don't support subscription conf; Can be removed when all exchanges support sub conf
b.Websocket.GenerateSubs = func() (subscription.List, error) { return subscription.List{}, nil }
b.Websocket.SetSubscriptionsNotRequired()

err = b.Websocket.Connect(context.TODO())
require.NoError(tb, err, "Connect must not error")
Expand Down Expand Up @@ -217,8 +216,7 @@ func SetupWs(tb testing.TB, e exchange.IBotExchange) {

// For testing we never want to use the default subscriptions; Tests of GenerateSubscriptions should be exercising it directly
b.Features.Subscriptions = subscription.List{}
// Exchanges which don't support subscription conf; Can be removed when all exchanges support sub conf
w.GenerateSubs = func() (subscription.List, error) { return subscription.List{}, nil }
w.SetSubscriptionsNotRequired()

err = w.Connect(context.TODO())
require.NoError(tb, err, "Connect must not error")
Expand Down
93 changes: 93 additions & 0 deletions internal/testing/exchange/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,75 @@
package exchange

import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

gws "github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/exchange/websocket"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/binance"
"github.com/thrasher-corp/gocryptotrader/exchanges/bybit"
"github.com/thrasher-corp/gocryptotrader/exchanges/protocol"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
)

const multiConnectionFilter = "multi-connection-test"

type multiConnectionSetupExchange struct {
sharedtestvalues.CustomEx
}

func (e *multiConnectionSetupExchange) GetBase() *exchange.Base {
return &e.Base
}

func newMultiConnectionSetupExchange(tb testing.TB, websocketURL string) *multiConnectionSetupExchange {
tb.Helper()

e := &multiConnectionSetupExchange{}
e.Base.Name = "MultiConnectionSetupExchange"
e.Base.Features.Subscriptions = subscription.List{{Channel: "ticker"}}
e.Base.Websocket = websocket.NewManager()

err := e.Base.Websocket.Setup(&websocket.ManagerSetup{
ExchangeConfig: &config.Exchange{
Name: "MultiConnectionSetupExchange",
Features: &config.FeaturesConfig{
Enabled: config.FeaturesEnabledConfig{Websocket: true},
},
WebsocketTrafficTimeout: 5 * time.Second,
},
Features: &protocol.Features{},
UseMultiConnectionManagement: true,
})
require.NoError(tb, err, "Setup must not error for the multi-connection manager")

err = e.Base.Websocket.SetupNewConnection(&websocket.ConnectionSetup{
URL: websocketURL,
Connector: func(ctx context.Context, conn websocket.Connection) error {
return conn.Dial(ctx, gws.DefaultDialer, nil, nil)
},
GenerateSubscriptions: func() (subscription.List, error) {
return e.Base.Features.Subscriptions.Clone(), nil
},
Subscriber: func(context.Context, websocket.Connection, subscription.List) error { return nil },
Handler: func(context.Context, websocket.Connection, []byte) error { return nil },
MessageFilter: multiConnectionFilter,
})
require.NoError(tb, err, "SetupNewConnection must not error for the multi-connection manager")

return e
}

// TestSetup exercises Setup
func TestSetup(t *testing.T) {
b := new(binance.Exchange)
Expand All @@ -35,3 +93,38 @@ func TestMockWsInstance(t *testing.T) {
b := MockWsInstance[binance.Exchange](t, mockws.CurryWsMockUpgrader(t, func(_ testing.TB, _ []byte, _ *gws.Conn) error { return nil }))
require.NotNil(t, b, "MockWsInstance must not be nil")
}

func TestMockWsInstanceSupportsMultiConnectionManagement(t *testing.T) {
b := MockWsInstance[bybit.Exchange](t, mockws.CurryWsMockUpgrader(t, func(_ testing.TB, _ []byte, _ *gws.Conn) error { return nil }))
require.NotNil(t, b, "MockWsInstance must not be nil for multi-connection websocket exchanges")
t.Cleanup(func() {
if b.GetBase().Websocket.IsConnected() {
assert.NoError(t, b.GetBase().Websocket.Shutdown(), "Websocket shutdown should not error for multi-connection websocket exchanges")
}
})
assert.True(t, b.GetBase().Websocket.IsConnected(), "Websocket manager should be connected for multi-connection websocket exchanges")
}

func TestSetupWsSupportsMultiConnectionManagement(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mockws.WsMockUpgrader(t, w, r, mockws.EchoHandler)
}))
t.Cleanup(server.Close)

e := newMultiConnectionSetupExchange(t, "ws"+strings.TrimPrefix(server.URL, "http"))
t.Cleanup(func() {
if e.Base.Websocket.IsConnected() {
assert.NoError(t, e.Base.Websocket.Shutdown(), "Websocket shutdown should not error after SetupWs")
}
})

SetupWs(t, e)

assert.Empty(t, e.Base.Features.Subscriptions, "Features.Subscriptions should be cleared by SetupWs")
assert.True(t, e.Base.Websocket.IsConnected(), "Websocket manager should be connected after SetupWs")

conn, err := e.Base.Websocket.GetConnection(multiConnectionFilter)
require.NoError(t, err, "GetConnection must not error after SetupWs on a multi-connection manager")
assert.NotNil(t, conn, "GetConnection should return a connection after SetupWs on a multi-connection manager")
assert.Empty(t, conn.Subscriptions().List(), "Connection subscriptions should remain empty when subscriptions are not required")
}
Loading