Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 108 additions & 40 deletions pkg/quotes/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,46 @@ var (
cexConfigured = atomic.Bool{}
)

type binanceStream struct {
aggTradeStop chan struct{}
midPriceStop context.CancelFunc
}

type binance struct {
once *once
usdcToUSDT bool
assetsUpdatePeriod time.Duration
idlePeriod time.Duration
binanceClient *gobinance.Client
filter Filter
history HistoricalData
batcherInbox chan<- TradeEvent
outbox chan<- TradeEvent
streams safe.Map[Market, chan struct{}]
symbolToMarket safe.Map[string, Market]
assets safe.Map[Market, gobinance.Symbol]
once *once
usdcToUSDT bool
assetsUpdatePeriod time.Duration
idlePeriod time.Duration
binanceClient *gobinance.Client
filter Filter
history HistoricalData
tradesBatcherInbox chan<- TradeEvent
midPriceBatcherInbox chan<- TradeEvent
outbox chan<- TradeEvent
streams safe.Map[Market, binanceStream]
symbolToMarket safe.Map[string, Market]
assets safe.Map[Market, gobinance.Symbol]
}

func newBinance(config BinanceConfig, outbox chan<- TradeEvent, history HistoricalData) (Driver, error) {
batcherInbox := make(chan TradeEvent, 1024)
go batch(config.BatchPeriod, batcherInbox, outbox)
tradesBatcherInbox := make(chan TradeEvent, 1024)
midPriceBatcherInbox := make(chan TradeEvent, 1024)
go batch(config.BatchPeriod, tradesBatcherInbox, midPriceBatcherInbox, outbox)

driver := &binance{
once: newOnce(),
usdcToUSDT: config.USDCtoUSDT,
assetsUpdatePeriod: config.AssetsUpdatePeriod,
idlePeriod: config.IdlePeriod,
binanceClient: gobinance.NewClient("", ""),
filter: NewFilter(config.Filter),
history: history,
batcherInbox: batcherInbox,
outbox: outbox,
streams: safe.NewMap[Market, chan struct{}](),
symbolToMarket: safe.NewMap[string, Market](),
assets: safe.NewMap[Market, gobinance.Symbol](),
once: newOnce(),
usdcToUSDT: config.USDCtoUSDT,
assetsUpdatePeriod: config.AssetsUpdatePeriod,
idlePeriod: config.IdlePeriod,
binanceClient: gobinance.NewClient("", ""),
filter: NewFilter(config.Filter),
history: history,
tradesBatcherInbox: tradesBatcherInbox,
midPriceBatcherInbox: midPriceBatcherInbox,
outbox: outbox,
streams: safe.NewMap[Market, binanceStream](),
symbolToMarket: safe.NewMap[string, Market](),
assets: safe.NewMap[Market, gobinance.Symbol](),
}

driver.updateAssets()
Expand Down Expand Up @@ -86,12 +94,12 @@ func (b *binance) Start() error {

func (b *binance) Stop() error {
stopped := b.once.Stop(func() {
b.streams.Range(func(market Market, _ chan struct{}) bool {
b.streams.Range(func(market Market, _ binanceStream) bool {
err := b.Unsubscribe(market)
return err == nil
})

b.streams = safe.NewMap[Market, chan struct{}]()
b.streams = safe.NewMap[Market, binanceStream]()
cexConfigured.CompareAndSwap(true, false)
})

Expand Down Expand Up @@ -128,20 +136,47 @@ func (b *binance) Subscribe(market Market) error {
return fmt.Errorf("market does not exist: %s", market)
}

idle := time.NewTimer(b.idlePeriod)
doneCh, stopCh, err := gobinance.WsAggTradeServe(symbol, b.handleTrade(market, idle), b.handleErr(market))
// Subscribe to trades
idleTrades := time.NewTimer(b.idlePeriod)
aggTradeDoneCh, aggTradeStopCh, err := gobinance.WsAggTradeServe(symbol, b.handleTrade(market, idleTrades), b.handleErr(market))
if err != nil {
return fmt.Errorf("%s: %w: %w", market, ErrFailedSub, err)
}

// Subscribe to orderbook updates
idleOrderbook := time.NewTimer(b.idlePeriod)
ctx, midPriceStop := context.WithCancel(context.Background())
orderbookUpdatesCh := make(chan BinanceOrderBookOutboxEvent, 128)
_, err = NewBinanceOrderBook(ctx, market, 1, orderbookUpdatesCh)
if err != nil {
midPriceStop()
return fmt.Errorf("%s: %w: %w", market, ErrFailedSub, err)
}
b.streams.Store(market, stopCh)
go func() {
h := b.handleOrderbookUpdates(market, idleOrderbook)
for update := range orderbookUpdatesCh {
h(update)
}
}()

// Store handles to stop the streams
b.streams.Store(market, binanceStream{
aggTradeStop: aggTradeStopCh,
midPriceStop: midPriceStop,
})

go func() {
defer idle.Stop()
defer idleTrades.Stop()
defer idleOrderbook.Stop()

select {
case <-doneCh:
case <-aggTradeDoneCh:
midPriceStop() // stop the sibling stream
loggerBinance.Warnw("market stopped", "market", market)
case <-ctx.Done():
aggTradeStopCh <- struct{}{} // stop the sibling stream
loggerBinance.Warnw("market stopped", "market", market)
case <-idle.C:
case <-idleTrades.C:
loggerBinance.Warnw("market inactivity detected", "market", market)
}

Expand Down Expand Up @@ -175,13 +210,14 @@ func (b *binance) Unsubscribe(market Market) error {
return ErrNotStarted
}

stopCh, ok := b.streams.Load(market)
stream, ok := b.streams.Load(market)
if !ok {
return fmt.Errorf("%s: %w", market, ErrNotSubbed)
}

stopCh <- struct{}{}
close(stopCh)
stream.aggTradeStop <- struct{}{}
close(stream.aggTradeStop)
stream.midPriceStop()

b.streams.Delete(market)
recordUnsubscribed(DriverBinance, market)
Expand Down Expand Up @@ -229,7 +265,35 @@ func (b *binance) handleTrade(
if !b.filter.Allow(tradeEvent) {
return
}
b.batcherInbox <- tradeEvent
b.tradesBatcherInbox <- tradeEvent
}
}

func (b *binance) handleOrderbookUpdates(
market Market,
idle *time.Timer,
) func(event BinanceOrderBookOutboxEvent) {
return func(event BinanceOrderBookOutboxEvent) {
idle.Reset(b.idlePeriod)

// Compute mid price
bidPrice := event.Bids[0].Price
askPrice := event.Asks[0].Price
midPrice := bidPrice.Add(askPrice).Div(two)

bidAmount := event.Bids[0].Amount
askAmount := event.Asks[0].Amount
midAmount := bidAmount.Add(askAmount).Div(two)

b.outbox <- TradeEvent{
Source: DriverBinance,
Market: market,
Price: midPrice,
Amount: midAmount,
Total: midPrice.Mul(midAmount),
TakerType: TakerTypeUnknown,
CreatedAt: time.Now(),
}
}
}

Expand Down Expand Up @@ -352,16 +416,20 @@ func (b *binance) HistoricalData(ctx context.Context, market Market, window time
return trades, nil
}

func batch(batchPeriod time.Duration, inbox <-chan TradeEvent, outbox chan<- TradeEvent) {
func batch(batchPeriod time.Duration, tradesInbox, midPriceIndox <-chan TradeEvent, outbox chan<- TradeEvent) {
marketTrades := make(map[Market][]TradeEvent)
midprices := make(map[Market][]TradeEvent)
timer := time.NewTimer(batchPeriod)
defer timer.Stop()

for {
select {
case trade := <-inbox:
case trade := <-tradesInbox:
marketTrades[trade.Market] = append(marketTrades[trade.Market], trade)
case trade := <-midPriceIndox:
midprices[trade.Market] = append(midprices[trade.Market], trade)
case <-timer.C:
// TODO: combine midprices
for market, trades := range marketTrades {
if event := combineTrades(trades); event != nil {
marketTrades[market] = nil
Expand Down
Loading