Skip to content

Commit 0edbb81

Browse files
author
abel
committed
(feat) Added low level API component for exchange derivative streams. Added unit tests for the new functionality. Included new functions in AsyncClient using the low level API components, and marked the old functions as deprecated
1 parent a549e62 commit 0edbb81

File tree

12 files changed

+1521
-83
lines changed

12 files changed

+1521
-83
lines changed

examples/exchange_client/derivative_exchange_rpc/10_StreamHistoricalOrders.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,41 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def order_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to derivative orders history updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The derivative orders history updates stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
1126
market_id = "0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"
12-
orders = await client.stream_historical_derivative_orders(market_id=market_id)
13-
async for order in orders:
14-
print(order)
27+
28+
task = asyncio.get_event_loop().create_task(
29+
client.listen_derivative_orders_history_updates(
30+
callback=order_event_processor,
31+
on_end_callback=stream_closed_processor,
32+
on_status_callback=stream_error_processor,
33+
market_id=market_id,
34+
)
35+
)
36+
37+
await asyncio.sleep(delay=60)
38+
task.cancel()
1539

1640

1741
if __name__ == "__main__":

examples/exchange_client/derivative_exchange_rpc/12_StreamTrades.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,46 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def market_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to derivative trades updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The derivative trades updates stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
1126
market_ids = [
1227
"0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6",
13-
"0xd5e4b12b19ecf176e4e14b42944731c27677819d2ed93be4104ad7025529c7ff",
28+
"0x70bc8d7feab38b23d5fdfb12b9c3726e400c265edbcbf449b6c80c31d63d3a02",
1429
]
15-
subaccount_id = "0xc6fe5d33615a1c52c08018c47e8bc53646a0e101000000000000000000000000"
16-
trades = await client.stream_derivative_trades(market_id=market_ids[0], subaccount_id=subaccount_id)
17-
async for trade in trades:
18-
print(trade)
30+
subaccount_ids = ["0xc6fe5d33615a1c52c08018c47e8bc53646a0e101000000000000000000000000"]
31+
32+
task = asyncio.get_event_loop().create_task(
33+
client.listen_derivative_trades_updates(
34+
callback=market_event_processor,
35+
on_end_callback=stream_closed_processor,
36+
on_status_callback=stream_error_processor,
37+
market_ids=market_ids,
38+
subaccount_ids=subaccount_ids,
39+
)
40+
)
41+
42+
await asyncio.sleep(delay=60)
43+
task.cancel()
1944

2045

2146
if __name__ == "__main__":

examples/exchange_client/derivative_exchange_rpc/14_SubaccountTradesList.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ async def main() -> None:
1616
skip = 10
1717
limit = 2
1818
pagination = PaginationOption(skip=skip, limit=limit)
19-
trades = await client.fetch_subaccount_trades_list(
19+
trades = await client.fetch_derivative_subaccount_trades_list(
2020
subaccount_id=subaccount_id,
2121
market_id=market_id,
2222
execution_type=execution_type,

examples/exchange_client/derivative_exchange_rpc/3_StreamMarket.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,39 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def market_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to derivative markets updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The derivative markets updates stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
11-
markets = await client.stream_derivative_markets()
12-
async for market in markets:
13-
print(market)
26+
27+
task = asyncio.get_event_loop().create_task(
28+
client.listen_derivative_market_updates(
29+
callback=market_event_processor,
30+
on_end_callback=stream_closed_processor,
31+
on_status_callback=stream_error_processor,
32+
)
33+
)
34+
35+
await asyncio.sleep(delay=60)
36+
task.cancel()
1437

1538

1639
if __name__ == "__main__":

examples/exchange_client/derivative_exchange_rpc/5_StreamOrderbooks.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,41 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def orderbook_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to derivative orderbook snapshots ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The derivative orderbook snapshots stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
11-
market_id = "0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"
12-
markets = await client.stream_derivative_orderbook_snapshot(market_ids=[market_id])
13-
async for market in markets:
14-
print(market)
26+
market_ids = ["0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"]
27+
28+
task = asyncio.get_event_loop().create_task(
29+
client.listen_derivative_orderbook_snapshots(
30+
market_ids=market_ids,
31+
callback=orderbook_event_processor,
32+
on_end_callback=stream_closed_processor,
33+
on_status_callback=stream_error_processor,
34+
)
35+
)
36+
37+
await asyncio.sleep(delay=60)
38+
task.cancel()
1539

1640

1741
if __name__ == "__main__":

examples/exchange_client/derivative_exchange_rpc/6_StreamOrderbookUpdate.py

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
import asyncio
22
from decimal import Decimal
3+
from typing import Any, Dict
4+
5+
from grpc import RpcError
36

47
from pyinjective.async_client import AsyncClient
58
from pyinjective.core.network import Network
69

710

11+
def stream_error_processor(exception: RpcError):
12+
print(f"There was an error listening to derivative orderbook updates ({exception})")
13+
14+
15+
def stream_closed_processor():
16+
print("The derivative orderbook updates stream has been closed")
17+
18+
819
class PriceLevel:
920
def __init__(self, price: Decimal, quantity: Decimal, timestamp: int):
1021
self.price = price
@@ -53,74 +64,91 @@ async def main() -> None:
5364

5465
market_id = "0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"
5566
orderbook = Orderbook(market_id=market_id)
67+
updates_queue = asyncio.Queue()
68+
tasks = []
69+
70+
async def queue_event(event: Dict[str, Any]):
71+
await updates_queue.put(event)
5672

5773
# start getting price levels updates
58-
stream = await async_client.stream_derivative_orderbook_update(market_ids=[market_id])
59-
first_update = None
60-
async for update in stream:
61-
first_update = update.orderbook_level_updates
62-
break
74+
task = asyncio.get_event_loop().create_task(
75+
async_client.listen_derivative_orderbook_updates(
76+
market_ids=[market_id],
77+
callback=queue_event,
78+
on_end_callback=stream_closed_processor,
79+
on_status_callback=stream_error_processor,
80+
)
81+
)
82+
tasks.append(task)
6383

6484
# load the snapshot once we are already receiving updates, so we don't miss any
6585
await load_orderbook_snapshot(async_client=async_client, orderbook=orderbook)
6686

67-
# start consuming updates again to process them
68-
apply_orderbook_update(orderbook, first_update)
69-
async for update in stream:
70-
apply_orderbook_update(orderbook, update.orderbook_level_updates)
87+
task = asyncio.get_event_loop().create_task(
88+
apply_orderbook_update(orderbook=orderbook, updates_queue=updates_queue)
89+
)
90+
tasks.append(task)
7191

92+
await asyncio.sleep(delay=60)
93+
for task in tasks:
94+
task.cancel()
7295

73-
def apply_orderbook_update(orderbook: Orderbook, updates):
74-
# discard old updates
75-
if updates.sequence <= orderbook.sequence:
76-
return
7796

78-
print(" * * * * * * * * * * * * * * * * * * *")
97+
async def apply_orderbook_update(orderbook: Orderbook, updates_queue: asyncio.Queue):
98+
while True:
99+
updates = await updates_queue.get()
100+
update = updates["orderbookLevelUpdates"]
79101

80-
# ensure we have not missed any update
81-
if updates.sequence > (orderbook.sequence + 1):
82-
raise Exception(
83-
"missing orderbook update events from stream, must restart: {} vs {}".format(
84-
updates.sequence, (orderbook.sequence + 1)
85-
)
86-
)
102+
# discard updates older than the snapshot
103+
if int(update["sequence"]) <= orderbook.sequence:
104+
return
87105

88-
print("updating orderbook with updates at sequence {}".format(updates.sequence))
106+
print(" * * * * * * * * * * * * * * * * * * *")
89107

90-
# update orderbook
91-
orderbook.sequence = updates.sequence
92-
for direction, levels in {"buys": updates.buys, "sells": updates.sells}.items():
93-
for level in levels:
94-
if level.is_active:
95-
# upsert level
96-
orderbook.levels[direction][level.price] = PriceLevel(
97-
price=Decimal(level.price), quantity=Decimal(level.quantity), timestamp=level.timestamp
108+
# ensure we have not missed any update
109+
if int(update["sequence"]) > (orderbook.sequence + 1):
110+
raise Exception(
111+
"missing orderbook update events from stream, must restart: {} vs {}".format(
112+
update["sequence"], (orderbook.sequence + 1)
98113
)
99-
else:
100-
if level.price in orderbook.levels[direction]:
101-
del orderbook.levels[direction][level.price]
102-
103-
# sort the level numerically
104-
buys = sorted(orderbook.levels["buys"].values(), key=lambda x: x.price, reverse=True)
105-
sells = sorted(orderbook.levels["sells"].values(), key=lambda x: x.price, reverse=True)
106-
107-
# lowest sell price should be higher than the highest buy price
108-
if len(buys) > 0 and len(sells) > 0:
109-
highest_buy = buys[0].price
110-
lowest_sell = sells[-1].price
111-
print("Max buy: {} - Min sell: {}".format(highest_buy, lowest_sell))
112-
if highest_buy >= lowest_sell:
113-
raise Exception("crossed orderbook, must restart")
114-
115-
# for the example, print the list of buys and sells orders.
116-
print("sells")
117-
for k in sells:
118-
print(k)
119-
print("=========")
120-
print("buys")
121-
for k in buys:
122-
print(k)
123-
print("====================================")
114+
)
115+
116+
print("updating orderbook with updates at sequence {}".format(update["sequence"]))
117+
118+
# update orderbook
119+
orderbook.sequence = int(update["sequence"])
120+
for direction, levels in {"buys": update["buys"], "sells": update["sells"]}.items():
121+
for level in levels:
122+
if level["isActive"]:
123+
# upsert level
124+
orderbook.levels[direction][level["price"]] = PriceLevel(
125+
price=Decimal(level["price"]), quantity=Decimal(level["quantity"]), timestamp=level["timestamp"]
126+
)
127+
else:
128+
if level["price"] in orderbook.levels[direction]:
129+
del orderbook.levels[direction][level["price"]]
130+
131+
# sort the level numerically
132+
buys = sorted(orderbook.levels["buys"].values(), key=lambda x: x.price, reverse=True)
133+
sells = sorted(orderbook.levels["sells"].values(), key=lambda x: x.price, reverse=True)
134+
135+
# lowest sell price should be higher than the highest buy price
136+
if len(buys) > 0 and len(sells) > 0:
137+
highest_buy = buys[0].price
138+
lowest_sell = sells[-1].price
139+
print("Max buy: {} - Min sell: {}".format(highest_buy, lowest_sell))
140+
if highest_buy >= lowest_sell:
141+
raise Exception("crossed orderbook, must restart")
142+
143+
# for the example, print the list of buys and sells orders.
144+
print("sells")
145+
for k in sells:
146+
print(k)
147+
print("=========")
148+
print("buys")
149+
for k in buys:
150+
print(k)
151+
print("====================================")
124152

125153

126154
if __name__ == "__main__":

0 commit comments

Comments
 (0)