Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This project uses the changelog in accordance with [keepchangelog](http://keepac
## [unreleased][unreleased]
- Fix for static nested key recovery (@jekkos)
- Fix LEDs being stuck on after battery check (@suut)
- Add TCP support for the CLI (@suut)
- Fix build on Android in Termux (@suut)

## [v2.1.0][2025-09-02]
- Added UV, formatter and linter. Contribution guidelines. (@GameTec-live)
Expand Down
41 changes: 19 additions & 22 deletions software/script/chameleon_cli_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ def on_exec(self, args: argparse.Namespace):
if block_known == block_target and type_known == type_target:
print(color_string((CR, "Target key already known")))
return
print(f" - Nested recover one key running...")
print(" - Nested recover one key running...")
key = self.recover_a_key(block_known, type_known, key_known_bytes, block_target, type_target)
if key is None:
print(color_string((CY, "No key found, you can retry.")))
Expand Down Expand Up @@ -2366,7 +2366,7 @@ def on_exec(self, args: argparse.Namespace):
else:
try:
self.cmd.hf14a_raw(options=options, resp_timeout_ms=200, data=struct.pack('!BB', 0x30, args.page))
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# we may lose the tag again here
pass
print(color_string((CR, " - Auth failed")))
Expand Down Expand Up @@ -2431,7 +2431,7 @@ def on_exec(self, args: argparse.Namespace):
# send a command just to disable the field. use read to avoid corrupting the data
try:
self.cmd.hf14a_raw(options=options, resp_timeout_ms=200, data=struct.pack('!BB', 0x30, args.page))
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# we may lose the tag again here
pass
print(color_string((CR, " - Auth failed")))
Expand Down Expand Up @@ -2570,15 +2570,15 @@ def on_exec(self, args: argparse.Namespace):
version = self.cmd.mf0_ntag_get_version_data()

fd.write(f"# Version: {version.hex()}\n")
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
pass # slot does not have version data

try:
signature = self.cmd.mf0_ntag_get_signature_data()

if signature != b"\x00" * 32:
fd.write(f"# Signature: {signature.hex()}\n")
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
pass # slot does not have signature data

page = 0
Expand Down Expand Up @@ -2641,7 +2641,7 @@ def on_exec(self, args: argparse.Namespace):
else:
try:
self.cmd.hf14a_raw(options=options, resp_timeout_ms=200, data=struct.pack('!BB', 0x39, args.counter))
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# we may lose the tag again here
pass
print(color_string((CR, " - Auth failed")))
Expand Down Expand Up @@ -2698,14 +2698,14 @@ def do_dump(self, args: argparse.Namespace, param, fd, save_as_eml):
version = self.cmd.hf14a_raw(options=options, resp_timeout_ms=100, data=struct.pack('!B', 0x60))
if len(version) == 0:
version = None
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
version = None

# try sending AUTHENTICATE command and observe the result
try:
supports_auth = len(self.cmd.hf14a_raw(
options=options, resp_timeout_ms=100, data=struct.pack('!B', 0x1A))) != 0
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
supports_auth = False

if version is not None and not supports_auth:
Expand Down Expand Up @@ -2744,7 +2744,7 @@ def do_dump(self, args: argparse.Namespace, param, fd, save_as_eml):

print(color_string((CY, "Tag is likely NTAG 20x, reading until first error.")))
stop_page = 256
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# Regular Ultralight
tag_name = 'Mifare Ultralight'
stop_page = 16
Expand Down Expand Up @@ -2794,7 +2794,7 @@ def do_dump(self, args: argparse.Namespace, param, fd, save_as_eml):

try:
resp = self.cmd.hf14a_raw(options=options, resp_timeout_ms=200, data=struct.pack('!BB', 0x30, i))
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# probably lost tag, but we still need to disable rf field
resp = None

Expand Down Expand Up @@ -3085,9 +3085,6 @@ def args_parser(self) -> ArgumentParserNoExit:

def on_exec(self, args: argparse.Namespace):
import json
import queue
import signal
import random

if not args.offline:
challenges = self.collect_challenges(args.challenges)
Expand Down Expand Up @@ -3290,7 +3287,7 @@ def signal_handler(sig, frame):
key_found = False
crack_effect.stop_event.set()
crack_effect.erase_key()
print(f"\n\n\n[-] Error: Unexpected output from mfulc_des_brute\033[?25h")
print("\n\n\n[-] Error: Unexpected output from mfulc_des_brute\033[?25h")
break

# Extract the key segment from output
Expand Down Expand Up @@ -3324,7 +3321,7 @@ def signal_handler(sig, frame):
formatted_key = f"\033[1;34m{result_key}\033[0m"
print(f"[+] Found key: {formatted_key}\033[?25h")
if offline:
print(f"You can restore found key on the card with appropriate write commands")
print("You can restore found key on the card with appropriate write commands")
else:
# Restore the key on the card
print("[+] Restoring key to card...")
Expand Down Expand Up @@ -3392,7 +3389,7 @@ def on_exec(self, args: argparse.Namespace):

try:
self.cmd.mf0_ntag_set_version_data(args.set_version)
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
print(color_string((CR, "Tag type does not support GET_VERSION command.")))
return

Expand All @@ -3406,7 +3403,7 @@ def on_exec(self, args: argparse.Namespace):

try:
self.cmd.mf0_ntag_set_signature_data(args.set_signature)
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
print(color_string((CR, "Tag type does not support READ_SIG command.")))
return

Expand Down Expand Up @@ -3468,7 +3465,7 @@ def on_exec(self, args: argparse.Namespace):
write_mode = new_write_mode
else:
print(color_string((CY, "Requested write mode already set")))
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
print(color_string((CR, "Failed to set write mode. Check if device firmware supports this feature.")))

detection = self.cmd.mf0_ntag_get_detection_enable()
Expand Down Expand Up @@ -3514,28 +3511,28 @@ def on_exec(self, args: argparse.Namespace):
try:
write_mode = MifareUltralightWriteMode(self.cmd.mf0_ntag_get_write_mode())
print(f'- {"Write mode:":40}{color_string((CY, write_mode))}')
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
# Write mode not supported in current firmware
pass

# Existing version/signature display code
try:
version = self.cmd.mf0_ntag_get_version_data().hex().upper()
print(f'- {"Version:":40}{color_string((CY, version))}')
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
pass

try:
signature = self.cmd.mf0_ntag_get_signature_data().hex().upper()
print(f'- {"Signature:":40}{color_string((CY, signature))}')
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
pass

try:
detection = color_string((CG, "enabled")) if self.cmd.mf0_ntag_get_detection_enable() else color_string((CR, "disabled"))
print(
f'- {"Log (password) mode:":40}{f"{detection}"}')
except:
except (ValueError, chameleon_com.CMDInvalidException, TimeoutError):
pass

@hf_mfu.command('edetect')
Expand Down
116 changes: 85 additions & 31 deletions software/script/chameleon_com.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import sys
import queue
import struct
import threading
import time
import serial
import platform
from typing import Union
from enum import Enum, auto
import serial
import socket

from chameleon_utils import CR, CG, CC, CY, color_string
from chameleon_enum import Command, Status

ANDROID = 'android' in platform.release()

# each thread is waiting for its data for 100 ms before looping again
THREAD_BLOCKING_TIMEOUT = 0.1

# TODO: client settings
DEBUG = False

class TransportType(Enum):
NONE = auto()
SERIAL = auto()
SOCKET = auto()

class NotOpenException(Exception):
"""
Expand Down Expand Up @@ -57,7 +68,8 @@ def __init__(self):
"""
Create a chameleon device instance
"""
self.serial_instance: Union[serial.Serial, None] = None
self.transport: Union[serial.Serial, socket.socket, None] = None
self.transport_type = TransportType.NONE
self.send_data_queue = queue.Queue()
self.wait_response_map = {}
self.event_closing = threading.Event()
Expand All @@ -68,7 +80,7 @@ def isOpen(self) -> bool:

:return:
"""
return self.serial_instance is not None and self.serial_instance.is_open
return self.transport is not None and (self.transport_type is TransportType.SOCKET or self.transport.is_open)

def open(self, port) -> "ChameleonCom":
"""
Expand All @@ -82,19 +94,35 @@ def open(self, port) -> "ChameleonCom":
error = None
try:
# open serial port
self.serial_instance = serial.Serial(port=port, baudrate=115200)
if port.startswith('tcp:'):
host, _, port = port[4:].partition(':')
if not host or not port:
sys.exit(color_string(CR, 'Usage: tcp:127.0.0.1:4321'))
self.transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('Connecting to', host, int(port))
self.transport.connect((host, int(port)))
self.transport_type = TransportType.SOCKET
else:
if ANDROID:
sys.exit(color_string(CR, 'COM port is not supported on Android, make a USB-serial to TCP communication bridge'))
self.transport = serial.Serial(port=port, baudrate=115200)
self.transport_type = TransportType.SERIAL
except Exception as e:
error = e
finally:
if error is not None:
raise OpenFailException(error)
assert self.serial_instance is not None
try:
self.serial_instance.dtr = True # must make dtr enable
except Exception:
# not all serial support dtr, e.g. virtual serial over BLE
pass
self.serial_instance.timeout = THREAD_BLOCKING_TIMEOUT
assert self.transport is not None
assert self.transport_type is not TransportType.NONE
if self.transport_type is TransportType.SERIAL:
try:
self.transport.dtr = True # must make dtr enable
except Exception:
# not all serial support dtr, e.g. virtual serial over BLE
pass
self.transport.timeout = THREAD_BLOCKING_TIMEOUT
else: # SOCKET
self.transport.settimeout(THREAD_BLOCKING_TIMEOUT)
# clear variable
self.send_data_queue.queue.clear()
self.wait_response_map.clear()
Expand Down Expand Up @@ -136,12 +164,14 @@ def close(self):
"""
self.event_closing.set()
try:
assert self.serial_instance is not None
self.serial_instance.close()
assert self.transport is not None
if self.transport_type is TransportType.SOCKET:
self.transport.shutdown()
self.transport.close()
except Exception:
pass
finally:
self.serial_instance = None
self.transport = None
self.wait_response_map.clear()
self.send_data_queue.queue.clear()

Expand All @@ -159,16 +189,29 @@ def thread_data_receive(self):

while self.isOpen():
# receive
try:
assert self.serial_instance is not None
data_bytes = self.serial_instance.read()
except Exception as e:
if not self.event_closing.is_set():
print(f"Serial Error {e}, thread for receiver exit.")
self.close()
break
if len(data_bytes) > 0:
assert self.transport_type is not TransportType.NONE
if self.transport_type is TransportType.SERIAL:
try:
assert self.transport is not None
data_bytes = bytearray(self.transport.read())
except Exception as e:
if not self.event_closing.is_set():
print(f"Serial Error {e}, thread for receiver exit.")
self.close()
break
else: # SOCKET
try:
data_bytes = bytearray(self.transport.recv(1024))
except socket.timeout:
continue
except OSError:
print(color_string(CR, 'socket closed'))
self.transport = None
break

while len(data_bytes) > 0:
data_byte = data_bytes[0]
data_bytes = data_bytes[1:]
data_buffer.append(data_byte)
if data_position < struct.calcsize('!BB'): # start of frame + lrc1
if data_position == 0:
Expand Down Expand Up @@ -267,14 +310,25 @@ def thread_data_transfer(self):
self.wait_response_map[task_cmd]['start_time'] = start_time
self.wait_response_map[task_cmd]['end_time'] = start_time + task_timeout
self.wait_response_map[task_cmd]['is_timeout'] = False
try:
assert self.serial_instance is not None
# send to device
self.serial_instance.write(task['frame'])
except Exception as e:
print(f"Serial Error {e}, thread for transfer exit.")
self.close()
break
assert self.transport_type is not TransportType.NONE
if self.transport_type == TransportType.SERIAL:
try:
assert self.transport is not None
# send to device
self.transport.write(task['frame'])
except Exception as e:
print(f"Serial Error {e}, thread for transfer exit.")
self.close()
break
else: # SOCKET
try:
assert self.transport is not None
self.transport.sendall(task['frame'])
except OSError as e:
self.transport = None
print(f'Socket error {e}, thread for transfer exit.')
self.close()
break
# update queue status
self.send_data_queue.task_done()
# disconnect if DFU command has been sent
Expand Down
Loading
Loading