From 80edaabae2ba164eeb2755275f9ded5fda004a7b Mon Sep 17 00:00:00 2001 From: unniznd Date: Sat, 26 Jul 2025 00:15:35 +0530 Subject: [PATCH 1/2] Implement encryption and shard management functionalities - Added save_shards.py to handle saving key shards to multiple nodes with CID validation and error handling. - Introduced share_to_address.py for sharing access to addresses with CID validation. - Created shared_key.py for generating key shards using Shamir's secret sharing scheme. - Developed transfer_ownership.py to manage ownership transfers of encrypted data with CID validation. - Implemented utility functions in utils.py for CID validation, object comparison, and API request handling. - Added upload_encrypted.py for uploading encrypted files with key generation and shard saving. - Created comprehensive unit tests for encryption, shard management, and ownership transfer functionalities. - Ensured robust error handling and validation across all new functionalities. --- requirements-dev.txt | 3 +- requirements.txt | 3 +- src/lighthouseweb3/__init__.py | 198 +++++++++++++++++- src/lighthouseweb3/functions/config.py | 3 + .../functions/encryption/generate.py | 43 ++++ .../encryption/get_access_condition.py | 9 + .../functions/encryption/get_auth_message.py | 10 + .../functions/encryption/get_jwt.py | 17 ++ .../functions/encryption/recover_key.py | 29 +++ .../functions/encryption/revoke_access.py | 52 +++++ .../functions/encryption/save_shards.py | 88 ++++++++ .../functions/encryption/share_to_address.py | 54 +++++ .../functions/encryption/shared_key.py | 50 +++++ .../encryption/transfer_ownership.py | 53 +++++ .../functions/encryption/utils.py | 69 ++++++ .../functions/upload_encrypted.py | 176 ++++++++++++++++ tests/test_encryption/test_generate.py | 19 ++ .../test_get_access_condition.py | 18 ++ .../test_encryption/test_get_auth_message.py | 17 ++ tests/test_encryption/test_get_jwt.py | 44 ++++ tests/test_encryption/test_recover_key.py | 20 ++ tests/test_encryption/test_revoke_access.py | 83 ++++++++ tests/test_encryption/test_save_shards.py | 114 ++++++++++ .../test_encryption/test_share_to_address.py | 59 ++++++ tests/test_encryption/test_shared_key.py | 63 ++++++ .../test_transfer_ownership.py | 58 +++++ tests/test_upload_encrypted.py | 71 +++++++ 27 files changed, 1419 insertions(+), 4 deletions(-) create mode 100644 src/lighthouseweb3/functions/encryption/generate.py create mode 100644 src/lighthouseweb3/functions/encryption/get_access_condition.py create mode 100644 src/lighthouseweb3/functions/encryption/get_auth_message.py create mode 100644 src/lighthouseweb3/functions/encryption/get_jwt.py create mode 100644 src/lighthouseweb3/functions/encryption/recover_key.py create mode 100644 src/lighthouseweb3/functions/encryption/revoke_access.py create mode 100644 src/lighthouseweb3/functions/encryption/save_shards.py create mode 100644 src/lighthouseweb3/functions/encryption/share_to_address.py create mode 100644 src/lighthouseweb3/functions/encryption/shared_key.py create mode 100644 src/lighthouseweb3/functions/encryption/transfer_ownership.py create mode 100644 src/lighthouseweb3/functions/encryption/utils.py create mode 100644 src/lighthouseweb3/functions/upload_encrypted.py create mode 100644 tests/test_encryption/test_generate.py create mode 100644 tests/test_encryption/test_get_access_condition.py create mode 100644 tests/test_encryption/test_get_auth_message.py create mode 100644 tests/test_encryption/test_get_jwt.py create mode 100644 tests/test_encryption/test_recover_key.py create mode 100644 tests/test_encryption/test_revoke_access.py create mode 100644 tests/test_encryption/test_save_shards.py create mode 100644 tests/test_encryption/test_share_to_address.py create mode 100644 tests/test_encryption/test_shared_key.py create mode 100644 tests/test_encryption/test_transfer_ownership.py create mode 100644 tests/test_upload_encrypted.py diff --git a/requirements-dev.txt b/requirements-dev.txt index 9686d58..cd466d6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,4 +5,5 @@ idna==3.4 requests==2.31.0 urllib3==2.0.2 web3 -eth-accounts \ No newline at end of file +eth-accounts +py_ecc \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 13c4766..060fbfa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ charset-normalizer==3.1.0 idna==3.4 requests==2.31.0 urllib3==2.0.2 -eth-account==0.13.7 \ No newline at end of file +eth-account==0.13.7 +py_ecc \ No newline at end of file diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index b1d8d7c..31c2eae 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -14,10 +14,24 @@ ipns_publish_record as ipnsPublishRecord, get_ipns_record as getIpnsRecord, remove_ipns_record as removeIpnsRecord, - create_wallet as createWallet + create_wallet as createWallet, + upload_encrypted as uploadEncrypted ) - +from typing import Any, Dict, List + +from .functions.encryption import ( + get_jwt as getJwt, + revoke_access as revokeAccess, + generate as generateKey, + transfer_ownership as transferOwnership, + share_to_address as shareToAddress, + save_shards as saveShards, + get_access_condition as getAccessCondition, + get_auth_message as getAuthMessage, + recover_key as recoveryKey, + shared_key as sharedKey +) class Lighthouse: def __init__(self, token: str = ""): self.token = token or os.environ.get("LIGHTHOUSE_TOKEN", "") @@ -37,6 +51,23 @@ def upload(self, source: str, tag: str = ''): return d.upload(source, self.token, tag) except Exception as e: raise e + + def uploadEncrypted(self, source_path: str, public_key: str, auth_token: str, cid_version: int = 1) -> Dict[str, List[Dict]]: + """ + Upload a file or directory to the Lighthouse. + + :param source_path: str, path to file or directory + :param public_key: str, public key of the user + :param auth_token: str, auth token of the user + :param cid_version: int, cid version of the user + :return: t.Upload, the upload result + """ + + try: + return uploadEncrypted.upload_file(source_path, self.token, public_key, auth_token, cid_version) + except Exception as e: + raise e + def uploadBlob(self, source: io.BufferedReader, filename: str, tag: str = ''): """ @@ -224,3 +255,166 @@ def getTagged(self, tag: str): except Exception as e: raise e +class Kavach: + @staticmethod + def sharedKey(key: str, threshold: int = 3, key_count: int = 5) -> Dict[str, Any]: + """ + Splits a secret key into shards using Shamir's Secret Sharing on BLS12-381 curve. + + :param key: Hex string of the master secret key + :param threshold: Minimum number of shards required to reconstruct the key + :param key_count: Total number of shards to generate + + :return: Dict containing isShardable flag and list of key shards with their indices + """ + + try: + return sharedKey.shard_key(key, threshold, key_count) + except Exception as e: + raise e + + @staticmethod + def recoverKey(shards: List[Dict[str, str]]) -> Dict[str, str]: + """ + Recovers the master key from a list of key shards using Lagrange interpolation. + + :param shards: List of dictionaries containing 'key' and 'index' as hex strings + :return: Dictionary containing the recovered master key + """ + try: + return recoveryKey.recover_key(shards) + except Exception as e: + raise e + + @staticmethod + def getAuthMessage(address: str) -> dict[str, Any]: + """ + Get Authentication message from the server + + :param address: str, The public key of the user + :return: dict, A dict with authentication message or error + """ + try: + return getAuthMessage.get_auth_message(address) + except Exception as e: + raise e + + @staticmethod + def getAccessCondition(cid: str): + """ + Get Access Condition for cid from the node + + :param cid: str, Content Identifier for the data to be downloaded + :return: conditions dict of access conditions + """ + + try: + return getAccessCondition.get_access_condition(cid) + except Exception as e: + raise e + + @staticmethod + def saveShards( + address: str, + cid: str, + auth_token: str, + key_shards: List[dict], + share_to: List[str] = None + ) -> Dict[str, Any]: + """ + Save key shards to multiple nodes. + + :param address: str, The Ethereum address of the user. + :param cid: str, The Content Identifier (CID) of the file for which key shards are being saved. + :param auth_token: str, The authentication token obtained by signing a message. + :param key_shards: List[KeyShard], A list of KeyShard objects, each containing a key and its index. + :param share_to: List[str], optional, A list of Ethereum addresses to which the key shards should be shared. Defaults to None. + :return: Dict[str, Any], A dictionary indicating the success or failure of the operation, along with any error messages. + """ + + try: + return saveShards.save_shards(address, cid, auth_token, key_shards, share_to) + except Exception as e: + raise e + + @staticmethod + def shareToAddress(address: str, cid: str, auth_token: Dict[str, Any], share_to: List[str]) -> Dict[str, Any]: + """ + Share an encrypted file with a list of addresses. + + :param address: str, The public address of the file owner. + :param cid: str, The CID of the file to share. + :param auth_token: Dict[str, Any], The authentication token. + :param share_to: List[str], A list of public addresses to share the file with. + :return: Dict[str, Any], A dictionary indicating the result of the share operation. + """ + try: + return shareToAddress.share_to_address(address, cid, auth_token, share_to) + except Exception as e: + raise e + + @staticmethod + def transferOwnership(address: str, cid: str, new_owner: str, auth_token: str, reset_shared_to: bool = True) -> dict[str, Any]: + """ + Transfer ownership of a file from the current owner to a new owner. + + :param address: str, The address of the current owner. + :param cid: str, The Content Identifier (CID) of the file to transfer. + :param new_owner: str, The address of the new owner. + :param auth_token: str, The authentication token for the current owner. + :param reset_shared_to: bool, Whether to reset the list of users the file is shared with (default: True). + :return: dict, A dictionary indicating the success or failure of the operation. + """ + try: + return transferOwnership.transfer_ownership(address, cid, new_owner, auth_token, reset_shared_to) + except Exception as e: + raise e + + @staticmethod + def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, any]: + """ + Generates a set of master secret keys and corresponding key shards using BLS (Boneh-Lynn-Shacham) + threshold cryptography. + + :param threshold: int, The minimum number of key shards required to reconstruct the master key. + :param key_count: int, The total number of key shards to generate. + :return: Dict[str, any], A dictionary containing the master key and a list of key shards. + """ + + try: + return generateKey.generate(threshold, key_count) + except Exception as e: + raise e + + @staticmethod + def revokeAccess(address: str, cid: str, auth_token: str, revoke_to: List[str]) -> Dict: + """ + Revokes access to a shared file for specified recipients. + + :param address: str, The address of the user initiating the revocation. + :param cid: str, The CID of the file for which access is being revoked. + :param auth_token: str, The authentication token of the user. + :param revoke_to: List[str], A list of addresses for whom access is to be revoked. + :return: Dict, A dictionary indicating the success or failure of the revocation. + """ + try: + return revokeAccess.revoke_access(address, cid, auth_token, revoke_to) + except Exception as e: + raise e + + @staticmethod + def getJWT(address: str, payload: str, use_as_refresh_token: bool = False, chain: str = "ALL") -> Dict: + """ + Retrieves a JSON Web Token (JWT) for authentication. + + :param address: str, The blockchain address of the user. + :param payload: str, The signed message or refresh token. + :param use_as_refresh_token: bool, If True, payload is treated as a refresh token. + :param chain: str, The blockchain chain (e.g., "ALL", "ETHEREUM"). + :return: Dict, A dictionary containing the JWT and refresh token, or an error. + """ + + try: + return getJwt.get_jwt(address, payload, use_as_refresh_token, chain) + except Exception as e: + raise e \ No newline at end of file diff --git a/src/lighthouseweb3/functions/config.py b/src/lighthouseweb3/functions/config.py index 000c5ef..0458c72 100644 --- a/src/lighthouseweb3/functions/config.py +++ b/src/lighthouseweb3/functions/config.py @@ -9,3 +9,6 @@ class Config: lighthouse_node = "https://node.lighthouse.storage" lighthouse_bls_node = "https://encryption.lighthouse.storage" lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs" + + is_dev = False + lighthouse_bls_node_dev = "http://enctest.lighthouse.storage" diff --git a/src/lighthouseweb3/functions/encryption/generate.py b/src/lighthouseweb3/functions/encryption/generate.py new file mode 100644 index 0000000..2a8006a --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/generate.py @@ -0,0 +1,43 @@ +from py_ecc.bls import G2ProofOfPossession as BLS +from py_ecc.optimized_bls12_381 import curve_order +import random +from typing import Dict, List + +def int_to_bytes(x: int) -> bytes: + return x.to_bytes(32, byteorder="big") + +def eval_poly(poly: List[int], x: int) -> int: + """Evaluate polynomial at a given point x.""" + result = 0 + for i, coeff in enumerate(poly): + result = (result + coeff * pow(x, i, curve_order)) % curve_order + return result + +def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, any]: + if threshold > key_count: + raise ValueError("threshold must be less than or equal to key_count") + + # Generate random polynomial coefficients (secret is constant term) + poly = [random.randint(1, curve_order - 1) for _ in range(threshold)] + master_sk = poly[0] # constant term is master key + + shares = [] + # Generate random vector IDs, ensuring uniqueness + id_vec = set() + while len(id_vec) < key_count: + id_vec.add(random.randint(1, curve_order - 1)) + id_vec = list(id_vec) + + for x in id_vec: + y = eval_poly(poly, x) + # Convert index to hex string without '0x' prefix to match JS output + index_hex = hex(x)[2:].zfill(64) # Ensure 64-character hex string + shares.append({ + "index": index_hex, + "key": int_to_bytes(y).hex() + }) + + return { + "masterKey": int_to_bytes(master_sk).hex(), + "keyShards": shares + } \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/get_access_condition.py b/src/lighthouseweb3/functions/encryption/get_access_condition.py new file mode 100644 index 0000000..bafd210 --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/get_access_condition.py @@ -0,0 +1,9 @@ +from typing import Any +from .utils import api_node_handler + +def get_access_condition(cid: str) -> dict[str, Any]: + try: + conditions = api_node_handler(f"/api/fileAccessConditions/get/{cid}", "GET") + return {'data': conditions} + except Exception as error: + raise error \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/get_auth_message.py b/src/lighthouseweb3/functions/encryption/get_auth_message.py new file mode 100644 index 0000000..f2a500a --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/get_auth_message.py @@ -0,0 +1,10 @@ +from typing import Any +from .utils import api_node_handler + + +def get_auth_message(address: str) -> dict[str, Any]: + try: + response = api_node_handler(f"/api/message/{address}", "GET") + return {'message': response[0]['message'], 'error': None} + except Exception as e: + return {'message': None, 'error':str(e)} \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/get_jwt.py b/src/lighthouseweb3/functions/encryption/get_jwt.py new file mode 100644 index 0000000..b5724bb --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/get_jwt.py @@ -0,0 +1,17 @@ +from .utils import api_node_handler + +def get_jwt(address: str, payload: str, use_as_refresh_token: bool = False, chain: str = "ALL") -> dict: + try: + if not use_as_refresh_token: + data = api_node_handler( + "/api/message/get-jwt", "POST", "", + {"address": address, "signature": payload, "chain": chain} + ) + else: + data = api_node_handler( + "/api/message/get-jwt", "PUT", "", + {"address": address, "refreshToken": payload} + ) + return {"JWT": data["token"], "refreshToken": data["refreshToken"], "error": None} + except Exception as e: + return {"JWT": None, "error": "Invalid Signature"} \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/recover_key.py b/src/lighthouseweb3/functions/encryption/recover_key.py new file mode 100644 index 0000000..024494f --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/recover_key.py @@ -0,0 +1,29 @@ +from typing import List, Dict +from py_ecc.optimized_bls12_381.optimized_curve import curve_order + +def recover_key(shards: List[Dict[str, str]]) -> Dict[str, str]: + + try: + # Convert hex strings to integers + x_coords = [int(shard['index'], 16) for shard in shards] + y_coords = [int(shard['key'], 16) for shard in shards] + + # Lagrange interpolation to recover the constant term (secret) + def lagrange_interpolate(x: int, x_s: List[int], y_s: List[int], p: int) -> int: + total = 0 + for i in range(len(x_s)): + numerator = denominator = 1 + for j in range(len(x_s)): + if i != j: + numerator = (numerator * (x - x_s[j])) % p + denominator = (denominator * (x_s[i] - x_s[j])) % p + l = (y_s[i] * numerator * pow(denominator, -1, p)) % p + total = (total + l) % p + return total + + # Recover the master key at x=0 + master_key = lagrange_interpolate(0, x_coords, y_coords, curve_order) + + return {"masterKey": hex(master_key)[2:].zfill(64), "error": None} + except Exception as e: + return {"masterKey": None, "error":e} \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/revoke_access.py b/src/lighthouseweb3/functions/encryption/revoke_access.py new file mode 100644 index 0000000..a9a36dc --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/revoke_access.py @@ -0,0 +1,52 @@ +import json +import time +from typing import List, Dict +from .utils import api_node_handler, is_cid_reg, is_equal + +def revoke_access(address: str, cid: str, auth_token: str, revoke_to: List[str]) -> Dict: + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + try: + node_id = [1, 2, 3, 4, 5] + node_url = [f"/api/setSharedKey/{elem}" for elem in node_id] + + def request_data(url: str) -> Dict: + try: + response = api_node_handler(url, "DELETE", auth_token, { + "address": address, + "cid": cid, + "revokeTo": revoke_to + }) + return response + except Exception as error: + return {"error": error} + + data = [] + for url in node_url: + response = request_data(url) + if "error" in response: + try: + error_message = json.loads(response["error"].message) + except: + error_message = str(response["error"]) + return { + "isSuccess": False, + "error": error_message + } + time.sleep(1) + data.append(response) + + temp = [{**elem, "data": None} for elem in data] + return { + "isSuccess": is_equal(*temp), + "error": None + } + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/save_shards.py b/src/lighthouseweb3/functions/encryption/save_shards.py new file mode 100644 index 0000000..0ee5c31 --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/save_shards.py @@ -0,0 +1,88 @@ +import json +import time +from typing import Any, Dict, List + +from .utils import ( + api_node_handler, + is_cid_reg, is_equal +) + +def save_shards( + address: str, + cid: str, + auth_token: str, + key_shards: List[dict], + share_to: List[str] = None +) -> Dict[str, Any]: + """ + Save key shards to multiple nodes. + """ + if share_to is None: + share_to = [] + + # Validate CID + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + # Validate key_shards + if not isinstance(key_shards, list) or len(key_shards) != 5: + return { + "isSuccess": False, + "error": "keyShards must be an array of 5 objects" + } + + try: + node_ids = [1, 2, 3, 4, 5] + node_urls = [f"/api/setSharedKey/{node_id}" for node_id in node_ids] + + def request_data(url: str, index: int) -> Dict[str, Any]: + try: + body = { + "address": address, + "cid": cid, + "payload": key_shards[index] + } + if share_to: + body["sharedTo"] = share_to + + response = api_node_handler( + url, + "POST", + auth_token, + body + ) + return response + except Exception as error: + return {"error": str(error)} + + data = [] + for index, url in enumerate(node_urls): + response = request_data(url, index) + if "error" in response: + try: + error_message = json.loads(response["error"]) + except json.JSONDecodeError: + error_message = response["error"] + return { + "isSuccess": False, + "error": error_message + } + data.append(response) + time.sleep(1) + + temp = [{"data": None, **{k: v for k, v in elem.items() if k != "data"}} for elem in data] + is_success = is_equal(*temp) and data[0].get("message") == "success" + + return { + "isSuccess": is_success, + "error": None + } + + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/share_to_address.py b/src/lighthouseweb3/functions/encryption/share_to_address.py new file mode 100644 index 0000000..cd1f335 --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/share_to_address.py @@ -0,0 +1,54 @@ +import json +import time + +from typing import List, Dict, Any +from .utils import is_equal, is_cid_reg, api_node_handler + +def share_to_address(address: str, cid: str, auth_token: Dict[str, Any], share_to: List[str]) -> Dict[str, Any]: + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + try: + node_id = [1, 2, 3, 4, 5] + node_url = [f"/api/setSharedKey/{elem}" for elem in node_id] + + def request_data(url: str) -> Dict[str, Any]: + try: + response = api_node_handler(url, "PUT", auth_token, { + "address": address, + "cid": cid, + "shareTo": share_to + }) + return response + except Exception as error: + return {"error": str(error)} + + data = [] + for url in node_url: + response = request_data(url) + if "error" in response: + try: + error_message = json.loads(response.get("error", {})) + except json.JSONDecodeError: + error_message = response.get("error") + return { + "isSuccess": False, + "error": error_message + } + time.sleep(1) + data.append(response) + + temp = [{**elem, "data": None} for elem in data] + return { + "isSuccess": is_equal(*temp) and temp[0].get("message") == "success", + "error": None + } + + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/shared_key.py b/src/lighthouseweb3/functions/encryption/shared_key.py new file mode 100644 index 0000000..5934b1d --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/shared_key.py @@ -0,0 +1,50 @@ +from py_ecc.bls import G2ProofOfPossession as bls +from py_ecc.optimized_bls12_381.optimized_curve import curve_order +from secrets import randbits +from typing import List, Tuple, Dict, Any + +def shard_key(key: str, threshold: int = 3, key_count: int = 5) -> Dict[str, Any]: + + try: + # Initialize master secret key list + msk = [] + id_vec = [] + sec_vec = [] + + # Convert input hex key to integer + master_key = int(key, 16) + msk.append(master_key) + + # Generate additional random coefficients for polynomial + for _ in range(threshold - 1): + # Generate random number for polynomial coefficient + sk = randbits(256) # Using 256 bits for randomness + msk.append(sk) + + # Perform key sharing + for i in range(key_count): + # Create random ID (x-coordinate for polynomial evaluation) + id_val = randbits(256) + id_vec.append(id_val) + + # Evaluate polynomial at id_val to create shard + # Using Shamir's secret sharing polynomial evaluation + sk = 0 + for j, coef in enumerate(msk): + sk += coef * pow(id_val, j, curve_order) + sk %= curve_order + sec_vec.append(sk) + + # Convert to hex format for output + return { + "isShardable": True, + "keyShards": [ + { + "key": hex(sk)[2:].zfill(64), # Remove '0x' and pad to 64 chars + "index": hex(id_vec[i])[2:].zfill(64) + } + for i, sk in enumerate(sec_vec) + ] + } + except Exception as e: + return {"isShardable": False, "keyShards": []} \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/transfer_ownership.py b/src/lighthouseweb3/functions/encryption/transfer_ownership.py new file mode 100644 index 0000000..3ab10d0 --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/transfer_ownership.py @@ -0,0 +1,53 @@ +import json +import time +from typing import Any +from .utils import api_node_handler, is_cid_reg, is_equal + +def transfer_ownership(address: str, cid: str, new_owner: str, auth_token: str, reset_shared_to: bool = True) -> dict[str, Any]: + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + try: + node_index_selected = [1, 2, 3, 4, 5] + node_urls = [f"/api/transferOwnership/{elem}" for elem in node_index_selected] + + def request_data(url: str) -> dict: + try: + response = api_node_handler(url, "POST", auth_token, { + "address": address, + "cid": cid, + "newOwner": new_owner, + "resetSharedTo": reset_shared_to + }) + return response + except Exception as error: + return {"error": str(error)} + + data = [] + for url in node_urls: + response = request_data(url) + if "error" in response: + try: + error_message = json.loads(response.get("error", "{}")) + except json.JSONDecodeError: + error_message = response.get("error") + return { + "isSuccess": False, + "error": error_message + } + time.sleep(1) # Delay between requests + data.append(response) + + return { + "isSuccess": is_equal(*data) and data[0].get("message") == "success", + "error": None + } + + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryption/utils.py b/src/lighthouseweb3/functions/encryption/utils.py new file mode 100644 index 0000000..d5b859a --- /dev/null +++ b/src/lighthouseweb3/functions/encryption/utils.py @@ -0,0 +1,69 @@ +import re +import json +import time +import requests +from typing import Dict, Any +from dataclasses import dataclass +from src.lighthouseweb3.functions.config import Config + + +def is_cid_reg(cid: str) -> bool: + + pattern = r'Qm[1-9A-HJ-NP-Za-km-z]{44}|b[A-Za-z2-7]{58}|B[A-Z2-7]{58}|z[1-9A-HJ-NP-Za-km-z]{48}|F[0-9A-F]{50}' + return bool(re.match(pattern, cid)) + +def is_equal(*objects: Any) -> bool: + + if not objects: + return True + first = json.dumps(objects[0], sort_keys=True) + return all(json.dumps(obj, sort_keys=True) == first for obj in objects) + +def api_node_handler( + endpoint: str, + verb: str, + auth_token: str = "", + body: Any = None, + retry_count: int = 3 +) -> Dict[str, Any]: + + url = f"{Config.is_dev and Config.lighthouse_bls_node_dev or Config.lighthouse_bls_node}{endpoint}" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {auth_token}" if auth_token else "" + } + + for attempt in range(retry_count): + try: + if verb in ["POST", "PUT", "DELETE"] and body is not None: + response = requests.request( + method=verb, + url=url, + headers=headers, + json=body + ) + else: + response = requests.request( + method=verb, + url=url, + headers=headers + ) + + if not response.ok: + if response.status_code == 404: + raise Exception(json.dumps({ + "message": "fetch Error", + "statusCode": response.status_code + })) + error_body = response.json() + raise Exception(json.dumps({ + **error_body, + "statusCode": response.status_code + })) + return response.json() + except Exception as error: + if "fetch" not in str(error): + raise + if attempt == retry_count - 1: + raise + time.sleep(1) \ No newline at end of file diff --git a/src/lighthouseweb3/functions/upload_encrypted.py b/src/lighthouseweb3/functions/upload_encrypted.py new file mode 100644 index 0000000..06c03ff --- /dev/null +++ b/src/lighthouseweb3/functions/upload_encrypted.py @@ -0,0 +1,176 @@ +import os +import requests +from pathlib import Path +from typing import List, Dict +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.backends import default_backend +import secrets +from .config import Config +from .encryption import generate as generateKey, save_shards as saveShards + + + +def fetch_with_timeout( + endpoint_url: str, + method: str = 'GET', + files: Dict = None, + timeout: int = 8000, + headers: Dict = None +) -> requests.Response: + try: + # Convert timeout to seconds (from milliseconds) + timeout_seconds = timeout / 1000 + + # Make HTTP request + response = requests.request( + method=method, + url=endpoint_url, + headers=headers, + files=files, + timeout=timeout_seconds + ) + return response + except requests.Timeout: + raise Exception('Request timed out') + except requests.RequestException as e: + raise Exception(f'Network error: {str(e)}') + +def walk(dir: str) -> List[str]: + results = [] + for root, _, files in os.walk(dir): + for file in files: + file_path = os.path.join(root, file) + results.append(file_path) + return results + +def encrypt_file(file_data: bytes, password: str) -> bytes: + try: + # Convert password to bytes + password_bytes = password.encode('utf-8') + + # Generate random salt and IV + salt = secrets.token_bytes(16) + iv = secrets.token_bytes(12) + + # Derive key using PBKDF2 + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, # 256-bit key for AES-GCM + salt=salt, + iterations=250000, + backend=default_backend() + ) + aes_key = kdf.derive(password_bytes) + + # Encrypt the file data using AES-GCM + aesgcm = AESGCM(aes_key) + cipher_bytes = aesgcm.encrypt(iv, file_data, None) + + # Combine salt, IV, and cipher bytes + result_bytes = salt + iv + cipher_bytes + + return result_bytes + except Exception as e: + raise e + +def upload_file( + source_path: str, + api_key: str, + public_key: str, + auth_token: str, + cid_version: int = 1, +) -> Dict[str, List[Dict]]: + token = f"Bearer {api_key}" + endpoint = f"{Config.lighthouse_node}/api/v0/add?wrap-with-directory=false&cid-version={cid_version}" + + if os.path.isfile(source_path): + try: + # Generate encryption key and shards + key = generateKey.generate() + + # Read and encrypt file + with open(source_path, 'rb') as f: + file_data = f.read() + encrypted_data = encrypt_file(file_data, key['masterKey']) + + # Prepare file for upload + filename = os.path.basename(source_path) + files = {'file': (filename, encrypted_data)} + + # Make HTTP request + response = fetch_with_timeout( + endpoint, + method='POST', + files=files, + timeout=7200000, + headers={ + 'Encryption': 'true', + 'Authorization': token + } + ) + + if response.status_code != 200: + res = response.json() + raise Exception(res.get('error', 'Unknown error')) + + response_data = response.json() + + # Save encryption key shards + error = saveShards.save_shards(public_key, response_data[0]['Hash'], auth_token, key['keyShards']) + if error["error"]: + raise Exception('Error encrypting file') + + return {'data': response_data} + except Exception as e: + raise Exception(str(e)) + else: + files_list = walk(source_path) + if len(files_list) > 1 and auth_token.startswith('0x'): + raise Exception('auth_token must be a JWT') + + key_map = {} + files = {} + + for file_path in files_list: + key = generateKey.generate() + with open(file_path, 'rb') as f: + file_data = f.read() + encrypted_data = encrypt_file(file_data, key['masterKey']) + filename = str(Path(file_path).relative_to(source_path)).replace(os.sep, '-') + files[filename] = (filename, encrypted_data) + key_map[filename] = key['keyShards'] + + + # Make HTTP request with multiple files + response = fetch_with_timeout( + endpoint, + method='POST', + files=files, + timeout=7200000, + headers={ + 'Encryption': 'true', + 'Authorization': token + } + ) + + if response.status_code != 200: + res = response.json() + raise Exception(res.get('error', 'Unknown error')) + + response_text = response.text + import re + match = re.search(r'\[.*\]$', response_text, re.DOTALL) + if not match: + raise Exception('No JSON array found in response') + + json_data = response.json() + + # Save key shards for each file + for data in json_data: + result = saveShards.save_shards(public_key, data['Hash'], auth_token, key_map[data['Name']]) + if not result.get('isSuccess', False): + raise Exception(str(result)) + + return {'data': json_data} diff --git a/tests/test_encryption/test_generate.py b/tests/test_encryption/test_generate.py new file mode 100644 index 0000000..085bc02 --- /dev/null +++ b/tests/test_encryption/test_generate.py @@ -0,0 +1,19 @@ + +import unittest +from src.lighthouseweb3 import Kavach +from unittest.mock import patch + +class TestGenerateKey(unittest.TestCase): + def test_generate_key_success(self): + result = Kavach.generate() + self.assertIsInstance(result["masterKey"], str, "masterKey should be a string") + self.assertEqual(len(result["keyShards"]), 5, "keyShards should have length 5") + + def test_threshold_greater_than_key_count(self): + with self.assertRaises(ValueError) as context: + Kavach.generate(threshold=6, key_count=5) + self.assertEqual( + str(context.exception), + "threshold must be less than or equal to key_count", + "Expected ValueError for threshold > key_count" + ) \ No newline at end of file diff --git a/tests/test_encryption/test_get_access_condition.py b/tests/test_encryption/test_get_access_condition.py new file mode 100644 index 0000000..b01339d --- /dev/null +++ b/tests/test_encryption/test_get_access_condition.py @@ -0,0 +1,18 @@ +import unittest +from src.lighthouseweb3 import Kavach + + +class TestGetAccessCondition(unittest.TestCase): + def test_access_condition(self): + condition = Kavach.getAccessCondition("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH") + self.assertIsInstance(condition, dict, "condition is a dict") + self.assertIsInstance(condition.get('data'), dict, "data is a dict") + self.assertIn("([2] and [1])", str(condition.get('data'))) + self.assertEqual("0xf0bc72fa04aea04d04b1fa80b359adb566e1c8b1", condition.get('data').get('owner')) + + def test_access_condition_invalid_cid(self): + condition = Kavach.getAccessCondition("invalid_cid") + self.assertIsInstance(condition, dict, "condition is a dict") + self.assertIsInstance(condition.get('data'), dict, "data is a dict") + self.assertEqual(condition.get('data').get('conditions'), [], "conditions is a list") + diff --git a/tests/test_encryption/test_get_auth_message.py b/tests/test_encryption/test_get_auth_message.py new file mode 100644 index 0000000..33e4e4c --- /dev/null +++ b/tests/test_encryption/test_get_auth_message.py @@ -0,0 +1,17 @@ +import unittest +import os +from src.lighthouseweb3 import Kavach + +class TestGetAuthMessage(unittest.TestCase): + + def test_get_auth_message(self): + auth_message = Kavach.getAuthMessage(address=os.environ.get("PUBLIC_KEY")) + self.assertIn("Please sign this message to prove you are owner of this account", auth_message['message'], "Owner response should come") + self.assertEqual(None, auth_message['error']) + + def test_get_auth_message_invalid_address(self): + + auth_message = Kavach.getAuthMessage(address="0x9a40b8EE3B8Fe7eB621cd142a651560Fa7") + self.assertEqual(None, auth_message['message']) + self.assertNotEqual(None, auth_message['error']) + self.assertIn("invalid address", str(auth_message["error"]).lower()) \ No newline at end of file diff --git a/tests/test_encryption/test_get_jwt.py b/tests/test_encryption/test_get_jwt.py new file mode 100644 index 0000000..eb77ae8 --- /dev/null +++ b/tests/test_encryption/test_get_jwt.py @@ -0,0 +1,44 @@ +import unittest +import os +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach + + +class TestKavachGetJWT(unittest.TestCase): + def test_get_jwt(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key='0x8218aa5dbf4dbec243142286b93e26af521b3e91219583595a06a7765abc9c8b' + ).signature.hex() + + jwt = Kavach.getJWT( + address=public_key, + payload=f"0x{auth_token}" + ) + + self.assertIsNotNone(jwt["JWT"]) + self.assertIsNotNone(jwt["refreshToken"]) + + def test_get_jwt_invalid_signature(self): + public_key = os.environ.get("PUBLIC_KEY") + + jwt = Kavach.getJWT( + address=public_key, + payload="invalid_signature" + ) + + self.assertIsNone(jwt["JWT"]) + self.assertIsNotNone(jwt["error"]) + + + \ No newline at end of file diff --git a/tests/test_encryption/test_recover_key.py b/tests/test_encryption/test_recover_key.py new file mode 100644 index 0000000..c64caae --- /dev/null +++ b/tests/test_encryption/test_recover_key.py @@ -0,0 +1,20 @@ +import unittest +from src.lighthouseweb3 import Kavach + + +class TestKavachRecoverKey(unittest.TestCase): + def test_recover_key(self): + """test recover key with valid shards""" + known_key = "0b2ccf87909bd1215858e5c0ec99359cadfcf918a4fac53e3e88e9ec28bec678" + result = [{'key': '20e2fdc23015c6c78272174be8639b49275e0dbf0479dde60950832ef18bb661', 'index': '9d956693217a5443ba1823151f0924d5a59aaa009cef8cd0f97d3e1a6d279140'}, {'key': '508e1ddc5c06b7e2053d600c4c5da6627778b371784ef712cb8c96f7deae9ce4', 'index': '61a82e98b2c19d13f7edeb1c79c36c40378a5ec2df2e818c6cbd0ead5e548178'}, {'key': '0aca075033a6d257cbac7564c01637b111268f8d42974e5bd765004cc840d359', 'index': '8339e90fc50c9b6b4d650bd03187f4cb4e42e616d1acb6c77dae037764997adb'}, {'key': '4c8a56b19e66a4d533946fe3bb4929556258869f939769514c0abdf8d22d1fa2', 'index': '283985072fcb14bcaf20f5c393d4af5f49a6d2cae472fb28038507f174f7d22d'}, {'key': '2ab718fe611b09eae351a53969364df1bd754f345de87839d22bbcae6b8d091f', 'index': '107dd8993e24875227e024e3352ab7333a766b93cd646f51bc68a2ebc682c64a'}] + + recovered = Kavach.recoverKey(result) + self.assertEqual(recovered["masterKey"], known_key, "Recovered key should match original") + + def test_recover_key_invalid(self): + """test recover key with invalid shards""" + known_key = "1b2ccf87909bd1215858e5c0ec99359cadfcf918a4fac53e3e88e9ec28bec678" + result = [{'key': '20e2fdc23015c6c78272174be8639b49275e0dbf0479dde60950832ef18bb661', 'index': '9d956693217a5443ba1823151f0924d5a59aaa009cef8cd0f97d3e1a6d279140'}, {'key': '508e1ddc5c06b7e2053d600c4c5da6627778b371784ef712cb8c96f7deae9ce4', 'index': '61a82e98b2c19d13f7edeb1c79c36c40378a5ec2df2e818c6cbd0ead5e548178'}, {'key': '0aca075033a6d257cbac7564c01637b111268f8d42974e5bd765004cc840d359', 'index': '8339e90fc50c9b6b4d650bd03187f4cb4e42e616d1acb6c77dae037764997adb'}, {'key': '4c8a56b19e66a4d533946fe3bb4929556258869f939769514c0abdf8d22d1fa2', 'index': '283985072fcb14bcaf20f5c393d4af5f49a6d2cae472fb28038507f174f7d22d'}, {'key': '2ab718fe611b09eae351a53969364df1bd754f345de87839d22bbcae6b8d091f', 'index': '107dd8993e24875227e024e3352ab7333a766b93cd646f51bc68a2ebc682c64a'}] + + recovered = Kavach.recoverKey(result) + self.assertNotEqual(recovered["masterKey"], known_key, "Recovered key should not match original") \ No newline at end of file diff --git a/tests/test_encryption/test_revoke_access.py b/tests/test_encryption/test_revoke_access.py new file mode 100644 index 0000000..fc7f5ec --- /dev/null +++ b/tests/test_encryption/test_revoke_access.py @@ -0,0 +1,83 @@ +import unittest +import os +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach + + +class TestKavachRevokeAcess(unittest.TestCase): + def test_revoke_access(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.revokeAccess( + address=public_key, + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + revoke_to=["0x344b0b6C1C5b8f4519db43dFb388b65ecA667243",] + ) + + self.assertTrue(result["isSuccess"]) + self.assertIsNone(result["error"]) + + def test_revoke_access_invalid_cid(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.revokeAccess( + address=public_key, + cid = "cid", + auth_token=f"0x{auth_token}", + revoke_to=["0x344b0b6C1C5b8f4519db43dFb388b65ecA667243",] + ) + + self.assertFalse(result["isSuccess"]) + self.assertIsNotNone(result["error"]) + self.assertIn(result["error"], "Invalid CID") + + def test_revoke_access_invalid_access(self): + public_key = "0x344b0b6C1C5b8f4519db43dFb388b65ecA667243" + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key="0x8218aa5dbf4dbec243142286b93e26af521b3e91219583595a06a7765abc9c8a" + ).signature.hex() + + result = Kavach.revokeAccess( + address=public_key, + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + revoke_to=["0xEaF4E24ffC1A2f53c07839a74966A6611b8Cb8A1",] + ) + + self.assertFalse(result["isSuccess"]) + self.assertIsNotNone(result["error"]) \ No newline at end of file diff --git a/tests/test_encryption/test_save_shards.py b/tests/test_encryption/test_save_shards.py new file mode 100644 index 0000000..81a2e7e --- /dev/null +++ b/tests/test_encryption/test_save_shards.py @@ -0,0 +1,114 @@ +import unittest +import requests +import os +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3.functions.config import Config +from src.lighthouseweb3 import Kavach + + +class TestSaveShards(unittest.TestCase): + + def test_save_shards_successful(self): + """test save shards for successful key saving""" + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come") + + signed_message = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.saveShards( + address=public_key, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{signed_message}", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + self.assertTrue(result["isSuccess"]) + self.assertIsNone(result["error"]) + + def test_save_shards_invalid_signature(self): + """test save shards for invalid signature""" + public_key = os.environ.get("PUBLIC_KEY") + + + result = Kavach.saveShards( + address=public_key, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token="signature", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + self.assertIsInstance(result["error"], dict) + self.assertEqual(result["error"].get("message"), "Invalid Signature") + + def test_save_shards_invalid_key_shards_length(self): + """test save shards for invalid key shards length""" + + public_key = os.environ.get("PUBLIC_KEY") + + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come") + + signed_message = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.saveShards( + address=public_key, + cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQJ", + auth_token=f"0x{signed_message}", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + ] + ) + self.assertFalse(result["isSuccess"]) + self.assertRegex(result["error"], r"keyShards must be an array of 5 objects") + + def test_save_shards_invalid_cid(self): + """test save shards for invalid CID""" + + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come") + + signed_message = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + + result = Kavach.saveShards( + address=public_key, + cid="cid", + auth_token=f"0x{signed_message}", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + self.assertFalse(result["isSuccess"]) + self.assertRegex(result["error"], r"Invalid CID") \ No newline at end of file diff --git a/tests/test_encryption/test_share_to_address.py b/tests/test_encryption/test_share_to_address.py new file mode 100644 index 0000000..4914547 --- /dev/null +++ b/tests/test_encryption/test_share_to_address.py @@ -0,0 +1,59 @@ +import unittest +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach +import os + + +class TestKavach(unittest.TestCase): + def test_share_to_address(self): + public_key = os.environ.get("PUBLIC_KEY") + + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.shareToAddress( + address=public_key, + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + share_to=["0xF0Bc72fA04aea04d04b1fA80B359Adb566E1c8B1"] + ) + + self.assertIsNone(result["error"]) + self.assertTrue(result["isSuccess"]) + + def test_share_to_address_invalid_address(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + + result = Kavach.shareToAddress( + address='0x344b0b6C1C5b8f4519db43dFb388b65ecA667243', + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + share_to=["0xF0Bc72fA04aea04d04b1fA80B359Adb566E1c8B1"] + ) + + self.assertFalse(result["isSuccess"]) + self.assertIsNotNone(result["error"]) \ No newline at end of file diff --git a/tests/test_encryption/test_shared_key.py b/tests/test_encryption/test_shared_key.py new file mode 100644 index 0000000..4adbf3a --- /dev/null +++ b/tests/test_encryption/test_shared_key.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +import unittest +from src.lighthouseweb3 import Kavach +from py_ecc.bls import G2ProofOfPossession as bls +from typing import List, Dict +from py_ecc.optimized_bls12_381.optimized_curve import curve_order + +class TestKavachSharedKey(unittest.TestCase): + + def test_key_shardable_false(self): + """Test if key is not shardable with invalid input""" + result = Kavach.sharedKey("invalid_hex") + self.assertFalse(result["isShardable"], "Key should not be shardable") + + def test_key_shardable_true(self): + """Test if key is shardable with valid input and correct number of shards""" + result = Kavach.sharedKey("0b2ccf87909bd1215858e5c0ec99359cadfcf918a4fac53e3e88e9ec28bec678") + self.assertTrue(result["isShardable"], "Key should be shardable") + self.assertEqual(len(result["keyShards"]), 5, "Should generate 5 shards") + + def test_master_key_recovery_4_of_5(self): + """Test master key recovery with 4 out of 5 shards""" + known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e" + result = Kavach.sharedKey(known_key, 3, 5) + self.assertTrue(result["isShardable"], "Key should be shardable") + + # Recover using 4 shards + recovered = Kavach.recoverKey([ + result["keyShards"][2], + result["keyShards"][0], + result["keyShards"][1], + result["keyShards"][4] + ]) + self.assertEqual(recovered["masterKey"], known_key, "Recovered key should match original") + + def test_master_key_recovery_5_of_5(self): + """Test master key recovery with all 5 shards""" + known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e" + result = Kavach.sharedKey(known_key, 3, 5) + self.assertTrue(result["isShardable"], "Key should be shardable") + + # Recover using all 5 shards + recovered = Kavach.recoverKey([ + result["keyShards"][2], + result["keyShards"][0], + result["keyShards"][1], + result["keyShards"][4], + result["keyShards"][3] + ]) + self.assertEqual(recovered["masterKey"], known_key, "Recovered key should match original") + + def test_master_key_recovery_2_of_5(self): + """Test master key recovery fails with only 2 out of 5 shards""" + known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e" + result = Kavach.sharedKey(known_key, 3, 5) + self.assertTrue(result["isShardable"], "Key should be shardable") + + # Try to recover with only 2 shards (below threshold) + recovered = Kavach.recoverKey([ + result["keyShards"][0], + result["keyShards"][1] + ]) + self.assertNotEqual(recovered["masterKey"], known_key, "Recovered key should not match with insufficient shards") \ No newline at end of file diff --git a/tests/test_encryption/test_transfer_ownership.py b/tests/test_encryption/test_transfer_ownership.py new file mode 100644 index 0000000..5397a72 --- /dev/null +++ b/tests/test_encryption/test_transfer_ownership.py @@ -0,0 +1,58 @@ +import unittest +import os +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach + + + +class TestKavachTransferOwnership(unittest.TestCase): + def test_transfer_ownership(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.transferOwnership( + address=public_key, + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + new_owner="0xF0Bc72fA04aea04d04b1fA80B359Adb566E1c8B1" + ) + + self.assertIsNone(result["error"]) + self.assertTrue(result["isSuccess"]) + + def test_transfer_ownership_invalid_address(self): + public_key = os.environ.get("PUBLIC_KEY") + + verification_message = Kavach.getAuthMessage(public_key)['message'] + self.assertIn( + "Please sign this message to prove you are owner of this account", + verification_message, + "Owner response should come" + ) + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=os.environ.get("PRIVATE_KEY") + ).signature.hex() + + result = Kavach.transferOwnership( + address='0x344b0b6C1C5b8f4519db43dFb388b65ecA667243', + cid = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH", + auth_token=f"0x{auth_token}", + new_owner="0xF0Bc72fA04aea04d04b1fA80B359Adb566E1c8B1" + ) + + self.assertFalse(result["isSuccess"]) + self.assertIsNotNone(result["error"]) \ No newline at end of file diff --git a/tests/test_upload_encrypted.py b/tests/test_upload_encrypted.py new file mode 100644 index 0000000..f73a820 --- /dev/null +++ b/tests/test_upload_encrypted.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +import os +import io +import unittest +from src.lighthouseweb3 import Lighthouse +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach + +class TestUploadEncrypted(unittest.TestCase): + + def get_auth_token(self,public_key, private_key): + verification_message = Kavach.getAuthMessage(public_key)['message'] + + auth_token = Web3().eth.account.sign_message( + encode_defunct(text=verification_message), + private_key=private_key + ).signature.hex() + + jwt = Kavach.getJWT( + address=public_key, + payload=f"0x{auth_token}" + ) + + return jwt["JWT"] + + + def test_upload_encrypted(self): + public_key = os.environ.get("PUBLIC_KEY") + private_key = os.environ.get("PRIVATE_KEY") + + + l = Lighthouse(os.environ.get("LIGHTHOUSE_TOKEN")) + + r = l.uploadEncrypted("tests/testdir/testfile.txt", public_key, self.get_auth_token(public_key, private_key)) + + self.assertIsInstance(r, dict, "data is a dict") + self.assertIsInstance(r.get("data"), list, "data is a dict") + self.assertIsInstance(r.get("data")[0], dict, "data is a dict") + self.assertIsInstance(r.get("data")[0].get("Name"), str, "data is a dict") + self.assertIsInstance(r.get("data")[0].get("Hash"), str, "data is a dict") + self.assertIsInstance(r.get("data")[0].get("Size"), str, "data is a dict") + + def test_upload_encrypted_invalid_token(self): + """test upload encrypted file with invalid auth token""" + public_key = os.environ.get('PUBLIC_KEY') + + l = Lighthouse(os.environ.get("LIGHTHOUSE_TOKEN")) + + with self.assertRaises(Exception) as e: + l.uploadEncrypted( + "tests/testdir/testfile.txt", + public_key, + "invalid_auth_token" + ) + self.assertEqual(str(e.exception), 'Error encrypting file') + + def test_upload_encrypted_invalid_api_key(self): + """test upload encrypted file with invalid api key""" + public_key = os.environ.get("PUBLIC_KEY") + private_key = os.environ.get("PRIVATE_KEY") + + l = Lighthouse('invalid_api_key') + auth_token = self.get_auth_token(public_key, private_key) + + with self.assertRaises(Exception): + l.uploadEncrypted( + "tests/testdir/testfile.txt", + public_key, + auth_token + ) \ No newline at end of file From da9809f1a3ee5da829f37ebeb0bd3a39f25a415d Mon Sep 17 00:00:00 2001 From: unniznd Date: Sat, 26 Jul 2025 00:21:31 +0530 Subject: [PATCH 2/2] made private key from env only --- tests/test_encryption/test_get_jwt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_encryption/test_get_jwt.py b/tests/test_encryption/test_get_jwt.py index eb77ae8..4c8b14a 100644 --- a/tests/test_encryption/test_get_jwt.py +++ b/tests/test_encryption/test_get_jwt.py @@ -18,7 +18,7 @@ def test_get_jwt(self): auth_token = Web3().eth.account.sign_message( encode_defunct(text=verification_message), - private_key='0x8218aa5dbf4dbec243142286b93e26af521b3e91219583595a06a7765abc9c8b' + private_key=os.environ.get('PRIVATE_KEY') ).signature.hex() jwt = Kavach.getJWT(