Skip to content

Commit d07d86c

Browse files
committed
implement bitfinex stream
1 parent ecddf04 commit d07d86c

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

pkg/exchange/bitfinex/stream.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package bitfinex
2+
3+
import (
4+
"context"
5+
"os"
6+
7+
"github.com/sirupsen/logrus"
8+
9+
"github.com/c9s/bbgo/pkg/depth"
10+
bfxapi "github.com/c9s/bbgo/pkg/exchange/bitfinex/bfxapi"
11+
"github.com/c9s/bbgo/pkg/types"
12+
)
13+
14+
// Stream represents the Bitfinex websocket stream.
15+
//
16+
//go:generate callbackgen -type Stream
17+
type Stream struct {
18+
types.StandardStream
19+
20+
depthBuffers map[string]*depth.Buffer
21+
22+
tickerEventCallbacks []func(e *bfxapi.TickerEvent)
23+
bookEventCallbacks []func(e *bfxapi.BookEvent)
24+
candleEventCallbacks []func(e *bfxapi.CandleEvent)
25+
statusEventCallbacks []func(e *bfxapi.StatusEvent)
26+
marketTradeEventCallbacks []func(e *bfxapi.MarketTradeEvent)
27+
28+
parser *bfxapi.Parser
29+
}
30+
31+
// NewStream creates a new Bitfinex Stream.
32+
func NewStream() *Stream {
33+
stream := &Stream{
34+
StandardStream: types.NewStandardStream(),
35+
depthBuffers: make(map[string]*depth.Buffer),
36+
37+
parser: bfxapi.NewParser(),
38+
}
39+
stream.SetParser(stream.parser.Parse)
40+
stream.SetDispatcher(stream.dispatchEvent)
41+
stream.SetEndpointCreator(stream.getEndpoint)
42+
return stream
43+
}
44+
45+
// getEndpoint returns the websocket endpoint URL.
46+
func (s *Stream) getEndpoint(ctx context.Context) (string, error) {
47+
url := os.Getenv("BITFINEX_API_WS_URL")
48+
if url == "" {
49+
if s.PublicOnly {
50+
url = "wss://api-pub.bitfinex.com/ws/2"
51+
} else {
52+
url = "wss://api.bitfinex.com/ws/2"
53+
}
54+
}
55+
return url, nil
56+
}
57+
58+
// dispatchEvent dispatches parsed events to corresponding callbacks.
59+
func (s *Stream) dispatchEvent(e interface{}) {
60+
switch evt := e.(type) {
61+
default:
62+
logrus.Warnf("unhandled %T event: %+v", evt, evt)
63+
}
64+
}

pkg/types/stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,18 @@ func (s *StandardStream) GetPublicOnly() bool {
196196
return s.PublicOnly
197197
}
198198

199+
// SetEndpointCreator sets the endpoint creator function to create the websocket endpoint URL.
200+
// This is useful when the endpoint URL is dynamic or needs to be created based on some context.
199201
func (s *StandardStream) SetEndpointCreator(creator EndpointCreator) {
200202
s.endpointCreator = creator
201203
}
202204

205+
// SetDispatcher sets the dispatcher function to dispatch the parsed websocket message.
203206
func (s *StandardStream) SetDispatcher(dispatcher Dispatcher) {
204207
s.dispatcher = dispatcher
205208
}
206209

210+
// SetParser sets the parser function to parse the websocket message.
207211
func (s *StandardStream) SetParser(parser Parser) {
208212
s.parser = parser
209213
}

0 commit comments

Comments
 (0)