diff --git a/.vscode/settings.json b/.vscode/settings.json index a3123f8..8b42354 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -17,4 +17,4 @@ "python.autoComplete.extraPaths": [ "/home/manjairo/tcheco/torustrate-interface" ] -} +} \ No newline at end of file diff --git a/README.md b/README.md index ae1f181..1274a9f 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ poetry add torusdk ## Installation with Nix To install `torus` the torus cli with Nix + ```sh nix profile install . ``` @@ -61,7 +62,6 @@ The `torus` offers a variety of features for token management and agent interact - Agent management for registration, curation and updates - Participation in governance processes - ## CLI Usage The CLI commands are structured as follows: @@ -140,6 +140,14 @@ torus network params torus misc circulating-supply ``` +### Acquiring Tokens from a Testnet Faucet + +```sh +# This command will send 15 TOR to the key-name address. +# This command is only available on testnet. +torus --testnet balance run-faucet +``` + ## Completions You can enable completions for your shell by running: diff --git a/src/torusdk/cli/balance.py b/src/torusdk/cli/balance.py index 071fe37..69e6643 100644 --- a/src/torusdk/cli/balance.py +++ b/src/torusdk/cli/balance.py @@ -1,3 +1,4 @@ +import time from typing import Optional import typer @@ -5,7 +6,6 @@ from torusdk.balance import BalanceUnit, format_balance, to_rems from torusdk.cli._common import ( - NOT_IMPLEMENTED_MESSAGE, make_custom_context, print_table_from_plain_dict, ) @@ -229,14 +229,17 @@ def unstake(ctx: Context, key: str, amount: float, dest: str): raise ChainTransactionError(response.error_message) # type: ignore +# Ammount of seconds to wait between faucet executions +SLEEP_BETWEEN_FAUCET_EXECUTIONS = 8 + + @balance_app.command() def run_faucet( ctx: Context, key: str, - num_processes: Optional[int] = None, - num_executions: int = 1, + jobs: Optional[int] = None, + repeat: int = 1, ): - raise NotImplementedError("Faucet " + NOT_IMPLEMENTED_MESSAGE) context = make_custom_context(ctx) use_testnet = ctx.obj.use_testnet @@ -247,13 +250,13 @@ def run_faucet( resolved_key = context.load_key(key, None) client = context.com_client() - for _ in range(num_executions): + for _i in range(repeat): with context.progress_status("Solving PoW..."): solution = solve_for_difficulty_fast( client, resolved_key, client.url, - num_processes=num_processes, + num_processes=jobs, ) with context.progress_status("Sending solution to blockchain"): params = { @@ -262,10 +265,17 @@ def run_faucet( "work": solution.seal, "key": resolved_key.ss58_address, } + client.compose_call( "faucet", params=params, unsigned=True, - module="FaucetModule", - key=resolved_key.ss58_address, # type: ignore + module="Faucet", + key=resolved_key, + wait_for_inclusion=False, ) + + context.info( + f"Waiting {SLEEP_BETWEEN_FAUCET_EXECUTIONS} seconds before next execution..." + ) + time.sleep(SLEEP_BETWEEN_FAUCET_EXECUTIONS) diff --git a/src/torusdk/faucet/powv2.py b/src/torusdk/faucet/powv2.py index f814c93..ce15a24 100644 --- a/src/torusdk/faucet/powv2.py +++ b/src/torusdk/faucet/powv2.py @@ -1,17 +1,14 @@ -import binascii import hashlib import math import multiprocessing import multiprocessing.queues import multiprocessing.synchronize import os -import random import threading from abc import abstractmethod from dataclasses import dataclass from queue import Empty -from time import sleep -from typing import Generic, Optional, TypeVar, cast +from typing import Any, Dict, Generic, Optional, TypeVar, cast from Crypto.Hash import keccak from torustrateinterface import Keypair @@ -49,8 +46,10 @@ class GenericQueue(Generic[T]): Generic[T]: The type parameter specifying the type of items in the queue. """ - def __init__(self): - self._queue = multiprocessing.Queue() # type: ignore + def __init__(self, mp_context: Any = None): + if mp_context is None: + mp_context = multiprocessing + self._queue = mp_context.Queue() # type: ignore def put( self, item: T, block: bool = True, timeout: float | None = None @@ -68,7 +67,7 @@ def put_nowait(self, item: T): def _terminate_workers_and_wait_for_exit( - workers: list[multiprocessing.Process], + workers: list["_Solver"], ) -> None: """Terminates the worker processes and waits for them to exit. @@ -103,7 +102,7 @@ def is_stale(self, current_block: int) -> bool: return self.block_number < current_block - 3 -class _SolverBase(multiprocessing.Process): +class _SolverBase: """ Base class for solver processes. @@ -144,6 +143,7 @@ def __init__( limit: int, key: Keypair, node_url: str, + mp_context: Any = None, ): """ Initializes a new instance of the _SolverBase class. @@ -157,13 +157,22 @@ def __init__( block_info_box: A synchronization primitive to access block information. limit: The maximum number of solutions to find. key: The keypair used for generating solutions. + mp_context: The multiprocessing context to use. """ - multiprocessing.Process.__init__(self, daemon=True) + if mp_context is None: + mp_context = multiprocessing + + # Create the process using the correct context + self._mp_context = mp_context + self._process = mp_context.Process( + target=self._run_wrapper, daemon=True + ) + self.proc_num = proc_num self.num_proc = num_proc self.update_interval = update_interval self.solution_queue = solution_queue - self.newBlockEvent = multiprocessing.Event() # type: ignore + self.newBlockEvent = mp_context.Event() # type: ignore self.newBlockEvent.clear() self.block_info_box = block_info_box self.stopEvent = stopEvent @@ -171,6 +180,22 @@ def __init__( self.key = key self.node_url = node_url + def _run_wrapper(self): + """Wrapper method to call the actual run method.""" + self.run() + + def start(self): + """Start the solver process.""" + self._process.start() + + def terminate(self): + """Terminate the solver process.""" + self._process.terminate() + + def join(self, timeout: float | None = None): + """Wait for the solver process to finish.""" + self._process.join(timeout) + @abstractmethod def run(self) -> None: """ @@ -238,8 +263,8 @@ def run(self): target=_update_curr_block_worker, args=(block_info_box, self.c_client, self.key.public_key), ).start() - # Start at random nonce - nonce_start = random.randint(0, nonce_limit) + # Start at random nonce using os.urandom + nonce_start = int.from_bytes(os.urandom(8), "little") % nonce_limit nonce_end = nonce_start + self.update_interval while not self.stopEvent.is_set(): # Do a block of nonces @@ -262,8 +287,8 @@ def run(self): self.solution_queue.put(solution) solution = None - nonce_start = random.randint(0, nonce_limit) - nonce_start = nonce_start % nonce_limit + # Use os.urandom for next nonce + nonce_start = int.from_bytes(os.urandom(8), "little") % nonce_limit nonce_end = nonce_start + self.update_interval @@ -289,46 +314,47 @@ def _update_curr_block_worker( block_info_box: MutexBox[BlockInfo], c_client: TorusClient, key_bytes: bytes, - sleep_time: int = 16, ): """ - Updates the current block information in a separate thread. + Updates the current block information using WebSocket subscription. - This function continuously retrieves the latest block information from the - Commune client and updates the block_info_box with the new block number, - block hash, and block bytes hashed with the key. + This function subscribes to new block headers and updates the block_info_box + with the new block information when a new block is received. Args: block_info_box: A MutexBox containing the block information. - c_client: The CommuneClient instance used to retrieve block information. + c_client: The TorusClient instance used to retrieve block information. key_bytes: The key bytes to be hashed with the block. - sleep_time: The time (in seconds) to sleep between block updates. """ - while True: - new_block = c_client.get_block() - new_block_number = cast(int, new_block["header"]["number"]) # type: ignore - new_block_hash = new_block["header"]["hash"] # type: ignore - new_block_bytes = bytes.fromhex(new_block_hash[2:]) # type: ignore - - with block_info_box as block_info: - old_block = block_info.block_number - if new_block_number == old_block: - pass - else: - with block_info_box as block_info: - block_info.block_number = new_block_number - # Hash the block with the key - block_and_key_hash_bytes = _hash_block_with_key( - new_block_bytes, key_bytes - ) - byte_list = [] - for i in range(32): - byte_list: list[int] = [] - byte = block_and_key_hash_bytes[i] - byte_list.append(byte) - block_info.curr_block = byte_list # type: ignore - block_info.new_info = True - sleep(sleep_time) + with c_client.get_conn() as substrate: + + def on_block_header( + obj: Dict[str, Any], update_nr: int, subscription_id: int + ) -> None: + header = obj["header"] + new_block_number = cast(int, header["number"]) + # Get block hash from the block itself since it's not in the header + block = c_client.get_block( + str(new_block_number) + ) # Convert to string for block_hash parameter + if block and "header" in block and "hash" in block["header"]: + new_block_hash = block["header"]["hash"] + new_block_bytes = bytes.fromhex(new_block_hash[2:]) + + with block_info_box as block_info: + old_block = block_info.block_number + if new_block_number != old_block: + block_info.block_number = new_block_number + block_and_key_hash_bytes = _hash_block_with_key( + new_block_bytes, key_bytes + ) + block_info.curr_block = bytes( + block_and_key_hash_bytes[:32] + ) # Convert to bytes + block_info.block_hash = new_block_hash + block_info.new_info = True + + substrate.subscribe_block_headers(on_block_header) # type: ignore def _update_curr_block( @@ -372,22 +398,6 @@ def _update_curr_block( return True, new_block_number -def _hex_bytes_to_u8_list(hex_bytes: bytes): - """ - Converts hex bytes to a list of unsigned 8-bit integers. - - Args: - hex_bytes: The hex bytes to be converted. - - Returns: - A list of unsigned 8-bit integers. - """ - hex_chunks = [ - int(hex_bytes[i : i + 2], 16) for i in range(0, len(hex_bytes), 2) - ] - return hex_chunks - - def _create_seal_hash(block_and_key_hash_bytes: bytes, nonce: int) -> bytes: """ Creates the seal hash using the block and key hash bytes and the nonce. @@ -399,12 +409,11 @@ def _create_seal_hash(block_and_key_hash_bytes: bytes, nonce: int) -> bytes: Returns: The seal hash as bytes. """ - - nonce_bytes = binascii.hexlify(nonce.to_bytes(8, "little")) - pre_seal = nonce_bytes + binascii.hexlify(block_and_key_hash_bytes)[:64] - seal_sh256 = hashlib.sha256( - bytearray(_hex_bytes_to_u8_list(pre_seal)) - ).digest() + # Convert nonce to bytes directly + nonce_bytes = nonce.to_bytes(8, "little") + # Concatenate nonce bytes with block hash bytes + pre_seal = nonce_bytes + block_and_key_hash_bytes[:32] + seal_sh256 = hashlib.sha256(pre_seal).digest() kec = keccak.new(digest_bits=256) seal = kec.update(seal_sh256).digest() return seal @@ -495,25 +504,28 @@ def solve_for_difficulty_fast( c_client: The CommuneClient instance used to retrieve block information. key: The Keypair used for signing. num_processes: The number of solver processes to create (default: number of CPU cores). - update_interval: The interval at which the solvers update their progress (default: 50,000). + update_interval: The interval at which the solvers update their progress (default: 500,000). Returns: A POWSolution object if a solution is found, None otherwise. """ - if num_processes is None: - # get the number of allowed processes for this process num_processes = max(1, get_cpu_count()) print(f"Running with {num_processes} cores.") if update_interval is None: - update_interval = 50_000 + update_interval = 500_000 limit = int(math.pow(2, 256)) - 1 - stopEvent = multiprocessing.Event() + # Use fork method to avoid pickle issues with threading.Lock in MutexBox + # This works on both x86 and Apple Silicon + mp_context = multiprocessing.get_context("fork") + stopEvent = mp_context.Event() stopEvent.clear() - solution_queue: GenericQueue[POWSolution] = GenericQueue[POWSolution]() + solution_queue: GenericQueue[POWSolution] = GenericQueue[POWSolution]( + mp_context + ) block_info = MutexBox(BlockInfo(-1, b"", None)) key_bytes = key.public_key @@ -535,30 +547,38 @@ def solve_for_difficulty_fast( limit, key, node_url, + mp_context, ) for i in range(num_processes) ] for worker in solvers: - worker.start() # start the solver processes + worker.start() solution = None + current_block = None while True: - # Wait until a solver finds a solution try: solution = solution_queue.get(block=True, timeout=0.25) if solution is not None: - break + # Get current block to validate solution + current_block = c_client.get_block() + if current_block: + current_block_number = cast( + int, current_block["header"]["number"] + ) + # Only accept solution if block is not too old (within 3 blocks) + if not solution.is_stale(current_block_number): + break + else: + solution = None except Empty: - # No solution found, try again pass - # exited while, solution contains the nonce or wallet is registered - stopEvent.set() # stop all other processes + stopEvent.set() print("Finished") - # terminate and wait for all solvers to exit - _terminate_workers_and_wait_for_exit(solvers) # type: ignore + _terminate_workers_and_wait_for_exit(solvers) return solution @@ -567,12 +587,12 @@ def solve_for_difficulty_fast( import time from torusdk._common import get_node_url - from torusdk.compat.key import classic_load_key + from torusdk.key import load_keypair node = get_node_url(use_testnet=True) print(node) client = TorusClient(node) - key = classic_load_key("dev01") + key = load_keypair("dev01") start_time = time.time() solution: POWSolution = solve_for_difficulty_fast(client, key, node) @@ -582,5 +602,12 @@ def solve_for_difficulty_fast( "block_number": solution.block_number, "nonce": solution.nonce, "work": solution.seal, + "key": key.ss58_address, } - client.compose_call("faucet", params=params, key=key) + client.compose_call( + "faucet", + params=params, + unsigned=True, + module="Faucet", + key=key, + ) diff --git a/src/torusdk/key.py b/src/torusdk/key.py index 9fbc38f..c49b249 100644 --- a/src/torusdk/key.py +++ b/src/torusdk/key.py @@ -5,10 +5,10 @@ from time import time from typing import TypeGuard +import torustrateinterface.utils.ss58 as ss58 from nacl.exceptions import CryptoError from pydantic import BaseModel from torustrateinterface import Keypair -from torustrateinterface.utils import ss58 from torusdk._common import SS58_FORMAT from torusdk.encryption import ( @@ -115,7 +115,8 @@ def is_ss58_address( True if the address is valid, False otherwise. """ - return ss58.is_valid_ss58_address(address, valid_ss58_format=ss58_format) + # ? Weird type error + return ss58.is_valid_ss58_address(address, valid_ss58_format=ss58_format) # type: ignore def check_ss58_address(