Skip to content

Commit 605986d

Browse files
author
abel
committed
(feat) Finished implementation to support chain streams. Added more details to the example module that shows how to use them
1 parent 80c97ff commit 605986d

File tree

4 files changed

+125
-4
lines changed

4 files changed

+125
-4
lines changed

examples/chain_client/48_WithdrawValidatorCommission_Rewards renamed to examples/chain_client/48_WithdrawValidatorCommission_Rewards.py

File renamed without changes.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import asyncio
2+
3+
from google.protobuf import json_format
4+
5+
from pyinjective.composer import Composer
6+
from pyinjective.async_client import AsyncClient
7+
from pyinjective.core.network import Network
8+
9+
10+
async def main() -> None:
11+
# select network: local, testnet, mainnet
12+
# network = Network.devnet()
13+
network = Network.custom(
14+
lcd_endpoint="https://staging.lcd.injective.network:443",
15+
tm_websocket_endpoint="wss://staging.tm.injective.network:443/websocket",
16+
grpc_endpoint="staging.chain.grpc.injective.network:443",
17+
grpc_exchange_endpoint="staging.exchange.grpc.injective.network:443",
18+
grpc_explorer_endpoint="staging.explorer.grpc.injective.network:443",
19+
chain_stream_endpoint="staging.stream.injective.network:443",
20+
chain_id="injective-1",
21+
env='mainnet',
22+
use_secure_connection=True,
23+
)
24+
25+
client = AsyncClient(network)
26+
composer = Composer(network=network.string())
27+
28+
inj_usdt_market = "0xfbc729e93b05b4c48916c1433c9f9c2ddb24605a73483303ea0f87a8886b52af"
29+
30+
bank_balances_filter = composer.chain_stream_bank_balances_filter()
31+
subaccount_deposits_filter = composer.chain_stream_subaccount_deposits_filter()
32+
spot_trades_filter = composer.chain_stream_trades_filter()
33+
derivative_trades_filter = composer.chain_stream_trades_filter()
34+
spot_orders_filter = composer.chain_stream_orders_filter()
35+
derivative_orders_filter = composer.chain_stream_orders_filter()
36+
spot_orderbooks_filter = composer.chain_stream_orderbooks_filter()
37+
derivative_orderbooks_filter = composer.chain_stream_orderbooks_filter()
38+
positions_filter = composer.chain_stream_positions_filter()
39+
oracle_price_filter = composer.chain_stream_oracle_price_filter()
40+
stream = await client.chain_stream(
41+
bank_balances_filter=bank_balances_filter,
42+
subaccount_deposits_filter=subaccount_deposits_filter,
43+
spot_trades_filter=spot_trades_filter,
44+
derivative_trades_filter=derivative_trades_filter,
45+
spot_orders_filter=spot_orders_filter,
46+
derivative_orders_filter=derivative_orders_filter,
47+
spot_orderbooks_filter=spot_orderbooks_filter,
48+
derivative_orderbooks_filter=derivative_orderbooks_filter,
49+
positions_filter=positions_filter,
50+
oracle_price_filter=oracle_price_filter
51+
)
52+
async for event in stream:
53+
print(json_format.MessageToJson(
54+
message=event,
55+
including_default_value_fields=True,
56+
preserving_proto_field_name=True)
57+
)
58+
59+
60+
if __name__ == '__main__':
61+
asyncio.get_event_loop().run_until_complete(main())

pyinjective/async_client.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -939,12 +939,29 @@ async def stream_account_portfolio(self, account_address: str, **kwargs):
939939
async def chain_stream(
940940
self,
941941
bank_balances_filter: Optional[chain_stream_query.BankBalancesFilter] = None,
942-
subaccount_deposits_filter: Optional[chain_stream_query.SubaccountDepositsFilter] = None
942+
subaccount_deposits_filter: Optional[chain_stream_query.SubaccountDepositsFilter] = None,
943+
spot_trades_filter: Optional[chain_stream_query.TradesFilter] = None,
944+
derivative_trades_filter: Optional[chain_stream_query.TradesFilter] = None,
945+
spot_orders_filter: Optional[chain_stream_query.OrdersFilter] = None,
946+
derivative_orders_filter: Optional[chain_stream_query.OrdersFilter] = None,
947+
spot_orderbooks_filter: Optional[chain_stream_query.OrderbookFilter] = None,
948+
derivative_orderbooks_filter: Optional[chain_stream_query.OrderbookFilter] = None,
949+
positions_filter: Optional[chain_stream_query.PositionsFilter] = None,
950+
oracle_price_filter: Optional[chain_stream_query.OraclePriceFilter] = None,
943951
):
944952

945953
request = chain_stream_query.StreamRequest(
946954
bank_balances_filter=bank_balances_filter,
947-
subaccount_deposits_filter=subaccount_deposits_filter)
955+
subaccount_deposits_filter=subaccount_deposits_filter,
956+
spot_trades_filter=spot_trades_filter,
957+
derivative_trades_filter=derivative_trades_filter,
958+
spot_orders_filter=spot_orders_filter,
959+
derivative_orders_filter=derivative_orders_filter,
960+
spot_orderbooks_filter=spot_orderbooks_filter,
961+
derivative_orderbooks_filter=derivative_orderbooks_filter,
962+
positions_filter=positions_filter,
963+
oracle_price_filter=oracle_price_filter,
964+
)
948965
metadata = await self.network.chain_metadata(
949966
metadata_query_provider=self._chain_cookie_metadata_requestor
950967
)

pyinjective/composer.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -902,14 +902,57 @@ def MsgVote(
902902
proposal_id=proposal_id, voter=voter, option=option
903903
)
904904

905-
def chain_stream_bank_balances_filter(self, accounts: List[str]) -> chain_stream_query.BankBalancesFilter:
905+
def chain_stream_bank_balances_filter(self, accounts: Optional[List[str]] = None) -> chain_stream_query.BankBalancesFilter:
906+
accounts = accounts or ["*"]
906907
return chain_stream_query.BankBalancesFilter(accounts=accounts)
907908

908909
def chain_stream_subaccount_deposits_filter(
909-
self, subaccount_ids: List[str]
910+
self, subaccount_ids: Optional[List[str]] = None,
910911
) -> chain_stream_query.SubaccountDepositsFilter:
912+
subaccount_ids = ["*"]
911913
return chain_stream_query.SubaccountDepositsFilter(subaccount_ids=subaccount_ids)
912914

915+
def chain_stream_trades_filter(
916+
self,
917+
subaccount_ids: Optional[List[str]] = None,
918+
market_ids: Optional[List[str]] = None,
919+
) -> chain_stream_query.TradesFilter:
920+
subaccount_ids = subaccount_ids or ["*"]
921+
market_ids = market_ids or ["*"]
922+
return chain_stream_query.TradesFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
923+
924+
def chain_stream_orders_filter(
925+
self,
926+
subaccount_ids: Optional[List[str]] = None,
927+
market_ids: Optional[List[str]] = None,
928+
) -> chain_stream_query.OrdersFilter:
929+
subaccount_ids = subaccount_ids or ["*"]
930+
market_ids = market_ids or ["*"]
931+
return chain_stream_query.OrdersFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
932+
933+
def chain_stream_orderbooks_filter(
934+
self,
935+
market_ids: Optional[List[str]] = None,
936+
) -> chain_stream_query.OrderbookFilter:
937+
market_ids = market_ids or ["*"]
938+
return chain_stream_query.OrderbookFilter(market_ids=market_ids)
939+
940+
def chain_stream_positions_filter(
941+
self,
942+
subaccount_ids: Optional[List[str]] = None,
943+
market_ids: Optional[List[str]] = None,
944+
) -> chain_stream_query.PositionsFilter:
945+
subaccount_ids = subaccount_ids or ["*"]
946+
market_ids = market_ids or ["*"]
947+
return chain_stream_query.PositionsFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
948+
949+
def chain_stream_oracle_price_filter(
950+
self,
951+
symbols: Optional[List[str]] = None,
952+
) -> chain_stream_query.PositionsFilter:
953+
symbols = symbols or ["*"]
954+
return chain_stream_query.OraclePriceFilter(symbol=symbols)
955+
913956
# data field format: [request-msg-header][raw-byte-msg-response]
914957
# you need to figure out this magic prefix number to trim request-msg-header off the data
915958
# this method handles only exchange responses

0 commit comments

Comments
 (0)