Skip to content

Commit cfb507e

Browse files
author
abel
committed
(feat) Added low level API components for the exchange portfolio module, with unit tests. Included new functions in AsyncClient to use the low level API components and marked the old functions as deprecated. Updated the examples to use the new AsyncClient functions.
1 parent 0edbb81 commit cfb507e

File tree

9 files changed

+397
-5
lines changed

9 files changed

+397
-5
lines changed

examples/exchange_client/portfolio_rpc/1_AccountPortfolio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ async def main() -> None:
99
network = Network.testnet()
1010
client = AsyncClient(network)
1111
account_address = "inj1clw20s2uxeyxtam6f7m84vgae92s9eh7vygagt"
12-
portfolio = await client.get_account_portfolio(account_address=account_address)
12+
portfolio = await client.fetch_account_portfolio(account_address=account_address)
1313
print(portfolio)
1414

1515

examples/exchange_client/portfolio_rpc/2_StreamAccountPortfolio.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,40 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def account_portfolio_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to account portfolio updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The account portfolio updates stream has been closed")
20+
21+
722
async def main() -> None:
823
network = Network.testnet()
924
client = AsyncClient(network)
1025
account_address = "inj1clw20s2uxeyxtam6f7m84vgae92s9eh7vygagt"
11-
updates = await client.stream_account_portfolio(account_address=account_address)
12-
async for update in updates:
13-
print("Account portfolio Update:\n")
14-
print(update)
26+
27+
task = asyncio.get_event_loop().create_task(
28+
client.listen_account_portfolio_updates(
29+
account_address=account_address,
30+
callback=account_portfolio_event_processor,
31+
on_end_callback=stream_closed_processor,
32+
on_status_callback=stream_error_processor,
33+
)
34+
)
35+
36+
await asyncio.sleep(delay=60)
37+
task.cancel()
1538

1639

1740
if __name__ == "__main__":

pyinjective/async_client.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
from pyinjective.client.indexer.grpc.indexer_grpc_insurance_api import IndexerGrpcInsuranceApi
1919
from pyinjective.client.indexer.grpc.indexer_grpc_meta_api import IndexerGrpcMetaApi
2020
from pyinjective.client.indexer.grpc.indexer_grpc_oracle_api import IndexerGrpcOracleApi
21+
from pyinjective.client.indexer.grpc.indexer_grpc_portfolio_api import IndexerGrpcPortfolioApi
2122
from pyinjective.client.indexer.grpc.indexer_grpc_spot_api import IndexerGrpcSpotApi
2223
from pyinjective.client.indexer.grpc_stream.indexer_grpc_account_stream import IndexerGrpcAccountStream
2324
from pyinjective.client.indexer.grpc_stream.indexer_grpc_auction_stream import IndexerGrpcAuctionStream
2425
from pyinjective.client.indexer.grpc_stream.indexer_grpc_derivative_stream import IndexerGrpcDerivativeStream
2526
from pyinjective.client.indexer.grpc_stream.indexer_grpc_meta_stream import IndexerGrpcMetaStream
2627
from pyinjective.client.indexer.grpc_stream.indexer_grpc_oracle_stream import IndexerGrpcOracleStream
28+
from pyinjective.client.indexer.grpc_stream.indexer_grpc_portfolio_stream import IndexerGrpcPortfolioStream
2729
from pyinjective.client.indexer.grpc_stream.indexer_grpc_spot_stream import IndexerGrpcSpotStream
2830
from pyinjective.client.model.pagination import PaginationOption
2931
from pyinjective.composer import Composer
@@ -213,6 +215,12 @@ def __init__(
213215
metadata_query_provider=self._exchange_cookie_metadata_requestor
214216
),
215217
)
218+
self.exchange_portfolio_api = IndexerGrpcPortfolioApi(
219+
channel=self.exchange_channel,
220+
metadata_provider=lambda: self.network.exchange_metadata(
221+
metadata_query_provider=self._exchange_cookie_metadata_requestor
222+
),
223+
)
216224
self.exchange_spot_api = IndexerGrpcSpotApi(
217225
channel=self.exchange_channel,
218226
metadata_provider=lambda: self.network.exchange_metadata(
@@ -250,6 +258,12 @@ def __init__(
250258
metadata_query_provider=self._exchange_cookie_metadata_requestor
251259
),
252260
)
261+
self.exchange_portfolio_stream_api = IndexerGrpcPortfolioStream(
262+
channel=self.exchange_channel,
263+
metadata_provider=lambda: self.network.exchange_metadata(
264+
metadata_query_provider=self._exchange_cookie_metadata_requestor
265+
),
266+
)
253267
self.exchange_spot_stream_api = IndexerGrpcSpotStream(
254268
channel=self.exchange_channel,
255269
metadata_provider=lambda: self.network.exchange_metadata(
@@ -2137,10 +2151,23 @@ async def fetch_binary_options_market(self, market_id: str) -> Dict[str, Any]:
21372151
# PortfolioRPC
21382152

21392153
async def get_account_portfolio(self, account_address: str):
2154+
"""
2155+
This method is deprecated and will be removed soon. Please use `fetch_account_portfolio` instead
2156+
"""
2157+
warn("This method is deprecated. Use fetch_account_portfolio instead", DeprecationWarning, stacklevel=2)
21402158
req = portfolio_rpc_pb.AccountPortfolioRequest(account_address=account_address)
21412159
return await self.stubPortfolio.AccountPortfolio(req)
21422160

2161+
async def fetch_account_portfolio(self, account_address: str) -> Dict[str, Any]:
2162+
return await self.exchange_portfolio_api.fetch_account_portfolio(account_address=account_address)
2163+
21432164
async def stream_account_portfolio(self, account_address: str, **kwargs):
2165+
"""
2166+
This method is deprecated and will be removed soon. Please use `listen_account_portfolio_updates` instead
2167+
"""
2168+
warn(
2169+
"This method is deprecated. Use listen_account_portfolio_updates instead", DeprecationWarning, stacklevel=2
2170+
)
21442171
req = portfolio_rpc_pb.StreamAccountPortfolioRequest(
21452172
account_address=account_address, subaccount_id=kwargs.get("subaccount_id"), type=kwargs.get("type")
21462173
)
@@ -2149,6 +2176,24 @@ async def stream_account_portfolio(self, account_address: str, **kwargs):
21492176
)
21502177
return self.stubPortfolio.StreamAccountPortfolio(request=req, metadata=metadata)
21512178

2179+
async def listen_account_portfolio_updates(
2180+
self,
2181+
account_address: str,
2182+
callback: Callable,
2183+
on_end_callback: Optional[Callable] = None,
2184+
on_status_callback: Optional[Callable] = None,
2185+
subaccount_id: Optional[str] = None,
2186+
update_type: Optional[str] = None,
2187+
):
2188+
await self.exchange_portfolio_stream_api.stream_account_portfolio(
2189+
account_address=account_address,
2190+
callback=callback,
2191+
on_end_callback=on_end_callback,
2192+
on_status_callback=on_status_callback,
2193+
subaccount_id=subaccount_id,
2194+
update_type=update_type,
2195+
)
2196+
21522197
async def chain_stream(
21532198
self,
21542199
bank_balances_filter: Optional[chain_stream_query.BankBalancesFilter] = None,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from typing import Any, Callable, Dict
2+
3+
from grpc.aio import Channel
4+
5+
from pyinjective.proto.exchange import (
6+
injective_portfolio_rpc_pb2 as exchange_portfolio_pb,
7+
injective_portfolio_rpc_pb2_grpc as exchange_portfolio_grpc,
8+
)
9+
from pyinjective.utils.grpc_api_request_assistant import GrpcApiRequestAssistant
10+
11+
12+
class IndexerGrpcPortfolioApi:
13+
def __init__(self, channel: Channel, metadata_provider: Callable):
14+
self._stub = self._stub = exchange_portfolio_grpc.InjectivePortfolioRPCStub(channel)
15+
self._assistant = GrpcApiRequestAssistant(metadata_provider=metadata_provider)
16+
17+
async def fetch_account_portfolio(self, account_address: str) -> Dict[str, Any]:
18+
request = exchange_portfolio_pb.AccountPortfolioRequest(account_address=account_address)
19+
response = await self._execute_call(call=self._stub.AccountPortfolio, request=request)
20+
21+
return response
22+
23+
async def _execute_call(self, call: Callable, request) -> Dict[str, Any]:
24+
return await self._assistant.execute_call(call=call, request=request)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Callable, Optional
2+
3+
from grpc.aio import Channel
4+
5+
from pyinjective.proto.exchange import (
6+
injective_portfolio_rpc_pb2 as exchange_portfolio_pb,
7+
injective_portfolio_rpc_pb2_grpc as exchange_portfolio_grpc,
8+
)
9+
from pyinjective.utils.grpc_api_stream_assistant import GrpcApiStreamAssistant
10+
11+
12+
class IndexerGrpcPortfolioStream:
13+
def __init__(self, channel: Channel, metadata_provider: Callable):
14+
self._stub = self._stub = exchange_portfolio_grpc.InjectivePortfolioRPCStub(channel)
15+
self._assistant = GrpcApiStreamAssistant(metadata_provider=metadata_provider)
16+
17+
async def stream_account_portfolio(
18+
self,
19+
account_address: str,
20+
callback: Callable,
21+
on_end_callback: Optional[Callable] = None,
22+
on_status_callback: Optional[Callable] = None,
23+
subaccount_id: Optional[str] = None,
24+
update_type: Optional[str] = None,
25+
):
26+
request = exchange_portfolio_pb.StreamAccountPortfolioRequest(
27+
account_address=account_address,
28+
subaccount_id=subaccount_id,
29+
type=update_type,
30+
)
31+
32+
await self._assistant.listen_stream(
33+
call=self._stub.StreamAccountPortfolio,
34+
request=request,
35+
callback=callback,
36+
on_end_callback=on_end_callback,
37+
on_status_callback=on_status_callback,
38+
)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from collections import deque
2+
3+
from pyinjective.proto.exchange import (
4+
injective_portfolio_rpc_pb2 as exchange_portfolio_pb,
5+
injective_portfolio_rpc_pb2_grpc as exchange_portfolio_grpc,
6+
)
7+
8+
9+
class ConfigurablePortfolioQueryServicer(exchange_portfolio_grpc.InjectivePortfolioRPCServicer):
10+
def __init__(self):
11+
super().__init__()
12+
self.account_portfolio_responses = deque()
13+
self.stream_account_portfolio_responses = deque()
14+
15+
async def AccountPortfolio(
16+
self, request: exchange_portfolio_pb.AccountPortfolioRequest, context=None, metadata=None
17+
):
18+
return self.account_portfolio_responses.pop()
19+
20+
async def StreamAccountPortfolio(
21+
self, request: exchange_portfolio_pb.StreamAccountPortfolioRequest, context=None, metadata=None
22+
):
23+
for event in self.stream_account_portfolio_responses:
24+
yield event
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import grpc
2+
import pytest
3+
4+
from pyinjective.client.indexer.grpc.indexer_grpc_portfolio_api import IndexerGrpcPortfolioApi
5+
from pyinjective.core.network import Network
6+
from pyinjective.proto.exchange import injective_portfolio_rpc_pb2 as exchange_portfolio_pb
7+
from tests.client.indexer.configurable_portfolio_query_servicer import ConfigurablePortfolioQueryServicer
8+
9+
10+
@pytest.fixture
11+
def portfolio_servicer():
12+
return ConfigurablePortfolioQueryServicer()
13+
14+
15+
class TestIndexerGrpcPortfolioApi:
16+
@pytest.mark.asyncio
17+
async def test_fetch_account_portfolio(
18+
self,
19+
portfolio_servicer,
20+
):
21+
coin = exchange_portfolio_pb.Coin(
22+
denom="peggy0x87aB3B4C8661e07D6372361211B96ed4Dc36B1B5",
23+
amount="2322098",
24+
)
25+
subaccount_deposit = exchange_portfolio_pb.SubaccountDeposit(
26+
total_balance="0.170858923182467801",
27+
available_balance="0.170858923182467801",
28+
)
29+
subaccount_balance = exchange_portfolio_pb.SubaccountBalanceV2(
30+
subaccount_id="0xc7dca7c15c364865f77a4fb67ab11dc95502e6fe000000000000000000000000",
31+
denom="peggy0x87aB3B4C8661e07D6372361211B96ed4Dc36B1B5",
32+
deposit=subaccount_deposit,
33+
)
34+
position = exchange_portfolio_pb.DerivativePosition(
35+
ticker="INJ/USDT PERP",
36+
market_id="0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6",
37+
subaccount_id="0x1383dabde57e5aed55960ee43e158ae7118057d3000000000000000000000000",
38+
direction="short",
39+
quantity="0.070294765766186502",
40+
entry_price="15980281.340438795311756847",
41+
margin="561065.540974",
42+
liquidation_price="23492052.224777",
43+
mark_price="16197000",
44+
aggregate_reduce_only_quantity="0",
45+
updated_at=1700161202147,
46+
created_at=-62135596800000,
47+
)
48+
positions_with_upnl = exchange_portfolio_pb.PositionsWithUPNL(
49+
position=position,
50+
unrealized_pnl="-364.479654577777780880",
51+
)
52+
53+
portfolio = exchange_portfolio_pb.Portfolio(
54+
account_address="inj1clw20s2uxeyxtam6f7m84vgae92s9eh7vygagt",
55+
bank_balances=[coin],
56+
subaccounts=[subaccount_balance],
57+
positions_with_upnl=[positions_with_upnl],
58+
)
59+
60+
portfolio_servicer.account_portfolio_responses.append(
61+
exchange_portfolio_pb.AccountPortfolioResponse(
62+
portfolio=portfolio,
63+
)
64+
)
65+
66+
network = Network.devnet()
67+
channel = grpc.aio.insecure_channel(network.grpc_exchange_endpoint)
68+
69+
api = IndexerGrpcPortfolioApi(channel=channel, metadata_provider=lambda: self._dummy_metadata_provider())
70+
api._stub = portfolio_servicer
71+
72+
result_auction = await api.fetch_account_portfolio(account_address=portfolio.account_address)
73+
expected_auction = {
74+
"portfolio": {
75+
"accountAddress": portfolio.account_address,
76+
"bankBalances": [
77+
{
78+
"denom": coin.denom,
79+
"amount": coin.amount,
80+
}
81+
],
82+
"subaccounts": [
83+
{
84+
"subaccountId": subaccount_balance.subaccount_id,
85+
"denom": subaccount_balance.denom,
86+
"deposit": {
87+
"totalBalance": subaccount_deposit.total_balance,
88+
"availableBalance": subaccount_deposit.available_balance,
89+
},
90+
}
91+
],
92+
"positionsWithUpnl": [
93+
{
94+
"position": {
95+
"ticker": position.ticker,
96+
"marketId": position.market_id,
97+
"subaccountId": position.subaccount_id,
98+
"direction": position.direction,
99+
"quantity": position.quantity,
100+
"entryPrice": position.entry_price,
101+
"margin": position.margin,
102+
"liquidationPrice": position.liquidation_price,
103+
"markPrice": position.mark_price,
104+
"aggregateReduceOnlyQuantity": position.aggregate_reduce_only_quantity,
105+
"createdAt": str(position.created_at),
106+
"updatedAt": str(position.updated_at),
107+
},
108+
"unrealizedPnl": positions_with_upnl.unrealized_pnl,
109+
},
110+
],
111+
}
112+
}
113+
114+
assert result_auction == expected_auction
115+
116+
async def _dummy_metadata_provider(self):
117+
return None

0 commit comments

Comments
 (0)