Skip to content

Commit 08d378a

Browse files
committed
Refactor AsyncSubtensor and Dendrite to offload signing to ProcessPoolExecutor (Fixes #3211)
1 parent 34fa24a commit 08d378a

File tree

2 files changed

+105
-6
lines changed

2 files changed

+105
-6
lines changed

bittensor/core/async_subtensor.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import asyncstdlib as a
88
import scalecodec
9+
from concurrent.futures import ProcessPoolExecutor
10+
from functools import partial
911
from async_substrate_interface import AsyncSubstrateInterface
1012
from async_substrate_interface.substrate_addons import RetryAsyncSubstrate
1113
from async_substrate_interface.utils.storage import StorageKey
@@ -163,6 +165,16 @@
163165
tick_to_price,
164166
)
165167

168+
169+
170+
171+
def sign_extrinsic_worker(private_key: str, payload: bytes) -> str:
172+
from bittensor_wallet import Keypair
173+
# We use the private key to recreate the keypair for signing
174+
kp = Keypair(private_key=private_key)
175+
return f"0x{kp.sign(payload).hex()}"
176+
177+
166178
if TYPE_CHECKING:
167179
from async_substrate_interface import AsyncQueryMapResult
168180
from async_substrate_interface.types import ScaleObj
@@ -253,6 +265,8 @@ def __init__(
253265
logging.info(
254266
f"Connected to {self.network} network and {self.chain_endpoint}."
255267
)
268+
269+
self._executor = ProcessPoolExecutor(max_workers=1)
256270

257271
async def close(self):
258272
"""Closes the connection to the blockchain.
@@ -279,6 +293,10 @@ async def close(self):
279293
if self.substrate:
280294
await self.substrate.close()
281295

296+
if self._executor:
297+
self._executor.shutdown(wait=True)
298+
self._executor = None
299+
282300
async def initialize(self):
283301
"""Establishes connection to the blockchain.
284302
@@ -328,7 +346,12 @@ async def __aenter__(self):
328346
return await self.initialize()
329347

330348
async def __aexit__(self, exc_type, exc_val, exc_tb):
331-
await self.substrate.close()
349+
if self.substrate:
350+
await self.substrate.close()
351+
352+
if self._executor:
353+
self._executor.shutdown(wait=True)
354+
self._executor = None
332355

333356
# Helpers ==========================================================================================================
334357

@@ -5961,6 +5984,51 @@ async def compose_call(
59615984
block_hash=block_hash,
59625985
)
59635986

5987+
async def create_signed_extrinsic(
5988+
self,
5989+
call: "GenericCall",
5990+
keypair: "Keypair",
5991+
nonce: Optional[int] = None,
5992+
era: Optional[dict] = None,
5993+
tip: int = 0,
5994+
tip_asset_id: Optional[int] = None,
5995+
):
5996+
"""
5997+
Creates a signed extrinsic, offloading the signing process to a separate process to avoid blocking the event loop.
5998+
"""
5999+
if nonce is None:
6000+
nonce = await self.substrate.get_account_next_index(keypair.ss58_address)
6001+
6002+
# Generate the signature payload.
6003+
# This uses the underlying substrate interface to prepare the bytes to be signed.
6004+
signature_payload = await self.substrate.generate_signature_payload(
6005+
call=call,
6006+
era=era,
6007+
nonce=nonce,
6008+
tip=tip,
6009+
tip_asset_id=tip_asset_id
6010+
)
6011+
6012+
# Offload the signing of the payload to an executor.
6013+
loop = asyncio.get_running_loop()
6014+
# signature_payload.data should contain the bytes to sign.
6015+
signature = await loop.run_in_executor(
6016+
self._executor,
6017+
partial(sign_extrinsic_worker, keypair.private_key, signature_payload.data)
6018+
)
6019+
6020+
# Create the extrinsic with the attached signature.
6021+
extrinsic = await self.substrate.create_extrinsic(
6022+
call=call,
6023+
nonce=nonce,
6024+
era=era,
6025+
tip=tip,
6026+
tip_asset_id=tip_asset_id,
6027+
signature=signature,
6028+
keypair=keypair, # Keypair may be needed for address/type info
6029+
)
6030+
return extrinsic
6031+
59646032
async def sign_and_send_extrinsic(
59656033
self,
59666034
call: "GenericCall",
@@ -6029,7 +6097,7 @@ async def sign_and_send_extrinsic(
60296097
if period is not None:
60306098
extrinsic_data["era"] = {"period": period}
60316099

6032-
extrinsic_response.extrinsic = await self.substrate.create_signed_extrinsic(
6100+
extrinsic_response.extrinsic = await self.create_signed_extrinsic(
60336101
**extrinsic_data
60346102
)
60356103
try:

bittensor/core/dendrite.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import time
55
import uuid
66
import warnings
7+
from concurrent.futures import ProcessPoolExecutor
8+
from functools import partial
79
from typing import Any, AsyncGenerator, Optional, Union, Type
810

911
import aiohttp
@@ -39,6 +41,19 @@ def event_loop_is_running():
3941
return False
4042

4143

44+
def sign_message_worker(private_key_hex: str, message: str) -> str:
45+
from bittensor_wallet import Keypair
46+
47+
# Re-instantiate the keypair from the private key
48+
# ss58_address is required but can be dummy if only signing?
49+
# Actually bittensor_wallet Keypair usually takes ss58_address.
50+
# If private_key is provided, it might derive it.
51+
# Let's try passing None or empty string if allowed, or check Keypair usage.
52+
# Safe bet: pass private key. Keypair usually derives public from private.
53+
kp = Keypair(private_key=private_key_hex)
54+
return f"0x{kp.sign(message).hex()}"
55+
56+
4257
class DendriteMixin:
4358
"""
4459
The Dendrite class represents the abstracted implementation of a network client module.
@@ -121,6 +136,7 @@ def __init__(self, wallet: Optional[Union["Wallet", "Keypair"]] = None):
121136
self.synapse_history: list = []
122137

123138
self._session: Optional[aiohttp.ClientSession] = None
139+
self._executor = ProcessPoolExecutor(max_workers=1)
124140

125141
@property
126142
async def session(self) -> aiohttp.ClientSession:
@@ -192,6 +208,10 @@ def close_session(self, using_new_loop: bool = False):
192208
if using_new_loop:
193209
loop.close()
194210
self._session = None
211+
212+
if self._executor:
213+
self._executor.shutdown(wait=True)
214+
self._executor = None
195215

196216
async def aclose_session(self):
197217
"""
@@ -217,6 +237,10 @@ async def aclose_session(self):
217237
if self._session:
218238
await self._session.close()
219239
self._session = None
240+
241+
if self._executor:
242+
self._executor.shutdown(wait=True)
243+
self._executor = None
220244

221245
def _get_endpoint_url(self, target_axon, request_name):
222246
"""
@@ -568,7 +592,8 @@ async def call(
568592
url = self._get_endpoint_url(target_axon, request_name=request_name)
569593

570594
# Preprocess synapse for making a request
571-
synapse = self.preprocess_synapse_for_request(target_axon, synapse, timeout)
595+
synapse = await self.preprocess_synapse_for_request(target_axon, synapse, timeout)
596+
572597

573598
try:
574599
# Log outgoing request
@@ -643,7 +668,8 @@ async def call_stream(
643668
url = f"http://{endpoint}/{request_name}"
644669

645670
# Preprocess synapse for making a request
646-
synapse = self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore
671+
synapse = await self.preprocess_synapse_for_request(target_axon, synapse, timeout) # type: ignore
672+
647673

648674
try:
649675
# Log outgoing request
@@ -682,7 +708,7 @@ async def call_stream(
682708
else:
683709
yield synapse
684710

685-
def preprocess_synapse_for_request(
711+
async def preprocess_synapse_for_request(
686712
self,
687713
target_axon_info: "AxonInfo",
688714
synapse: "Synapse",
@@ -719,7 +745,12 @@ def preprocess_synapse_for_request(
719745

720746
# Sign the request using the dendrite, axon info, and the synapse body hash
721747
message = f"{synapse.dendrite.nonce}.{synapse.dendrite.hotkey}.{synapse.axon.hotkey}.{synapse.dendrite.uuid}.{synapse.body_hash}"
722-
synapse.dendrite.signature = f"0x{self.keypair.sign(message).hex()}"
748+
749+
loop = asyncio.get_running_loop()
750+
synapse.dendrite.signature = await loop.run_in_executor(
751+
self._executor,
752+
partial(sign_message_worker, self.keypair.private_key, message)
753+
)
723754

724755
return synapse
725756

0 commit comments

Comments
 (0)