Skip to content

Commit bdf1976

Browse files
committed
Merge branch '0.38/connector-altmarkets' into 0.38-altmarkets
2 parents 0e20c23 + 355ffe7 commit bdf1976

File tree

9 files changed

+74
-35
lines changed

9 files changed

+74
-35
lines changed

hummingbot/connector/exchange/altmarkets/altmarkets_api_user_stream_data_source.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,33 +32,39 @@ def __init__(self, altmarkets_auth: AltmarketsAuth, trading_pairs: Optional[List
3232
self._current_listen_key = None
3333
self._listen_for_user_stream_task = None
3434
self._last_recv_time: float = 0
35+
self._ws: AltmarketsWebsocket = None
3536
super().__init__()
3637

3738
@property
3839
def last_recv_time(self) -> float:
3940
return self._last_recv_time
4041

42+
@property
43+
def is_connected(self):
44+
return self._ws.is_connected if self._ws is not None else False
45+
4146
async def _listen_to_orders_trades_balances(self) -> AsyncIterable[Any]:
4247
"""
4348
Subscribe to active orders via web socket
4449
"""
4550

4651
try:
47-
ws = AltmarketsWebsocket(self._altmarkets_auth)
52+
self._ws = AltmarketsWebsocket(self._altmarkets_auth)
4853

49-
await ws.connect()
54+
await self._ws.connect()
5055

51-
await ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"])
56+
await self._ws.subscribe(Constants.WS_SUB["USER_ORDERS_TRADES"])
5257

53-
async for msg in ws.on_message():
58+
async for msg in self._ws.on_message():
59+
# print(f"user msg: {msg}")
5460
self._last_recv_time = time.time()
5561
if msg is not None:
5662
yield msg
5763

5864
except Exception as e:
5965
raise e
6066
finally:
61-
await ws.disconnect()
67+
await self._ws.disconnect()
6268
await asyncio.sleep(5)
6369

6470
async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue) -> AsyncIterable[Any]:

hummingbot/connector/exchange/altmarkets/altmarkets_auth.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import hmac
33
from datetime import datetime, timezone, timedelta
44
from typing import Dict, Any
5+
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants
56

67

78
class AltmarketsAuth():
@@ -40,5 +41,6 @@ def get_headers(self) -> (Dict[str, Any]):
4041
"X-Auth-Apikey": self.api_key,
4142
"X-Auth-Nonce": nonce,
4243
"X-Auth-Signature": signature,
43-
"Content-Type": "application/json"
44+
"Content-Type": "application/json",
45+
"User-Agent": Constants.USER_AGENT
4446
}

hummingbot/connector/exchange/altmarkets/altmarkets_constants.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
class Constants:
33
EXCHANGE_NAME = "altmarkets"
44
REST_URL = "https://v2.altmarkets.io/api/v2/peatio"
5-
# WS_PRIVATE_URL = "wss://stream.crypto.com/v2/user"
65
WS_PRIVATE_URL = "wss://v2.altmarkets.io/api/v2/ranger/private"
7-
# WS_PUBLIC_URL = "wss://stream.crypto.com/v2/market"
86
WS_PUBLIC_URL = "wss://v2.altmarkets.io/api/v2/ranger/public"
97

108
HBOT_BROKER_ID = "HBOT"
119

10+
USER_AGENT = "HBOT_AMv2"
11+
1212
ENDPOINT = {
1313
# Public Endpoints
1414
"TIMESTAMP": "public/timestamp",
@@ -26,14 +26,15 @@ class Constants:
2626
WS_SUB = {
2727
"TRADES": "{trading_pair}.trades",
2828
"ORDERS": "{trading_pair}.ob-inc",
29-
"USER_ORDERS_TRADES": ['order', 'trade'],
29+
"USER_ORDERS_TRADES": ['balance', 'order', 'trade'],
3030

3131
}
3232

3333
WS_METHODS = {
3434
"ORDERS_SNAPSHOT": ".ob-snap",
3535
"ORDERS_UPDATE": ".ob-inc",
3636
"TRADES_UPDATE": ".trades",
37+
"USER_BALANCES": "balance",
3738
"USER_ORDERS": "order",
3839
"USER_TRADES": "trade",
3940
}
@@ -54,11 +55,13 @@ class Constants:
5455

5556
# Intervals
5657
# Only used when nothing is received from WS
57-
SHORT_POLL_INTERVAL = 5.0
58-
# One minute should be fine since we request balance updates on order updates
59-
LONG_POLL_INTERVAL = 60.0
58+
SHORT_POLL_INTERVAL = 10.0
59+
# Two minutes should be fine since we get balances via WS
60+
LONG_POLL_INTERVAL = 120.0
6061
# Two minutes should be fine for order status since we get these via WS
6162
UPDATE_ORDER_STATUS_INTERVAL = 120.0
63+
# We don't get many messages here if we're not updating orders so set this pretty high
64+
USER_TRACKER_MAX_AGE = 300.0
6265
# 10 minute interval to update trading rules, these would likely never change whilst running.
6366
INTERVAL_TRADING_RULES = 600
6467

hummingbot/connector/exchange/altmarkets/altmarkets_exchange.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ def start(self, clock: Clock, timestamp: float):
181181
"""
182182
This function is called automatically by the clock.
183183
"""
184+
if self._poll_notifier.is_set():
185+
self._poll_notifier.clear()
184186
super().start(clock, timestamp)
185187

186188
def stop(self, clock: Clock):
@@ -324,7 +326,7 @@ async def _api_request(self,
324326
qs_params: dict = params if method.upper() == "GET" else None
325327
req_params = ujson.dumps(params) if method.upper() == "POST" and params is not None else None
326328
# Generate auth headers if needed.
327-
headers: dict = {"Content-Type": "application/json"}
329+
headers: dict = {"Content-Type": "application/json", "User-Agent": Constants.USER_AGENT}
328330
if is_auth_required:
329331
headers: dict = self._altmarkets_auth.get_headers()
330332
# Build request coro
@@ -439,6 +441,7 @@ async def _create_order(self,
439441
"side": trade_type.name.lower(),
440442
"ord_type": order_type_str,
441443
# "price": f"{price:f}",
444+
"client_id": order_id,
442445
"volume": f"{amount:f}",
443446
}
444447
if order_type is not OrderType.MARKET:
@@ -540,8 +543,9 @@ async def _execute_cancel(self, trading_pair: str, order_id: str) -> str:
540543
raise
541544
except AltmarketsAPIError as e:
542545
errors_found = e.error_payload.get('errors', e.error_payload)
543-
order_state = errors_found.get("state", None)
544-
if order_state is None:
546+
if isinstance(errors_found, dict):
547+
order_state = errors_found.get("state", None)
548+
if order_state is None or 'market.order.invaild_id_or_uuid' in errors_found:
545549
self._order_not_found_records[order_id] = self._order_not_found_records.get(order_id, 0) + 1
546550
if order_state in Constants.ORDER_STATES['CANCEL_WAIT'] or \
547551
self._order_not_found_records.get(order_id, 0) >= self.ORDER_NOT_EXIST_CANCEL_COUNT:
@@ -683,8 +687,6 @@ def _process_order_message(self, order_msg: Dict[str, Any]):
683687
order_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER)
684688
try:
685689
updated = tracked_order.update_with_order_update(order_msg)
686-
# Call Update balances on every message to catch order create, fill and cancel.
687-
safe_ensure_future(self._update_balances())
688690
except Exception as e:
689691
self.logger().error(f"Error in order update for {tracked_order.exchange_order_id}. Message: {order_msg}\n{e}")
690692
traceback.print_exc()
@@ -728,14 +730,17 @@ async def _process_trade_message(self, trade_msg: Dict[str, Any]):
728730
# Estimate fee
729731
trade_msg["trade_fee"] = self.estimate_fee_pct(tracked_order.order_type is OrderType.LIMIT_MAKER)
730732
updated = tracked_order.update_with_trade_update(trade_msg)
731-
# Call Update balances on every message to catch order create, fill and cancel.
732-
safe_ensure_future(self._update_balances())
733733

734734
if not updated:
735735
return
736736

737737
await self._trigger_order_fill(tracked_order, trade_msg)
738738

739+
def _process_balance_message(self, balance_message: Dict[str, Any]):
740+
asset_name = balance_message["currency"].upper()
741+
self._account_available_balances[asset_name] = Decimal(str(balance_message["balance"]))
742+
self._account_balances[asset_name] = Decimal(str(balance_message["locked"])) + Decimal(str(balance_message["balance"]))
743+
739744
async def _trigger_order_fill(self,
740745
tracked_order: AltmarketsInFlightOrder,
741746
update_msg: Dict[str, Any]):
@@ -812,7 +817,8 @@ def tick(self, timestamp: float):
812817
"""
813818
now = time.time()
814819
poll_interval = (Constants.SHORT_POLL_INTERVAL
815-
if now - self._user_stream_tracker.last_recv_time > 120.0
820+
if not self._user_stream_tracker.is_connected
821+
or now - self._user_stream_tracker.last_recv_time > Constants.USER_TRACKER_MAX_AGE
816822
else Constants.LONG_POLL_INTERVAL)
817823
last_tick = int(self._last_timestamp / poll_interval)
818824
current_tick = int(timestamp / poll_interval)
@@ -857,6 +863,7 @@ async def _user_stream_event_listener(self):
857863
async for event_message in self._iter_user_event_queue():
858864
try:
859865
event_methods = [
866+
Constants.WS_METHODS["USER_BALANCES"],
860867
Constants.WS_METHODS["USER_ORDERS"],
861868
Constants.WS_METHODS["USER_TRADES"],
862869
]
@@ -870,6 +877,8 @@ async def _user_stream_event_listener(self):
870877
await self._process_trade_message(params)
871878
elif method == Constants.WS_METHODS["USER_ORDERS"]:
872879
self._process_order_message(params)
880+
elif method == Constants.WS_METHODS["USER_BALANCES"]:
881+
self._process_balance_message(params)
873882
except asyncio.CancelledError:
874883
raise
875884
except Exception:

hummingbot/connector/exchange/altmarkets/altmarkets_in_flight_order.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def update_with_order_update(self, order_update: Dict[str, Any]) -> bool:
117117
if self.executed_amount_base <= s_decimal_0:
118118
# No trades executed yet.
119119
return False
120-
trade_id = order_update["updated_at"]
120+
trade_id = f"{order_update['id']}-{order_update['updated_at']}"
121121
if trade_id in self.trade_id_set:
122122
# trade already recorded
123123
return False
@@ -149,11 +149,11 @@ def update_with_trade_update(self, trade_update: Dict[str, Any]) -> bool:
149149
if self.executed_amount_base <= s_decimal_0:
150150
# No trades executed yet.
151151
return False
152-
trade_id = trade_update["id"]
152+
trade_id = f"{trade_update['order_id']}-{trade_update['created_at']}"
153153
if trade_id in self.trade_id_set:
154154
# trade already recorded
155155
return False
156-
trade_update["exchange_trade_id"] = trade_id
156+
trade_update["exchange_trade_id"] = trade_update["id"]
157157
self.trade_id_set.add(trade_id)
158158
self.fee_paid += trade_update.get("trade_fee") * self.executed_amount_base
159159
if not self.fee_asset:

hummingbot/connector/exchange/altmarkets/altmarkets_user_stream_tracker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def data_source(self) -> UserStreamTrackerDataSource:
5454
)
5555
return self._data_source
5656

57+
@property
58+
def is_connected(self) -> float:
59+
return self._data_source.is_connected if self._data_source is not None else False
60+
5761
@property
5862
def exchange_name(self) -> str:
5963
"""

hummingbot/connector/exchange/altmarkets/altmarkets_utils.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import random
44
import re
5+
import ujson
56
from dateutil.parser import parse as dateparse
67
from typing import (
78
Any,
@@ -78,7 +79,7 @@ def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str:
7879
quote = symbols[1].upper()
7980
base_str = f"{base[0:4]}{base[-1]}"
8081
quote_str = f"{quote[0:2]}{quote[-1]}"
81-
return f"{Constants.HBOT_BROKER_ID}-{side}-{base_str}{quote_str}-{get_tracking_nonce()}"
82+
return f"{Constants.HBOT_BROKER_ID}-{side}{base_str}{quote_str}{get_tracking_nonce()}"
8283

8384

8485
def retry_sleep_time(try_count: int) -> float:
@@ -98,14 +99,19 @@ async def aiohttp_response_with_errors(request_coroutine):
9899
request_errors = True
99100
try:
100101
parsed_response = await response.text('utf-8')
101-
if len(parsed_response) < 1:
102-
parsed_response = None
103-
elif len(parsed_response) > 100:
104-
parsed_response = f"{parsed_response[:100]} ... (truncated)"
102+
try:
103+
parsed_response = ujson.loads(parsed_response)
104+
except Exception:
105+
if len(parsed_response) < 1:
106+
parsed_response = None
107+
elif len(parsed_response) > 100:
108+
parsed_response = f"{parsed_response[:100]} ... (truncated)"
105109
except Exception:
106110
pass
107111
TempFailure = (parsed_response is None or
108-
(response.status not in [200, 201] and "errors" not in parsed_response))
112+
(response.status not in [200, 201] and
113+
"errors" not in parsed_response and
114+
"error" not in parsed_response))
109115
if TempFailure:
110116
parsed_response = response.reason if parsed_response is None else parsed_response
111117
request_errors = True
@@ -120,7 +126,7 @@ async def api_call_with_retries(method,
120126
shared_client=None,
121127
try_count: int = 0) -> Dict[str, Any]:
122128
url = f"{Constants.REST_URL}/{endpoint}"
123-
headers = {"Content-Type": "application/json"}
129+
headers = {"Content-Type": "application/json", "User-Agent": Constants.USER_AGENT}
124130
http_client = shared_client if shared_client is not None else aiohttp.ClientSession()
125131
# Build request coro
126132
response_coro = http_client.request(method=method.upper(), url=url, headers=headers,

hummingbot/connector/exchange/altmarkets/altmarkets_websocket.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,17 @@ def __init__(self, auth: Optional[AltmarketsAuth] = None):
3737
self._client: Optional[websockets.WebSocketClientProtocol] = None
3838
self._is_subscribed = False
3939

40+
@property
41+
def is_connected(self):
42+
return self._client.open if self._client is not None else False
43+
4044
@property
4145
def is_subscribed(self):
4246
return self._is_subscribed
4347

4448
# connect to exchange
4549
async def connect(self):
46-
extra_headers = self._auth.get_headers() if self._isPrivate else None
50+
extra_headers = self._auth.get_headers() if self._isPrivate else {"User-Agent": Constants.USER_AGENT}
4751
self._client = await websockets.connect(self._WS_URL, extra_headers=extra_headers)
4852

4953
return self._client

test/connector/exchange/altmarkets/test_altmarkets_auth.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from hummingbot.connector.exchange.altmarkets.altmarkets_websocket import AltmarketsWebsocket
1313
from hummingbot.logger.struct_logger import METRICS_LOG_LEVEL
1414
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants
15+
from hummingbot.connector.exchange.altmarkets.altmarkets_utils import aiohttp_response_with_errors
1516

1617
sys.path.insert(0, realpath(join(__file__, "../../../../../")))
1718
logging.basicConfig(level=METRICS_LOG_LEVEL)
@@ -29,9 +30,9 @@ async def rest_auth(self) -> Dict[Any, Any]:
2930
endpoint = Constants.ENDPOINT['USER_BALANCES']
3031
headers = self.auth.get_headers()
3132
http_client = aiohttp.ClientSession()
32-
response = await http_client.get(f"{Constants.REST_URL}/{endpoint}", headers=headers)
33+
http_status, response, request_errors = await aiohttp_response_with_errors(http_client.request(method='GET', url=f"{Constants.REST_URL}/{endpoint}", headers=headers))
3334
await http_client.close()
34-
return await response.json()
35+
return response, request_errors
3536

3637
async def ws_auth(self) -> Dict[Any, Any]:
3738
ws = AltmarketsWebsocket(self.auth)
@@ -44,9 +45,13 @@ async def ws_auth(self) -> Dict[Any, Any]:
4445
return False
4546

4647
def test_rest_auth(self):
47-
result = self.ev_loop.run_until_complete(self.rest_auth())
48+
result, errors = self.ev_loop.run_until_complete(self.rest_auth())
49+
if errors:
50+
reason = result.get('errors', result.get('error', result)) if isinstance(result, dict) else result
51+
print(f"\nUnable to connect: {reason}")
52+
assert errors is False
4853
if len(result) == 0 or "currency" not in result[0].keys():
49-
print(f"Unexpected response for API call: {result}")
54+
print(f"\nUnexpected response for API call: {result}")
5055
assert "currency" in result[0].keys()
5156

5257
def test_ws_auth(self):

0 commit comments

Comments
 (0)