Skip to content

Commit a38583f

Browse files
authored
Merge pull request #51 from tigerfintech/feature_upgrade_stomp
upgrade stomp.py
2 parents cc4e70a + 78ab2b4 commit a38583f

File tree

16 files changed

+228
-45
lines changed

16 files changed

+228
-45
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
## 2.1.2 (2022-06-14)
2+
### Modify
3+
- 升级 stomp.py 版本, 将之前的 4.1.24 升级到 8.0.1
4+
### Breaking
5+
- PushClient 去除 `auto_reconnect` 参数,如需自定义重连方式,可自定义方法并绑定 `disconnect_callback` 进行重连
6+
- 处理连接被踢的情况,如果有多台设备使用相同 tiger_id 连接, 新连接会将较早的连接踢掉,较早连接会抛出异常,停止接收消息
7+
8+
## 2.1.1 (2022-05-25)
9+
### New
10+
- 新增批量分页获取k线接口
11+
股票:`QuoteClient.get_bars_by_page`
12+
期货:`QuoteClient.get_future_bars_by_page`
13+
- `QuoteClient.get_future_bars`, `QuoteClient.get_bars` 增加 `page_token` 参数,可用于分页请求定位下一页位置
14+
- `tigeropen.trade.domain.order.Order` 新增 `user_mark` 属性,用户下单时可传入一定长度的备注信息,该属性值在查询订单时会返回。(需用户提前向平台申请配置)
15+
116
## 2.1.0 (2022-05-07)
217
### New
318
- 动态获取服务域名;更改默认域名

requirements.txt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
simplejson==3.17.3
1+
simplejson==3.17.6
22
delorean==1.0.0
33
pandas
44
python-dateutil==2.8.2
5-
pytz==2021.1
5+
pytz==2022.1
66
pyasn1==0.4.8
7-
rsa==4.7.2
8-
stomp.py==4.1.24
9-
getmac==0.8.2
7+
rsa==4.8
8+
stomp.py==8.0.1
9+
getmac==0.8.3
10+
cryptography==37.0.2

tigeropen/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
55
@author: gaoan
66
"""
7-
__VERSION__ = '2.1.0'
7+
__VERSION__ = '2.1.2'

tigeropen/examples/option_helpers/helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ def __init__(self,
268268
else:
269269
helper_class = FDAmericanDividendOptionHelper
270270

271+
ql.Settings.instance().evaluationDate = settlement_date
271272
helper = helper_class(option_type=ql_option_type,
272273
underlying=args.underlying,
273274
strike=args.strike,

tigeropen/examples/push_client_demo.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ def unsubscribe_callback(destination, content):
120120
print('subscribe:{}, callback content:{}'.format(destination, content))
121121

122122

123-
def error_callback(content):
123+
def error_callback(frame):
124124
"""错误回调"""
125-
print(content)
125+
print(frame)
126126

127127

128-
def connect_callback():
128+
def connect_callback(frame):
129129
"""连接建立回调"""
130130
print('connected')
131131

tigeropen/examples/quote_client_demo.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
@author: gaoan
66
"""
77
import logging
8+
import time
9+
810
import pandas as pd
911
from tigeropen.common.consts import Market, QuoteRight, FinancialReportPeriodType, Valuation, \
10-
Income, Balance, CashFlow, BalanceSheetRatio, Growth, Leverage, Profitability, IndustryLevel
12+
Income, Balance, CashFlow, BalanceSheetRatio, Growth, Leverage, Profitability, IndustryLevel, BarPeriod
1113
from tigeropen.quote.domain.filter import OptionFilter
1214

1315
from tigeropen.quote.quote_client import QuoteClient
@@ -61,6 +63,17 @@ def get_quote():
6163
print(delay_brief)
6264

6365

66+
def test_gat_bars_by_page():
67+
bars = openapi_client.get_bars_by_page(['AAPL'], period=BarPeriod.DAY,
68+
end_time='2022-05-01',
69+
total=10,
70+
page_size=4,
71+
)
72+
bars['cn_date'] = pd.to_datetime(bars['time'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('Asia/Shanghai')
73+
bars['us_date'] = pd.to_datetime(bars['time'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
74+
print(bars)
75+
76+
6477
def get_option_quote():
6578
symbol = 'AAPL'
6679
expirations = openapi_client.get_option_expirations(symbols=[symbol])
@@ -85,11 +98,11 @@ def get_option_quote():
8598
chains = openapi_client.get_option_chain('AAPL', '2023-01-20', implied_volatility_min=0.5, open_interest_min=200,
8699
vega_min=0.1, rho_max=0.9)
87100
# convert expiry date to US/Eastern
88-
chains['expiry_date'] = pd.to_datetime(chains['expiry'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
101+
chains['expiry_date'] = pd.to_datetime(chains['expiry'], unit='ms').dt.tz_localize('UTC').dt.tz_convert(
102+
'US/Eastern')
89103
print(chains)
90104

91105

92-
93106
def get_future_quote():
94107
exchanges = openapi_client.get_future_exchanges()
95108
print(exchanges)
@@ -107,18 +120,27 @@ def get_future_quote():
107120
print(briefs)
108121

109122

123+
def test_get_future_bars_by_page():
124+
bars = openapi_client.get_future_bars_by_page('CLmain',
125+
end_time=1648526400000,
126+
total=10,
127+
page_size=4)
128+
bars['cn_date'] = pd.to_datetime(bars['time'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('Asia/Shanghai')
129+
bars['us_date'] = pd.to_datetime(bars['time'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('US/Eastern')
130+
print(bars)
131+
110132

111133
def get_fundamental():
112134
"""获取基础数据"""
113-
135+
114136
# 日级财务数据
115137
financial_daily = openapi_client.get_financial_daily(symbols=['AAPL', 'MSFT'],
116138
market=Market.US,
117139
fields=[Valuation.shares_outstanding],
118140
begin_date='2019-01-01',
119141
end_date='2019-01-10')
120142
print(financial_daily)
121-
143+
122144
# 财报数据(季报或年报)
123145
financial_report = openapi_client.get_financial_report(symbols=['AAPL', 'GOOG'],
124146
market=Market.US,

tigeropen/push/__init__.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,26 @@
33
Created on 2018/10/29
44
55
@author: gaoan
6-
"""
6+
"""
7+
import ssl
8+
import time
9+
10+
11+
def _patch_ssl(wait=0.01):
12+
def new_wrap_socket(self, sock, server_side=False,
13+
do_handshake_on_connect=True,
14+
suppress_ragged_eofs=True,
15+
server_hostname=None, session=None):
16+
time.sleep(wait)
17+
return self.sslsocket_class._create(
18+
sock=sock,
19+
server_side=server_side,
20+
do_handshake_on_connect=do_handshake_on_connect,
21+
suppress_ragged_eofs=suppress_ragged_eofs,
22+
server_hostname=server_hostname,
23+
context=self,
24+
session=session
25+
)
26+
ssl.SSLContext.old_wrap_socket = ssl.SSLContext.wrap_socket
27+
ssl.SSLContext.wrap_socket = new_wrap_socket
28+

tigeropen/push/push_client.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import logging
99
import sys
10+
import os
1011
from collections import defaultdict
1112

1213
import stomp
@@ -22,10 +23,12 @@
2223
SUBSCRIPTION_TRADE_ORDER
2324
from tigeropen.common.consts.push_types import RequestType, ResponseType
2425
from tigeropen.common.consts.quote_keys import QuoteChangeKey, QuoteKeyType
26+
from tigeropen.common.exceptions import ApiException
2527
from tigeropen.common.util.string_utils import camel_to_underline
2628
from tigeropen.common.util.common_utils import get_enum_value
2729
from tigeropen.common.util.order_utils import get_order_status
2830
from tigeropen.common.util.signature_utils import sign_with_rsa
31+
from tigeropen.push import _patch_ssl
2932

3033
HOUR_TRADING_QUOTE_KEYS_MAPPINGS = {'hourTradingLatestPrice': 'latest_price', 'hourTradingPreClose': 'pre_close',
3134
'hourTradingLatestTime': 'latest_time', 'hourTradingVolume': 'volume',
@@ -67,14 +70,12 @@
6770

6871

6972
class PushClient(stomp.ConnectionListener):
70-
def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconnect=True,
71-
heartbeats=(30 * 1000, 30 * 1000)):
73+
def __init__(self, host, port, use_ssl=True, connection_timeout=120, heartbeats=(30 * 1000, 30 * 1000)):
7274
"""
7375
:param host:
7476
:param port:
7577
:param use_ssl:
76-
:param connection_timeout: second
77-
:param auto_reconnect:
78+
:param connection_timeout: unit: second. The timeout value should be greater the heartbeats interval
7879
:param heartbeats: tuple of millisecond
7980
"""
8081
self.host = host
@@ -97,8 +98,8 @@ def __init__(self, host, port, use_ssl=True, connection_timeout=120, auto_reconn
9798
self.unsubscribe_callback = None
9899
self.error_callback = None
99100
self._connection_timeout = connection_timeout
100-
self._auto_reconnect = auto_reconnect
101101
self._heartbeats = heartbeats
102+
_patch_ssl()
102103

103104
def _connect(self):
104105
try:
@@ -108,12 +109,13 @@ def _connect(self):
108109
except:
109110
pass
110111

111-
self._stomp_connection = stomp.Connection12(host_and_ports=[(self.host, self.port), ], use_ssl=self.use_ssl,
112+
self._stomp_connection = stomp.Connection12(host_and_ports=[(self.host, self.port)],
112113
keepalive=KEEPALIVE, timeout=self._connection_timeout,
113114
heartbeats=self._heartbeats)
114115
self._stomp_connection.set_listener('push', self)
115-
self._stomp_connection.start()
116116
try:
117+
if self.use_ssl:
118+
self._stomp_connection.set_ssl([(self.host, self.port)])
117119
self._stomp_connection.connect(self._tiger_id, self._sign, wait=True, headers=self._generate_headers())
118120
except ConnectFailedException as e:
119121
raise e
@@ -128,23 +130,26 @@ def disconnect(self):
128130
if self._stomp_connection:
129131
self._stomp_connection.disconnect()
130132

131-
def on_connected(self, headers, body):
133+
def on_connected(self, frame):
132134
if self.connect_callback:
133-
self.connect_callback()
135+
self.connect_callback(frame)
134136

135137
def on_disconnected(self):
136138
if self.disconnect_callback:
137139
self.disconnect_callback()
138-
elif self._auto_reconnect:
139-
self._connect()
140140

141-
def on_message(self, headers, body):
141+
def on_message(self, frame):
142142
"""
143143
Called by the STOMP connection when a MESSAGE frame is received.
144144
145-
:param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
146-
:param body: the frame's payload - the message body.
145+
:param Frame frame: the stomp frame. stomp.utils.Frame
146+
A STOMP frame's attributes:
147+
cmd: the protocol command
148+
headers: a map of headers for the frame
149+
body: the content of the frame.
147150
"""
151+
headers = frame.headers
152+
body = frame.body
148153
try:
149154
response_type = headers.get('ret-type')
150155
if response_type == str(ResponseType.GET_SUB_SYMBOLS_END.value):
@@ -255,11 +260,17 @@ def on_message(self, headers, body):
255260
except Exception as e:
256261
logging.error(e, exc_info=True)
257262

258-
def on_error(self, headers, body):
263+
def on_error(self, frame):
264+
body = json.loads(frame.body)
265+
if body.get('code') == 4001:
266+
logging.error(body)
267+
self.disconnect_callback = None
268+
raise ApiException(4001, body.get('message'))
269+
259270
if self.error_callback:
260-
self.error_callback(body)
271+
self.error_callback(frame)
261272
else:
262-
logging.error(body)
273+
logging.error(frame.body)
263274

264275
def _update_subscribe_id(self, destination):
265276
self._destination_counter_map[destination] += 1

0 commit comments

Comments
 (0)