Skip to content

Commit ee4b14e

Browse files
committed
add ask/bid depth quote subscribe; refactoring PushClient
1 parent e5e5cb1 commit ee4b14e

File tree

4 files changed

+126
-81
lines changed

4 files changed

+126
-81
lines changed

tigeropen/common/consts/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ class BarPeriod(Enum):
105105
SIX_HOURS = '6hour' # 6小时
106106

107107

108+
@unique
109+
class ExchangeQuote(Enum):
110+
ARCA = 'ARCA'
111+
TOTAL_VIEW = 'TOTAL_VIEW'
112+
113+
108114
class OrderStatus(Enum):
109115
PENDING_NEW = 'PendingNew'
110116
NEW = 'Initial' # 订单初始状态
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# -*- coding: utf-8 -*-
2+
3+
QUOTE = 'quote'
4+
QUOTE_DEPTH = 'askbid'
5+
QUOTE_FUTURE = 'future'
6+
QUOTE_OPTION = 'option'
7+
8+
TRADE_ASSET = 'trade/asset'
9+
TRADE_POSITION = 'trade/position'
10+
TRADE_ORDER = 'trade/order'

tigeropen/examples/push_client_demo.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
import time
88
# from tigeropen.common.consts import QuoteKeyType
9+
from tigeropen.common.consts import ExchangeQuote
910
from tigeropen.push.push_client import PushClient
1011
from tigeropen.examples.client_config import get_client_config
1112

@@ -158,6 +159,10 @@ def unsubscribe_callback(destination, content):
158159
push_client.subscribe_quote(['AAPL', 'GOOG'])
159160
# 可以指定关注的行情key的类型, QuoteKeyType.TRADE 为成交数据, QuoteKeyType.QUOTE 为盘口数据
160161
# push_client.subscribe_quote(['MSFT', 'AMD'], quote_key_type=QuoteKeyType.TRADE)
162+
163+
# 订阅深度行情
164+
push_client.subscribe_depth_quote(['AMD', 'BABA'], exchange_quote=ExchangeQuote.TOTAL_VIEW)
165+
161166
# 订阅资产变动
162167
push_client.subscribe_asset()
163168
# 订阅订单变动

tigeropen/push/push_client.py

Lines changed: 105 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,20 @@
66
"""
77
import sys
88
import json
9+
from collections import defaultdict
10+
911
import stomp
1012
import six
1113
import logging
1214
from stomp.exception import ConnectFailedException
15+
16+
from tigeropen.common.consts.push_destinations import QUOTE, QUOTE_DEPTH, QUOTE_FUTURE, QUOTE_OPTION, TRADE_ASSET, \
17+
TRADE_ORDER, TRADE_POSITION
1318
from tigeropen.common.util.signature_utils import sign_with_rsa
1419
from tigeropen.common.util.order_utils import get_order_status
1520
from tigeropen.common.consts.push_types import RequestType, ResponseType
1621
from tigeropen.common.consts.quote_keys import QuoteChangeKey, QuoteKeyType
17-
from tigeropen.common.consts import OrderStatus
22+
from tigeropen.common.consts import OrderStatus, ExchangeQuote
1823

1924
HOUR_TRADING_QUOTE_KEYS_MAPPINGS = {'hourTradingLatestPrice': 'latest_price', 'hourTradingPreClose': 'pre_close',
2025
'hourTradingLatestTime': 'latest_time', 'hourTradingVolume': 'volume',
@@ -73,10 +78,7 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
7378
self._private_key = None
7479
self._sign = None
7580
self._stomp_connection = None
76-
self._quote_counter = 0
77-
self._asset_counter = 0
78-
self._position_counter = 0
79-
self._order_counter = 0
81+
self._destination_counter_map = defaultdict(lambda: 0)
8082

8183
self.subscribed_symbols = None
8284
self.quote_changed = None
@@ -243,98 +245,53 @@ def on_error(self, headers, body):
243245
if self.error_callback:
244246
self.error_callback(body)
245247

246-
@staticmethod
247-
def _get_subscribe_id(counter):
248-
return 'sub-' + str(counter)
248+
def _update_subscribe_id(self, destination):
249+
self._destination_counter_map[destination] += 1
250+
251+
def _get_subscribe_id(self, destination):
252+
return 'sub-' + str(self._destination_counter_map[destination])
249253

250254
def subscribe_asset(self, account=None):
251255
"""
252256
订阅账户资产更新
253257
:return:
254258
"""
255-
self._asset_counter += 1
256-
sub_id = self._get_subscribe_id(self._asset_counter)
257-
headers = dict()
258-
headers['destination'] = 'trade/asset'
259-
headers['subscription'] = 'Asset'
260-
headers['id'] = sub_id
261-
if account:
262-
headers['account'] = account
263-
264-
self._stomp_connection.subscribe('trade/asset', id=sub_id, headers=headers)
265-
return sub_id
259+
return self._handle_trade_subscribe(TRADE_ASSET, 'Asset', account)
266260

267261
def unsubscribe_asset(self, id=None):
268262
"""
269263
退订账户资产更新
270264
:return:
271265
"""
272-
headers = dict()
273-
headers['destination'] = 'trade/asset'
274-
headers['subscription'] = 'Asset'
275-
sub_id = id if id else self._get_subscribe_id(self._asset_counter)
276-
headers['id'] = sub_id
277-
self._stomp_connection.unsubscribe(id=sub_id, headers=headers)
266+
self._handle_trade_unsubscribe(TRADE_ASSET, 'Asset', sub_id=id)
278267

279268
def subscribe_position(self, account=None):
280269
"""
281270
订阅账户持仓更新
282271
:return:
283272
"""
284-
self._position_counter += 1
285-
sub_id = self._get_subscribe_id(self._position_counter)
286-
headers = dict()
287-
headers['destination'] = 'trade/position'
288-
headers['subscription'] = 'Position'
289-
headers['id'] = sub_id
290-
if account:
291-
headers['account'] = account
292-
293-
self._stomp_connection.subscribe('trade/position', id=sub_id, headers=headers)
294-
return sub_id
273+
return self._handle_trade_subscribe(TRADE_POSITION, 'Position', account)
295274

296275
def unsubscribe_position(self, id=None):
297276
"""
298277
退订账户持仓更新
299278
:return:
300279
"""
301-
headers = dict()
302-
headers['destination'] = 'trade/position'
303-
headers['subscription'] = 'Position'
304-
sub_id = id if id else self._get_subscribe_id(self._position_counter)
305-
headers['id'] = sub_id
306-
307-
self._stomp_connection.unsubscribe(id=sub_id, headers=headers)
280+
self._handle_trade_unsubscribe(TRADE_POSITION, 'Position', sub_id=id)
308281

309282
def subscribe_order(self, account=None):
310283
"""
311284
订阅账户订单更新
312285
:return:
313286
"""
314-
self._order_counter += 1
315-
sub_id = self._get_subscribe_id(self._order_counter)
316-
headers = dict()
317-
headers['destination'] = 'trade/order'
318-
headers['subscription'] = 'OrderStatus'
319-
headers['id'] = sub_id
320-
if account:
321-
headers['account'] = account
322-
323-
self._stomp_connection.subscribe('trade/order', id=sub_id, headers=headers)
324-
return sub_id
287+
return self._handle_trade_subscribe(TRADE_ORDER, 'OrderStatus', account)
325288

326289
def unsubscribe_order(self, id=None):
327290
"""
328291
退订账户订单更新
329292
:return:
330293
"""
331-
headers = dict()
332-
headers['destination'] = 'trade/order'
333-
headers['subscription'] = 'OrderStatus'
334-
sub_id = id if id else self._get_subscribe_id(self._order_counter)
335-
headers['id'] = sub_id
336-
337-
self._stomp_connection.unsubscribe(id=sub_id, headers=headers)
294+
self._handle_trade_unsubscribe(TRADE_ORDER, 'OrderStatus', sub_id=id)
338295

339296
def subscribe_quote(self, symbols, quote_key_type=QuoteKeyType.TRADE, focus_keys=None):
340297
"""
@@ -344,48 +301,115 @@ def subscribe_quote(self, symbols, quote_key_type=QuoteKeyType.TRADE, focus_keys
344301
:param focus_keys: 行情 key
345302
:return:
346303
"""
347-
self._quote_counter += 1
348-
sub_id = self._get_subscribe_id(self._quote_counter)
349-
headers = dict()
350-
headers['destination'] = 'quote'
351-
headers['subscription'] = 'Quote'
352-
headers['id'] = sub_id
353-
if symbols:
354-
headers['symbols'] = ','.join(symbols)
304+
extra_headers = dict()
355305
if focus_keys:
356306
keys = list()
357307
for key in focus_keys:
358308
if isinstance(key, six.string_types):
359309
keys.append(key)
360310
else:
361311
keys.append(key.value)
362-
headers['keys'] = ','.join(keys)
312+
extra_headers['keys'] = ','.join(keys)
363313
elif quote_key_type and quote_key_type.value:
364-
headers['keys'] = quote_key_type.value
365-
self._stomp_connection.subscribe('quote', id=sub_id, headers=headers)
366-
return sub_id
314+
extra_headers['keys'] = quote_key_type.value
315+
return self._handle_quote_subscribe(destination=QUOTE, subscription='Quote', symbols=symbols,
316+
extra_headers=extra_headers)
317+
318+
def subscribe_depth_quote(self, symbols, exchange_quote=ExchangeQuote.TOTAL_VIEW):
319+
"""
320+
订阅深度行情
321+
:param symbols: symbol列表
322+
:param exchange_quote: tigeropen.common.consts.ExchangeQuote 交易所行情枚举类型
323+
:return:
324+
"""
325+
extra_headers = {'exchange': exchange_quote.value}
326+
return self._handle_quote_subscribe(destination=QUOTE_DEPTH, subscription='AskBid', symbols=symbols,
327+
extra_headers=extra_headers)
328+
329+
def subscribe_option(self, symbols):
330+
"""
331+
订阅期权行情
332+
:param symbols: symbol列表
333+
:return:
334+
"""
335+
return self._handle_quote_subscribe(destination=QUOTE_OPTION, subscription='Option', symbols=symbols)
336+
337+
def subscribe_future(self, symbols):
338+
"""
339+
订阅期货行情
340+
:param symbols: symbol列表
341+
:return:
342+
"""
343+
return self._handle_quote_subscribe(destination=QUOTE_FUTURE, subscription='Future', symbols=symbols)
367344

368345
def query_subscribed_quote(self):
369346
"""
370347
查询已订阅行情的合约
371348
:return:
372349
"""
373350
headers = dict()
374-
headers['destination'] = 'quote'
351+
headers['destination'] = QUOTE
375352
headers['req-type'] = RequestType.REQ_SUB_SYMBOLS.value
376-
self._stomp_connection.send('quote', "{}", headers=headers)
353+
self._stomp_connection.send(QUOTE, "{}", headers=headers)
377354

378355
def unsubscribe_quote(self, symbols=None, id=None):
379356
"""
380357
退订行情更新
381358
:return:
382359
"""
360+
self._handle_quote_unsubscribe(destination=QUOTE, subscription='Quote', sub_id=id, symbols=symbols)
361+
362+
def unsubscribe_depth_quote(self, symbols=None, id=None):
363+
"""
364+
退订深度行情更新
365+
:return:
366+
"""
367+
self._handle_quote_unsubscribe(destination=QUOTE_DEPTH, subscription='AskBid', sub_id=id, symbols=symbols)
368+
369+
def _handle_trade_subscribe(self, destination, subscription, account=None, extra_headers=None):
370+
if extra_headers is None:
371+
extra_headers = dict()
372+
if account is not None:
373+
extra_headers['account'] = account
374+
return self._handle_subscribe(destination=destination, subscription=subscription, extra_headers=extra_headers)
375+
376+
def _handle_quote_subscribe(self, destination, subscription, symbols=None, extra_headers=None):
377+
if extra_headers is None:
378+
extra_headers = dict()
379+
if symbols is not None:
380+
extra_headers['symbols'] = ','.join(symbols)
381+
return self._handle_subscribe(destination=destination, subscription=subscription, extra_headers=extra_headers)
382+
383+
def _handle_trade_unsubscribe(self, destination, subscription, sub_id=None):
384+
self._handle_unsubscribe(destination=destination, subscription=subscription, sub_id=sub_id)
385+
386+
def _handle_quote_unsubscribe(self, destination, subscription, sub_id=None, symbols=None):
387+
extra_headers = dict()
388+
if symbols is not None:
389+
extra_headers['symbols'] = ','.join(symbols)
390+
self._handle_unsubscribe(destination=destination, subscription=subscription, sub_id=sub_id,
391+
extra_headers=extra_headers)
392+
393+
def _handle_subscribe(self, destination, subscription, extra_headers=None):
383394
headers = dict()
384-
headers['destination'] = 'quote'
385-
headers['subscription'] = 'Quote'
386-
sub_id = id if id else self._get_subscribe_id(self._quote_counter)
395+
headers['destination'] = destination
396+
headers['subscription'] = subscription
397+
self._update_subscribe_id(destination)
398+
sub_id = self._get_subscribe_id(destination)
387399
headers['id'] = sub_id
388-
if symbols:
389-
headers['symbols'] = ','.join(symbols)
390400

391-
self._stomp_connection.unsubscribe(id=sub_id, headers=headers)
401+
if extra_headers is not None:
402+
headers.update(extra_headers)
403+
self._stomp_connection.subscribe(destination, id=sub_id, headers=headers)
404+
return sub_id
405+
406+
def _handle_unsubscribe(self, destination, subscription, sub_id=None, extra_headers=None):
407+
headers = dict()
408+
headers['destination'] = destination
409+
headers['subscription'] = subscription
410+
id_ = sub_id if sub_id is not None else self._get_subscribe_id(destination)
411+
headers['id'] = id_
412+
if extra_headers:
413+
headers.update(extra_headers)
414+
415+
self._stomp_connection.unsubscribe(id=id_, headers=headers)

0 commit comments

Comments
 (0)