Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
53 changes: 53 additions & 0 deletions src/lighthouseweb3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import io
from typing import Optional, Dict, Any
from .functions import (
upload as d,
deal_status,
Expand All @@ -17,6 +18,15 @@
create_wallet as createWallet
)

from .functions.kavach import(
get_auth_message as getAuthMessage,
save_shards as saveShards,
recover_shards as recoverShards
)

from typing import List, Dict, Any, Union
from .functions.kavach.types import AuthToken, KeyShard


class Lighthouse:
def __init__(self, token: str = ""):
Expand Down Expand Up @@ -224,3 +234,46 @@ def getTagged(self, tag: str):
except Exception as e:
raise e

class Kavach:

@staticmethod
def getAuthMessage(address: str):
"""
Retrieves an authentication message for a given address.

:param address: str, The address for which to retrieve the authentication message.
:return: dict, A dictionary containing the authentication message.
"""
try:
return getAuthMessage.get_auth_message(address)
except Exception as e:
raise e

@staticmethod
def recoverShards(address: str, cid: str, auth_token: AuthToken, num_of_shards: int = 3, dynamic_data: Optional[Dict[str, Any]] = None):
try:
return recoverShards.recover_shards(address, cid, auth_token, num_of_shards, dynamic_data)
except Exception as e:
raise e
@staticmethod
def saveShards(
address: str,
cid: str,
auth_token: AuthToken,
key_shards: List[KeyShard],
share_to: List[str] = []
) -> Dict[str, Union[bool, str, None]]:
"""
Save shards for a given address and CID.

:param address: str, The address for which to save the shards.
:param cid: str, The content identifier for the data.
:param auth_token: AuthToken, The authentication token.
:param key_shards: List[KeyShard], The list of key shards to save.
:param share_to: List[str], The list of addresses to share the shards with (optional).
:return: dict, A dictionary containing the result of the operation.
"""
try:
return saveShards.save_shards(address, cid, auth_token, key_shards, share_to)
except Exception as e:
raise e
4 changes: 4 additions & 0 deletions src/lighthouseweb3/functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ class Config:
lighthouse_node = "https://node.lighthouse.storage"
lighthouse_bls_node = "https://encryption.lighthouse.storage"
lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs"


is_dev = False
lighthouse_bls_node_dev = "http://enctest.lighthouse.storage"
10 changes: 10 additions & 0 deletions src/lighthouseweb3/functions/kavach/get_auth_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any
from .util import api_node_handler


async def get_auth_message(address: str) -> dict[str, Any]:
try:
response = await api_node_handler(f"/api/message/{address}", "GET")
return {'message': response[0]['message'], 'error': None}
except Exception as e:
return {'message': None, 'error':str(e)}
107 changes: 107 additions & 0 deletions src/lighthouseweb3/functions/kavach/recover_shards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import random
import asyncio
import json
from typing import List, Dict, Any, Optional
from .types import AuthToken, RecoverShards


def shuffle_array(array: List[int]) -> List[int]:
"""Shuffle the input array in place using Fisher-Yates algorithm.

Args:
array: List of integers to be shuffled

Returns:
List[int]: The shuffled array (same reference as input)
"""
for i in range(len(array) - 1, 0, -1):
j = random.randint(0, i)
array[i], array[j] = array[j], array[i]
return array


def rand_select(k: int, n: int) -> List[int]:
"""Randomly select k unique numbers from range 1 to n (inclusive).

Args:
k: Number of unique numbers to select
n: Upper bound of the range (inclusive)

Returns:
List[int]: Sorted list of k unique numbers

Raises:
ValueError: If k is greater than n
"""
if k > n:
raise ValueError("k cannot be greater than n")

numbers = list(range(1, n + 1))
shuffled_numbers = shuffle_array(numbers)
return sorted(shuffled_numbers[:k])


async def recover_shards(
address: str,
cid: str,
auth_token: AuthToken,
num_of_shards: int = 3,
dynamic_data: Optional[Dict[str, Any]] = None,
) -> RecoverShards:
"""Recover key shards from the Lighthouse network.

Args:
address: User's wallet address
cid: Content ID for the encrypted content
auth_token: Authentication token for API access
num_of_shards: Number of shards to recover (default: 3)
dynamic_data: Additional dynamic data for the request (default: {})

Returns:
RecoverShards: Object containing recovered shards or error information
"""
if dynamic_data is None:
dynamic_data = {}

try:
from .util import api_node_handler

node_indices = rand_select(num_of_shards, 5)
node_urls = [f"/api/retrieveSharedKey/{index}" for index in node_indices]

async def request_data(url: str, index: int) -> Dict[str, Any]:
"""Helper function to make API requests to node URLs."""
try:
response = await api_node_handler(
url,
"POST",
auth_token,
{"address": address, "cid": cid, "dynamicData": dynamic_data}
)
return response
except Exception as e:
raise e

recovered_shards = []

for index, url in enumerate(node_urls):
response = await request_data(url, index)
await asyncio.sleep(1)
recovered_shards.append(response.get('payload'))

return RecoverShards(shards=recovered_shards, error=None)

except Exception as err:
error_msg = str(err)

if "null" in error_msg:
return RecoverShards(shards=[], error="cid not found")

# Try to parse the error message as JSON, exactly like TypeScript
try:
error_data = json.loads(error_msg)
return RecoverShards(shards=[], error=error_data)
except (json.JSONDecodeError, TypeError):
# If JSON parsing fails, return the original error message as string
# This matches TypeScript behavior when JSON.parse fails
return RecoverShards(shards=[], error=error_msg)
75 changes: 75 additions & 0 deletions src/lighthouseweb3/functions/kavach/save_shards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
from typing import List, Dict, Any, Union
from .util import api_node_handler, is_cid_reg, is_equal
from .types import AuthToken, KeyShard

async def save_shards(
address: str,
cid: str,
auth_token: AuthToken,
key_shards: List[KeyShard],
share_to: List[str] = []
) -> Dict[str, Union[bool, str, None]]:

if not is_cid_reg(cid):
return {
"isSuccess": False,
"error": "Invalid CID"
}

if not isinstance(key_shards, list) or len(key_shards) != 5:
return {
"isSuccess": False,
"error": "keyShards must be an array of 5 objects"
}

try:
node_ids = [1, 2, 3, 4, 5]
node_urls = [f"/api/setSharedKey/{i}" for i in node_ids]

async def request_data(url: str, index: int) -> Dict[str, Any]:
try:
payload = {
"address": address,
"cid": cid,
"payload": key_shards[index]
}
if share_to:
payload["sharedTo"] = share_to

response = await api_node_handler(url, "POST", auth_token, payload)
return response

except Exception as error:
return {
"error": error
}

data = []
for index, url in enumerate(node_urls):
response = await request_data(url, index)
if "error" in response:
try:
return {
"isSuccess": False,
"error": str(response["error"])
}
except Exception:
return {
"isSuccess": False,
"error": "Unknown error"
}
await asyncio.sleep(1)
data.append(response)

temp = [{**elem, "data": None} for elem in data]
return {
"isSuccess": is_equal(*temp) and data[0].get("message") == "success",
"error": None
}

except Exception as err:
return {
"isSuccess": False,
"error": str(err)
}
Loading