Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 154 additions & 14 deletions spoon_toolkits/crypto/evm/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

This module provides a clean abstraction for transaction signing, allowing EVM tools
to work with either local private keys (via web3.py) or Turnkey's secure API.

Priority order for auto-detection:
1. Plain private key from environment variable (not encrypted)
2. Encrypted private key from SecretVault (ENC:v2 decrypted)
3. Turnkey remote signing
"""

import os
Expand All @@ -17,6 +22,105 @@

logger = logging.getLogger(__name__)

# Environment variable keys
ENV_PRIVATE_KEY = "EVM_PRIVATE_KEY"
ENV_TURNKEY_SIGN_WITH = "TURNKEY_SIGN_WITH"
ENV_TURNKEY_ADDRESS = "TURNKEY_ADDRESS"


def _is_encrypted(value: str) -> bool:
"""Check if a value is an encrypted secret (ENC: prefix)."""
return value.startswith("ENC:")


def _get_from_vault(key: str) -> Optional[str]:
"""Try to retrieve a decrypted secret from SecretVault."""
try:
from spoon_ai.wallet.vault import get_vault
vault = get_vault()
if vault.exists(key):
raw = vault.get_raw(key)
if raw:
return raw.decode("utf-8")
except ImportError:
pass
except Exception as e:
logger.debug("Failed to get %s from vault: %s", key, e)
return None


def _auto_decrypt_to_vault(env_key: str) -> bool:
"""
Auto-decrypt an encrypted env var and store in vault.

Returns True if decryption succeeded, False otherwise.
"""
enc_value = os.getenv(env_key)
if not enc_value or not _is_encrypted(enc_value):
return False

try:
from spoon_ai.wallet.vault import get_vault
from spoon_ai.wallet.security import decrypt_and_store

vault = get_vault()

# Already decrypted?
if vault.exists(env_key):
return True

# Get master password
password = os.getenv("SPOON_MASTER_PWD")
if not password:
import sys
import getpass
try:
if sys.stdin.isatty():
password = getpass.getpass(
f"Enter password to decrypt {env_key}: "
)
except Exception:
pass

if not password:
logger.warning(
f"Encrypted {env_key} found but no password available. "
f"Set SPOON_MASTER_PWD or run interactively."
)
return False

# Decrypt and store
decrypt_and_store(enc_value, password, env_key, vault=vault)
logger.info(f"Decrypted {env_key} and stored in vault.")
return True

except ImportError as e:
logger.warning(f"Cannot decrypt {env_key}: {e}")
return False
except Exception as e:
logger.error(f"Failed to decrypt {env_key}: {e}")
return False


def _get_private_key_from_vault() -> Optional[str]:
"""
Get decrypted private key from SecretVault.

If an encrypted key exists in env but not in vault, auto-decrypt it first.
"""
# Check if already in vault
value = _get_from_vault(ENV_PRIVATE_KEY)
if value:
return value

# Try auto-decrypt if encrypted in env
env_value = os.getenv(ENV_PRIVATE_KEY)
if env_value and _is_encrypted(env_value):
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
return _get_from_vault(ENV_PRIVATE_KEY)

return None


class SignerError(Exception):
"""Exception raised for signing-related errors."""
Expand Down Expand Up @@ -257,6 +361,11 @@ def create_signer(
"""
Create a signer based on configuration.

Priority order for auto-detection:
1. Plain private key from env (not encrypted)
2. Encrypted private key from SecretVault
3. Turnkey remote signing

Args:
signer_type: 'local', 'turnkey', or 'auto'
private_key: Private key for local signing
Expand All @@ -268,39 +377,70 @@ def create_signer(
"""
# Auto-detect signer type
if signer_type == "auto":
if turnkey_sign_with:
signer_type = "turnkey"
elif private_key:
# Check explicit parameters first
if private_key:
signer_type = "local"
elif turnkey_sign_with:
signer_type = "turnkey"
else:
# Check environment variables
if os.getenv("TURNKEY_SIGN_WITH"):
signer_type = "turnkey"
elif os.getenv("EVM_PRIVATE_KEY"):
# Priority: plain env -> vault -> turnkey
env_key = os.getenv(ENV_PRIVATE_KEY)

# 1. Plain private key from env (not encrypted)
if env_key and not _is_encrypted(env_key):
signer_type = "local"

# 2. Encrypted private key from SecretVault (auto-decrypt if needed)
elif _get_private_key_from_vault():
signer_type = "local"

# 3. Turnkey remote signing
elif os.getenv(ENV_TURNKEY_SIGN_WITH):
signer_type = "turnkey"

else:
raise ValueError("Cannot auto-detect signer type, please specify signer_type or provide credentials")
raise ValueError(
"Cannot auto-detect signer type. Options:\n"
f"1. Set {ENV_PRIVATE_KEY} with plain private key\n"
f"2. Set {ENV_PRIVATE_KEY} with ENC:v2 encrypted key and decrypt to vault\n"
f"3. Set {ENV_TURNKEY_SIGN_WITH} for Turnkey signing"
)

if signer_type == "local":
key = private_key or os.getenv("EVM_PRIVATE_KEY")
# Try sources in priority order: param -> plain env -> vault (auto-decrypt)
key = private_key
if not key:
env_key = os.getenv(ENV_PRIVATE_KEY)
if env_key and not _is_encrypted(env_key):
key = env_key
if not key:
raise ValueError("Private key required for local signing")
key = _get_private_key_from_vault()

if not key:
raise ValueError(
f"Private key required for local signing. "
f"Set {ENV_PRIVATE_KEY} or decrypt encrypted key to vault."
)

# Ensure private key has 0x prefix
key = key.strip()
if not key.startswith("0x"):
key = "0x" + key
return LocalSigner(key)

elif signer_type == "turnkey":
sign_with = turnkey_sign_with or os.getenv("TURNKEY_SIGN_WITH")
sign_with = turnkey_sign_with or os.getenv(ENV_TURNKEY_SIGN_WITH)
if not sign_with:
raise ValueError("turnkey_sign_with required for Turnkey signing")
raise ValueError(
f"turnkey_sign_with required for Turnkey signing. "
f"Set {ENV_TURNKEY_SIGN_WITH} env var."
)

signer = TurnkeySigner(sign_with)
if turnkey_address:
signer._cached_address = turnkey_address
elif os.getenv("TURNKEY_ADDRESS"):
signer._cached_address = os.getenv("TURNKEY_ADDRESS")
elif os.getenv(ENV_TURNKEY_ADDRESS):
signer._cached_address = os.getenv(ENV_TURNKEY_ADDRESS)

return signer

Expand Down
Loading