Skip to content

Commit 1b13143

Browse files
author
Ryan O'Hara-Reid
committed
kucoin_ws_ob_updoo: thrasher-corp#2123 includes thrasher-corp#2119
1 parent 1c2e9cb commit 1b13143

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1564
-1584
lines changed

cmd/exchange_template/websocket.tmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (e *Exchange) WsConnect() error {
3030
// Add WebSocket authentication logic here.
3131
}
3232

33-
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
33+
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}, nil); err != nil {
3434
return fmt.Errorf("%v - Unable to connect to Websocket. Error: %s", e.Name, err)
3535
}
3636

docs/ADD_NEW_EXCHANGE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ func (e *Exchange) WsConnect() error {
663663
}
664664
}
665665

666-
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}); err != nil {
666+
if err := e.Websocket.Conn.Dial(ctx, &dialer, http.Header{}, nil); err != nil {
667667
return fmt.Errorf("%v - Unable to connect to Websocket. Error: %s", e.Name, err)
668668
}
669669

engine/rpcserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2030,7 +2030,7 @@ func (s *RPCServer) GetOrderbookStream(r *gctrpc.GetOrderbookStreamRequest, stre
20302030
return err
20312031
}
20322032

2033-
depth, err := orderbook.GetDepth(r.Exchange, p, a)
2033+
depth, err := orderbook.GetDepth(exch.GetName(), p, a)
20342034
if err != nil {
20352035
return err
20362036
}
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
package buffer
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/thrasher-corp/gocryptotrader/common"
11+
"github.com/thrasher-corp/gocryptotrader/common/key"
12+
"github.com/thrasher-corp/gocryptotrader/currency"
13+
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
14+
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
15+
"github.com/thrasher-corp/gocryptotrader/log"
16+
)
17+
18+
// public errors
19+
var (
20+
ErrOrderbookSnapshotOutdated = errors.New("orderbook snapshot is outdated")
21+
)
22+
23+
// time settings
24+
var (
25+
DefaultWSOrderbookUpdateTimeDelay = time.Second * 2
26+
DefaultWSOrderbookUpdateDeadline = time.Minute * 2
27+
)
28+
29+
var (
30+
errPendingUpdatesNotApplied = errors.New("pending updates not applied")
31+
errUnhandledCacheState = errors.New("unhandled cache state")
32+
)
33+
34+
// UpdateManager manages orderbook updates for websocket connections
35+
// TODO: Directly couple with orderbook struct and optimise locking paths.
36+
type UpdateManager struct {
37+
lookup map[key.PairAsset]*updateCache
38+
deadline time.Duration
39+
delay time.Duration
40+
fetchOrderbook func(ctx context.Context, p currency.Pair, a asset.Item) (*orderbook.Book, error)
41+
checkPendingUpdate func(lastUpdateID int64, firstUpdateID int64, update *orderbook.Update) (skip bool, err error)
42+
ob *Orderbook
43+
m sync.RWMutex
44+
}
45+
46+
type updateCache struct {
47+
updates []pendingUpdate
48+
ch chan int64
49+
m sync.Mutex
50+
state cacheState
51+
}
52+
53+
type cacheState uint32
54+
55+
const (
56+
cacheStateUninitialised cacheState = iota
57+
cacheStateInitialised
58+
cacheStateQueuing
59+
cacheStateSynced
60+
)
61+
62+
type pendingUpdate struct {
63+
update *orderbook.Update
64+
firstUpdateID int64
65+
}
66+
67+
// UpdateParams contains parameters used to create a new UpdateManager
68+
type UpdateParams struct {
69+
// FetchDelay defines the delay before the REST orderbook is retrieved. In some cases REST requests can be behind
70+
// websocket updates by a large margin, this allows the cache to fill with updates before we fetch the orderbook so
71+
// they can be correctly applied.
72+
FetchDelay time.Duration
73+
// FetchDeadline defines the maximum time to wait for the REST orderbook to be retrieved. This prevents excessive
74+
// backlogs of pending updates building up while waiting for rate limiter delays.
75+
FetchDeadline time.Duration
76+
FetchOrderbook func(ctx context.Context, p currency.Pair, a asset.Item) (*orderbook.Book, error)
77+
// CheckPendingUpdate allows custom logic to determine if a pending update added to cache should be skipped or if an
78+
// error has occurred.
79+
CheckPendingUpdate func(lastUpdateID int64, firstUpdateID int64, update *orderbook.Update) (skip bool, err error)
80+
BufferInstance *Orderbook // TODO: Integrate directly with orderbook struct
81+
}
82+
83+
// NewUpdateManager creates a new websocket orderbook update manager
84+
func NewUpdateManager(params *UpdateParams) *UpdateManager {
85+
if params.FetchDeadline <= 0 {
86+
panic("fetch deadline must be greater than zero")
87+
}
88+
if params.FetchDelay < 0 {
89+
panic("fetch delay must be greater than or equal to zero")
90+
}
91+
if err := common.NilGuard(params.FetchOrderbook, params.CheckPendingUpdate, params.BufferInstance); err != nil {
92+
panic(err)
93+
}
94+
return &UpdateManager{
95+
lookup: make(map[key.PairAsset]*updateCache),
96+
deadline: params.FetchDeadline,
97+
delay: params.FetchDelay,
98+
fetchOrderbook: params.FetchOrderbook,
99+
checkPendingUpdate: params.CheckPendingUpdate,
100+
ob: params.BufferInstance,
101+
}
102+
}
103+
104+
// ProcessOrderbookUpdate processes an orderbook update by syncing snapshot, caching updates and applying them
105+
func (m *UpdateManager) ProcessOrderbookUpdate(ctx context.Context, firstUpdateID int64, update *orderbook.Update) error {
106+
cache, err := m.loadCache(update.Pair, update.Asset)
107+
if err != nil {
108+
return err
109+
}
110+
111+
cache.m.Lock()
112+
defer cache.m.Unlock()
113+
switch cache.state {
114+
case cacheStateSynced:
115+
return m.applyUpdate(ctx, cache, firstUpdateID, update)
116+
case cacheStateInitialised:
117+
m.initialiseOrderbookCache(ctx, firstUpdateID, update, cache)
118+
case cacheStateQueuing:
119+
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
120+
select {
121+
case cache.ch <- update.UpdateID: // Notify syncOrderbook of most recent update ID for inspection
122+
default:
123+
}
124+
default:
125+
return fmt.Errorf("%w: %d for %v %v", errUnhandledCacheState, cache.state, update.Pair, update.Asset)
126+
}
127+
return nil
128+
}
129+
130+
// loadCache loads the cache for the given pair and asset. If the cache does not exist, it creates a new one.
131+
func (m *UpdateManager) loadCache(p currency.Pair, a asset.Item) (*updateCache, error) {
132+
if p.IsEmpty() {
133+
return nil, currency.ErrCurrencyPairEmpty
134+
}
135+
if !a.IsValid() {
136+
return nil, fmt.Errorf("%w: %q", asset.ErrInvalidAsset, a)
137+
}
138+
m.m.RLock()
139+
cache, ok := m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}]
140+
m.m.RUnlock()
141+
if !ok {
142+
cache = &updateCache{ch: make(chan int64), state: cacheStateInitialised}
143+
m.m.Lock()
144+
m.lookup[key.PairAsset{Base: p.Base.Item, Quote: p.Quote.Item, Asset: a}] = cache
145+
m.m.Unlock()
146+
}
147+
return cache, nil
148+
}
149+
150+
// applyUpdate verifies and applies an orderbook update
151+
// Invalidates the cache on error
152+
// Does not benefit from concurrent lock protection
153+
func (m *UpdateManager) applyUpdate(ctx context.Context, cache *updateCache, firstUpdateID int64, update *orderbook.Update) error {
154+
lastUpdateID, err := m.ob.LastUpdateID(update.Pair, update.Asset)
155+
if err != nil {
156+
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", m.ob.exchangeName, update.Pair, update.Asset, err)
157+
return m.invalidateCache(ctx, firstUpdateID, update, cache)
158+
}
159+
if isOutOfSequence(lastUpdateID, firstUpdateID) {
160+
if m.ob.verbose { // disconnection will pollute logs
161+
log.Warnf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: desync detected", m.ob.exchangeName, update.Pair, update.Asset)
162+
}
163+
return m.invalidateCache(ctx, firstUpdateID, update, cache)
164+
}
165+
if err := m.ob.Update(update); err != nil {
166+
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", m.ob.exchangeName, update.Pair, update.Asset, err)
167+
return m.invalidateCache(ctx, firstUpdateID, update, cache)
168+
}
169+
return nil
170+
}
171+
172+
// initialiseOrderbookCache sets the cache state to queuing, appends the update to the cache and spawns a goroutine
173+
// to fetch and synchronise the orderbook snapshot
174+
// assumes lock already active on cache
175+
func (m *UpdateManager) initialiseOrderbookCache(ctx context.Context, firstUpdateID int64, update *orderbook.Update, cache *updateCache) {
176+
cache.state = cacheStateQueuing
177+
cache.updates = append(cache.updates, pendingUpdate{update: update, firstUpdateID: firstUpdateID})
178+
go func() {
179+
if err := m.syncOrderbook(ctx, cache, update.Pair, update.Asset); err != nil {
180+
log.Errorf(log.ExchangeSys, "%s websocket orderbook manager: failed to sync orderbook for %v %v: %v", m.ob.exchangeName, update.Pair, update.Asset, err)
181+
}
182+
}()
183+
}
184+
185+
// invalidateCache invalidates the existing orderbook, clears the update queue and reinitialises the orderbook cache
186+
// assumes lock already active on cache
187+
func (m *UpdateManager) invalidateCache(ctx context.Context, firstUpdateID int64, update *orderbook.Update, cache *updateCache) error {
188+
err := m.ob.InvalidateOrderbook(update.Pair, update.Asset)
189+
m.initialiseOrderbookCache(ctx, firstUpdateID, update, cache)
190+
return err
191+
}
192+
193+
// syncOrderbook fetches and synchronises an orderbook snapshot so that pending updates can be applied to the orderbook.
194+
func (m *UpdateManager) syncOrderbook(ctx context.Context, cache *updateCache, pair currency.Pair, a asset.Item) error {
195+
// REST requests can be behind websocket updates by a large margin, so we wait here to allow the cache to fill with
196+
// updates before we fetch the orderbook snapshot.
197+
select {
198+
case <-ctx.Done():
199+
cache.clearPreserveStateWithLock()
200+
return ctx.Err()
201+
case <-time.After(m.delay):
202+
}
203+
204+
// Setting deadline to error out instead of waiting for rate limiter delay which excessively builds a backlog of
205+
// pending updates.
206+
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(m.deadline))
207+
defer cancel()
208+
209+
book, err := m.fetchOrderbook(ctx, pair, a)
210+
if err != nil {
211+
cache.clearWithLock()
212+
return err
213+
}
214+
215+
if err := cache.waitForUpdate(ctx, book.LastUpdateID+1); err != nil {
216+
cache.clearWithLock()
217+
return err
218+
}
219+
220+
if err := m.ob.LoadSnapshot(book); err != nil {
221+
cache.clearWithLock()
222+
return err
223+
}
224+
225+
cache.m.Lock() // Lock here to prevent ws handle data interference with REST request above.
226+
defer func() {
227+
cache.clearNoLock()
228+
cache.m.Unlock()
229+
}()
230+
231+
if err := m.applyPendingUpdates(cache); err != nil {
232+
cache.resetStateNoLock()
233+
return common.AppendError(err, m.ob.InvalidateOrderbook(pair, a))
234+
}
235+
236+
return nil
237+
}
238+
239+
// applyPendingUpdates applies all pending updates to the orderbook
240+
// assumes lock already active on cache
241+
func (m *UpdateManager) applyPendingUpdates(cache *updateCache) error {
242+
var updated bool
243+
for _, data := range cache.updates {
244+
bookLastUpdateID, err := m.ob.LastUpdateID(data.update.Pair, data.update.Asset)
245+
if err != nil {
246+
return err
247+
}
248+
249+
if !updated {
250+
skip, err := m.checkPendingUpdate(bookLastUpdateID, data.firstUpdateID, data.update)
251+
if err != nil {
252+
return err
253+
}
254+
if skip {
255+
continue
256+
}
257+
} else if isOutOfSequence(bookLastUpdateID, data.firstUpdateID) {
258+
return fmt.Errorf("apply pending updates %w: last update ID %d, first update ID %d", ErrOrderbookSnapshotOutdated, bookLastUpdateID, data.firstUpdateID)
259+
}
260+
261+
if err := m.ob.Update(data.update); err != nil {
262+
return err
263+
}
264+
265+
updated = true
266+
}
267+
268+
if !updated {
269+
return errPendingUpdatesNotApplied
270+
}
271+
cache.state = cacheStateSynced
272+
return nil
273+
}
274+
275+
// isOutOfSequence checks if the update is out of sequence
276+
func isOutOfSequence(lastUpdateID int64, firstUpdateID int64) bool {
277+
return lastUpdateID+1 != firstUpdateID
278+
}
279+
280+
// waitForUpdate waits for an update with an ID >= nextUpdateID
281+
func (c *updateCache) waitForUpdate(ctx context.Context, nextUpdateID int64) error {
282+
c.m.Lock()
283+
updateListLastUpdateID := c.updates[len(c.updates)-1].update.UpdateID
284+
c.m.Unlock()
285+
if updateListLastUpdateID >= nextUpdateID {
286+
return nil
287+
}
288+
289+
for {
290+
select {
291+
case <-ctx.Done():
292+
return ctx.Err()
293+
case recentPendingUpdateID := <-c.ch:
294+
if recentPendingUpdateID >= nextUpdateID {
295+
return nil
296+
}
297+
}
298+
}
299+
}
300+
301+
func (c *updateCache) clearWithLock() {
302+
c.m.Lock()
303+
defer c.m.Unlock()
304+
c.resetStateNoLock()
305+
c.clearNoLock()
306+
}
307+
308+
func (c *updateCache) clearPreserveStateWithLock() {
309+
c.m.Lock()
310+
defer c.m.Unlock()
311+
c.clearNoLock()
312+
}
313+
314+
func (c *updateCache) clearNoLock() {
315+
c.updates = nil
316+
}
317+
318+
func (c *updateCache) resetStateNoLock() {
319+
c.state = cacheStateInitialised
320+
}

0 commit comments

Comments
 (0)