Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions examples/crypto.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
from rubix.crypto.secp256k1 import Secp256k1Keypair
from rubix.client import RubixClient
from rubix.signer import Signer
from rubix.crypto.secp256k1 import Secp256k1Keypair, secp256k1_verify

def sign_arbitrary_data(data: bytes):
def sign_and_verify_arbitrary_data(data: bytes):
# Generate a new Secp256k1 keypair
keypair = Secp256k1Keypair.from_private_key(bytes.fromhex("<private key in hex>"))
client = RubixClient("<Rubix Node URL>")

print("Public Key (hex):", keypair.public_key)
signer = Signer(
rubixClient=client,
mnemonic="<Enter 24-word long BIP-39 mnemonic>"
)

print("Public Key (hex): ", signer.get_keypair().public_key)
keypair = signer.get_keypair()

signature_bytes = keypair.sign(data)

is_valid = secp256k1_verify(bytes.fromhex(keypair.public_key), data, signature_bytes)
print("Is the signature valid?: ", is_valid)
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "rubix-py"
version = "0.2.0"
version = "0.3.0"
description = "Rubix Client SDK for Python"
requires-python = ">=3.10"
license = { text = "MIT" }
Expand All @@ -22,7 +22,8 @@ dependencies = [
"bip32utils>=0.3.post4",
"mnemonic>=0.21",
"ECPy>=1.2.5",
"py-cid>=0.3.1"
"py-cid>=0.3.1",
"coincurve>=21.0.0"
]

[project.urls]
Expand Down
94 changes: 59 additions & 35 deletions rubix/crypto/secp256k1.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
import bip32utils
import hashlib
import base64

from ecpy.curves import Curve
from ecpy.keys import ECPrivateKey
from ecpy.curves import Curve, Point
from ecpy.keys import ECPrivateKey, ECPublicKey
from ecpy.ecdsa import ECDSA
from ecdsa import SigningKey, SECP256k1, util
from ecdsa import SigningKey, SECP256k1
from coincurve import PublicKey as CoincurvePublicKey

def secp256k1_sign(private_key: bytes, message: bytes) -> bytes:
"""Signs a message using secp256k1 private key.

Args:
private_key (bytes): Secp256k1 private key.
message (bytes): The message to sign.

Returns:
bytes: The generated signature in bytes.
"""
cv = Curve.get_curve('secp256k1')
pv_key = ECPrivateKey(int.from_bytes(private_key, 'big'), cv)
signer = ECDSA()

sig = signer.sign(message, pv_key)
if sig is None:
raise ValueError("Failed to sign the message.")

return bytes(sig)

def secp256k1_verify(public_key: bytes, message: bytes, signature: bytes) -> bool:
"""Verifies secp256k1 signature.

Args:
public_key (bytes): Compressed public key.
message (bytes): The original message that was signed.
signature (bytes): The signature to verify.

Returns:
bool: True if signature is valid, False otherwise.
"""
cv = Curve.get_curve('secp256k1')

# check if the public key is in compressed format
uncompressed_public_key = None

if len(public_key) == 33:
coincurve_pub_key = CoincurvePublicKey(public_key)
uncompressed_public_key = coincurve_pub_key.format(compressed=False)
elif len(public_key) == 65:
uncompressed_public_key = public_key
else:
raise ValueError(f"Invalid public key of length {len(public_key)}.")

# Form public key object
x = int.from_bytes(uncompressed_public_key[1:33], 'big')
y = int.from_bytes(uncompressed_public_key[33:], 'big')
ec_point = Point(x, y, cv)
pub_key_obj = ECPublicKey(ec_point)

# Verify signature
verifier = ECDSA()
return verifier.verify(message, signature, pub_key_obj)

class Secp256k1Keypair:
def __init__(self, private_key: str, public_key: str):
Expand Down Expand Up @@ -68,33 +121,4 @@ def sign(self, message: bytes) -> bytes:
Returns:
bytes: The generated signature in bytes.
"""
cv = Curve.get_curve('secp256k1')
pv_key = ECPrivateKey(int(self.__private_key, 16), cv)
signer = ECDSA()

sig = signer.sign(message, pv_key)
if sig is None:
raise ValueError("Failed to sign the message.")

return sig

def verify(self, message: bytes, signature: bytes) -> bool:
"""Verifies secp256k1 signature.

Args:
message (bytes): The original message that was signed.
signature (bytes): The signature to verify.

Returns:
bool: True if signature is valid, False otherwise.
"""
cv = Curve.get_curve('secp256k1')
priv_key = ECPrivateKey(int(self.__private_key, 16), cv)

pub_key = priv_key.get_public_key()
signer = ECDSA()

try:
return signer.verify(message, signature, pub_key)
except Exception:
return False
return secp256k1_sign(bytes.fromhex(self.__private_key), message)
41 changes: 41 additions & 0 deletions rubix/did.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,44 @@ def create_did(keypair: Secp256k1Keypair, rubixNodeBaseUrl: str) -> None:

return user_did

def online_signature_verify(rubixNodeBaseUrl: str, did: str, message: bytes, signature: bytes) -> bool:
"""
Verifies a signature using Rubix node's online verification service.

Args:
rubixNodeBaseUrl (str): Base URL of the Rubix node.
did (str): The DID of the signer.
message (bytes): The original message that was signed.
signature (bytes): The signature to verify.

Returns:
bool: True if signature is valid, False otherwise.
"""

verify_signature_url = urljoin(rubixNodeBaseUrl, "/api/verify-signature")

verify_signature_body = {
"signer_did": did,
"signed_msg": message.decode('utf-8'),
"signature": signature.hex()
}

try:
response = requests.get(
verify_signature_url,
params=verify_signature_body,
timeout=300
)

response.raise_for_status()

response_body = response.json()
return response_body.get("status", False)
except requests.exceptions.Timeout:
raise signatureResponseError("Request to Rubix node timed out")
except requests.exceptions.ConnectionError:
raise signatureResponseError(f"Failed to connect to Rubix node at {rubixNodeBaseUrl}")
except requests.exceptions.HTTPError as e:
raise signatureResponseError(f"HTTP error from Rubix node: {e}")
except requests.exceptions.RequestException as e:
raise signatureResponseError(f"Request failed: {e}")
2 changes: 0 additions & 2 deletions rubix/signer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import base64
import os
from urllib.parse import urljoin

from .client import RubixClient
from .crypto.bip39 import generate_bip39_mnemonic, get_seed_from_mnemonic
from .crypto.secp256k1 import Secp256k1Keypair
from .did import create_did
from .models.result import Response

class Signer:
"""
Expand Down
30 changes: 30 additions & 0 deletions tests/crypto/test_online_verify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

from rubix.did import online_signature_verify
from rubix.signer import Signer
from rubix.client import RubixClient

@pytest.mark.skip(reason="requires online Rubix node")
def test_online_verify_valid_signature():
"""Test verifying a valid signature using an online verification service."""
node_url = "http://localhost:20000"
client = RubixClient(node_url)

signer = Signer(
rubixClient=client,
mnemonic="buffalo tumble defy laundry call almost little pig lift party property pool frame erosion mind library sample floor ring enemy word enemy foster ill"
)

signer_did = signer.did

message = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit."

signature = signer.get_keypair().sign(message)

is_valid = online_signature_verify(
rubixNodeBaseUrl=node_url,
did=signer_did,
message=message,
signature=signature
)
assert is_valid is True
10 changes: 5 additions & 5 deletions tests/crypto/test_secp256k1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from rubix.crypto.secp256k1 import Secp256k1Keypair

from rubix.crypto.secp256k1 import Secp256k1Keypair, secp256k1_verify

def test_secp256k1_keypair_from_private_key():
"""Test the generation of a Secp256k1 keypair from a private key."""
Expand Down Expand Up @@ -37,8 +36,9 @@ def test_secp256k1_verify_valid_message():
keypair = Secp256k1Keypair.from_private_key(private_key_bytes)
signature = keypair.sign(message)
assert isinstance(signature, bytes)

is_valid = keypair.verify(message, signature)

# Verify the signature
is_valid = secp256k1_verify(bytes.fromhex(keypair.public_key), message, signature)
assert is_valid is True

def test_secp256k1_verify_invalid_message():
Expand All @@ -55,6 +55,6 @@ def test_secp256k1_verify_invalid_message():
signature = keypair.sign(message_1)
assert isinstance(signature, bytes)

is_valid = keypair.verify(message_2, signature)
is_valid = secp256k1_verify(bytes.fromhex(keypair.public_key), message_2, signature)
assert is_valid is False