diff --git a/pkg/quotes/binance.go b/pkg/quotes/binance.go index c0dc13b4..92a02086 100644 --- a/pkg/quotes/binance.go +++ b/pkg/quotes/binance.go @@ -19,38 +19,46 @@ var ( cexConfigured = atomic.Bool{} ) +type binanceStream struct { + aggTradeStop chan struct{} + midPriceStop context.CancelFunc +} + type binance struct { - once *once - usdcToUSDT bool - assetsUpdatePeriod time.Duration - idlePeriod time.Duration - binanceClient *gobinance.Client - filter Filter - history HistoricalData - batcherInbox chan<- TradeEvent - outbox chan<- TradeEvent - streams safe.Map[Market, chan struct{}] - symbolToMarket safe.Map[string, Market] - assets safe.Map[Market, gobinance.Symbol] + once *once + usdcToUSDT bool + assetsUpdatePeriod time.Duration + idlePeriod time.Duration + binanceClient *gobinance.Client + filter Filter + history HistoricalData + tradesBatcherInbox chan<- TradeEvent + midPriceBatcherInbox chan<- TradeEvent + outbox chan<- TradeEvent + streams safe.Map[Market, binanceStream] + symbolToMarket safe.Map[string, Market] + assets safe.Map[Market, gobinance.Symbol] } func newBinance(config BinanceConfig, outbox chan<- TradeEvent, history HistoricalData) (Driver, error) { - batcherInbox := make(chan TradeEvent, 1024) - go batch(config.BatchPeriod, batcherInbox, outbox) + tradesBatcherInbox := make(chan TradeEvent, 1024) + midPriceBatcherInbox := make(chan TradeEvent, 1024) + go batch(config.BatchPeriod, tradesBatcherInbox, midPriceBatcherInbox, outbox) driver := &binance{ - once: newOnce(), - usdcToUSDT: config.USDCtoUSDT, - assetsUpdatePeriod: config.AssetsUpdatePeriod, - idlePeriod: config.IdlePeriod, - binanceClient: gobinance.NewClient("", ""), - filter: NewFilter(config.Filter), - history: history, - batcherInbox: batcherInbox, - outbox: outbox, - streams: safe.NewMap[Market, chan struct{}](), - symbolToMarket: safe.NewMap[string, Market](), - assets: safe.NewMap[Market, gobinance.Symbol](), + once: newOnce(), + usdcToUSDT: config.USDCtoUSDT, + assetsUpdatePeriod: config.AssetsUpdatePeriod, + idlePeriod: config.IdlePeriod, + binanceClient: gobinance.NewClient("", ""), + filter: NewFilter(config.Filter), + history: history, + tradesBatcherInbox: tradesBatcherInbox, + midPriceBatcherInbox: midPriceBatcherInbox, + outbox: outbox, + streams: safe.NewMap[Market, binanceStream](), + symbolToMarket: safe.NewMap[string, Market](), + assets: safe.NewMap[Market, gobinance.Symbol](), } driver.updateAssets() @@ -86,12 +94,12 @@ func (b *binance) Start() error { func (b *binance) Stop() error { stopped := b.once.Stop(func() { - b.streams.Range(func(market Market, _ chan struct{}) bool { + b.streams.Range(func(market Market, _ binanceStream) bool { err := b.Unsubscribe(market) return err == nil }) - b.streams = safe.NewMap[Market, chan struct{}]() + b.streams = safe.NewMap[Market, binanceStream]() cexConfigured.CompareAndSwap(true, false) }) @@ -128,20 +136,47 @@ func (b *binance) Subscribe(market Market) error { return fmt.Errorf("market does not exist: %s", market) } - idle := time.NewTimer(b.idlePeriod) - doneCh, stopCh, err := gobinance.WsAggTradeServe(symbol, b.handleTrade(market, idle), b.handleErr(market)) + // Subscribe to trades + idleTrades := time.NewTimer(b.idlePeriod) + aggTradeDoneCh, aggTradeStopCh, err := gobinance.WsAggTradeServe(symbol, b.handleTrade(market, idleTrades), b.handleErr(market)) + if err != nil { + return fmt.Errorf("%s: %w: %w", market, ErrFailedSub, err) + } + + // Subscribe to orderbook updates + idleOrderbook := time.NewTimer(b.idlePeriod) + ctx, midPriceStop := context.WithCancel(context.Background()) + orderbookUpdatesCh := make(chan BinanceOrderBookOutboxEvent, 128) + _, err = NewBinanceOrderBook(ctx, market, 1, orderbookUpdatesCh) if err != nil { + midPriceStop() return fmt.Errorf("%s: %w: %w", market, ErrFailedSub, err) } - b.streams.Store(market, stopCh) + go func() { + h := b.handleOrderbookUpdates(market, idleOrderbook) + for update := range orderbookUpdatesCh { + h(update) + } + }() + + // Store handles to stop the streams + b.streams.Store(market, binanceStream{ + aggTradeStop: aggTradeStopCh, + midPriceStop: midPriceStop, + }) go func() { - defer idle.Stop() + defer idleTrades.Stop() + defer idleOrderbook.Stop() select { - case <-doneCh: + case <-aggTradeDoneCh: + midPriceStop() // stop the sibling stream + loggerBinance.Warnw("market stopped", "market", market) + case <-ctx.Done(): + aggTradeStopCh <- struct{}{} // stop the sibling stream loggerBinance.Warnw("market stopped", "market", market) - case <-idle.C: + case <-idleTrades.C: loggerBinance.Warnw("market inactivity detected", "market", market) } @@ -175,13 +210,14 @@ func (b *binance) Unsubscribe(market Market) error { return ErrNotStarted } - stopCh, ok := b.streams.Load(market) + stream, ok := b.streams.Load(market) if !ok { return fmt.Errorf("%s: %w", market, ErrNotSubbed) } - stopCh <- struct{}{} - close(stopCh) + stream.aggTradeStop <- struct{}{} + close(stream.aggTradeStop) + stream.midPriceStop() b.streams.Delete(market) recordUnsubscribed(DriverBinance, market) @@ -229,7 +265,35 @@ func (b *binance) handleTrade( if !b.filter.Allow(tradeEvent) { return } - b.batcherInbox <- tradeEvent + b.tradesBatcherInbox <- tradeEvent + } +} + +func (b *binance) handleOrderbookUpdates( + market Market, + idle *time.Timer, +) func(event BinanceOrderBookOutboxEvent) { + return func(event BinanceOrderBookOutboxEvent) { + idle.Reset(b.idlePeriod) + + // Compute mid price + bidPrice := event.Bids[0].Price + askPrice := event.Asks[0].Price + midPrice := bidPrice.Add(askPrice).Div(two) + + bidAmount := event.Bids[0].Amount + askAmount := event.Asks[0].Amount + midAmount := bidAmount.Add(askAmount).Div(two) + + b.outbox <- TradeEvent{ + Source: DriverBinance, + Market: market, + Price: midPrice, + Amount: midAmount, + Total: midPrice.Mul(midAmount), + TakerType: TakerTypeUnknown, + CreatedAt: time.Now(), + } } } @@ -352,16 +416,20 @@ func (b *binance) HistoricalData(ctx context.Context, market Market, window time return trades, nil } -func batch(batchPeriod time.Duration, inbox <-chan TradeEvent, outbox chan<- TradeEvent) { +func batch(batchPeriod time.Duration, tradesInbox, midPriceIndox <-chan TradeEvent, outbox chan<- TradeEvent) { marketTrades := make(map[Market][]TradeEvent) + midprices := make(map[Market][]TradeEvent) timer := time.NewTimer(batchPeriod) defer timer.Stop() for { select { - case trade := <-inbox: + case trade := <-tradesInbox: marketTrades[trade.Market] = append(marketTrades[trade.Market], trade) + case trade := <-midPriceIndox: + midprices[trade.Market] = append(midprices[trade.Market], trade) case <-timer.C: + // TODO: combine midprices for market, trades := range marketTrades { if event := combineTrades(trades); event != nil { marketTrades[market] = nil diff --git a/pkg/quotes/binance_orderbok.go b/pkg/quotes/binance_orderbok.go new file mode 100644 index 00000000..b142dd00 --- /dev/null +++ b/pkg/quotes/binance_orderbok.go @@ -0,0 +1,379 @@ +package quotes + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "sort" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/shopspring/decimal" +) + +// gapThreshold is the maximum gap size (in update IDs) before we refetch the snapshot. +const gapThreshold = 25 + +// ErrInvalidSnapshot is returned if the snapshot from REST is empty. +var ErrInvalidSnapshot = errors.New("invalid snapshot received") + +// BinanceDepthSnapshot is the REST snapshot. +type BinanceDepthSnapshot struct { + LastUpdateID int64 `json:"lastUpdateId"` + Bids [][]string `json:"bids"` + Asks [][]string `json:"asks"` +} + +// BinanceDepthEvent is the WebSocket update event. +type BinanceDepthEvent struct { + FirstUpdateID int64 `json:"U"` + LastUpdateID int64 `json:"u"` + Bids [][]string `json:"b"` + Asks [][]string `json:"a"` +} + +// BinanceOrderBookLevel represents a single level in the order book. +type BinanceOrderBookLevel struct { + Price decimal.Decimal + Amount decimal.Decimal +} + +// BinanceOrderBookOutboxEvent is sent on each update. +type BinanceOrderBookOutboxEvent struct { + Bids []BinanceOrderBookLevel + Asks []BinanceOrderBookLevel +} + +// BinanceOrderBook holds the current order book state. +type BinanceOrderBook struct { + mu sync.Mutex + LastUpdateID int64 + // Both Bids and Asks are maintained as maps using a normalized price string as key. + // The value is a normalized string representation of the quantity. + Bids map[string]string + Asks map[string]string + + Outbox chan<- BinanceOrderBookOutboxEvent + TopLevels int +} + +// NewBinanceOrderBook creates a new order book. The Outbox channel receives the top-level updates. +// The provided context cancels the routine. +func NewBinanceOrderBook(ctx context.Context, market Market, topLevels int, outbox chan<- BinanceOrderBookOutboxEvent) (*BinanceOrderBook, error) { + if topLevels <= 0 { + return nil, errors.New("topLevels must be greater than 0") + } + if outbox == nil { + return nil, errors.New("outbox channel is required") + } + + ob := &BinanceOrderBook{ + Bids: make(map[string]string), + Asks: make(map[string]string), + Outbox: outbox, + TopLevels: topLevels, + } + + // Run the order book update loop. + go ob.run(ctx, market) + return ob, nil +} + +// run connects to the WebSocket feed and processes updates in a loop. +func (ob *BinanceOrderBook) run(ctx context.Context, market Market) { + for { + if ctx.Err() != nil { + return + } + err := ob.connectWebSocket(ctx, market) + if err != nil { + fmt.Printf("WebSocket error: %v\n", err) + } + // Pause before attempting to reconnect. + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } +} + +// connectWebSocket fetches a snapshot and then processes WebSocket updates. +func (ob *BinanceOrderBook) connectWebSocket(ctx context.Context, market Market) error { + // Build the WebSocket URL. + symbol := strings.ReplaceAll(strings.ToLower(market.String()), "/", "") + wsURL := fmt.Sprintf("wss://stream.binance.com:9443/ws/%s@depth@100ms", symbol) + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + return fmt.Errorf("failed to dial WebSocket: %w", err) + } + defer conn.Close() + + // Simple ping/pong to keep the connection alive. + conn.SetPongHandler(func(appData string) error { + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + return + } + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } + }() + + // Fetch the initial snapshot. + snapshot, err := ob.fetchSnapshot(ctx, market) + if err != nil { + return fmt.Errorf("failed to fetch snapshot: %w", err) + } + ob.applySnapshot(snapshot) + fmt.Printf("Snapshot applied: LastUpdateID=%d\n", snapshot.LastUpdateID) + nextUpdateID := snapshot.LastUpdateID + 1 + + // Process incoming updates. + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + _, message, err := conn.ReadMessage() + if err != nil { + return fmt.Errorf("read error: %w", err) + } + + var event BinanceDepthEvent + if err := json.Unmarshal(message, &event); err != nil { + fmt.Printf("Unmarshal error: %v\n", err) + continue + } + + // Skip events that are entirely outdated. + if event.LastUpdateID < nextUpdateID { + continue + } + + // According to Binance, we must have: + // event.FirstUpdateID <= nextUpdateID <= event.LastUpdateID + if event.FirstUpdateID <= nextUpdateID && nextUpdateID <= event.LastUpdateID { + ob.applyUpdate(event) + nextUpdateID = event.LastUpdateID + 1 + ob.sendUpdate() + } else { + // A gap or an unexpected range – refetch the snapshot. + fmt.Printf("Gap detected (expected %d, got [%d, %d]). Refetching snapshot...\n", + nextUpdateID, event.FirstUpdateID, event.LastUpdateID) + snapshot, err = ob.fetchSnapshot(ctx, market) + if err != nil { + return fmt.Errorf("failed to refetch snapshot: %w", err) + } + ob.applySnapshot(snapshot) + nextUpdateID = snapshot.LastUpdateID + 1 + fmt.Printf("Snapshot reapplied: LastUpdateID=%d\n", snapshot.LastUpdateID) + } + } +} + +// fetchSnapshot retrieves the current order book snapshot via REST. +func (ob *BinanceOrderBook) fetchSnapshot(ctx context.Context, market Market) (BinanceDepthSnapshot, error) { + symbol := strings.ReplaceAll(strings.ToUpper(market.String()), "/", "") + url := fmt.Sprintf("https://api.binance.com/api/v3/depth?symbol=%s&limit=1000", symbol) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return BinanceDepthSnapshot{}, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return BinanceDepthSnapshot{}, err + } + defer resp.Body.Close() + + var snapshot BinanceDepthSnapshot + if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil { + return BinanceDepthSnapshot{}, err + } + if snapshot.LastUpdateID == 0 { + return BinanceDepthSnapshot{}, ErrInvalidSnapshot + } + return snapshot, nil +} + +// normalizePrice takes a price string, parses it to a decimal, and returns its normalized string. +func normalizePrice(priceStr string) (string, error) { + d, err := decimal.NewFromString(priceStr) + if err != nil { + return "", err + } + return d.String(), nil +} + +// normalizeQuantity takes a quantity string, parses it to a decimal, and returns its normalized string. +func normalizeQuantity(qtyStr string) (string, error) { + d, err := decimal.NewFromString(qtyStr) + if err != nil { + return "", err + } + return d.String(), nil +} + +// applySnapshot completely replaces the order book with the snapshot data. +// Prices and quantities are normalized so that keys match across snapshot and updates. +func (ob *BinanceOrderBook) applySnapshot(snapshot BinanceDepthSnapshot) { + ob.mu.Lock() + defer ob.mu.Unlock() + + ob.LastUpdateID = snapshot.LastUpdateID + ob.Bids = make(map[string]string) + ob.Asks = make(map[string]string) + + for _, bid := range snapshot.Bids { + if len(bid) < 2 { + continue + } + normPrice, err := normalizePrice(bid[0]) + if err != nil { + continue + } + normQty, err := normalizeQuantity(bid[1]) + if err != nil { + continue + } + // Only store if the quantity is nonzero. + qtyDec, _ := decimal.NewFromString(normQty) + if qtyDec.IsZero() { + continue + } + ob.Bids[normPrice] = normQty + } + + for _, ask := range snapshot.Asks { + if len(ask) < 2 { + continue + } + normPrice, err := normalizePrice(ask[0]) + if err != nil { + continue + } + normQty, err := normalizeQuantity(ask[1]) + if err != nil { + continue + } + qtyDec, _ := decimal.NewFromString(normQty) + if qtyDec.IsZero() { + continue + } + ob.Asks[normPrice] = normQty + } +} + +// applyUpdate applies an incremental update event to the order book. +// Prices and quantities are normalized so that update keys match those from the snapshot. +func (ob *BinanceOrderBook) applyUpdate(event BinanceDepthEvent) { + ob.mu.Lock() + defer ob.mu.Unlock() + + for _, bid := range event.Bids { + if len(bid) < 2 { + continue + } + normPrice, err := normalizePrice(bid[0]) + if err != nil { + continue + } + normQty, err := normalizeQuantity(bid[1]) + if err != nil { + continue + } + qtyDec, _ := decimal.NewFromString(normQty) + if qtyDec.IsZero() { + delete(ob.Bids, normPrice) + } else { + ob.Bids[normPrice] = normQty + } + } + + for _, ask := range event.Asks { + if len(ask) < 2 { + continue + } + normPrice, err := normalizePrice(ask[0]) + if err != nil { + continue + } + normQty, err := normalizeQuantity(ask[1]) + if err != nil { + continue + } + qtyDec, _ := decimal.NewFromString(normQty) + if qtyDec.IsZero() { + delete(ob.Asks, normPrice) + } else { + ob.Asks[normPrice] = normQty + } + } + + ob.LastUpdateID = event.LastUpdateID +} + +// sendUpdate builds the top-level order book levels and pushes an update on Outbox. +func (ob *BinanceOrderBook) sendUpdate() { + bids := ob.getSortedLevels(ob.Bids, true) + asks := ob.getSortedLevels(ob.Asks, false) + update := BinanceOrderBookOutboxEvent{ + Bids: bids, + Asks: asks, + } + // Nonblocking send. + select { + case ob.Outbox <- update: + default: + } +} + +// getSortedLevels returns the top levels from the given side (bids or asks). +// Bids are sorted descending, asks ascending. +func (ob *BinanceOrderBook) getSortedLevels(side map[string]string, isBid bool) []BinanceOrderBookLevel { + ob.mu.Lock() + defer ob.mu.Unlock() + + var levels []BinanceOrderBookLevel + for priceStr, qtyStr := range side { + price, err := decimal.NewFromString(priceStr) + if err != nil { + continue + } + qty, err := decimal.NewFromString(qtyStr) + if err != nil { + continue + } + levels = append(levels, BinanceOrderBookLevel{ + Price: price, + Amount: qty, + }) + } + + sort.Slice(levels, func(i, j int) bool { + if isBid { + return levels[i].Price.GreaterThan(levels[j].Price) + } + return levels[i].Price.LessThan(levels[j].Price) + }) + + if len(levels) > ob.TopLevels { + levels = levels[:ob.TopLevels] + } + return levels +} diff --git a/pkg/quotes/testing/orderbook/main.go b/pkg/quotes/testing/orderbook/main.go new file mode 100644 index 00000000..64b887ce --- /dev/null +++ b/pkg/quotes/testing/orderbook/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + + "github.com/layer-3/clearsync/pkg/quotes" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + market := quotes.NewMarket("btc", "usdt") + const topLevels = 1 // Configurable number of top levels to track + outbox := make(chan quotes.BinanceOrderBookOutboxEvent, 128) + + _, err := quotes.NewBinanceOrderBook(ctx, market, topLevels, outbox) + if err != nil { + log.Fatal(err) + } + + go func() { + for update := range outbox { + log.Println("Top levels updated:", update.Asks[0], update.Bids[0]) + } + }() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + cancel() +} diff --git a/pkg/quotes/testing/app.go b/pkg/quotes/testing/trades/main.go similarity index 100% rename from pkg/quotes/testing/app.go rename to pkg/quotes/testing/trades/main.go