Skip to content

Commit d65cb2e

Browse files
author
abel
committed
(feat) Added low level API component for exchange explorer streams. Added unit tests for the new functionality. Included new functions in AsyncClient using the low level API components, and marked the old functions as deprecated. Updated the example scripts.
1 parent 25adca0 commit d65cb2e

File tree

7 files changed

+323
-6
lines changed

7 files changed

+323
-6
lines changed

examples/exchange_client/explorer_rpc/6_StreamTxs.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,39 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def tx_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to txs updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The txs updates stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
11-
stream_txs = await client.stream_txs()
12-
async for tx in stream_txs:
13-
print(tx)
26+
27+
task = asyncio.get_event_loop().create_task(
28+
client.listen_txs_updates(
29+
callback=tx_event_processor,
30+
on_end_callback=stream_closed_processor,
31+
on_status_callback=stream_error_processor,
32+
)
33+
)
34+
35+
await asyncio.sleep(delay=60)
36+
task.cancel()
1437

1538

1639
if __name__ == "__main__":

examples/exchange_client/explorer_rpc/7_StreamBlocks.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,39 @@
11
import asyncio
2+
from typing import Any, Dict
3+
4+
from grpc import RpcError
25

36
from pyinjective.async_client import AsyncClient
47
from pyinjective.core.network import Network
58

69

10+
async def block_event_processor(event: Dict[str, Any]):
11+
print(event)
12+
13+
14+
def stream_error_processor(exception: RpcError):
15+
print(f"There was an error listening to blocks updates ({exception})")
16+
17+
18+
def stream_closed_processor():
19+
print("The blocks updates stream has been closed")
20+
21+
722
async def main() -> None:
823
# select network: local, testnet, mainnet
924
network = Network.testnet()
1025
client = AsyncClient(network)
11-
stream_blocks = await client.stream_blocks()
12-
async for block in stream_blocks:
13-
print(block)
26+
27+
task = asyncio.get_event_loop().create_task(
28+
client.listen_blocks_updates(
29+
callback=block_event_processor,
30+
on_end_callback=stream_closed_processor,
31+
on_status_callback=stream_error_processor,
32+
)
33+
)
34+
35+
await asyncio.sleep(delay=60)
36+
task.cancel()
1437

1538

1639
if __name__ == "__main__":

pyinjective/async_client.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pyinjective.client.indexer.grpc_stream.indexer_grpc_account_stream import IndexerGrpcAccountStream
2525
from pyinjective.client.indexer.grpc_stream.indexer_grpc_auction_stream import IndexerGrpcAuctionStream
2626
from pyinjective.client.indexer.grpc_stream.indexer_grpc_derivative_stream import IndexerGrpcDerivativeStream
27+
from pyinjective.client.indexer.grpc_stream.indexer_grpc_explorer_stream import IndexerGrpcExplorerStream
2728
from pyinjective.client.indexer.grpc_stream.indexer_grpc_meta_stream import IndexerGrpcMetaStream
2829
from pyinjective.client.indexer.grpc_stream.indexer_grpc_oracle_stream import IndexerGrpcOracleStream
2930
from pyinjective.client.indexer.grpc_stream.indexer_grpc_portfolio_stream import IndexerGrpcPortfolioStream
@@ -278,6 +279,12 @@ def __init__(
278279
metadata_query_provider=self._explorer_cookie_metadata_requestor
279280
),
280281
)
282+
self.exchange_explorer_stream_api = IndexerGrpcExplorerStream(
283+
channel=self.explorer_channel,
284+
metadata_provider=lambda: self.network.exchange_metadata(
285+
metadata_query_provider=self._explorer_cookie_metadata_requestor
286+
),
287+
)
281288

282289
async def all_tokens(self) -> Dict[str, Token]:
283290
if self._tokens is None:
@@ -729,13 +736,45 @@ async def fetch_txs(
729736
)
730737

731738
async def stream_txs(self):
739+
"""
740+
This method is deprecated and will be removed soon. Please use `listen_txs_updates` instead
741+
"""
742+
warn("This method is deprecated. Use listen_txs_updates instead", DeprecationWarning, stacklevel=2)
732743
req = explorer_rpc_pb.StreamTxsRequest()
733744
return self.stubExplorer.StreamTxs(req)
734745

746+
async def listen_txs_updates(
747+
self,
748+
callback: Callable,
749+
on_end_callback: Optional[Callable] = None,
750+
on_status_callback: Optional[Callable] = None,
751+
):
752+
await self.exchange_explorer_stream_api.stream_txs(
753+
callback=callback,
754+
on_end_callback=on_end_callback,
755+
on_status_callback=on_status_callback,
756+
)
757+
735758
async def stream_blocks(self):
759+
"""
760+
This method is deprecated and will be removed soon. Please use `listen_blocks_updates` instead
761+
"""
762+
warn("This method is deprecated. Use listen_blocks_updates instead", DeprecationWarning, stacklevel=2)
736763
req = explorer_rpc_pb.StreamBlocksRequest()
737764
return self.stubExplorer.StreamBlocks(req)
738765

766+
async def listen_blocks_updates(
767+
self,
768+
callback: Callable,
769+
on_end_callback: Optional[Callable] = None,
770+
on_status_callback: Optional[Callable] = None,
771+
):
772+
await self.exchange_explorer_stream_api.stream_blocks(
773+
callback=callback,
774+
on_end_callback=on_end_callback,
775+
on_status_callback=on_status_callback,
776+
)
777+
739778
async def get_peggy_deposits(self, **kwargs):
740779
"""
741780
This method is deprecated and will be removed soon. Please use `fetch_peggy_deposit_txs` instead
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from typing import Callable, Optional
2+
3+
from grpc.aio import Channel
4+
5+
from pyinjective.proto.exchange import (
6+
injective_explorer_rpc_pb2 as exchange_explorer_pb,
7+
injective_explorer_rpc_pb2_grpc as exchange_explorer_grpc,
8+
)
9+
from pyinjective.utils.grpc_api_stream_assistant import GrpcApiStreamAssistant
10+
11+
12+
class IndexerGrpcExplorerStream:
13+
def __init__(self, channel: Channel, metadata_provider: Callable):
14+
self._stub = self._stub = exchange_explorer_grpc.InjectiveExplorerRPCStub(channel)
15+
self._assistant = GrpcApiStreamAssistant(metadata_provider=metadata_provider)
16+
17+
async def stream_txs(
18+
self,
19+
callback: Callable,
20+
on_end_callback: Optional[Callable] = None,
21+
on_status_callback: Optional[Callable] = None,
22+
):
23+
request = exchange_explorer_pb.StreamTxsRequest()
24+
25+
await self._assistant.listen_stream(
26+
call=self._stub.StreamTxs,
27+
request=request,
28+
callback=callback,
29+
on_end_callback=on_end_callback,
30+
on_status_callback=on_status_callback,
31+
)
32+
33+
async def stream_blocks(
34+
self,
35+
callback: Callable,
36+
on_end_callback: Optional[Callable] = None,
37+
on_status_callback: Optional[Callable] = None,
38+
):
39+
request = exchange_explorer_pb.StreamBlocksRequest()
40+
41+
await self._assistant.listen_stream(
42+
call=self._stub.StreamBlocks,
43+
request=request,
44+
callback=callback,
45+
on_end_callback=on_end_callback,
46+
on_status_callback=on_status_callback,
47+
)

tests/client/indexer/configurable_explorer_query_servicer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ def __init__(self):
2929
self.relayers_responses = deque()
3030
self.bank_transfers_responses = deque()
3131

32+
self.stream_txs_responses = deque()
33+
self.stream_blocks_responses = deque()
34+
3235
async def GetAccountTxs(self, request: exchange_explorer_pb.GetAccountTxsRequest, context=None, metadata=None):
3336
return self.account_txs_responses.pop()
3437

@@ -99,3 +102,11 @@ async def GetBankTransfers(
99102
self, request: exchange_explorer_pb.GetBankTransfersRequest, context=None, metadata=None
100103
):
101104
return self.bank_transfers_responses.pop()
105+
106+
async def StreamTxs(self, request: exchange_explorer_pb.StreamTxsRequest, context=None, metadata=None):
107+
for event in self.stream_txs_responses:
108+
yield event
109+
110+
async def StreamBlocks(self, request: exchange_explorer_pb.StreamBlocksRequest, context=None, metadata=None):
111+
for event in self.stream_blocks_responses:
112+
yield event
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import asyncio
2+
3+
import grpc
4+
import pytest
5+
6+
from pyinjective.client.indexer.grpc_stream.indexer_grpc_explorer_stream import IndexerGrpcExplorerStream
7+
from pyinjective.core.network import Network
8+
from pyinjective.proto.exchange import injective_explorer_rpc_pb2 as exchange_explorer_pb
9+
from tests.client.indexer.configurable_explorer_query_servicer import ConfigurableExplorerQueryServicer
10+
11+
12+
@pytest.fixture
13+
def explorer_servicer():
14+
return ConfigurableExplorerQueryServicer()
15+
16+
17+
class TestIndexerGrpcAuctionStream:
18+
@pytest.mark.asyncio
19+
async def test_stream_txs(
20+
self,
21+
explorer_servicer,
22+
):
23+
code = 5
24+
claim_id = 100
25+
tx_data = exchange_explorer_pb.StreamTxsResponse(
26+
id="test id",
27+
block_number=18138926,
28+
block_timestamp="2023-11-07 23:19:55.371 +0000 UTC",
29+
hash="0x3790ade2bea6c8605851ec89fa968adf2a2037a5ecac11ca95e99260508a3b7e",
30+
codespace="test codespace",
31+
messages='[{"type":"/cosmos.bank.v1beta1.MsgSend",'
32+
'"value":{"from_address":"inj1phd706jqzd9wznkk5hgsfkrc8jqxv0kmlj0kex",'
33+
'"to_address":"inj1d6qx83nhx3a3gx7e654x4su8hur5s83u84h2xc",'
34+
'"amount":[{"denom":"factory/inj17vytdwqczqz72j65saukplrktd4gyfme5agf6c/weth",'
35+
'"amount":"100000000000000000"}]}}]',
36+
tx_number=221429,
37+
error_log="",
38+
code=code,
39+
claim_ids=[claim_id],
40+
)
41+
42+
explorer_servicer.stream_txs_responses.append(tx_data)
43+
44+
network = Network.devnet()
45+
channel = grpc.aio.insecure_channel(network.grpc_exchange_endpoint)
46+
47+
api = IndexerGrpcExplorerStream(channel=channel, metadata_provider=lambda: self._dummy_metadata_provider())
48+
api._stub = explorer_servicer
49+
50+
txs_updates = asyncio.Queue()
51+
end_event = asyncio.Event()
52+
53+
callback = lambda update: txs_updates.put_nowait(update)
54+
error_callback = lambda exception: pytest.fail(str(exception))
55+
end_callback = lambda: end_event.set()
56+
57+
asyncio.get_event_loop().create_task(
58+
api.stream_txs(
59+
callback=callback,
60+
on_end_callback=end_callback,
61+
on_status_callback=error_callback,
62+
)
63+
)
64+
expected_update = {
65+
"id": tx_data.id,
66+
"blockNumber": str(tx_data.block_number),
67+
"blockTimestamp": tx_data.block_timestamp,
68+
"hash": tx_data.hash,
69+
"codespace": tx_data.codespace,
70+
"messages": tx_data.messages,
71+
"txNumber": str(tx_data.tx_number),
72+
"errorLog": tx_data.error_log,
73+
"code": tx_data.code,
74+
"claimIds": [str(claim_id)],
75+
}
76+
77+
first_update = await asyncio.wait_for(txs_updates.get(), timeout=1)
78+
79+
assert first_update == expected_update
80+
assert end_event.is_set()
81+
82+
@pytest.mark.asyncio
83+
async def test_stream_blocks(
84+
self,
85+
explorer_servicer,
86+
):
87+
block_info = exchange_explorer_pb.StreamBlocksResponse(
88+
height=19034578,
89+
proposer="injvalcons18x63wcw5hjxlf535lgn4qy20yer7mm0qedu0la",
90+
moniker="InjectiveNode1",
91+
block_hash="0x7f7bfe8caaa0eed042315d1447ef1ed726a80f5da23fdbe6831fc66775197db1",
92+
parent_hash="0x44287ba5fad21d0109a3ec6f19d447580763e5a709e5a5ceb767174e99ae3bd8",
93+
num_pre_commits=20,
94+
num_txs=4,
95+
timestamp="2023-11-29 20:23:33.842 +0000 UTC",
96+
)
97+
98+
explorer_servicer.stream_blocks_responses.append(block_info)
99+
100+
network = Network.devnet()
101+
channel = grpc.aio.insecure_channel(network.grpc_exchange_endpoint)
102+
103+
api = IndexerGrpcExplorerStream(channel=channel, metadata_provider=lambda: self._dummy_metadata_provider())
104+
api._stub = explorer_servicer
105+
106+
blocks_updates = asyncio.Queue()
107+
end_event = asyncio.Event()
108+
109+
callback = lambda update: blocks_updates.put_nowait(update)
110+
error_callback = lambda exception: pytest.fail(str(exception))
111+
end_callback = lambda: end_event.set()
112+
113+
asyncio.get_event_loop().create_task(
114+
api.stream_blocks(
115+
callback=callback,
116+
on_end_callback=end_callback,
117+
on_status_callback=error_callback,
118+
)
119+
)
120+
expected_update = {
121+
"height": str(block_info.height),
122+
"proposer": block_info.proposer,
123+
"moniker": block_info.moniker,
124+
"blockHash": block_info.block_hash,
125+
"parentHash": block_info.parent_hash,
126+
"numPreCommits": str(block_info.num_pre_commits),
127+
"numTxs": str(block_info.num_txs),
128+
"txs": [],
129+
"timestamp": block_info.timestamp,
130+
}
131+
132+
first_update = await asyncio.wait_for(blocks_updates.get(), timeout=1)
133+
134+
assert first_update == expected_update
135+
assert end_event.is_set()
136+
137+
async def _dummy_metadata_provider(self):
138+
return None

0 commit comments

Comments
 (0)