From e1df988a9c8c906d3b724ae65626b3cd313932d7 Mon Sep 17 00:00:00 2001 From: Freddie Honohan Date: Wed, 28 Jan 2026 07:52:38 +0000 Subject: [PATCH 1/5] Patching issue: https://github.com/bsv-blockchain/py-sdk/issues/135 --- .gitignore | 4 +- TEST_SETUP.md | 89 ++++++++++++++++++++++++++++++++++++++++++++ bsv/script/script.py | 78 ++++++++++++++++++++++++++------------ 3 files changed, 147 insertions(+), 24 deletions(-) create mode 100644 TEST_SETUP.md diff --git a/.gitignore b/.gitignore index 55d132b..fe453a5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,6 @@ dist/ htmlcov/ .coverage build/ -.venv/ \ No newline at end of file +.venv/ +.env* +.wallet diff --git a/TEST_SETUP.md b/TEST_SETUP.md new file mode 100644 index 0000000..e4c7051 --- /dev/null +++ b/TEST_SETUP.md @@ -0,0 +1,89 @@ +# Test Setup Commands for py-sdk + +## Setup Virtual Environment + +```bash +# Navigate to py-sdk directory +cd /home/sneakyfox/py-lib/py-sdk + +# Create virtual environment +python3 -m venv venv + +# Activate virtual environment +source venv/bin/activate + +# Upgrade pip +pip install --upgrade pip + +# Install dependencies (includes pytest and test dependencies) +pip install -r requirements.txt + +# Install the package in development mode (editable install) +pip install -e . +``` + +## Run Tests + +### Run all tests +```bash +# Make sure venv is activated +source venv/bin/activate + +# Run all tests +pytest + +# Run with verbose output +pytest -v + +# Run with coverage +pytest --cov=bsv --cov-report=html +``` + +### Run specific test files +```bash +# Run script-related tests +pytest tests/test_scripts.py -v + +# Run all script tests +pytest tests/test_script*.py -v + +# Run a specific test function +pytest tests/test_scripts.py::test_function_name -v +``` + +### Run tests matching a pattern +```bash +# Run tests matching "op_return" or "OP_RETURN" +pytest -k "op_return" -v + +# Run tests matching "chunk" +pytest -k "chunk" -v +``` + +## Quick Setup Script + +You can also create a simple setup script: + +```bash +#!/bin/bash +cd /home/sneakyfox/py-lib/py-sdk +python3 -m venv venv +source venv/bin/activate +pip install --upgrade pip +pip install -r requirements.txt +pip install -e . +echo "Setup complete! Activate with: source venv/bin/activate" +``` + +## Verify Installation + +```bash +# Check pytest is installed +pytest --version + +# Check bsv package is installed +python -c "import bsv; print(bsv.__version__)" + +# Run a quick test +pytest tests/test_scripts.py -v -k "test" --tb=short +``` diff --git a/bsv/script/script.py b/bsv/script/script.py index ae14ebf..cb9e34a 100644 --- a/bsv/script/script.py +++ b/bsv/script/script.py @@ -6,10 +6,10 @@ # BRC-106 compliance: Opcode aliases for parsing # Build a comprehensive mapping of all opcode names (including aliases) to their byte values OPCODE_ALIASES = { - 'OP_FALSE': b'\x00', - 'OP_0': b'\x00', - 'OP_TRUE': b'\x51', - 'OP_1': b'\x51' + "OP_FALSE": b"\x00", + "OP_0": b"\x00", + "OP_TRUE": b"\x51", + "OP_1": b"\x51", } # Build name->value mapping for all OpCodes @@ -17,6 +17,7 @@ # Merge with aliases OPCODE_NAME_VALUE_DICT.update(OPCODE_ALIASES) + class ScriptChunk: """ A representation of a chunk of a script, which includes an opcode. @@ -43,7 +44,7 @@ def __init__(self, script: Union[str, bytes, None] = None): Create script from hex string or bytes """ if script is None: - self.script: bytes = b'' + self.script: bytes = b"" elif isinstance(script, str): # script in hex string self.script: bytes = bytes.fromhex(script) @@ -51,7 +52,7 @@ def __init__(self, script: Union[str, bytes, None] = None): # script in bytes self.script: bytes = script else: - raise TypeError('unsupported script type') + raise TypeError("unsupported script type") # An array of script chunks that make up the script. self.chunks: List[ScriptChunk] = [] self._build_chunks() @@ -59,12 +60,40 @@ def __init__(self, script: Union[str, bytes, None] = None): def _build_chunks(self): self.chunks = [] reader = Reader(self.script) + in_conditional_block = ( + 0 # Track conditional depth for proper OP_RETURN handling + ) + while not reader.eof(): op = reader.read_bytes(1) chunk = ScriptChunk(op) data = None - if b'\x01' <= op <= b'\x4b': - data = reader.read_bytes(int.from_bytes(op, 'big')) + + # Track conditional blocks (OP_IF, OP_NOTIF, OP_VERIF, OP_VERNOTIF) + if ( + op == OpCode.OP_IF + or op == OpCode.OP_NOTIF + or op == OpCode.OP_VERIF + or op == OpCode.OP_VERNOTIF + ): + in_conditional_block += 1 + elif op == OpCode.OP_ENDIF: + in_conditional_block = max(0, in_conditional_block - 1) + + # Handle OP_RETURN: terminate script, treat remaining as data + # Only terminate if not inside a conditional block + if op == OpCode.OP_RETURN and in_conditional_block == 0: + # Read all remaining bytes as data (from position after 0x6a) + remaining_length = len(reader.getvalue()) - reader.tell() + if remaining_length > 0: + data = reader.read_bytes(remaining_length) + chunk.data = data + self.chunks.append(chunk) + break # Terminate parsing + + # Handle push data operations + if b"\x01" <= op <= b"\x4b": + data = reader.read_bytes(int.from_bytes(op, "big")) elif op == OpCode.OP_PUSHDATA1: # 0x4c length = reader.read_uint8() if length is not None: @@ -77,6 +106,7 @@ def _build_chunks(self): length = reader.read_uint32_le() if length is not None: data = reader.read_bytes(length) + chunk.data = data self.chunks.append(chunk) @@ -118,20 +148,22 @@ def __repr__(self) -> str: return self.__str__() @classmethod - def from_chunks(cls, chunks: List[ScriptChunk]) -> 'Script': - script = b'' + def from_chunks(cls, chunks: List[ScriptChunk]) -> "Script": + script = b"" for chunk in chunks: - script += encode_pushdata(chunk.data) if chunk.data is not None else chunk.op + script += ( + encode_pushdata(chunk.data) if chunk.data is not None else chunk.op + ) s = Script(script) s.chunks = chunks return s @classmethod - def from_asm(cls, asm: str) -> 'Script': + def from_asm(cls, asm: str) -> "Script": chunks: [ScriptChunk] = [] if not asm: # Handle empty string return Script.from_chunks(chunks) - tokens = asm.split(' ') + tokens = asm.split(" ") i = 0 while i < len(tokens): token = tokens[i] @@ -141,12 +173,12 @@ def from_asm(cls, asm: str) -> 'Script': opcode_value = OPCODE_NAME_VALUE_DICT[token] chunks.append(ScriptChunk(opcode_value)) i += 1 - elif token == '0': + elif token == "0": # Numeric literal 0 - opcode_value = b'\x00' + opcode_value = b"\x00" chunks.append(ScriptChunk(opcode_value)) i += 1 - elif token == '-1': + elif token == "-1": # Numeric literal -1 opcode_value = OpCode.OP_1NEGATE chunks.append(ScriptChunk(opcode_value)) @@ -155,13 +187,13 @@ def from_asm(cls, asm: str) -> 'Script': # Assume it's hex data to push hex_string = token if len(hex_string) % 2 != 0: - hex_string = '0' + hex_string + hex_string = "0" + hex_string hex_bytes = bytes.fromhex(hex_string) if hex_bytes.hex() != hex_string.lower(): - raise ValueError('invalid hex string in script') + raise ValueError("invalid hex string in script") hex_len = len(hex_bytes) - if 0 <= hex_len < int.from_bytes(OpCode.OP_PUSHDATA1, 'big'): - opcode_value = int.to_bytes(hex_len, 1, 'big') + if 0 <= hex_len < int.from_bytes(OpCode.OP_PUSHDATA1, "big"): + opcode_value = int.to_bytes(hex_len, 1, "big") elif hex_len < pow(2, 8): opcode_value = OpCode.OP_PUSHDATA1 elif hex_len < pow(2, 16): @@ -173,10 +205,10 @@ def from_asm(cls, asm: str) -> 'Script': return Script.from_chunks(chunks) def to_asm(self) -> str: - return ' '.join(str(chunk) for chunk in self.chunks) + return " ".join(str(chunk) for chunk in self.chunks) @classmethod - def find_and_delete(cls, source: 'Script', pattern: 'Script') -> 'Script': + def find_and_delete(cls, source: "Script", pattern: "Script") -> "Script": chunks = [] for chunk in source.chunks: if Script.from_chunks([chunk]).hex() != pattern.hex(): @@ -184,5 +216,5 @@ def find_and_delete(cls, source: 'Script', pattern: 'Script') -> 'Script': return Script.from_chunks(chunks) @classmethod - def write_bin(cls, octets: bytes) -> 'Script': + def write_bin(cls, octets: bytes) -> "Script": return Script(encode_pushdata(octets)) From a2302a986541704b048799e3e219596d6f6322a6 Mon Sep 17 00:00:00 2001 From: Freddie Honohan Date: Wed, 28 Jan 2026 08:38:04 +0000 Subject: [PATCH 2/5] Removing file --- TEST_SETUP.md | 89 --------------------------------------------------- 1 file changed, 89 deletions(-) delete mode 100644 TEST_SETUP.md diff --git a/TEST_SETUP.md b/TEST_SETUP.md deleted file mode 100644 index e4c7051..0000000 --- a/TEST_SETUP.md +++ /dev/null @@ -1,89 +0,0 @@ -# Test Setup Commands for py-sdk - -## Setup Virtual Environment - -```bash -# Navigate to py-sdk directory -cd /home/sneakyfox/py-lib/py-sdk - -# Create virtual environment -python3 -m venv venv - -# Activate virtual environment -source venv/bin/activate - -# Upgrade pip -pip install --upgrade pip - -# Install dependencies (includes pytest and test dependencies) -pip install -r requirements.txt - -# Install the package in development mode (editable install) -pip install -e . -``` - -## Run Tests - -### Run all tests -```bash -# Make sure venv is activated -source venv/bin/activate - -# Run all tests -pytest - -# Run with verbose output -pytest -v - -# Run with coverage -pytest --cov=bsv --cov-report=html -``` - -### Run specific test files -```bash -# Run script-related tests -pytest tests/test_scripts.py -v - -# Run all script tests -pytest tests/test_script*.py -v - -# Run a specific test function -pytest tests/test_scripts.py::test_function_name -v -``` - -### Run tests matching a pattern -```bash -# Run tests matching "op_return" or "OP_RETURN" -pytest -k "op_return" -v - -# Run tests matching "chunk" -pytest -k "chunk" -v -``` - -## Quick Setup Script - -You can also create a simple setup script: - -```bash -#!/bin/bash -cd /home/sneakyfox/py-lib/py-sdk -python3 -m venv venv -source venv/bin/activate -pip install --upgrade pip -pip install -r requirements.txt -pip install -e . -echo "Setup complete! Activate with: source venv/bin/activate" -``` - -## Verify Installation - -```bash -# Check pytest is installed -pytest --version - -# Check bsv package is installed -python -c "import bsv; print(bsv.__version__)" - -# Run a quick test -pytest tests/test_scripts.py -v -k "test" --tb=short -``` From f40e6c91c852914be7cbb92102617ed13897c67b Mon Sep 17 00:00:00 2001 From: Freddie Honohan Date: Wed, 28 Jan 2026 11:39:57 +0000 Subject: [PATCH 3/5] Addressed Qube issues --- bsv/script/script.py | 179 +++++++++++++++++++++++-------------------- 1 file changed, 97 insertions(+), 82 deletions(-) diff --git a/bsv/script/script.py b/bsv/script/script.py index cb9e34a..a34d949 100644 --- a/bsv/script/script.py +++ b/bsv/script/script.py @@ -44,80 +44,87 @@ def __init__(self, script: Union[str, bytes, None] = None): Create script from hex string or bytes """ if script is None: - self.script: bytes = b"" + self._bytes: bytes = b"" elif isinstance(script, str): # script in hex string - self.script: bytes = bytes.fromhex(script) + self._bytes: bytes = bytes.fromhex(script) elif isinstance(script, bytes): # script in bytes - self.script: bytes = script + self._bytes: bytes = script else: raise TypeError("unsupported script type") # An array of script chunks that make up the script. self.chunks: List[ScriptChunk] = [] self._build_chunks() + def _update_conditional_depth(self, op: bytes, depth: int) -> int: + """Update conditional block depth based on opcode.""" + if ( + op == OpCode.OP_IF + or op == OpCode.OP_NOTIF + or op == OpCode.OP_VERIF + or op == OpCode.OP_VERNOTIF + ): + return depth + 1 + if op == OpCode.OP_ENDIF: + return max(0, depth - 1) + return depth + + def _handle_op_return(self, reader: Reader, chunk: ScriptChunk) -> bool: + """Handle OP_RETURN opcode. Returns True if parsing should terminate.""" + remaining_length = len(reader.getvalue()) - reader.tell() + if remaining_length > 0: + chunk.data = reader.read_bytes(remaining_length) + else: + chunk.data = None + self.chunks.append(chunk) + return True # Terminate parsing + + def _read_push_data(self, reader: Reader, op: bytes) -> Optional[bytes]: + """Read push data based on opcode. Returns data bytes or None.""" + if b"\x01" <= op <= b"\x4b": + return reader.read_bytes(int.from_bytes(op, "big")) + if op == OpCode.OP_PUSHDATA1: + length = reader.read_uint8() + return reader.read_bytes(length) if length is not None else None + if op == OpCode.OP_PUSHDATA2: + length = reader.read_uint16_le() + return reader.read_bytes(length) if length is not None else None + if op == OpCode.OP_PUSHDATA4: + length = reader.read_uint32_le() + return reader.read_bytes(length) if length is not None else None + return None + def _build_chunks(self): self.chunks = [] - reader = Reader(self.script) - in_conditional_block = ( - 0 # Track conditional depth for proper OP_RETURN handling - ) + reader = Reader(self._bytes) + in_conditional_block = 0 while not reader.eof(): op = reader.read_bytes(1) chunk = ScriptChunk(op) - data = None - - # Track conditional blocks (OP_IF, OP_NOTIF, OP_VERIF, OP_VERNOTIF) - if ( - op == OpCode.OP_IF - or op == OpCode.OP_NOTIF - or op == OpCode.OP_VERIF - or op == OpCode.OP_VERNOTIF - ): - in_conditional_block += 1 - elif op == OpCode.OP_ENDIF: - in_conditional_block = max(0, in_conditional_block - 1) - - # Handle OP_RETURN: terminate script, treat remaining as data - # Only terminate if not inside a conditional block + + in_conditional_block = self._update_conditional_depth( + op, in_conditional_block + ) + if op == OpCode.OP_RETURN and in_conditional_block == 0: - # Read all remaining bytes as data (from position after 0x6a) - remaining_length = len(reader.getvalue()) - reader.tell() - if remaining_length > 0: - data = reader.read_bytes(remaining_length) - chunk.data = data - self.chunks.append(chunk) - break # Terminate parsing - - # Handle push data operations - if b"\x01" <= op <= b"\x4b": - data = reader.read_bytes(int.from_bytes(op, "big")) - elif op == OpCode.OP_PUSHDATA1: # 0x4c - length = reader.read_uint8() - if length is not None: - data = reader.read_bytes(length) - elif op == OpCode.OP_PUSHDATA2: - length = reader.read_uint16_le() - if length is not None: - data = reader.read_bytes(length) - elif op == OpCode.OP_PUSHDATA4: - length = reader.read_uint32_le() - if length is not None: - data = reader.read_bytes(length) + if self._handle_op_return(reader, chunk): + break + continue + data = self._read_push_data(reader, op) chunk.data = data self.chunks.append(chunk) def serialize(self) -> bytes: - return self.script + return self._bytes def hex(self) -> str: - return self.script.hex() + return self._bytes.hex() def byte_length(self) -> int: - return len(self.script) + return len(self._bytes) size = byte_length @@ -138,11 +145,11 @@ def is_push_only(self) -> bool: def __eq__(self, o: object) -> bool: if isinstance(o, Script): - return self.script == o.script + return self._bytes == o._bytes return super().__eq__(o) def __str__(self) -> str: - return self.script.hex() + return self._bytes.hex() def __repr__(self) -> str: return self.__str__() @@ -158,50 +165,58 @@ def from_chunks(cls, chunks: List[ScriptChunk]) -> "Script": s.chunks = chunks return s + @classmethod + def _parse_opcode_token(cls, token: str) -> Optional[bytes]: + """Parse a single token as an opcode. Returns opcode bytes or None.""" + if token in OPCODE_NAME_VALUE_DICT: + return OPCODE_NAME_VALUE_DICT[token] + if token == "0": + return b"\x00" + if token == "-1": + return OpCode.OP_1NEGATE + return None + + @classmethod + def _parse_data_token(cls, token: str) -> tuple[bytes, bytes]: + """Parse a token as hex data. Returns (opcode, data) tuple.""" + hex_string = token + if len(hex_string) % 2 != 0: + hex_string = "0" + hex_string + hex_bytes = bytes.fromhex(hex_string) + if hex_bytes.hex() != hex_string.lower(): + raise ValueError("invalid hex string in script") + + hex_len = len(hex_bytes) + if 0 <= hex_len < int.from_bytes(OpCode.OP_PUSHDATA1, "big"): + opcode_value = int.to_bytes(hex_len, 1, "big") + elif hex_len < pow(2, 8): + opcode_value = OpCode.OP_PUSHDATA1 + elif hex_len < pow(2, 16): + opcode_value = OpCode.OP_PUSHDATA2 + else: + opcode_value = OpCode.OP_PUSHDATA4 + return (opcode_value, hex_bytes) + @classmethod def from_asm(cls, asm: str) -> "Script": chunks: [ScriptChunk] = [] - if not asm: # Handle empty string + if not asm: return Script.from_chunks(chunks) + tokens = asm.split(" ") i = 0 while i < len(tokens): token = tokens[i] - opcode_value: Optional[bytes] = None - # BRC-106: Check if token is a recognized opcode (including aliases) - if token in OPCODE_NAME_VALUE_DICT: - opcode_value = OPCODE_NAME_VALUE_DICT[token] - chunks.append(ScriptChunk(opcode_value)) - i += 1 - elif token == "0": - # Numeric literal 0 - opcode_value = b"\x00" - chunks.append(ScriptChunk(opcode_value)) - i += 1 - elif token == "-1": - # Numeric literal -1 - opcode_value = OpCode.OP_1NEGATE + opcode_value = cls._parse_opcode_token(token) + + if opcode_value is not None: chunks.append(ScriptChunk(opcode_value)) i += 1 else: - # Assume it's hex data to push - hex_string = token - if len(hex_string) % 2 != 0: - hex_string = "0" + hex_string - hex_bytes = bytes.fromhex(hex_string) - if hex_bytes.hex() != hex_string.lower(): - raise ValueError("invalid hex string in script") - hex_len = len(hex_bytes) - if 0 <= hex_len < int.from_bytes(OpCode.OP_PUSHDATA1, "big"): - opcode_value = int.to_bytes(hex_len, 1, "big") - elif hex_len < pow(2, 8): - opcode_value = OpCode.OP_PUSHDATA1 - elif hex_len < pow(2, 16): - opcode_value = OpCode.OP_PUSHDATA2 - elif hex_len < pow(2, 32): - opcode_value = OpCode.OP_PUSHDATA4 + opcode_value, hex_bytes = cls._parse_data_token(token) chunks.append(ScriptChunk(opcode_value, hex_bytes)) - i = i + 1 + i += 1 + return Script.from_chunks(chunks) def to_asm(self) -> str: From 6e683a7ffc493c27ca445b229ca9b520b1c36457 Mon Sep 17 00:00:00 2001 From: Freddie Honohan Date: Wed, 28 Jan 2026 12:52:53 +0000 Subject: [PATCH 4/5] Committing test --- tests/test_scripts.py | 476 +++++++++++++++++++++++------------------- 1 file changed, 267 insertions(+), 209 deletions(-) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index f1c7fb3..62e9619 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -11,99 +11,148 @@ def test_script(): - locking_script = '76a9146a176cd51593e00542b8e1958b7da2be97452d0588ac' + locking_script = "76a9146a176cd51593e00542b8e1958b7da2be97452d0588ac" assert Script(locking_script) == Script(bytes.fromhex(locking_script)) assert Script(locking_script).hex() == locking_script - assert Script(locking_script).size_varint() == b'\x19' + assert Script(locking_script).size_varint() == b"\x19" - assert Script().serialize() == b'' - assert Script().hex() == '' + assert Script().serialize() == b"" + assert Script().hex() == "" assert Script().byte_length() == 0 - with pytest.raises(TypeError, match=r'unsupported script type'): + with pytest.raises(TypeError, match=r"unsupported script type"): # noinspection PyTypeChecker Script(1) def test_p2pkh(): - address = '1AfxgwYJrBgriZDLryfyKuSdBsi59jeBX9' - locking_script = '76a9146a176cd51593e00542b8e1958b7da2be97452d0588ac' + address = "1AfxgwYJrBgriZDLryfyKuSdBsi59jeBX9" + locking_script = "76a9146a176cd51593e00542b8e1958b7da2be97452d0588ac" assert P2PKH().lock(address) == Script(locking_script) assert P2PKH().lock(address_to_public_key_hash(address)) == Script(locking_script) - with pytest.raises(TypeError, match=r"unsupported type to parse P2PKH locking script"): + with pytest.raises( + TypeError, match=r"unsupported type to parse P2PKH locking script" + ): # noinspection PyTypeChecker P2PKH().lock(1) - key_compressed = PrivateKey('L5agPjZKceSTkhqZF2dmFptT5LFrbr6ZGPvP7u4A6dvhTrr71WZ9') - key_uncompressed = PrivateKey('5KiANv9EHEU4o9oLzZ6A7z4xJJ3uvfK2RLEubBtTz1fSwAbpJ2U') + key_compressed = PrivateKey("L5agPjZKceSTkhqZF2dmFptT5LFrbr6ZGPvP7u4A6dvhTrr71WZ9") + key_uncompressed = PrivateKey("5KiANv9EHEU4o9oLzZ6A7z4xJJ3uvfK2RLEubBtTz1fSwAbpJ2U") assert P2PKH().unlock(key_compressed).estimated_unlocking_byte_length() == 107 assert P2PKH().unlock(key_uncompressed).estimated_unlocking_byte_length() == 139 source_tx = Transaction( - [], + [], [TransactionOutput(locking_script=Script(locking_script), satoshis=1000)] + ) + tx = Transaction( [ - TransactionOutput( - locking_script=Script(locking_script), - satoshis=1000 + TransactionInput( + source_transaction=source_tx, + source_output_index=0, + unlocking_script_template=P2PKH().unlock(key_compressed), ) - ] + ], + [TransactionOutput(locking_script=P2PKH().lock(address), change=True)], ) - tx = Transaction([ - TransactionInput( - source_transaction=source_tx, - source_output_index=0, - unlocking_script_template=P2PKH().unlock(key_compressed) - ) - ], - [ - TransactionOutput( - locking_script=P2PKH().lock(address), - change=True - ) - ]) - + tx.fee() tx.sign() - + unlocking_script = P2PKH().unlock(key_compressed).sign(tx, 0) assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() in [106, 107] - - spend = Spend({ - 'sourceTXID': tx.inputs[0].source_txid, - 'sourceOutputIndex': tx.inputs[0].source_output_index, - 'sourceSatoshis': source_tx.outputs[0].satoshis, - 'lockingScript': source_tx.outputs[0].locking_script, - 'transactionVersion': tx.version, - 'otherInputs': [], - 'inputIndex': 0, - 'unlockingScript': tx.inputs[0].unlocking_script, - 'outputs': tx.outputs, - 'inputSequence': tx.inputs[0].sequence, - 'lockTime': tx.locktime, - }) + + spend = Spend( + { + "sourceTXID": tx.inputs[0].source_txid, + "sourceOutputIndex": tx.inputs[0].source_output_index, + "sourceSatoshis": source_tx.outputs[0].satoshis, + "lockingScript": source_tx.outputs[0].locking_script, + "transactionVersion": tx.version, + "otherInputs": [], + "inputIndex": 0, + "unlockingScript": tx.inputs[0].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[0].sequence, + "lockTime": tx.locktime, + } + ) assert spend.validate() def test_op_return(): - assert OpReturn().lock(['0']) == Script('006a0130') - assert OpReturn().lock(['0' * 0x4b]) == Script('006a' + '4b' + '30' * 0x4b) - assert OpReturn().lock(['0' * 0x4c]) == Script('006a' + '4c4c' + '30' * 0x4c) - assert OpReturn().lock(['0' * 0x0100]) == Script('006a' + '4d0001' + '30' * 0x0100) - assert OpReturn().lock([b'\x31\x32', '345']) == Script('006a' + '023132' + '03333435') + assert OpReturn().lock(["0"]) == Script("006a0130") + assert OpReturn().lock(["0" * 0x4B]) == Script("006a" + "4b" + "30" * 0x4B) + assert OpReturn().lock(["0" * 0x4C]) == Script("006a" + "4c4c" + "30" * 0x4C) + assert OpReturn().lock(["0" * 0x0100]) == Script("006a" + "4d0001" + "30" * 0x0100) + assert OpReturn().lock([b"\x31\x32", "345"]) == Script( + "006a" + "023132" + "03333435" + ) - with pytest.raises(TypeError, match=r"unsupported type to parse OP_RETURN locking script"): + with pytest.raises( + TypeError, match=r"unsupported type to parse OP_RETURN locking script" + ): # noinspection PyTypeChecker OpReturn().lock([1]) +def test_op_return_chunk_parsing(): + """ + Test that OP_RETURN correctly terminates script parsing and treats remaining bytes as data. + This verifies the fix for issue where scripts starting with 0x00 6a were incorrectly parsed. + """ + # Test case: OP_FALSE OP_RETURN with data (the bug case) + # Script: 00 (OP_FALSE) 6a (OP_RETURN) 04 (push 4 bytes) 54657374 ("Test") + script = Script("006a0454657374") + chunks = list(script.chunks) + + # Should parse as 2 chunks, not 3 + assert len(chunks) == 2, f"Expected 2 chunks, got {len(chunks)}" + + # First chunk: OP_FALSE + assert chunks[0].op == b"\x00" + assert chunks[0].data is None + + # Second chunk: OP_RETURN with all remaining data + assert chunks[1].op == b"\x6a" + assert chunks[1].data == b"\x04Test" + + # Test case: OP_RETURN with data (no OP_FALSE prefix) + script2 = Script("6a0454657374") + chunks2 = list(script2.chunks) + + assert len(chunks2) == 1, f"Expected 1 chunk, got {len(chunks2)}" + assert chunks2[0].op == b"\x6a" + assert chunks2[0].data == b"\x04Test" + + # Test case: OP_RETURN with no data + script3 = Script("6a") + chunks3 = list(script3.chunks) + + assert len(chunks3) == 1 + assert chunks3[0].op == b"\x6a" + assert chunks3[0].data is None + + # Test case: OP_FALSE OP_RETURN with no data + script4 = Script("006a") + chunks4 = list(script4.chunks) + + assert len(chunks4) == 2 + assert chunks4[0].op == b"\x00" + assert chunks4[0].data is None + assert chunks4[1].op == b"\x6a" + assert chunks4[1].data is None + + def test_p2pk(): - private_key = PrivateKey('L5agPjZKceSTkhqZF2dmFptT5LFrbr6ZGPvP7u4A6dvhTrr71WZ9') + private_key = PrivateKey("L5agPjZKceSTkhqZF2dmFptT5LFrbr6ZGPvP7u4A6dvhTrr71WZ9") public_key = private_key.public_key() assert P2PK().lock(public_key.hex()) == P2PK().lock(public_key.serialize()) - with pytest.raises(TypeError, match=r"unsupported type to parse P2PK locking script"): + with pytest.raises( + TypeError, match=r"unsupported type to parse P2PK locking script" + ): # noinspection PyTypeChecker P2PK().lock(1) @@ -111,24 +160,24 @@ def test_p2pk(): [], [ TransactionOutput( - locking_script=P2PK().lock(public_key.hex()), - satoshis=1000 + locking_script=P2PK().lock(public_key.hex()), satoshis=1000 ) - ] + ], + ) + tx = Transaction( + [ + TransactionInput( + source_transaction=source_tx, + source_output_index=0, + unlocking_script_template=P2PK().unlock(private_key), + ) + ], + [ + TransactionOutput( + locking_script=P2PKH().lock(public_key.address()), change=True + ) + ], ) - tx = Transaction([ - TransactionInput( - source_transaction=source_tx, - source_output_index=0, - unlocking_script_template=P2PK().unlock(private_key) - ) - ], - [ - TransactionOutput( - locking_script=P2PKH().lock(public_key.address()), - change=True - ) - ]) tx.fee() tx.sign() @@ -136,103 +185,114 @@ def test_p2pk(): unlocking_script = P2PK().unlock(private_key).sign(tx, 0) assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() in [72, 73] - - spend = Spend({ - 'sourceTXID': tx.inputs[0].source_txid, - 'sourceOutputIndex': tx.inputs[0].source_output_index, - 'sourceSatoshis': source_tx.outputs[0].satoshis, - 'lockingScript': source_tx.outputs[0].locking_script, - 'transactionVersion': tx.version, - 'otherInputs': [], - 'inputIndex': 0, - 'unlockingScript': tx.inputs[0].unlocking_script, - 'outputs': tx.outputs, - 'inputSequence': tx.inputs[0].sequence, - 'lockTime': tx.locktime, - }) + + spend = Spend( + { + "sourceTXID": tx.inputs[0].source_txid, + "sourceOutputIndex": tx.inputs[0].source_output_index, + "sourceSatoshis": source_tx.outputs[0].satoshis, + "lockingScript": source_tx.outputs[0].locking_script, + "transactionVersion": tx.version, + "otherInputs": [], + "inputIndex": 0, + "unlockingScript": tx.inputs[0].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[0].sequence, + "lockTime": tx.locktime, + } + ) assert spend.validate() def test_bare_multisig(): privs = [PrivateKey(), PrivateKey(), PrivateKey()] pubs = [ - privs[0].public_key().serialize(), + privs[0].public_key().serialize(), privs[1].public_key().serialize(), - privs[2].public_key().serialize() + privs[2].public_key().serialize(), ] - encoded_pks = b''.join([encode_pushdata(pk if isinstance(pk, bytes) else bytes.fromhex(pk)) for pk in pubs]) + encoded_pks = b"".join( + [ + encode_pushdata(pk if isinstance(pk, bytes) else bytes.fromhex(pk)) + for pk in pubs + ] + ) - expected_locking = encode_int(2) + encoded_pks + encode_int(3) + OpCode.OP_CHECKMULTISIG + expected_locking = ( + encode_int(2) + encoded_pks + encode_int(3) + OpCode.OP_CHECKMULTISIG + ) assert BareMultisig().lock(pubs, 2).serialize() == expected_locking - + source_tx = Transaction( [], + [TransactionOutput(locking_script=BareMultisig().lock(pubs, 2), satoshis=1000)], + ) + tx = Transaction( + [ + TransactionInput( + source_transaction=source_tx, + source_output_index=0, + unlocking_script_template=BareMultisig().unlock(privs[:2]), + ) + ], [ TransactionOutput( - locking_script=BareMultisig().lock(pubs, 2), - satoshis=1000 + locking_script=P2PKH().lock("1AfxgwYJrBgriZDLryfyKuSdBsi59jeBX9"), + change=True, ) - ] + ], ) - tx = Transaction([ - TransactionInput( - source_transaction=source_tx, - source_output_index=0, - unlocking_script_template=BareMultisig().unlock(privs[:2]) - ) - ], [ - TransactionOutput( - locking_script=P2PKH().lock('1AfxgwYJrBgriZDLryfyKuSdBsi59jeBX9'), - change=True - ) - ]) - + tx.fee() tx.sign() unlocking_script = BareMultisig().unlock(privs[:2]).sign(tx, 0) assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() >= 144 - - spend = Spend({ - 'sourceTXID': tx.inputs[0].source_txid, - 'sourceOutputIndex': tx.inputs[0].source_output_index, - 'sourceSatoshis': source_tx.outputs[0].satoshis, - 'lockingScript': source_tx.outputs[0].locking_script, - 'transactionVersion': tx.version, - 'otherInputs': [], - 'inputIndex': 0, - 'unlockingScript': tx.inputs[0].unlocking_script, - 'outputs': tx.outputs, - 'inputSequence': tx.inputs[0].sequence, - 'lockTime': tx.locktime, - }) + + spend = Spend( + { + "sourceTXID": tx.inputs[0].source_txid, + "sourceOutputIndex": tx.inputs[0].source_output_index, + "sourceSatoshis": source_tx.outputs[0].satoshis, + "lockingScript": source_tx.outputs[0].locking_script, + "transactionVersion": tx.version, + "otherInputs": [], + "inputIndex": 0, + "unlockingScript": tx.inputs[0].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[0].sequence, + "lockTime": tx.locktime, + } + ) assert spend.validate() def test_is_push_only(): - assert Script('00').is_push_only() # OP_0 - assert not Script('006a').is_push_only() # OP_0 OP_RETURN - assert Script('4c051010101010').is_push_only() + assert Script("00").is_push_only() # OP_0 + assert not Script("006a").is_push_only() # OP_0 OP_RETURN + assert Script("4c051010101010").is_push_only() # like bitcoind, we regard OP_RESERVED as being "push only" - assert Script('50').is_push_only() # OP_RESERVED + assert Script("50").is_push_only() # OP_RESERVED def test_to_asm(): - assert Script('000301020300').to_asm() == 'OP_FALSE 010203 OP_FALSE' + assert Script("000301020300").to_asm() == "OP_FALSE 010203 OP_FALSE" - asm = 'OP_DUP OP_HASH160 f4c03610e60ad15100929cc23da2f3a799af1725 OP_EQUALVERIFY OP_CHECKSIG' - assert Script('76a914f4c03610e60ad15100929cc23da2f3a799af172588ac').to_asm() == asm + asm = "OP_DUP OP_HASH160 f4c03610e60ad15100929cc23da2f3a799af1725 OP_EQUALVERIFY OP_CHECKSIG" + assert Script("76a914f4c03610e60ad15100929cc23da2f3a799af172588ac").to_asm() == asm def test_from_asm(): - assert Script.from_asm('OP_0 3 010203 OP_0').to_asm() == 'OP_FALSE 03 010203 OP_FALSE' + assert ( + Script.from_asm("OP_0 3 010203 OP_0").to_asm() == "OP_FALSE 03 010203 OP_FALSE" + ) asms = [ - '', - 'OP_FALSE 010203 OP_FALSE', - 'OP_SHA256 8cc17e2a2b10e1da145488458a6edec4a1fdb1921c2d5ccbc96aa0ed31b4d5f8 OP_EQUALVERIFY', + "", + "OP_FALSE 010203 OP_FALSE", + "OP_SHA256 8cc17e2a2b10e1da145488458a6edec4a1fdb1921c2d5ccbc96aa0ed31b4d5f8 OP_EQUALVERIFY", ] for asm in asms: assert Script.from_asm(asm).to_asm() == asm @@ -242,58 +302,58 @@ def test_from_asm(): _asm_pushdata(pow(2, 17)) asms = [ - 'OP_FALSE', - 'OP_0', - '0', + "OP_FALSE", + "OP_0", + "0", ] for asm in asms: - assert Script.from_asm(asm).to_asm() == 'OP_FALSE' + assert Script.from_asm(asm).to_asm() == "OP_FALSE" asms = [ - 'OP_1NEGATE', - '-1', + "OP_1NEGATE", + "-1", ] for asm in asms: - assert Script.from_asm(asm).to_asm() == 'OP_1NEGATE' + assert Script.from_asm(asm).to_asm() == "OP_1NEGATE" def _asm_pushdata(byte_length: int): - octets = b'\x00' * byte_length - asm = 'OP_RETURN ' + octets.hex() + octets = b"\x00" * byte_length + asm = "OP_RETURN " + octets.hex() assert Script.from_asm(asm).to_asm() == asm def test_find_and_delete(): - source = Script.from_asm('OP_RETURN f0f0') - assert Script.find_and_delete(source, Script.from_asm('f0f0')).to_asm() == 'OP_RETURN' - + source = Script.from_asm("OP_RETURN f0f0") + assert ( + Script.find_and_delete(source, Script.from_asm("f0f0")).to_asm() == "OP_RETURN" + ) + + def test_r_puzzle(): private_key = PrivateKey() - public_key = private_key.public_key() - + k = PrivateKey().int() G: Point = curve.g r = curve_multiply(k, G).x % curve.n - - r_bytes = r.to_bytes(32, byteorder='big') - if r_bytes[0] > 0x7f: - r_bytes = b'\x00' + r_bytes - + + r_bytes = r.to_bytes(32, byteorder="big") + if r_bytes[0] > 0x7F: + r_bytes = b"\x00" + r_bytes + source_tx = Transaction( [], [ - TransactionOutput( - locking_script=RPuzzle().lock(r_bytes), satoshis=100 - ), + TransactionOutput(locking_script=RPuzzle().lock(r_bytes), satoshis=100), TransactionOutput( locking_script=P2PKH().lock(private_key.address()), change=True - ) - ] + ), + ], ) - + source_tx.fee() source_tx.sign() - + tx = Transaction( [ TransactionInput( @@ -307,29 +367,32 @@ def test_r_puzzle(): TransactionOutput( locking_script=P2PKH().lock(private_key.address()), change=True ) - ] + ], ) - + tx.fee() tx.sign() - assert(len(tx.inputs[0].unlocking_script.serialize()) >= 106) - - spend = Spend({ - 'sourceTXID': tx.inputs[0].source_txid, - 'sourceOutputIndex': tx.inputs[0].source_output_index, - 'sourceSatoshis': source_tx.outputs[0].satoshis, - 'lockingScript': source_tx.outputs[0].locking_script, - 'transactionVersion': tx.version, - 'otherInputs': [], - 'inputIndex': 0, - 'unlockingScript': tx.inputs[0].unlocking_script, - 'outputs': tx.outputs, - 'inputSequence': tx.inputs[0].sequence, - 'lockTime': tx.locktime, - }) + assert len(tx.inputs[0].unlocking_script.serialize()) >= 106 + + spend = Spend( + { + "sourceTXID": tx.inputs[0].source_txid, + "sourceOutputIndex": tx.inputs[0].source_output_index, + "sourceSatoshis": source_tx.outputs[0].satoshis, + "lockingScript": source_tx.outputs[0].locking_script, + "transactionVersion": tx.version, + "otherInputs": [], + "inputIndex": 0, + "unlockingScript": tx.inputs[0].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[0].sequence, + "lockTime": tx.locktime, + } + ) assert spend.validate() + def test_p2pkh_sighash_acp(): key = PrivateKey() @@ -337,53 +400,48 @@ def test_p2pkh_sighash_acp(): [], [ TransactionOutput( - locking_script=P2PKH().lock(key.address()), - satoshis=1000 + locking_script=P2PKH().lock(key.address()), satoshis=1000 ), - TransactionOutput( - locking_script=P2PKH().lock(key.address()), - satoshis=245 + TransactionOutput(locking_script=P2PKH().lock(key.address()), satoshis=245), + ], + ) + tx = Transaction( + [ + TransactionInput( + source_transaction=source_tx, + source_output_index=0, + unlocking_script_template=P2PKH().unlock(key), + sighash=SIGHASH.ALL_ANYONECANPAY_FORKID, ) - ] + ], + [TransactionOutput(locking_script=P2PKH().lock(key.address()), change=True)], ) - tx = Transaction([ - TransactionInput( - source_transaction=source_tx, - source_output_index=0, - unlocking_script_template=P2PKH().unlock(key), - sighash=SIGHASH.ALL_ANYONECANPAY_FORKID - ) - ], - [ - TransactionOutput( - locking_script=P2PKH().lock(key.address()), - change=True - ) - ]) - + tx.fee() tx.sign() - + # Add another input that shouldn't break signature. tx.add_input( TransactionInput( source_transaction=source_tx, source_output_index=1, - unlocking_script_template=P2PKH().unlock(key) + unlocking_script_template=P2PKH().unlock(key), ) ) - - spend = Spend({ - 'sourceTXID': tx.inputs[0].source_txid, - 'sourceOutputIndex': tx.inputs[0].source_output_index, - 'sourceSatoshis': source_tx.outputs[0].satoshis, - 'lockingScript': source_tx.outputs[0].locking_script, - 'transactionVersion': tx.version, - 'otherInputs': [tx.inputs[1]], - 'inputIndex': 0, - 'unlockingScript': tx.inputs[0].unlocking_script, - 'outputs': tx.outputs, - 'inputSequence': tx.inputs[0].sequence, - 'lockTime': tx.locktime, - }) - assert spend.validate() \ No newline at end of file + + spend = Spend( + { + "sourceTXID": tx.inputs[0].source_txid, + "sourceOutputIndex": tx.inputs[0].source_output_index, + "sourceSatoshis": source_tx.outputs[0].satoshis, + "lockingScript": source_tx.outputs[0].locking_script, + "transactionVersion": tx.version, + "otherInputs": [tx.inputs[1]], + "inputIndex": 0, + "unlockingScript": tx.inputs[0].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[0].sequence, + "lockTime": tx.locktime, + } + ) + assert spend.validate() From ec6a58bcfc8430d73af134d0958d4b29c193abeb Mon Sep 17 00:00:00 2001 From: Freddie Honohan Date: Wed, 28 Jan 2026 12:58:49 +0000 Subject: [PATCH 5/5] SONAR --- tests/test_scripts.py | 115 ++++++++++++------------------------------ 1 file changed, 31 insertions(+), 84 deletions(-) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 62e9619..d44a3b1 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -10,6 +10,32 @@ from bsv.curve import curve_multiply, curve, Point +def _create_spend_validator(tx, source_tx, input_index=0, other_inputs=None): + """Helper function to create and validate a Spend object from transaction data.""" + if other_inputs is None: + other_inputs = [] + spend = Spend( + { + "sourceTXID": tx.inputs[input_index].source_txid, + "sourceOutputIndex": tx.inputs[input_index].source_output_index, + "sourceSatoshis": source_tx.outputs[ + tx.inputs[input_index].source_output_index + ].satoshis, + "lockingScript": source_tx.outputs[ + tx.inputs[input_index].source_output_index + ].locking_script, + "transactionVersion": tx.version, + "otherInputs": other_inputs, + "inputIndex": input_index, + "unlockingScript": tx.inputs[input_index].unlocking_script, + "outputs": tx.outputs, + "inputSequence": tx.inputs[input_index].sequence, + "lockTime": tx.locktime, + } + ) + assert spend.validate() + + def test_script(): locking_script = "76a9146a176cd51593e00542b8e1958b7da2be97452d0588ac" assert Script(locking_script) == Script(bytes.fromhex(locking_script)) @@ -63,22 +89,7 @@ def test_p2pkh(): assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() in [106, 107] - spend = Spend( - { - "sourceTXID": tx.inputs[0].source_txid, - "sourceOutputIndex": tx.inputs[0].source_output_index, - "sourceSatoshis": source_tx.outputs[0].satoshis, - "lockingScript": source_tx.outputs[0].locking_script, - "transactionVersion": tx.version, - "otherInputs": [], - "inputIndex": 0, - "unlockingScript": tx.inputs[0].unlocking_script, - "outputs": tx.outputs, - "inputSequence": tx.inputs[0].sequence, - "lockTime": tx.locktime, - } - ) - assert spend.validate() + _create_spend_validator(tx, source_tx) def test_op_return(): @@ -185,23 +196,7 @@ def test_p2pk(): unlocking_script = P2PK().unlock(private_key).sign(tx, 0) assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() in [72, 73] - - spend = Spend( - { - "sourceTXID": tx.inputs[0].source_txid, - "sourceOutputIndex": tx.inputs[0].source_output_index, - "sourceSatoshis": source_tx.outputs[0].satoshis, - "lockingScript": source_tx.outputs[0].locking_script, - "transactionVersion": tx.version, - "otherInputs": [], - "inputIndex": 0, - "unlockingScript": tx.inputs[0].unlocking_script, - "outputs": tx.outputs, - "inputSequence": tx.inputs[0].sequence, - "lockTime": tx.locktime, - } - ) - assert spend.validate() + _create_spend_validator(tx, source_tx) def test_bare_multisig(): @@ -249,23 +244,7 @@ def test_bare_multisig(): unlocking_script = BareMultisig().unlock(privs[:2]).sign(tx, 0) assert isinstance(unlocking_script, Script) assert unlocking_script.byte_length() >= 144 - - spend = Spend( - { - "sourceTXID": tx.inputs[0].source_txid, - "sourceOutputIndex": tx.inputs[0].source_output_index, - "sourceSatoshis": source_tx.outputs[0].satoshis, - "lockingScript": source_tx.outputs[0].locking_script, - "transactionVersion": tx.version, - "otherInputs": [], - "inputIndex": 0, - "unlockingScript": tx.inputs[0].unlocking_script, - "outputs": tx.outputs, - "inputSequence": tx.inputs[0].sequence, - "lockTime": tx.locktime, - } - ) - assert spend.validate() + _create_spend_validator(tx, source_tx) def test_is_push_only(): @@ -374,23 +353,7 @@ def test_r_puzzle(): tx.sign() assert len(tx.inputs[0].unlocking_script.serialize()) >= 106 - - spend = Spend( - { - "sourceTXID": tx.inputs[0].source_txid, - "sourceOutputIndex": tx.inputs[0].source_output_index, - "sourceSatoshis": source_tx.outputs[0].satoshis, - "lockingScript": source_tx.outputs[0].locking_script, - "transactionVersion": tx.version, - "otherInputs": [], - "inputIndex": 0, - "unlockingScript": tx.inputs[0].unlocking_script, - "outputs": tx.outputs, - "inputSequence": tx.inputs[0].sequence, - "lockTime": tx.locktime, - } - ) - assert spend.validate() + _create_spend_validator(tx, source_tx) def test_p2pkh_sighash_acp(): @@ -428,20 +391,4 @@ def test_p2pkh_sighash_acp(): unlocking_script_template=P2PKH().unlock(key), ) ) - - spend = Spend( - { - "sourceTXID": tx.inputs[0].source_txid, - "sourceOutputIndex": tx.inputs[0].source_output_index, - "sourceSatoshis": source_tx.outputs[0].satoshis, - "lockingScript": source_tx.outputs[0].locking_script, - "transactionVersion": tx.version, - "otherInputs": [tx.inputs[1]], - "inputIndex": 0, - "unlockingScript": tx.inputs[0].unlocking_script, - "outputs": tx.outputs, - "inputSequence": tx.inputs[0].sequence, - "lockTime": tx.locktime, - } - ) - assert spend.validate() + _create_spend_validator(tx, source_tx, other_inputs=[tx.inputs[1]])