Skip to content

Commit 79fa546

Browse files
committed
2 parents 28e6299 + de5f62b commit 79fa546

File tree

6 files changed

+89
-64
lines changed

6 files changed

+89
-64
lines changed

README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,19 @@ wsClient.close()
240240
### WebsocketClient + Mongodb
241241
The ```WebsocketClient``` now supports data gathering via MongoDB. Given a
242242
MongoDB collection, the ```WebsocketClient``` will stream results directly into
243-
the database collection.
243+
the database collection.
244244
```python
245245
# import PyMongo and connect to a local, running Mongo instance
246246
from pymongo import MongoClient
247+
import gdax
247248
mongo_client = MongoClient('mongodb://localhost:27017/')
249+
248250
# specify the database and collection
249251
db = mongo_client.cryptocurrency_database
250252
BTC_collection = db.BTC_collection
253+
251254
# instantiate a WebsocketClient instance, with a Mongo collection as a parameter
252-
wsClient = WebsocketClient(url="wss://ws-feed.gdax.com", products="BTC-USD",
255+
wsClient = gdax.WebsocketClient(url="wss://ws-feed.gdax.com", products="BTC-USD",
253256
mongo_collection=BTC_collection, should_print=False)
254257
wsClient.start()
255258
```
@@ -261,9 +264,9 @@ can react to the data streaming in. The current client is a template used for
261264
illustration purposes only.
262265

263266

264-
- onOpen - called once, *immediately before* the socket connection is made, this
267+
- onOpen - called once, *immediately before* the socket connection is made, this
265268
is where you want to add initial parameters.
266-
- onMessage - called once for every message that arrives and accepts one
269+
- onMessage - called once for every message that arrives and accepts one
267270
argument that contains the message of dict type.
268271
- onClose - called once after the websocket has been closed.
269272
- close - call this method to close the websocket connection (do not overwrite).

contributors.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ Leonard Lin
33
Jeff Gibson
44
David Caseria
55
Paul Mestemaker
6+
Drew Rice

gdax/order_book.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,39 @@ def on_open(self):
3636
def on_close(self):
3737
print("\n-- OrderBook Socket Closed! --")
3838

39+
def reset_book(self):
40+
self._asks = RBTree()
41+
self._bids = RBTree()
42+
res = self._client.get_product_order_book(product_id=self.product_id, level=3)
43+
for bid in res['bids']:
44+
self.add({
45+
'id': bid[2],
46+
'side': 'buy',
47+
'price': Decimal(bid[0]),
48+
'size': Decimal(bid[1])
49+
})
50+
for ask in res['asks']:
51+
self.add({
52+
'id': ask[2],
53+
'side': 'sell',
54+
'price': Decimal(ask[0]),
55+
'size': Decimal(ask[1])
56+
})
57+
self._sequence = res['sequence']
58+
3959
def on_message(self, message):
4060
if self._log_to:
4161
pickle.dump(message, self._log_to)
4262

4363
sequence = message['sequence']
4464
if self._sequence == -1:
45-
self._asks = RBTree()
46-
self._bids = RBTree()
47-
res = self._client.get_product_order_book(product_id=self.product_id, level=3)
48-
for bid in res['bids']:
49-
self.add({
50-
'id': bid[2],
51-
'side': 'buy',
52-
'price': Decimal(bid[0]),
53-
'size': Decimal(bid[1])
54-
})
55-
for ask in res['asks']:
56-
self.add({
57-
'id': ask[2],
58-
'side': 'sell',
59-
'price': Decimal(ask[0]),
60-
'size': Decimal(ask[1])
61-
})
62-
self._sequence = res['sequence']
63-
65+
self.reset_book()
66+
return
6467
if sequence <= self._sequence:
6568
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
6669
return
6770
elif sequence > self._sequence + 1:
68-
print('Error: messages missing ({} - {}). Re-initializing websocket.'.format(sequence, self._sequence))
69-
self.close()
70-
self.start()
71+
self.on_sequence_gap(self._sequence, sequence)
7172
return
7273

7374
msg_type = message['type']
@@ -83,18 +84,11 @@ def on_message(self, message):
8384

8485
self._sequence = sequence
8586

86-
# bid = self.get_bid()
87-
# bids = self.get_bids(bid)
88-
# bid_depth = sum([b['size'] for b in bids])
89-
# ask = self.get_ask()
90-
# asks = self.get_asks(ask)
91-
# ask_depth = sum([a['size'] for a in asks])
92-
# print('bid: %f @ %f - ask: %f @ %f' % (bid_depth, bid, ask_depth, ask))
87+
def on_sequence_gap(self, gap_start, gap_end):
88+
self.reset_book()
89+
print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format(
90+
gap_start, gap_end, self._sequence))
9391

94-
def on_error(self, e):
95-
self._sequence = -1
96-
self.close()
97-
self.start()
9892

9993
def add(self, order):
10094
order = {
@@ -250,6 +244,7 @@ def set_bids(self, price, bids):
250244

251245

252246
if __name__ == '__main__':
247+
import sys
253248
import time
254249
import datetime as dt
255250

@@ -286,10 +281,18 @@ def on_message(self, message):
286281
self._ask = ask
287282
self._bid_depth = bid_depth
288283
self._ask_depth = ask_depth
289-
print('{}\tbid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(dt.datetime.now(), bid_depth, bid,
290-
ask_depth, ask))
284+
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
285+
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))
291286

292287
order_book = OrderBookConsole()
293288
order_book.start()
294-
time.sleep(10)
295-
order_book.close()
289+
try:
290+
while True:
291+
time.sleep(10)
292+
except KeyboardInterrupt:
293+
order_book.close()
294+
295+
if order_book.error:
296+
sys.exit(1)
297+
else:
298+
sys.exit(0)

gdax/websocket_client.py

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
from pymongo import MongoClient
1717

1818
class WebsocketClient(object):
19-
def __init__(self, url="wss://ws-feed.gdax.com", products=None, message_type="subscribe",
20-
mongo_collection=None, should_print=True, auth=False, api_key="", api_secret="", api_passphrase="", channels=None):
19+
def __init__(self, url="wss://ws-feed.gdax.com", products=None, message_type="subscribe", mongo_collection=None,
20+
should_print=True, auth=False, api_key="", api_secret="", api_passphrase="", channels=None):
21+
2122
self.url = url
2223
self.products = products
2324
self.channels = channels
2425
self.type = message_type
2526
self.stop = False
27+
self.error = None
2628
self.ws = None
2729
self.thread = None
2830
self.auth = auth
@@ -36,6 +38,7 @@ def start(self):
3638
def _go():
3739
self._connect()
3840
self._listen()
41+
self._disconnect()
3942

4043
self.stop = False
4144
self.on_open()
@@ -85,22 +88,29 @@ def _listen(self):
8588
if int(time.time() % 30) == 0:
8689
# Set a 30 second ping to keep connection alive
8790
self.ws.ping("keepalive")
88-
msg = json.loads(self.ws.recv())
91+
data = self.ws.recv()
92+
msg = json.loads(data)
93+
except ValueError as e:
94+
self.on_error(e)
8995
except Exception as e:
9096
self.on_error(e)
9197
else:
9298
self.on_message(msg)
9399

100+
def _disconnect(self):
101+
if self.type == "heartbeat":
102+
self.ws.send(json.dumps({"type": "heartbeat", "on": False}))
103+
try:
104+
if self.ws:
105+
self.ws.close()
106+
except WebSocketConnectionClosedException as e:
107+
pass
108+
109+
self.on_close()
110+
94111
def close(self):
95-
if not self.stop:
96-
self.on_close()
97-
self.stop = True
98-
self.thread.join()
99-
try:
100-
if self.ws:
101-
self.ws.close()
102-
except WebSocketConnectionClosedException as e:
103-
pass
112+
self.stop = True
113+
self.thread.join()
104114

105115
def on_open(self):
106116
if self.should_print:
@@ -116,10 +126,13 @@ def on_message(self, msg):
116126
if self.mongo_collection: # dump JSON to given mongo collection
117127
self.mongo_collection.insert_one(msg)
118128

119-
def on_error(self, e):
120-
print(e)
129+
def on_error(self, e, data=None):
130+
self.error = e
131+
self.stop
132+
print('{} - data: {}'.format(e, data))
121133

122134
if __name__ == "__main__":
135+
import sys
123136
import gdax
124137
import time
125138

@@ -131,8 +144,7 @@ def on_open(self):
131144
print("Let's count the messages!")
132145

133146
def on_message(self, msg):
134-
if 'price' in msg and 'type' in msg:
135-
print("Message type:", msg["type"], "\t@ %.3f" % float(msg["price"]))
147+
print(json.dumps(msg, indent=4, sort_keys=True))
136148
self.message_count += 1
137149

138150
def on_close(self):
@@ -141,9 +153,14 @@ def on_close(self):
141153
wsClient = MyWebsocketClient()
142154
wsClient.start()
143155
print(wsClient.url, wsClient.products)
144-
# Do some logic with the data
145-
while wsClient.message_count < 10000:
146-
print("\nMessageCount =", "%i \n" % wsClient.message_count)
147-
time.sleep(1)
148-
149-
wsClient.close()
156+
try:
157+
while True:
158+
print("\nMessageCount =", "%i \n" % wsClient.message_count)
159+
time.sleep(1)
160+
except KeyboardInterrupt:
161+
wsClient.close()
162+
163+
if wsClient.error:
164+
sys.exit(1)
165+
else:
166+
sys.exit(0)

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ bintrees==2.0.7
22
requests==2.13.0
33
six==1.10.0
44
websocket-client==0.40.0
5-
pymongo
5+
pymongo==3.5.1

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
'requests==2.13.0',
88
'six==1.10.0',
99
'websocket-client==0.40.0',
10+
'pymongo==3.5.1'
1011
]
1112

1213
tests_require = [

0 commit comments

Comments
 (0)