diff --git a/chia/full_node/full_node_rpc_api.py b/chia/full_node/full_node_rpc_api.py index b22720c526fd..3330583ee67a 100644 --- a/chia/full_node/full_node_rpc_api.py +++ b/chia/full_node/full_node_rpc_api.py @@ -2,6 +2,7 @@ import asyncio import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, ClassVar, Optional, cast @@ -89,6 +90,8 @@ async def get_average_block_time( class FullNodeRpcApi: + executor: ThreadPoolExecutor + if TYPE_CHECKING: from chia.rpc.rpc_server import RpcApiProtocol @@ -98,6 +101,7 @@ def __init__(self, service: FullNode) -> None: self.service = service self.service_name = "chia_full_node" self.cached_blockchain_state: Optional[dict[str, Any]] = None + self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="node-rpc-") def get_routes(self) -> dict[str, Endpoint]: return { @@ -485,11 +489,14 @@ async def get_block_spends(self, request: dict[str, Any]) -> EndpointResult: if block_generator is None: # if block is not a transaction block. return {"block_spends": []} - spends = get_spends_for_trusted_block( + flags = get_flags_for_height_and_constants(full_block.height, self.service.constants) + spends = await asyncio.get_running_loop().run_in_executor( + self.executor, + get_spends_for_trusted_block, self.service.constants, block_generator.program, block_generator.generator_refs, - get_flags_for_height_and_constants(full_block.height, self.service.constants), + flags, ) return spends @@ -506,11 +513,14 @@ async def get_block_spends_with_conditions(self, request: dict[str, Any]) -> End if block_generator is None: # if block is not a transaction block. return {"block_spends_with_conditions": []} - spends_with_conditions = get_spends_for_trusted_block_with_conditions( + flags = get_flags_for_height_and_constants(full_block.height, self.service.constants) + spends_with_conditions = await asyncio.get_running_loop().run_in_executor( + self.executor, + get_spends_for_trusted_block_with_conditions, self.service.constants, block_generator.program, block_generator.generator_refs, - get_flags_for_height_and_constants(full_block.height, self.service.constants), + flags, ) return {"block_spends_with_conditions": spends_with_conditions} @@ -885,7 +895,7 @@ async def create_block_generator(self, _: dict[str, Any]) -> EndpointResult: if maybe_gen is not None: # this also validates the signature err, conds = await asyncio.get_running_loop().run_in_executor( - self.service.blockchain.pool, + self.executor, run_block_generator2, bytes(gen.program), gen.generator_refs, diff --git a/tools/validate_rpcs.py b/tools/validate_rpcs.py index aa77347f458d..f83a9fa9b13d 100755 --- a/tools/validate_rpcs.py +++ b/tools/validate_rpcs.py @@ -40,7 +40,7 @@ async def get_height_to_hash_bytes(root_path: Path, config: dict[str, Any]) -> b return await f.read() -def get_block_hash_from_height(height: int, height_to_hash: bytes) -> bytes32: +def get_block_hash_for_height(height: int, height_to_hash: bytes) -> bytes32: """ Get the block header hash from the height-to-hash database. """ @@ -224,7 +224,7 @@ async def cli_async( start_time: float = cycle_start def add_tasks_for_height(height: int) -> None: - block_header_hash = get_block_hash_from_height(height, height_to_hash_bytes) + block_header_hash = get_block_hash_for_height(height, height_to_hash_bytes) # Create tasks for each RPC call based on the flags if spends_with_conditions: pipeline.add( @@ -238,7 +238,7 @@ def add_tasks_for_height(height: int) -> None: for i in range(start_height, end_height + 1): add_tasks_for_height(height=i) # Make Status Updates. - if len(pipeline) >= pipeline_depth: + while len(pipeline) >= pipeline_depth: done, pipeline = await asyncio.wait(pipeline, return_when=asyncio.FIRST_COMPLETED) completed_requests += len(done) now = time.monotonic()