diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..fbd27fd Binary files /dev/null and b/.DS_Store differ diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index b1d8d7c..2af6767 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -2,6 +2,7 @@ import os import io +from typing import Optional, Dict, Any from .functions import ( upload as d, deal_status, @@ -17,6 +18,15 @@ create_wallet as createWallet ) +from .functions.kavach import( + get_auth_message as getAuthMessage, + save_shards as saveShards, + recover_shards as recoverShards +) + +from typing import List, Dict, Any, Union +from .functions.kavach.types import AuthToken, KeyShard + class Lighthouse: def __init__(self, token: str = ""): @@ -224,3 +234,46 @@ def getTagged(self, tag: str): except Exception as e: raise e +class Kavach: + + @staticmethod + def getAuthMessage(address: str): + """ + Retrieves an authentication message for a given address. + + :param address: str, The address for which to retrieve the authentication message. + :return: dict, A dictionary containing the authentication message. + """ + try: + return getAuthMessage.get_auth_message(address) + except Exception as e: + raise e + + @staticmethod + def recoverShards(address: str, cid: str, auth_token: AuthToken, num_of_shards: int = 3, dynamic_data: Optional[Dict[str, Any]] = None): + try: + return recoverShards.recover_shards(address, cid, auth_token, num_of_shards, dynamic_data) + except Exception as e: + raise e + @staticmethod + def saveShards( + address: str, + cid: str, + auth_token: AuthToken, + key_shards: List[KeyShard], + share_to: List[str] = [] + ) -> Dict[str, Union[bool, str, None]]: + """ + Save shards for a given address and CID. + + :param address: str, The address for which to save the shards. + :param cid: str, The content identifier for the data. + :param auth_token: AuthToken, The authentication token. + :param key_shards: List[KeyShard], The list of key shards to save. + :param share_to: List[str], The list of addresses to share the shards with (optional). + :return: dict, A dictionary containing the result of the operation. + """ + try: + return saveShards.save_shards(address, cid, auth_token, key_shards, share_to) + except Exception as e: + raise e diff --git a/src/lighthouseweb3/functions/config.py b/src/lighthouseweb3/functions/config.py index 000c5ef..70988eb 100644 --- a/src/lighthouseweb3/functions/config.py +++ b/src/lighthouseweb3/functions/config.py @@ -9,3 +9,7 @@ class Config: lighthouse_node = "https://node.lighthouse.storage" lighthouse_bls_node = "https://encryption.lighthouse.storage" lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs" + + + is_dev = False + lighthouse_bls_node_dev = "http://enctest.lighthouse.storage" \ No newline at end of file diff --git a/src/lighthouseweb3/functions/kavach/get_auth_message.py b/src/lighthouseweb3/functions/kavach/get_auth_message.py new file mode 100644 index 0000000..3f330f9 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/get_auth_message.py @@ -0,0 +1,10 @@ +from typing import Any +from .util import api_node_handler + + +async def get_auth_message(address: str) -> dict[str, Any]: + try: + response = await api_node_handler(f"/api/message/{address}", "GET") + return {'message': response[0]['message'], 'error': None} + except Exception as e: + return {'message': None, 'error':str(e)} diff --git a/src/lighthouseweb3/functions/kavach/recover_shards.py b/src/lighthouseweb3/functions/kavach/recover_shards.py new file mode 100644 index 0000000..5ff3592 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/recover_shards.py @@ -0,0 +1,107 @@ +import random +import asyncio +import json +from typing import List, Dict, Any, Optional +from .types import AuthToken, RecoverShards + + +def shuffle_array(array: List[int]) -> List[int]: + """Shuffle the input array in place using Fisher-Yates algorithm. + + Args: + array: List of integers to be shuffled + + Returns: + List[int]: The shuffled array (same reference as input) + """ + for i in range(len(array) - 1, 0, -1): + j = random.randint(0, i) + array[i], array[j] = array[j], array[i] + return array + + +def rand_select(k: int, n: int) -> List[int]: + """Randomly select k unique numbers from range 1 to n (inclusive). + + Args: + k: Number of unique numbers to select + n: Upper bound of the range (inclusive) + + Returns: + List[int]: Sorted list of k unique numbers + + Raises: + ValueError: If k is greater than n + """ + if k > n: + raise ValueError("k cannot be greater than n") + + numbers = list(range(1, n + 1)) + shuffled_numbers = shuffle_array(numbers) + return sorted(shuffled_numbers[:k]) + + +async def recover_shards( + address: str, + cid: str, + auth_token: AuthToken, + num_of_shards: int = 3, + dynamic_data: Optional[Dict[str, Any]] = None, +) -> RecoverShards: + """Recover key shards from the Lighthouse network. + + Args: + address: User's wallet address + cid: Content ID for the encrypted content + auth_token: Authentication token for API access + num_of_shards: Number of shards to recover (default: 3) + dynamic_data: Additional dynamic data for the request (default: {}) + + Returns: + RecoverShards: Object containing recovered shards or error information + """ + if dynamic_data is None: + dynamic_data = {} + + try: + from .util import api_node_handler + + node_indices = rand_select(num_of_shards, 5) + node_urls = [f"/api/retrieveSharedKey/{index}" for index in node_indices] + + async def request_data(url: str, index: int) -> Dict[str, Any]: + """Helper function to make API requests to node URLs.""" + try: + response = await api_node_handler( + url, + "POST", + auth_token, + {"address": address, "cid": cid, "dynamicData": dynamic_data} + ) + return response + except Exception as e: + raise e + + recovered_shards = [] + + for index, url in enumerate(node_urls): + response = await request_data(url, index) + await asyncio.sleep(1) + recovered_shards.append(response.get('payload')) + + return RecoverShards(shards=recovered_shards, error=None) + + except Exception as err: + error_msg = str(err) + + if "null" in error_msg: + return RecoverShards(shards=[], error="cid not found") + + # Try to parse the error message as JSON, exactly like TypeScript + try: + error_data = json.loads(error_msg) + return RecoverShards(shards=[], error=error_data) + except (json.JSONDecodeError, TypeError): + # If JSON parsing fails, return the original error message as string + # This matches TypeScript behavior when JSON.parse fails + return RecoverShards(shards=[], error=error_msg) \ No newline at end of file diff --git a/src/lighthouseweb3/functions/kavach/save_shards.py b/src/lighthouseweb3/functions/kavach/save_shards.py new file mode 100644 index 0000000..c587125 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/save_shards.py @@ -0,0 +1,75 @@ +import asyncio +from typing import List, Dict, Any, Union +from .util import api_node_handler, is_cid_reg, is_equal +from .types import AuthToken, KeyShard + +async def save_shards( + address: str, + cid: str, + auth_token: AuthToken, + key_shards: List[KeyShard], + share_to: List[str] = [] +) -> Dict[str, Union[bool, str, None]]: + + if not is_cid_reg(cid): + return { + "isSuccess": False, + "error": "Invalid CID" + } + + if not isinstance(key_shards, list) or len(key_shards) != 5: + return { + "isSuccess": False, + "error": "keyShards must be an array of 5 objects" + } + + try: + node_ids = [1, 2, 3, 4, 5] + node_urls = [f"/api/setSharedKey/{i}" for i in node_ids] + + async def request_data(url: str, index: int) -> Dict[str, Any]: + try: + payload = { + "address": address, + "cid": cid, + "payload": key_shards[index] + } + if share_to: + payload["sharedTo"] = share_to + + response = await api_node_handler(url, "POST", auth_token, payload) + return response + + except Exception as error: + return { + "error": error + } + + data = [] + for index, url in enumerate(node_urls): + response = await request_data(url, index) + if "error" in response: + try: + return { + "isSuccess": False, + "error": str(response["error"]) + } + except Exception: + return { + "isSuccess": False, + "error": "Unknown error" + } + await asyncio.sleep(1) + data.append(response) + + temp = [{**elem, "data": None} for elem in data] + return { + "isSuccess": is_equal(*temp) and data[0].get("message") == "success", + "error": None + } + + except Exception as err: + return { + "isSuccess": False, + "error": str(err) + } diff --git a/src/lighthouseweb3/functions/kavach/types.py b/src/lighthouseweb3/functions/kavach/types.py new file mode 100644 index 0000000..55aaf87 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/types.py @@ -0,0 +1,156 @@ +from typing import List, Dict, Union, Optional, Any, Literal +from dataclasses import dataclass +from enum import Enum + +ErrorValue = Union[str, List[str], int, bool, None, Dict[str, Any], Any] +SignedMessage = str +JWT = str +AuthToken = Union[SignedMessage, JWT] + +class ChainType(str, Enum): + EVM = "EVM" + EVM_LOWER = "evm" + SOLANA = "SOLANA" + SOLANA_LOWER = "solana" + +class DecryptionType(str, Enum): + ADDRESS = "ADDRESS" + ACCESS_CONDITIONS = "ACCESS_CONDITIONS" + +class StandardContractType(str, Enum): + ERC20 = "ERC20" + ERC721 = "ERC721" + ERC1155 = "ERC1155" + CUSTOM = "Custom" + EMPTY = "" + +class SolanaContractType(str, Enum): + SPL_TOKEN = "spl-token" + EMPTY = "" + +class Comparator(str, Enum): + EQUAL = "==" + GREATER_EQUAL = ">=" + LESS_EQUAL = "<=" + NOT_EQUAL = "!=" + GREATER = ">" + LESS = "<" + +# Data Classes +@dataclass +class KeyShard: + key: str + index: str + +@dataclass +class GeneratedKey: + master_key: Optional[str] + key_shards: List[KeyShard] + +@dataclass +class GenerateInput: + threshold: Optional[int] = None + key_count: Optional[int] = None + +@dataclass +class AuthMessage: + message: Optional[str] + error: Optional[ErrorValue] + +@dataclass +class RecoveredKey: + master_key: Optional[str] + error: Optional[ErrorValue] + +@dataclass +class RecoverShards: + shards: List[KeyShard] + error: ErrorValue + +@dataclass +class LightHouseSDKResponse: + is_success: bool + error: ErrorValue + +@dataclass +class ReturnValueTest: + comparator: Comparator + value: Union[int, str, List[Any]] + +@dataclass +class PDAInterface: + offset: Optional[int] = None + selector: Optional[str] = None + +@dataclass +class EVMCondition: + id: int + standard_contract_type: StandardContractType + chain: str + method: str + return_value_test: ReturnValueTest + contract_address: Optional[str] = None + parameters: Optional[List[Any]] = None + input_array_type: Optional[List[str]] = None + output_type: Optional[str] = None + +@dataclass +class SolanaCondition: + id: int + chain: str + method: str + standard_contract_type: SolanaContractType + pda_interface: PDAInterface + return_value_test: ReturnValueTest + contract_address: Optional[str] = None + parameters: Optional[List[Any]] = None + +# Union Type for Conditions +Condition = Union[EVMCondition, SolanaCondition] + +@dataclass +class UpdateConditionSchema: + chain_type: Literal["EVM", "SOLANA"] + conditions: List[Condition] + decryption_type: Literal["ADDRESS", "ACCESS_CONDITIONS"] + address: str + cid: str + aggregator: Optional[str] = None + +@dataclass +class AccessConditionSchema: + chain_type: Literal["EVM", "SOLANA"] + conditions: List[Condition] + decryption_type: Literal["ADDRESS", "ACCESS_CONDITIONS"] + address: str + cid: str + key_shards: List[Any] + aggregator: Optional[str] = None + +@dataclass +class IGetAccessCondition: + aggregator: str + owner: str + cid: str + conditions: Optional[List[Condition]] = None + conditions_solana: Optional[List[Any]] = None + shared_to: Optional[List[Any]] = None + +def is_jwt(token: str) -> bool: + """Check if token is a JWT (starts with 'jwt:')""" + return token.startswith('jwt:') + +def create_jwt(token: str) -> JWT: + """Create a JWT token with proper prefix""" + if not token.startswith('jwt:'): + return f'jwt:{token}' + return token + +# Type Guards +def is_evm_condition(condition: Condition) -> bool: + """Check if condition is an EVM condition""" + return isinstance(condition, EVMCondition) + +def is_solana_condition(condition: Condition) -> bool: + """Check if condition is a Solana condition""" + return isinstance(condition, SolanaCondition) diff --git a/src/lighthouseweb3/functions/kavach/util.py b/src/lighthouseweb3/functions/kavach/util.py new file mode 100644 index 0000000..f9cadd2 --- /dev/null +++ b/src/lighthouseweb3/functions/kavach/util.py @@ -0,0 +1,95 @@ +import re +import json +import asyncio +import httpx +from typing import Any, Optional, Union +from src.lighthouseweb3.functions.config import Config + +def is_cid_reg(cid: str) -> bool: + """Check if string is a valid CID (Content Identifier)""" + pattern = r'Qm[1-9A-HJ-NP-Za-km-z]{44}|b[A-Za-z2-7]{58}|B[A-Z2-7]{58}|z[1-9A-HJ-NP-Za-km-z]{48}|F[0-9A-F]{50}' + return bool(re.match(pattern, cid)) + +def is_equal(*objects: Any) -> bool: + """Check if all objects are equal by comparing their JSON representations""" + if not objects: + return True + + first_obj_json = json.dumps(objects[0], sort_keys=True) + return all(json.dumps(obj, sort_keys=True) == first_obj_json for obj in objects) + +async def api_node_handler( + endpoint: str, + verb: str, + auth_token: str = "", + body: Any = None, + retry_count: int = 3 +) -> Any: + """ + Handle API requests to node with retry logic + + Args: + endpoint: API endpoint path + verb: HTTP method (GET, POST, DELETE, PUT) + auth_token: Bearer token for authentication + body: Request body for POST/PUT/DELETE requests + retry_count: Number of retry attempts + + Returns: + JSON response from API + + Raises: + Exception: If request fails after all retries + """ + verb = verb.upper() + url = Config.lighthouse_bls_node if not Config.is_dev else Config.lighthouse_bls_node_dev + url += endpoint + + headers = { + "Content-Type": "application/json" + } + + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + json_data = body if verb in ["POST", "PUT", "DELETE"] and body is not None else None + + async with httpx.AsyncClient() as client: + for i in range(retry_count): + try: + response = await client.request( + method=verb, + url=url, + headers=headers, + json=json_data + ) + + if not response.is_success: + if response.status_code == 404: + raise Exception(json.dumps({ + "message": "fetch Error", + "statusCode": response.status_code + })) + + try: + error_body = response.json() + except: + error_body = {"message": "Unknown error"} + + raise Exception(json.dumps({ + **error_body, + "statusCode": response.status_code + })) + + return response.json() + + except Exception as error: + error_str = str(error) + if "fetch" not in error_str: + raise error + + if i == retry_count - 1: # Last attempt + raise error + + # Wait 1 second before retry + await asyncio.sleep(1) \ No newline at end of file diff --git a/tests/setup.py b/tests/setup.py index bd7c127..4854219 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -2,7 +2,6 @@ import os - def parse_env(): """parse .env file""" try: diff --git a/tests/tests_kavach/test_get_auth_message.py b/tests/tests_kavach/test_get_auth_message.py new file mode 100644 index 0000000..80bc573 --- /dev/null +++ b/tests/tests_kavach/test_get_auth_message.py @@ -0,0 +1,31 @@ +import unittest +import logging +from src.lighthouseweb3 import Kavach + +logger = logging.getLogger(__name__) + +class TestGetAuthMessage(unittest.IsolatedAsyncioTestCase): + """Test cases for the getAuthMessage function.""" + + async def test_get_auth_message_valid_address(self): + """Test getting auth message with a valid address.""" + address = 'h6gar47c9GxYda8Kkg5J9So3R9K3jhcWKbgrjKhqfst' + auth_message = await Kavach.getAuthMessage(address=address) + + self.assertIn( + "Please sign this message to prove you are owner of this account", + auth_message['message'], + "Should return a valid auth message" + ) + self.assertIsNone(auth_message['error']) + + async def test_get_auth_message_invalid_address(self): + """Test getting auth message with an invalid address.""" + auth_message = await Kavach.getAuthMessage(address="0x9a40b8EE3B8Fe7eB621cd142a651560Fa7") + + self.assertIsNone(auth_message['message']) + self.assertIsNotNone(auth_message['error']) + self.assertIn("invalid address", str(auth_message["error"]).lower()) + +if __name__ == '__main__': + unittest.main(verbosity=2) \ No newline at end of file diff --git a/tests/tests_kavach/test_recover_shards.py b/tests/tests_kavach/test_recover_shards.py new file mode 100644 index 0000000..48cea0d --- /dev/null +++ b/tests/tests_kavach/test_recover_shards.py @@ -0,0 +1,145 @@ +import unittest +import logging +from eth_account import Account +from eth_account.messages import encode_defunct +from web3 import Web3 +from src.lighthouseweb3 import Kavach +import os + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class TestRecoverShards(unittest.IsolatedAsyncioTestCase): + """Test cases for the recoverShards function.""" + + def setUp(self): + self.private_key=os.environ.get("PRIVATE_KEY") + self.cid=os.environ.get("CID") + + + async def asyncSetUp(self): + """Set up test fixtures before each test method.""" + private_key = self.private_key + self.signer = Account.from_key(private_key) + self.kavach = Kavach() + + async def test_invalid_signature(self): + """Test recovery with invalid signature.""" + result = await self.kavach.recoverShards( + self.signer.address, + self.cid, + "signature", + 5 + ) + + self.assertIsNotNone(result.error) + self.assertIsInstance(result.error, dict) + self.assertEqual(result.error.get('message'), "Invalid Signature") + + async def test_save_key(self): + """Test saving shards successfully.""" + # Test CID validation + from src.lighthouseweb3.functions.kavach.util import is_cid_reg + auth_message = await self.kavach.getAuthMessage(self.signer.address) + + message = auth_message['message'] + signature = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=self.private_key + ).signature.hex()}" + + result = await self.kavach.saveShards( + self.signer.address, + self.cid, + signature, + [ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + + + self.assertTrue(result['isSuccess']) + self.assertIsNone(result['error']) + + async def test_recover_key_authorized(self): + """Test authorized shard recovery.""" + auth_message = await self.kavach.getAuthMessage(self.signer.address) + + message = auth_message['message'] + signature = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=self.private_key + ).signature.hex()}" + + result = await self.kavach.recoverShards( + self.signer.address, + self.cid, + signature, + 5 + ) + + self.assertIsNone(result.error) + expected_shards = [ + {"index": "1", "key": "1"}, + {"index": "2", "key": "2"}, + {"index": "3", "key": "3"}, + {"index": "4", "key": "4"}, + {"index": "5", "key": "5"}, + ] + self.assertEqual(result.shards, expected_shards) + + async def test_missing_cid(self): + """Test recovery with missing CID.""" + auth_message = await self.kavach.getAuthMessage(self.signer.address) + + message = auth_message['message'] + signature = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=self.private_key + ).signature.hex()}" + + result = await self.kavach.recoverShards( + self.signer.address, + "bafkreiebj4d3f6e6abuxhsrcevhgkypr54335hjucryj3zvivq7hv2nwiqZ", # Non-existent CID + signature, + 3 + ) + + self.assertIsNotNone(result.error) + self.assertIsInstance(result.error, dict) + self.assertIn("message", result.error) + self.assertEqual(result.shards, []) + + async def test_recover_key_unauthorized(self): + """Test unauthorized shard recovery.""" + private_key2 = "0xbca24fceb5f6c412e401b9ba68b351d811cc0735b059771dfc4e878adb0373ef" + + auth_message = await self.kavach.getAuthMessage(self.signer.address) + + message = auth_message['message'] + signature2 = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=private_key2 + ).signature.hex()}" + + result = await self.kavach.recoverShards( + self.signer.address, + self.cid, + signature2, + 5 + ) + + + self.assertEqual(result.shards, []) + self.assertIsInstance(result.error, dict) + self.assertIn("===", result.error.get('message', '')) + self.assertEqual(result.error.get('statusCode'), 406) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main() \ No newline at end of file diff --git a/tests/tests_kavach/test_save_shards.py b/tests/tests_kavach/test_save_shards.py new file mode 100644 index 0000000..6d94dee --- /dev/null +++ b/tests/tests_kavach/test_save_shards.py @@ -0,0 +1,116 @@ +import unittest +import logging +from src.lighthouseweb3 import Kavach +from web3 import Web3 +from eth_account.messages import encode_defunct +import os + + +logger = logging.getLogger(__name__) + + +class TestSaveShards(unittest.IsolatedAsyncioTestCase): + """Test cases for the saveShards function.""" + + + def setUp(self): + self.cid = os.environ.get("CID") + self.private_key = os.environ.get("PRIVATE_KEY") + self.signer_address = Web3().eth.account.from_key(self.private_key).address + + + async def test_invalid_signature(self): + """Test saveShards with invalid signature.""" + result = await Kavach.saveShards( + address=self.signer_address, + cid=self.cid, + auth_token="signature", + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertTrue(result['error'] is None or isinstance(result['error'], str)) + + + async def test_save_key_success(self): + """Test successful key saving.""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + message = auth_message_result['message'] + signature = f"0x{Web3().eth.account.sign_message( + encode_defunct(text=message), + private_key=self.private_key + ).signature.hex()}" + print(signature) + result = await Kavach.saveShards( + address=self.signer_address, + cid=self.cid, + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + print(result) + self.assertTrue(result['isSuccess']) + self.assertIsNone(result['error']) + + + async def test_save_key_insufficient_shards(self): + """Test saving key with insufficient shards (should fail).""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + auth_message = auth_message_result['message'] + signature = "0x" + Web3().eth.account.sign_message(encode_defunct(text=auth_message), private_key=self.private_key).signature.hex() + + result = await Kavach.saveShards( + address=self.signer_address, + cid=self.cid, + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertRegex(str(result['error']).lower(), r'keyshards must be an array of 5 objects') + + + async def test_invalid_cid(self): + """Test saving key with invalid CID.""" + auth_message_result = await Kavach.getAuthMessage(address=self.signer_address) + self.assertIsNone(auth_message_result['error']) + auth_message = auth_message_result['message'] + signature = "0x" + Web3().eth.account.sign_message(encode_defunct(text=auth_message), private_key=self.private_key).signature.hex() + + result = await Kavach.saveShards( + address=self.signer_address, + cid=self.cid, + auth_token=signature, + key_shards=[ + {"key": "1", "index": "1"}, + {"key": "2", "index": "2"}, + {"key": "3", "index": "3"}, + {"key": "4", "index": "4"}, + {"key": "5", "index": "5"}, + ] + ) + + self.assertFalse(result['isSuccess']) + self.assertRegex(str(result['error']).lower(), r'invalid cid') + + +if __name__ == '__main__': + unittest.main(verbosity=2) +