Skip to content

Commit de5f62b

Browse files
authored
Merge pull request #123 from tfgm-bud/cleanup
Clean up to fix a thead joining issue
2 parents def9723 + 71572f5 commit de5f62b

File tree

2 files changed

+76
-57
lines changed

2 files changed

+76
-57
lines changed

gdax/order_book.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,39 @@ def on_open(self):
3636
def on_close(self):
3737
print("\n-- OrderBook Socket Closed! --")
3838

39+
def reset_book(self):
40+
self._asks = RBTree()
41+
self._bids = RBTree()
42+
res = self._client.get_product_order_book(product_id=self.product_id, level=3)
43+
for bid in res['bids']:
44+
self.add({
45+
'id': bid[2],
46+
'side': 'buy',
47+
'price': Decimal(bid[0]),
48+
'size': Decimal(bid[1])
49+
})
50+
for ask in res['asks']:
51+
self.add({
52+
'id': ask[2],
53+
'side': 'sell',
54+
'price': Decimal(ask[0]),
55+
'size': Decimal(ask[1])
56+
})
57+
self._sequence = res['sequence']
58+
3959
def on_message(self, message):
4060
if self._log_to:
4161
pickle.dump(message, self._log_to)
4262

4363
sequence = message['sequence']
4464
if self._sequence == -1:
45-
self._asks = RBTree()
46-
self._bids = RBTree()
47-
res = self._client.get_product_order_book(product_id=self.product_id, level=3)
48-
for bid in res['bids']:
49-
self.add({
50-
'id': bid[2],
51-
'side': 'buy',
52-
'price': Decimal(bid[0]),
53-
'size': Decimal(bid[1])
54-
})
55-
for ask in res['asks']:
56-
self.add({
57-
'id': ask[2],
58-
'side': 'sell',
59-
'price': Decimal(ask[0]),
60-
'size': Decimal(ask[1])
61-
})
62-
self._sequence = res['sequence']
63-
65+
self.reset_book()
66+
return
6467
if sequence <= self._sequence:
6568
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
6669
return
6770
elif sequence > self._sequence + 1:
68-
print('Error: messages missing ({} - {}). Re-initializing websocket.'.format(sequence, self._sequence))
69-
self.close()
70-
self.start()
71+
self.on_sequence_gap(self._sequence, sequence)
7172
return
7273

7374
msg_type = message['type']
@@ -83,18 +84,11 @@ def on_message(self, message):
8384

8485
self._sequence = sequence
8586

86-
# bid = self.get_bid()
87-
# bids = self.get_bids(bid)
88-
# bid_depth = sum([b['size'] for b in bids])
89-
# ask = self.get_ask()
90-
# asks = self.get_asks(ask)
91-
# ask_depth = sum([a['size'] for a in asks])
92-
# print('bid: %f @ %f - ask: %f @ %f' % (bid_depth, bid, ask_depth, ask))
87+
def on_sequence_gap(self, gap_start, gap_end):
88+
self.reset_book()
89+
print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format(
90+
gap_start, gap_end, self._sequence))
9391

94-
def on_error(self, e):
95-
self._sequence = -1
96-
self.close()
97-
self.start()
9892

9993
def add(self, order):
10094
order = {
@@ -250,6 +244,7 @@ def set_bids(self, price, bids):
250244

251245

252246
if __name__ == '__main__':
247+
import sys
253248
import time
254249
import datetime as dt
255250

@@ -286,10 +281,18 @@ def on_message(self, message):
286281
self._ask = ask
287282
self._bid_depth = bid_depth
288283
self._ask_depth = ask_depth
289-
print('{}\tbid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(dt.datetime.now(), bid_depth, bid,
290-
ask_depth, ask))
284+
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
285+
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))
291286

292287
order_book = OrderBookConsole()
293288
order_book.start()
294-
time.sleep(10)
295-
order_book.close()
289+
try:
290+
while True:
291+
time.sleep(10)
292+
except KeyboardInterrupt:
293+
order_book.close()
294+
295+
if order_book.error:
296+
sys.exit(1)
297+
else:
298+
sys.exit(0)

gdax/websocket_client.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def __init__(self, url="wss://ws-feed.gdax.com", products=None,
2323
self.products = products
2424
self.type = message_type
2525
self.stop = False
26+
self.error = None
2627
self.ws = None
2728
self.thread = None
2829
self.auth = auth
@@ -36,6 +37,7 @@ def start(self):
3637
def _go():
3738
self._connect()
3839
self._listen()
40+
self._disconnect()
3941

4042
self.stop = False
4143
self.on_open()
@@ -80,22 +82,29 @@ def _listen(self):
8082
if int(time.time() % 30) == 0:
8183
# Set a 30 second ping to keep connection alive
8284
self.ws.ping("keepalive")
83-
msg = json.loads(self.ws.recv())
85+
data = self.ws.recv()
86+
msg = json.loads(data)
87+
except ValueError as e:
88+
self.on_error(e)
8489
except Exception as e:
8590
self.on_error(e)
8691
else:
8792
self.on_message(msg)
8893

94+
def _disconnect(self):
95+
if self.type == "heartbeat":
96+
self.ws.send(json.dumps({"type": "heartbeat", "on": False}))
97+
try:
98+
if self.ws:
99+
self.ws.close()
100+
except WebSocketConnectionClosedException as e:
101+
pass
102+
103+
self.on_close()
104+
89105
def close(self):
90-
if not self.stop:
91-
self.on_close()
92-
self.stop = True
93-
self.thread.join()
94-
try:
95-
if self.ws:
96-
self.ws.close()
97-
except WebSocketConnectionClosedException as e:
98-
pass
106+
self.stop = True
107+
self.thread.join()
99108

100109
def on_open(self):
101110
if self.should_print:
@@ -111,10 +120,13 @@ def on_message(self, msg):
111120
if self.mongo_collection: # dump JSON to given mongo collection
112121
self.mongo_collection.insert_one(msg)
113122

114-
def on_error(self, e):
115-
print(e)
123+
def on_error(self, e, data=None):
124+
self.error = e
125+
self.stop
126+
print('{} - data: {}'.format(e, data))
116127

117128
if __name__ == "__main__":
129+
import sys
118130
import gdax
119131
import time
120132

@@ -126,8 +138,7 @@ def on_open(self):
126138
print("Let's count the messages!")
127139

128140
def on_message(self, msg):
129-
if 'price' in msg and 'type' in msg:
130-
print("Message type:", msg["type"], "\t@ %.3f" % float(msg["price"]))
141+
print(json.dumps(msg, indent=4, sort_keys=True))
131142
self.message_count += 1
132143

133144
def on_close(self):
@@ -136,9 +147,14 @@ def on_close(self):
136147
wsClient = MyWebsocketClient()
137148
wsClient.start()
138149
print(wsClient.url, wsClient.products)
139-
# Do some logic with the data
140-
while wsClient.message_count < 10000:
141-
print("\nMessageCount =", "%i \n" % wsClient.message_count)
142-
time.sleep(1)
143-
144-
wsClient.close()
150+
try:
151+
while True:
152+
print("\nMessageCount =", "%i \n" % wsClient.message_count)
153+
time.sleep(1)
154+
except KeyboardInterrupt:
155+
wsClient.close()
156+
157+
if wsClient.error:
158+
sys.exit(1)
159+
else:
160+
sys.exit(0)

0 commit comments

Comments
 (0)