Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/lighthouseweb3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import os
import io
from typing import Any, Dict, List

from .functions.encryption import save_shards as saveShards

from .functions import (
upload as d,
deal_status,
Expand Down Expand Up @@ -224,3 +228,29 @@ def getTagged(self, tag: str):
except Exception as e:
raise e


class Kavach:
@staticmethod
def saveShards(
address: str,
cid: str,
auth_token: str,
key_shards: List[dict],
share_to: List[str] = None
) -> Dict[str, Any]:
"""
Save key shards to multiple nodes.


:param address: str, The Ethereum address of the user.
:param cid: str, The Content Identifier (CID) of the file for which key shards are being saved.
:param auth_token: str, The authentication token obtained by signing a message.
:param key_shards: List[KeyShard], A list of KeyShard objects, each containing a key and its index.
:param share_to: List[str], optional, A list of Ethereum addresses to which the key shards should be shared. Defaults to None.
:return: Dict[str, Any], A dictionary indicating the success or failure of the operation, along with any error messages.
"""

try:
return saveShards.save_shards(address, cid, auth_token, key_shards, share_to)
except Exception as e:
raise e
3 changes: 3 additions & 0 deletions src/lighthouseweb3/functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ class Config:
lighthouse_node = "https://node.lighthouse.storage"
lighthouse_bls_node = "https://encryption.lighthouse.storage"
lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs"

is_dev = False
lighthouse_bls_node_dev = "http://enctest.lighthouse.storage"
89 changes: 89 additions & 0 deletions src/lighthouseweb3/functions/encryption/save_shards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import time
from typing import Any, Dict, List

from .utils import (
api_node_handler,
is_cid_reg, is_equal
)


def save_shards(
address: str,
cid: str,
auth_token: str,
key_shards: List[dict],
share_to: List[str] = None
) -> Dict[str, Any]:
"""
Save key shards to multiple nodes.
"""
if share_to is None:
share_to = []

# Validate CID
if not is_cid_reg(cid):
return {
"isSuccess": False,
"error": "Invalid CID"
}

# Validate key_shards
if not isinstance(key_shards, list) or len(key_shards) != 5:
return {
"isSuccess": False,
"error": "keyShards must be an array of 5 objects"
}

try:
node_ids = [1, 2, 3, 4, 5]
node_urls = [f"/api/setSharedKey/{node_id}" for node_id in node_ids]

def request_data(url: str, index: int) -> Dict[str, Any]:
try:
body = {
"address": address,
"cid": cid,
"payload": key_shards[index]
}
if share_to:
body["sharedTo"] = share_to

response = api_node_handler(
url,
"POST",
auth_token,
body
)
return response
except Exception as error:
return {"error": str(error)}

data = []
for index, url in enumerate(node_urls):
response = request_data(url, index)
if "error" in response:
try:
error_message = json.loads(response["error"])
except json.JSONDecodeError:
error_message = response["error"]
return {
"isSuccess": False,
"error": error_message
}
data.append(response)
time.sleep(1)

temp = [{"data": None, **{k: v for k, v in elem.items() if k != "data"}} for elem in data]
is_success = is_equal(*temp) and data[0].get("message") == "success"

return {
"isSuccess": is_success,
"error": None
}

except Exception as err:
return {
"isSuccess": False,
"error": str(err)
}
69 changes: 69 additions & 0 deletions src/lighthouseweb3/functions/encryption/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import re
import json
import time
import requests
from typing import Dict, Any
from dataclasses import dataclass
from src.lighthouseweb3.functions.config import Config


def is_cid_reg(cid: str) -> bool:

pattern = r'Qm[1-9A-HJ-NP-Za-km-z]{44}|b[A-Za-z2-7]{58}|B[A-Z2-7]{58}|z[1-9A-HJ-NP-Za-km-z]{48}|F[0-9A-F]{50}'
return bool(re.match(pattern, cid))

def is_equal(*objects: Any) -> bool:

if not objects:
return True
first = json.dumps(objects[0], sort_keys=True)
return all(json.dumps(obj, sort_keys=True) == first for obj in objects)

def api_node_handler(
endpoint: str,
verb: str,
auth_token: str = "",
body: Any = None,
retry_count: int = 3
) -> Dict[str, Any]:

url = f"{Config.is_dev and Config.lighthouse_bls_node_dev or Config.lighthouse_bls_node}{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {auth_token}" if auth_token else ""
}

for attempt in range(retry_count):
try:
if verb in ["POST", "PUT", "DELETE"] and body is not None:
response = requests.request(
method=verb,
url=url,
headers=headers,
json=body
)
else:
response = requests.request(
method=verb,
url=url,
headers=headers
)

if not response.ok:
if response.status_code == 404:
raise Exception(json.dumps({
"message": "fetch Error",
"statusCode": response.status_code
}))
error_body = response.json()
raise Exception(json.dumps({
**error_body,
"statusCode": response.status_code
}))
return response.json()
except Exception as error:
if "fetch" not in str(error):
raise
if attempt == retry_count - 1:
raise
time.sleep(1)
118 changes: 118 additions & 0 deletions tests/test_encryption/test_save_shards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

import unittest
import requests
import os
from eth_account.messages import encode_defunct
from web3 import Web3
from src.lighthouseweb3.functions.config import Config
from src.lighthouseweb3 import Kavach
from src.lighthouseweb3.functions.encryption.utils import api_node_handler


def get_auth_message(public_key: str) -> dict:
response = api_node_handler(f"/api/message/{public_key}", "GET")
return response[0]['message']

class TestSaveShards(unittest.TestCase):

def test_save_shards_successful(self):
"""test save shards for successful key saving"""
public_key = os.environ.get("PUBLIC_KEY")

verification_message = get_auth_message(public_key)
self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come")

signed_message = Web3().eth.account.sign_message(
encode_defunct(text=verification_message),
private_key=os.environ.get("PRIVATE_KEY")
).signature.hex()

result = Kavach.saveShards(
address=public_key,
cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH",
auth_token=f"0x{signed_message}",
key_shards=[
{"key": "1", "index": "1"},
{"key": "2", "index": "2"},
{"key": "3", "index": "3"},
{"key": "4", "index": "4"},
{"key": "5", "index": "5"},
]
)

self.assertTrue(result["isSuccess"])
self.assertIsNone(result["error"])

def test_save_shards_invalid_signature(self):
"""test save shards for invalid signature"""
public_key = os.environ.get("PUBLIC_KEY")

result = Kavach.saveShards(
address=public_key,
cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH",
auth_token="signature",
key_shards=[
{"key": "1", "index": "1"},
{"key": "2", "index": "2"},
{"key": "3", "index": "3"},
{"key": "4", "index": "4"},
{"key": "5", "index": "5"},
]
)
self.assertIsInstance(result["error"], dict)
self.assertEqual(result["error"].get("message"), "Invalid Signature")

def test_save_shards_invalid_key_shards_length(self):
"""test save shards for invalid key shards length"""

public_key = os.environ.get("PUBLIC_KEY")

verification_message = get_auth_message(public_key)
self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come")

signed_message = Web3().eth.account.sign_message(
encode_defunct(text=verification_message),
private_key=os.environ.get("PRIVATE_KEY")
).signature.hex()

result = Kavach.saveShards(
address=public_key,
cid="QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQJ",
auth_token=f"0x{signed_message}",
key_shards=[
{"key": "1", "index": "1"},
{"key": "2", "index": "2"},
{"key": "3", "index": "3"},
]
)
self.assertFalse(result["isSuccess"])
self.assertRegex(result["error"], r"keyShards must be an array of 5 objects")

def test_save_shards_invalid_cid(self):
"""test save shards for invalid CID"""

public_key = os.environ.get("PUBLIC_KEY")

verification_message = get_auth_message(public_key)
self.assertIn("Please sign this message to prove you are owner of this account", verification_message, "Owner response should come")

signed_message = Web3().eth.account.sign_message(
encode_defunct(text=verification_message),
private_key=os.environ.get("PRIVATE_KEY")
).signature.hex()


result = Kavach.saveShards(
address=public_key,
cid="cid",
auth_token=f"0x{signed_message}",
key_shards=[
{"key": "1", "index": "1"},
{"key": "2", "index": "2"},
{"key": "3", "index": "3"},
{"key": "4", "index": "4"},
{"key": "5", "index": "5"},
]
)
self.assertFalse(result["isSuccess"])
self.assertRegex(result["error"], r"Invalid CID")