Skip to content

Commit e760505

Browse files
committed
Feat: Wallet generation ennabled
1 parent af369be commit e760505

File tree

7 files changed

+295
-129
lines changed

7 files changed

+295
-129
lines changed

cngn_manager/CryptoWallet.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from ecdsa import SigningKey, SECP256k1
2+
from mnemonic import Mnemonic
3+
from bip32utils import BIP32Key, BIP32_HARDEN
4+
from tronpy import Tron
5+
from tronpy.keys import PrivateKey, PublicKey
6+
from hashlib import sha3_256
7+
from stellar_sdk import Keypair, StrKey
8+
import coinaddrvalidator
9+
import nacl.signing
10+
import nacl.encoding
11+
from .constants import Network # Assuming you have a Network class or Enum in constants
12+
13+
class CryptoWallet:
14+
15+
DERIVATION_PATHS = {
16+
Network.ETH: "m/44'/60'/0'/0/0",
17+
Network.BSC: "m/44'/60'/0'/0/0",
18+
Network.ATC: "m/44'/60'/0'/0/0",
19+
Network.MATIC: "m/44'/60'/0'/0/0",
20+
Network.TRX: "m/44'/195'/0'/0/0", # TRON's derivation path
21+
Network.XBN: "m/44'/703'/0'/0" # XBN's derivation path
22+
}
23+
24+
@staticmethod
25+
def generate_wallet_with_mnemonic_details(network: str):
26+
mnemo = Mnemonic("english")
27+
mnemonic = mnemo.generate(strength=128)
28+
return CryptoWallet.generate_wallet_from_mnemonic(mnemonic, network)
29+
30+
@staticmethod
31+
def generate_wallet_from_mnemonic(mnemonic: str, network: str):
32+
if network == Network.XBN:
33+
return CryptoWallet.generate_xbn_wallet(mnemonic)
34+
elif network == Network.TRX:
35+
return CryptoWallet.generate_trx_wallet(mnemonic)
36+
37+
private_key = CryptoWallet.get_private_key_from_mnemonic(mnemonic, network)
38+
public_key = CryptoWallet.get_public_key(private_key, network)
39+
address = CryptoWallet.get_address_from_public_key(public_key, network)
40+
41+
return {'mnemonic': mnemonic, 'privateKey': private_key, 'address': address, 'network': network}
42+
43+
@staticmethod
44+
def get_public_key(private_key: str, network: str):
45+
if network == Network.XBN:
46+
seed = nacl.signing.SigningKey(bytes.fromhex(private_key))
47+
return seed.verify_key.encode().hex()
48+
sk = SigningKey.from_string(bytes.fromhex(private_key), curve=SECP256k1)
49+
return sk.verifying_key.to_string().hex()
50+
51+
@staticmethod
52+
def get_private_key_from_mnemonic(mnemonic: str, network: str):
53+
seed = Mnemonic.to_seed(mnemonic)
54+
derivation_path = CryptoWallet.DERIVATION_PATHS[network]
55+
# Use BIP32Key derivation according to the provided network path
56+
key = BIP32Key.fromEntropy(seed)
57+
for index in derivation_path.split("/")[1:]:
58+
if index.endswith("'"):
59+
index = int(index[:-1]) + 0x80000000
60+
else:
61+
index = int(index)
62+
key = key.ChildKey(index)
63+
return key.PrivateKey().hex()
64+
65+
@staticmethod
66+
def get_address_from_public_key(public_key: str, network: str):
67+
if network in [Network.ETH, Network.BSC, Network.MATIC, Network.ATC]:
68+
return CryptoWallet.get_ethereum_style_address(public_key)
69+
elif network == Network.TRX:
70+
return CryptoWallet.get_tron_address_from_public_key(public_key)
71+
raise ValueError(f'Unsupported network: {network}')
72+
73+
@staticmethod
74+
def get_ethereum_style_address(public_key: str):
75+
clean_public_key = public_key[2:] if public_key.startswith('04') else public_key
76+
hash = sha3_256(bytes.fromhex(clean_public_key)).digest()
77+
return '0x' + hash[-20:].hex()
78+
79+
@staticmethod
80+
def generate_trx_wallet(mnemonic: str):
81+
private_key = CryptoWallet.get_private_key_from_mnemonic(mnemonic, Network.TRX)
82+
tron_private_key = PrivateKey(bytes.fromhex(private_key))
83+
address = tron_private_key.public_key.to_base58check_address()
84+
85+
return {
86+
'mnemonic': mnemonic,
87+
'privateKey': tron_private_key.hex(),
88+
'address': address,
89+
'network': Network.TRX
90+
}
91+
92+
@staticmethod
93+
def get_tron_address_from_public_key(public_key: str):
94+
public_key_bytes = bytes.fromhex(public_key)
95+
if len(public_key_bytes) != 64:
96+
raise ValueError("Invalid public key length")
97+
98+
tron_address = PublicKey(public_key_bytes).to_base58check_address()
99+
return tron_address
100+
101+
@staticmethod
102+
def generate_xbn_wallet(mnemonic: str):
103+
# Derive BIP32 seed from mnemonic
104+
seed = Mnemonic.to_seed(mnemonic)
105+
derivation_path = CryptoWallet.DERIVATION_PATHS[Network.XBN]
106+
master_key = BIP32Key.fromEntropy(seed)
107+
108+
for index in derivation_path.split("/")[1:]:
109+
if index.endswith("'"):
110+
index = int(index[:-1]) + BIP32_HARDEN # Hardened key
111+
else:
112+
index = int(index)
113+
key = master_key.ChildKey(index)
114+
115+
raw_secret_key = key.PrivateKey()
116+
secret_key = StrKey.encode_ed25519_secret_seed(raw_secret_key)
117+
keypair = Keypair.from_secret(secret_key)
118+
119+
return {
120+
'mnemonic': mnemonic,
121+
'privateKey': keypair.secret,
122+
'address': keypair.public_key,
123+
'network': Network.XBN
124+
}
125+
126+
@staticmethod
127+
def validate_address(address: str, network: str):
128+
if network in [Network.ETH, Network.BSC, Network.MATIC, Network.ATC]:
129+
result = coinaddrvalidator.validate(network, address)
130+
print(f'Validation result for {network}: {result}')
131+
return result
132+
elif network == Network.TRX:
133+
tron = Tron()
134+
return tron.is_address(address)
135+
elif network == Network.XBN:
136+
result = coinaddrvalidator.validate(network, address)
137+
print(f'Validation result for {network}: {result}')
138+
return result
139+
raise ValueError(f'Unsupported network: {network}')

cngn_manager/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
__title__ = "cngn_manager"
1111
__description__ = "Python HTTP for Humans."
1212
__url__ = "https://docs.cngn.co"
13-
__version__ = "0.0.4"
13+
__version__ = "0.0.5"
1414
__author__ = "Emmanuel Onyo"
1515
__author_email__ = "[email protected]"
1616
__license__ = "MIT License"

cngn_manager/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
class Network:
22
BSC = "bsc"
33
ETH = "eth"
4+
ATC = "atc"
5+
MATIC = "matic"
6+
TRX = "trx"
7+
XBN = "xbn"
8+
49
# Add other network options if necessary
510

611
class ProviderType:

cngn_manager/main.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from typing import Optional, Dict, Any
1212
import requests
1313
from requests.exceptions import RequestException, HTTPError
14-
from cngn_manager.AESCrypto import AESCrypto
15-
from cngn_manager.Ed25519Crypto import Ed25519Crypto
14+
from .AESCrypto import AESCrypto
15+
from .Ed25519Crypto import Ed25519Crypto
16+
from .CryptoWallet import CryptoWallet
1617

1718

1819
"""
@@ -106,3 +107,13 @@ def create_virtual_account(self, data: dict) -> str:
106107

107108
def whitelist_address(self, data: dict) -> str:
108109
return self.__make_calls("POST", "/api/whiteListAddress", data)
110+
111+
def generate_wallet_address(self, network: str) -> str:
112+
response = CryptoWallet.generate_wallet_with_mnemonic_details(network)
113+
return {
114+
"success": True,
115+
"data": response
116+
}
117+
118+
def validate_address(self, address, network):
119+
return CryptoWallet.validate_address(address, network)

setup.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,9 @@
44

55
# Dictionary to hold package metadata
66
about = {}
7-
8-
# Set the current directory as the base directory
97
here = os.path.abspath(os.path.dirname(__file__))
10-
11-
# Read the __version__.py file to get version info
128
with open(os.path.join(here, "cngn_manager", "__version__.py"), "r", encoding="utf-8") as f:
139
exec(f.read(), about)
14-
15-
# Read the README.md file for long description
1610
with open(os.path.join(here, "README.md"), "r", encoding="utf-8") as f:
1711
readme = f.read()
1812

@@ -21,6 +15,11 @@
2115
"requests",
2216
"cryptography==43.0.1",
2317
"pynacl==1.5.0",
18+
"mnemonic==0.20",
19+
"bip32utils==0.3.post4",
20+
"hashlib",
21+
"tronpy",
22+
"stellar-sdk==11.1.0",
2423
]
2524

2625
# Define test dependencies
@@ -31,6 +30,7 @@
3130
"pytest-xdist",
3231
"PySocks>=1.5.6, !=1.5.7",
3332
"pytest>=3",
33+
3434
]
3535

3636
# Setup function to handle packaging

tests/main.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import unittest
2+
from unittest.mock import patch, MagicMock
3+
from cngn_manager import CNGnManager
4+
import json
5+
from cngn_manager.AESCrypto import AESCrypto
6+
from cngn_manager.Ed25519Crypto import Ed25519Crypto
7+
8+
from nacl.public import PrivateKey, PublicKey, Box
9+
from requests.exceptions import RequestException, HTTPError
10+
11+
class TestCNGnManager(unittest.TestCase):
12+
13+
def setUp(self):
14+
# Initialize CNGnManager with test data
15+
self.api_key = "test_api_key"
16+
self.private_key = "test_private_key"
17+
self.encryption_key = "test_encryption_key"
18+
self.manager = CNGnManager(self.api_key, self.private_key, self.encryption_key)
19+
20+
@patch('requests.Session')
21+
def test_init(self, mock_session):
22+
# Test the initialization of CNGnManager
23+
manager = CNGnManager(self.api_key, self.private_key, self.encryption_key)
24+
self.assertEqual(manager.api_key, self.api_key)
25+
self.assertEqual(manager.api_url, CNGnManager.API_URL)
26+
mock_session.assert_called_once()
27+
28+
@patch.object(AESCrypto, 'encrypt', return_value="encrypted_data")
29+
@patch.object(Ed25519Crypto, 'decrypt_with_private_key', return_value='{"decrypted":"data"}')
30+
@patch('requests.Session.request')
31+
def test_make_calls_success(self, mock_request, mock_decrypt, mock_encrypt):
32+
# Mock the response from the API
33+
mock_response = MagicMock()
34+
mock_response.json.return_value = {"data": "encrypted_response_data"}
35+
mock_request.return_value = mock_response
36+
37+
# Test a successful GET call
38+
result = self.manager.get_balance()
39+
40+
# Verify that encryption, request, and decryption were called
41+
mock_encrypt.assert_called_once_with(json.dumps(None), self.encryption_key)
42+
mock_request.assert_called_once_with('GET', f'{self.manager.api_url}/v1/api/balance', json="encrypted_data")
43+
mock_decrypt.assert_called_once_with(self.private_key, "encrypted_response_data")
44+
45+
# Verify the response is properly decrypted
46+
self.assertEqual(result, {"data": {"decrypted": "data"}})
47+
48+
@patch.object(AESCrypto, 'encrypt', return_value="encrypted_data")
49+
@patch.object(Ed25519Crypto, 'decrypt_with_private_key')
50+
@patch('requests.Session.request', side_effect=RequestException("API request failed"))
51+
def test_make_calls_request_error(self, mock_request, mock_decrypt, mock_encrypt):
52+
# Test handling of request exceptions
53+
result = self.manager.get_balance()
54+
55+
# Verify that the request error is handled properly
56+
self.assertEqual(result['success'], False)
57+
self.assertEqual(result['error'], 'API request failed')
58+
self.assertEqual(result['message'], 'API request failed')
59+
60+
@patch.object(AESCrypto, 'encrypt', return_value="encrypted_data")
61+
@patch('requests.Session.request', side_effect=HTTPError(response=MagicMock(status_code=500)))
62+
def test_make_calls_http_error(self, mock_request, mock_encrypt):
63+
# Test handling of HTTP errors
64+
result = self.manager.get_balance()
65+
66+
# Verify that HTTPError is handled properly
67+
self.assertEqual(result['success'], False)
68+
self.assertEqual(result['error'], 'API request failed')
69+
self.assertEqual(result['status_code'], 500)
70+
71+
@patch.object(AESCrypto, 'encrypt', side_effect=Exception("Unexpected error"))
72+
def test_make_calls_unexpected_error(self, mock_encrypt):
73+
# Test handling of unexpected errors
74+
result = self.manager.get_balance()
75+
76+
# Verify that unexpected errors are handled properly
77+
self.assertEqual(result['success'], False)
78+
self.assertEqual(result['error'], 'An unexpected error occurred')
79+
self.assertEqual(result['message'], 'Unexpected error')
80+
self.assertEqual(result['status_code'], 500)
81+
82+
@patch.object(AESCrypto, 'encrypt', return_value="encrypted_data")
83+
@patch.object(Ed25519Crypto, 'decrypt_with_private_key', return_value='{"decrypted":"data"}')
84+
@patch('requests.Session.request')
85+
def test_post_calls(self, mock_request, mock_decrypt, mock_encrypt):
86+
# Mock the response from the API
87+
mock_response = MagicMock()
88+
mock_response.json.return_value = {"data": "encrypted_response_data"}
89+
mock_request.return_value = mock_response
90+
91+
# Test a POST call
92+
data = {"test": "data"}
93+
result = self.manager.swap_between_chains(data)
94+
95+
# Verify that encryption, request, and decryption were called
96+
mock_encrypt.assert_called_once_with(json.dumps(data), self.encryption_key)
97+
mock_request.assert_called_once_with('POST', f'{self.manager.api_url}/v1/api/swap', json="encrypted_data")
98+
mock_decrypt.assert_called_once_with(self.private_key, "encrypted_response_data")
99+
100+
# Verify the response is properly decrypted
101+
self.assertEqual(result, {"data": {"decrypted": "data"}})
102+
103+
def test_prepare_request_data(self):
104+
aes_crypto = AESCrypto()
105+
106+
# Test with data
107+
data = {"test": "data"}
108+
encrypted_data = self.manager._prepare_request_data(data, aes_crypto)
109+
self.assertIsNotNone(encrypted_data)
110+
111+
# Test with no data
112+
encrypted_data = self.manager._prepare_request_data(None, aes_crypto)
113+
self.assertIsNone(encrypted_data)
114+
115+
def test_handle_request_error(self):
116+
error = RequestException("Request failed")
117+
result = self.manager._handle_request_error(error)
118+
self.assertEqual(result['success'], False)
119+
self.assertEqual(result['error'], 'API request failed')
120+
self.assertEqual(result['message'], 'Request failed')
121+
122+
def test_handle_unexpected_error(self):
123+
error = Exception("Something went wrong")
124+
result = self.manager._handle_unexpected_error(error)
125+
self.assertEqual(result['success'], False)
126+
self.assertEqual(result['error'], 'An unexpected error occurred')
127+
self.assertEqual(result['message'], 'Something went wrong')
128+
self.assertEqual(result['status_code'], 500)
129+
130+
if __name__ == '__main__':
131+
unittest.main()

0 commit comments

Comments
 (0)