66"""
77import sys
88import json
9+ from collections import defaultdict
10+
911import stomp
1012import six
1113import logging
1214from stomp .exception import ConnectFailedException
15+
16+ from tigeropen .common .consts .push_destinations import QUOTE , QUOTE_DEPTH , QUOTE_FUTURE , QUOTE_OPTION , TRADE_ASSET , \
17+ TRADE_ORDER , TRADE_POSITION
1318from tigeropen .common .util .signature_utils import sign_with_rsa
1419from tigeropen .common .util .order_utils import get_order_status
1520from tigeropen .common .consts .push_types import RequestType , ResponseType
@@ -73,10 +78,7 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
7378 self ._private_key = None
7479 self ._sign = None
7580 self ._stomp_connection = None
76- self ._quote_counter = 0
77- self ._asset_counter = 0
78- self ._position_counter = 0
79- self ._order_counter = 0
81+ self ._destination_counter_map = defaultdict (lambda : 0 )
8082
8183 self .subscribed_symbols = None
8284 self .quote_changed = None
@@ -243,98 +245,53 @@ def on_error(self, headers, body):
243245 if self .error_callback :
244246 self .error_callback (body )
245247
246- @staticmethod
247- def _get_subscribe_id (counter ):
248- return 'sub-' + str (counter )
248+ def _update_subscribe_id (self , destination ):
249+ self ._destination_counter_map [destination ] += 1
250+
251+ def _get_subscribe_id (self , destination ):
252+ return 'sub-' + str (self ._destination_counter_map [destination ])
249253
250254 def subscribe_asset (self , account = None ):
251255 """
252256 订阅账户资产更新
253257 :return:
254258 """
255- self ._asset_counter += 1
256- sub_id = self ._get_subscribe_id (self ._asset_counter )
257- headers = dict ()
258- headers ['destination' ] = 'trade/asset'
259- headers ['subscription' ] = 'Asset'
260- headers ['id' ] = sub_id
261- if account :
262- headers ['account' ] = account
263-
264- self ._stomp_connection .subscribe ('trade/asset' , id = sub_id , headers = headers )
265- return sub_id
259+ return self ._handle_trade_subscribe (TRADE_ASSET , 'Asset' , account )
266260
267261 def unsubscribe_asset (self , id = None ):
268262 """
269263 退订账户资产更新
270264 :return:
271265 """
272- headers = dict ()
273- headers ['destination' ] = 'trade/asset'
274- headers ['subscription' ] = 'Asset'
275- sub_id = id if id else self ._get_subscribe_id (self ._asset_counter )
276- headers ['id' ] = sub_id
277- self ._stomp_connection .unsubscribe (id = sub_id , headers = headers )
266+ self ._handle_trade_unsubscribe (TRADE_ASSET , 'Asset' , sub_id = id )
278267
279268 def subscribe_position (self , account = None ):
280269 """
281270 订阅账户持仓更新
282271 :return:
283272 """
284- self ._position_counter += 1
285- sub_id = self ._get_subscribe_id (self ._position_counter )
286- headers = dict ()
287- headers ['destination' ] = 'trade/position'
288- headers ['subscription' ] = 'Position'
289- headers ['id' ] = sub_id
290- if account :
291- headers ['account' ] = account
292-
293- self ._stomp_connection .subscribe ('trade/position' , id = sub_id , headers = headers )
294- return sub_id
273+ return self ._handle_trade_subscribe (TRADE_POSITION , 'Position' , account )
295274
296275 def unsubscribe_position (self , id = None ):
297276 """
298277 退订账户持仓更新
299278 :return:
300279 """
301- headers = dict ()
302- headers ['destination' ] = 'trade/position'
303- headers ['subscription' ] = 'Position'
304- sub_id = id if id else self ._get_subscribe_id (self ._position_counter )
305- headers ['id' ] = sub_id
306-
307- self ._stomp_connection .unsubscribe (id = sub_id , headers = headers )
280+ self ._handle_trade_unsubscribe (TRADE_POSITION , 'Position' , sub_id = id )
308281
309282 def subscribe_order (self , account = None ):
310283 """
311284 订阅账户订单更新
312285 :return:
313286 """
314- self ._order_counter += 1
315- sub_id = self ._get_subscribe_id (self ._order_counter )
316- headers = dict ()
317- headers ['destination' ] = 'trade/order'
318- headers ['subscription' ] = 'OrderStatus'
319- headers ['id' ] = sub_id
320- if account :
321- headers ['account' ] = account
322-
323- self ._stomp_connection .subscribe ('trade/order' , id = sub_id , headers = headers )
324- return sub_id
287+ return self ._handle_trade_subscribe (TRADE_ORDER , 'OrderStatus' , account )
325288
326289 def unsubscribe_order (self , id = None ):
327290 """
328291 退订账户订单更新
329292 :return:
330293 """
331- headers = dict ()
332- headers ['destination' ] = 'trade/order'
333- headers ['subscription' ] = 'OrderStatus'
334- sub_id = id if id else self ._get_subscribe_id (self ._order_counter )
335- headers ['id' ] = sub_id
336-
337- self ._stomp_connection .unsubscribe (id = sub_id , headers = headers )
294+ self ._handle_trade_unsubscribe (TRADE_ORDER , 'OrderStatus' , sub_id = id )
338295
339296 def subscribe_quote (self , symbols , quote_key_type = QuoteKeyType .TRADE , focus_keys = None ):
340297 """
@@ -344,48 +301,112 @@ def subscribe_quote(self, symbols, quote_key_type=QuoteKeyType.TRADE, focus_keys
344301 :param focus_keys: 行情 key
345302 :return:
346303 """
347- self ._quote_counter += 1
348- sub_id = self ._get_subscribe_id (self ._quote_counter )
349- headers = dict ()
350- headers ['destination' ] = 'quote'
351- headers ['subscription' ] = 'Quote'
352- headers ['id' ] = sub_id
353- if symbols :
354- headers ['symbols' ] = ',' .join (symbols )
304+ extra_headers = dict ()
355305 if focus_keys :
356306 keys = list ()
357307 for key in focus_keys :
358308 if isinstance (key , six .string_types ):
359309 keys .append (key )
360310 else :
361311 keys .append (key .value )
362- headers ['keys' ] = ',' .join (keys )
312+ extra_headers ['keys' ] = ',' .join (keys )
363313 elif quote_key_type and quote_key_type .value :
364- headers ['keys' ] = quote_key_type .value
365- self ._stomp_connection .subscribe ('quote' , id = sub_id , headers = headers )
366- return sub_id
314+ extra_headers ['keys' ] = quote_key_type .value
315+ return self ._handle_quote_subscribe (destination = QUOTE , subscription = 'Quote' , symbols = symbols ,
316+ extra_headers = extra_headers )
317+
318+ def subscribe_depth_quote (self , symbols ):
319+ """
320+ 订阅深度行情
321+ :param symbols: symbol列表
322+ :return:
323+ """
324+ return self ._handle_quote_subscribe (destination = QUOTE_DEPTH , subscription = 'QuoteDepth' , symbols = symbols )
325+
326+ def subscribe_option (self , symbols ):
327+ """
328+ 订阅期权行情
329+ :param symbols: symbol列表
330+ :return:
331+ """
332+ return self ._handle_quote_subscribe (destination = QUOTE_OPTION , subscription = 'Option' , symbols = symbols )
333+
334+ def subscribe_future (self , symbols ):
335+ """
336+ 订阅期货行情
337+ :param symbols: symbol列表
338+ :return:
339+ """
340+ return self ._handle_quote_subscribe (destination = QUOTE_FUTURE , subscription = 'Future' , symbols = symbols )
367341
368342 def query_subscribed_quote (self ):
369343 """
370344 查询已订阅行情的合约
371345 :return:
372346 """
373347 headers = dict ()
374- headers ['destination' ] = 'quote'
348+ headers ['destination' ] = QUOTE
375349 headers ['req-type' ] = RequestType .REQ_SUB_SYMBOLS .value
376- self ._stomp_connection .send ('quote' , "{}" , headers = headers )
350+ self ._stomp_connection .send (QUOTE , "{}" , headers = headers )
377351
378352 def unsubscribe_quote (self , symbols = None , id = None ):
379353 """
380354 退订行情更新
381355 :return:
382356 """
357+ self ._handle_quote_unsubscribe (destination = QUOTE , subscription = 'Quote' , sub_id = id , symbols = symbols )
358+
359+ def unsubscribe_depth_quote (self , symbols = None , id = None ):
360+ """
361+ 退订深度行情更新
362+ :return:
363+ """
364+ self ._handle_quote_unsubscribe (destination = QUOTE_DEPTH , subscription = 'AskBid' , sub_id = id , symbols = symbols )
365+
366+ def _handle_trade_subscribe (self , destination , subscription , account = None , extra_headers = None ):
367+ if extra_headers is None :
368+ extra_headers = dict ()
369+ if account is not None :
370+ extra_headers ['account' ] = account
371+ return self ._handle_subscribe (destination = destination , subscription = subscription , extra_headers = extra_headers )
372+
373+ def _handle_quote_subscribe (self , destination , subscription , symbols = None , extra_headers = None ):
374+ if extra_headers is None :
375+ extra_headers = dict ()
376+ if symbols is not None :
377+ extra_headers ['symbols' ] = ',' .join (symbols )
378+ return self ._handle_subscribe (destination = destination , subscription = subscription , extra_headers = extra_headers )
379+
380+ def _handle_trade_unsubscribe (self , destination , subscription , sub_id = None ):
381+ self ._handle_unsubscribe (destination = destination , subscription = subscription , sub_id = sub_id )
382+
383+ def _handle_quote_unsubscribe (self , destination , subscription , sub_id = None , symbols = None ):
384+ extra_headers = dict ()
385+ if symbols is not None :
386+ extra_headers ['symbols' ] = ',' .join (symbols )
387+ self ._handle_unsubscribe (destination = destination , subscription = subscription , sub_id = sub_id ,
388+ extra_headers = extra_headers )
389+
390+ def _handle_subscribe (self , destination , subscription , extra_headers = None ):
383391 headers = dict ()
384- headers ['destination' ] = 'quote'
385- headers ['subscription' ] = 'Quote'
386- sub_id = id if id else self ._get_subscribe_id (self ._quote_counter )
392+ headers ['destination' ] = destination
393+ headers ['subscription' ] = subscription
394+ self ._update_subscribe_id (destination )
395+ sub_id = self ._get_subscribe_id (destination )
387396 headers ['id' ] = sub_id
388- if symbols :
389- headers ['symbols' ] = ',' .join (symbols )
390397
391- self ._stomp_connection .unsubscribe (id = sub_id , headers = headers )
398+ if extra_headers is not None :
399+ headers .update (extra_headers )
400+ self ._stomp_connection .subscribe (destination , id = sub_id , headers = headers )
401+ return sub_id
402+
403+ def _handle_unsubscribe (self , destination , subscription , sub_id = None , extra_headers = None ):
404+ headers = dict ()
405+ headers ['destination' ] = destination
406+ headers ['subscription' ] = subscription
407+ id_ = sub_id if sub_id is not None else self ._get_subscribe_id (destination )
408+ headers ['id' ] = id_
409+ if extra_headers :
410+ headers .update (extra_headers )
411+
412+ self ._stomp_connection .unsubscribe (id = id_ , headers = headers )
0 commit comments