Skip to content

Commit 8c45bbc

Browse files
cursoragentsc0tfree
andcommitted
Refactor: Improve netbyte core functionality and testing
This commit introduces several improvements to the netbyte tool: - **Core Module:** A new `netbyte/core.py` module is created to house reusable logic, including `to_hex` and `parse_hex_bytes` functions. This promotes better code organization and testability. - **Enhanced `to_hex`:** The `to_hex` function now correctly handles byte strings and provides more robust ASCII references. - **New `parse_hex_bytes`:** A new function `parse_hex_bytes` is added to parse various hex string formats, making it easier to send hex data. - **Improved Error Handling:** `print_error` now raises `SystemExit` for cleaner exit handling. - **Listen Mode:** Added `--listen` and `--bind` arguments to enable listening for incoming connections (TCP/UDP). - **Hex Input:** Introduced `--send-hex` to interpret stdin as hex bytes before sending. - **Test Suite:** A basic test suite (`tests/test_core.py`) is added to verify the core functionality. - **CI Workflow:** A GitHub Actions workflow (`.github/workflows/tests.yml`) is added to automatically run tests on pushes and pull requests. - **Dependencies:** Updated `setup.py` to include `pytest` for testing and specify Python 3.9+ compatibility. - **Test Server:** Minor updates to `testserver.py` for Python 3 compatibility and byte string handling. Co-authored-by: henry <henry@prince.co>
1 parent d990756 commit 8c45bbc

File tree

6 files changed

+300
-97
lines changed

6 files changed

+300
-97
lines changed

.github/workflows/tests.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: tests
2+
3+
on:
4+
push:
5+
pull_request:
6+
7+
jobs:
8+
pytest:
9+
runs-on: ubuntu-latest
10+
steps:
11+
- uses: actions/checkout@v4
12+
- uses: actions/setup-python@v5
13+
with:
14+
python-version: '3.11'
15+
- name: Install
16+
run: |
17+
python -m pip install --upgrade pip
18+
python -m pip install -e '.[test]'
19+
- name: Run tests
20+
run: pytest -q

netbyte/core.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""Core helpers for netbyte.
2+
3+
This module is intentionally free of socket/CLI code so it can be unit tested.
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from dataclasses import dataclass
9+
10+
11+
_SYMBOLS = set("~`!@#$%^&*()_-+={}[]:>;',</?*-+")
12+
13+
14+
def is_symbol(ch: str) -> bool:
15+
return ch in _SYMBOLS
16+
17+
18+
def _is_printable_reference_byte(b: int) -> bool:
19+
# Keep the original tool's intent: annotate common readable bytes.
20+
if b < 0x20 or b >= 0x7F:
21+
return False
22+
ch = chr(b)
23+
return ch.isalpha() or ch.isdigit() or is_symbol(ch)
24+
25+
26+
def to_hex(data: bytes) -> str:
27+
"""Convert bytes to formatted hex with lightweight ASCII references.
28+
29+
Output is similar to the original Python 2 implementation, e.g.:
30+
48(H) 65(e) 6C(l) 6C(l) 6F(o) 0D 0A(\n)
31+
32+
Newlines (0x0A) are rendered with an explicit (\n) marker and a real newline.
33+
"""
34+
35+
results: list[str] = []
36+
new_line = True
37+
38+
for b in data:
39+
hex_value = f"{b:02X}"
40+
41+
if _is_printable_reference_byte(b):
42+
hex_value = f"{hex_value}({chr(b)})"
43+
44+
if not new_line:
45+
hex_value = " " + hex_value
46+
47+
if b == 0x0A and not data.isspace():
48+
hex_value = hex_value + "(\\n)\n"
49+
new_line = True
50+
else:
51+
new_line = False
52+
53+
results.append(hex_value)
54+
55+
return "".join(results)
56+
57+
58+
_HEX_PAIR = set("0123456789abcdefABCDEF")
59+
60+
61+
def parse_hex_bytes(text: str) -> bytes:
62+
"""Parse a user-supplied hex string into bytes.
63+
64+
Accepts common separators/spaces and optional 0x prefixes.
65+
66+
Examples:
67+
"DE AD BE EF" -> b"\xDE\xAD\xBE\xEF"
68+
"0xDE,0xAD" -> b"\xDE\xAD"
69+
"deadbeef" -> b"\xDE\xAD\xBE\xEF"
70+
"""
71+
72+
# Strip 0x prefixes and keep only hex digits.
73+
cleaned_chars: list[str] = []
74+
i = 0
75+
while i < len(text):
76+
if text[i : i + 2].lower() == "0x":
77+
i += 2
78+
continue
79+
ch = text[i]
80+
if ch in _HEX_PAIR:
81+
cleaned_chars.append(ch)
82+
i += 1
83+
84+
cleaned = "".join(cleaned_chars)
85+
86+
if len(cleaned) == 0:
87+
return b""
88+
89+
if len(cleaned) % 2 != 0:
90+
raise ValueError("hex input must contain an even number of hex digits")
91+
92+
return bytes(int(cleaned[i : i + 2], 16) for i in range(0, len(cleaned), 2))
93+
94+
95+
@dataclass(frozen=True)
96+
class NetbyteConfig:
97+
udp: bool = False
98+
timeout: float = 2.0
99+
display_encoding: str = "utf-8"
100+
display_errors: str = "replace"
101+
send_encoding: str = "utf-8"
102+
send_errors: str = "strict"

netbyte/netbyte.py

Lines changed: 117 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import argparse
1414
import sys
1515
import time
16-
from colorama import Fore, Style
16+
from colorama import Fore, Style, init as colorama_init
1717
from threading import Thread
18-
from Queue import Queue, Empty
18+
from queue import Queue, Empty
19+
20+
from netbyte.core import to_hex, parse_hex_bytes
1921

2022

2123
def is_symbol(character):
@@ -32,66 +34,19 @@ def is_symbol(character):
3234
return True
3335

3436

35-
def to_hex(string):
36-
'''
37-
Converts string to formatted hex and ASCII reference values
38-
39-
Returns:
40-
str: Formatted hex & ASCII reference
41-
'''
42-
43-
results = []
44-
45-
new_line = True
46-
47-
for character in string:
48-
49-
# Convert ASCII to unicode
50-
unicode_value = ord(character)
51-
52-
# Convert unicode to hex
53-
hex_value = hex(unicode_value).replace('0x', '')
54-
55-
# Add a preceding 0
56-
if len(hex_value) == 1:
57-
hex_value = '0' + hex_value
58-
59-
# Make upper case
60-
hex_value = hex_value.upper()
61-
62-
# Add reference ASCII for readability
63-
if character.isalpha() or character.isdigit() or is_symbol(character):
64-
hex_value = hex_value + '(' + character + ')'
65-
66-
# Add a space in between hex values (not to a new line)
67-
if not new_line:
68-
hex_value = ' ' + hex_value
69-
70-
# Add a newline for readability (corresponding to ASCII)
71-
if '0A' in hex_value and not string.isspace():
72-
hex_value = hex_value + '(\\n)\n'
73-
# Next line will be a newline
74-
new_line = True
75-
else:
76-
new_line = False
77-
78-
results.append(hex_value)
79-
80-
full_hex = ''
81-
for result in results:
82-
full_hex += result
83-
84-
return full_hex
85-
86-
87-
def print_ascii(string):
37+
def print_ascii(data):
8838
'''
8939
Print string with ASCII color configuration
9040
'''
91-
if string.isspace():
41+
if isinstance(data, (bytes, bytearray)):
42+
text = bytes(data).decode("utf-8", errors="replace")
43+
else:
44+
text = str(data)
45+
46+
if text.isspace():
9247
# Add a space to show colors on a non-ASCII line
93-
string = ' ' + string
94-
print(Fore.MAGENTA + Style.BRIGHT + string + Style.RESET_ALL)
48+
text = ' ' + text
49+
print(Fore.MAGENTA + Style.BRIGHT + text + Style.RESET_ALL)
9550

9651

9752
def print_hex(string):
@@ -106,7 +61,7 @@ def print_error(string):
10661
Print string with error color configuration and exit with code 1
10762
'''
10863
print(Fore.RED + Style.BRIGHT + string + Style.RESET_ALL)
109-
exit(1)
64+
raise SystemExit(1)
11065

11166

11267
class ReadAsync(object):
@@ -156,12 +111,33 @@ def parse_arguments():
156111
parser.add_argument('hostname', metavar='HOSTNAME', help='Host or IP to connect to')
157112
parser.add_argument('port', metavar='PORT', help='Connection port')
158113
parser.add_argument('-u', dest='udp', action="store_true", default=False, help='Use UDP instead of default TCP')
114+
parser.add_argument(
115+
'-l',
116+
'--listen',
117+
dest='listen',
118+
action='store_true',
119+
default=False,
120+
help='Listen instead of connect (TCP: accept one client; UDP: receive from a peer)',
121+
)
122+
parser.add_argument(
123+
'--bind',
124+
dest='bind',
125+
default=None,
126+
help='Bind address for --listen (defaults to HOSTNAME)',
127+
)
128+
parser.add_argument(
129+
'--send-hex',
130+
dest='send_hex',
131+
action='store_true',
132+
default=False,
133+
help='Interpret stdin as hex bytes before sending (e.g. \"DE AD BE EF\" or \"0xDE,0xAD\")',
134+
)
159135

160136
if len(sys.argv) == 1:
161137

162138
parser.print_help()
163139

164-
exit(1)
140+
raise SystemExit(1)
165141

166142
args = parser.parse_args()
167143
return args
@@ -172,46 +148,107 @@ def main():
172148
Main function: Connects to host/port and spawns ReadAsync
173149
'''
174150

151+
colorama_init()
175152
args = parse_arguments()
176153

177-
if args.udp:
178-
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
179-
else:
180-
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
154+
peer = None
181155

182-
connection.settimeout(2)
156+
if args.listen:
157+
bind_host = args.bind if args.bind is not None else args.hostname
158+
bind_addr = (bind_host, int(args.port))
183159

184-
address = (args.hostname, int(args.port))
160+
if args.udp:
161+
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
162+
try:
163+
connection.bind(bind_addr)
164+
except OSError:
165+
print_error(f"Could not bind UDP listener on {bind_addr[0]}:{bind_addr[1]}")
166+
print(Fore.GREEN + Style.BRIGHT + f"Listening (UDP) on {bind_addr[0]}:{bind_addr[1]}" + Style.RESET_ALL)
167+
else:
168+
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
169+
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
170+
try:
171+
server.bind(bind_addr)
172+
server.listen(1)
173+
except OSError:
174+
print_error(f"Could not bind TCP listener on {bind_addr[0]}:{bind_addr[1]}")
175+
print(Fore.GREEN + Style.BRIGHT + f"Listening (TCP) on {bind_addr[0]}:{bind_addr[1]}" + Style.RESET_ALL)
176+
conn, addr = server.accept()
177+
server.close()
178+
connection = conn
179+
peer = addr
180+
print(Fore.GREEN + Style.BRIGHT + f"Connection established from {addr[0]}:{addr[1]}" + Style.RESET_ALL)
181+
else:
182+
if args.udp:
183+
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
184+
else:
185+
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
185186

186-
try:
187-
connection.connect(address)
187+
connection.settimeout(2)
188+
189+
address = (args.hostname, int(args.port))
188190

189-
except socket.error:
190-
print_error("Could not establish connection to " + address[0] + ":" + str(address[1]))
191+
try:
192+
connection.connect(address)
193+
except OSError:
194+
print_error("Could not establish connection to " + address[0] + ":" + str(address[1]))
191195

192-
print(Fore.GREEN + Style.BRIGHT + "Connection established" + Style.RESET_ALL)
196+
print(Fore.GREEN + Style.BRIGHT + "Connection established" + Style.RESET_ALL)
193197

194198
try:
195199
connection.setblocking(0)
196200
stdin = ReadAsync(sys.stdin.readline)
197201

198202
while True:
199203
try:
200-
data = connection.recv(4096)
201-
if not data:
202-
raise socket.error
203-
print_ascii(data)
204-
print_hex(to_hex(data))
205-
except socket.error, e:
206-
if e.errno != errno.EWOULDBLOCK:
204+
if args.listen and args.udp:
205+
data, addr = connection.recvfrom(4096)
206+
if data:
207+
peer = addr
208+
print_ascii(data)
209+
print_hex(to_hex(data))
210+
else:
211+
data = connection.recv(4096)
212+
if not data:
213+
raise OSError
214+
print_ascii(data)
215+
print_hex(to_hex(data))
216+
except OSError as e:
217+
if getattr(e, "errno", None) not in (errno.EWOULDBLOCK, errno.EAGAIN):
207218
raise
208219
try:
209-
connection.send(stdin.dequeue())
220+
outbound = stdin.dequeue()
221+
if outbound == "":
222+
# EOF on stdin
223+
raise KeyboardInterrupt
224+
225+
if args.send_hex:
226+
try:
227+
outbound_bytes = parse_hex_bytes(outbound.strip())
228+
except ValueError as e:
229+
print_error(f"Invalid hex input: {e}")
230+
if args.listen and args.udp:
231+
if peer is None:
232+
print_error("No UDP peer yet (send requires receiving at least one datagram)")
233+
connection.sendto(outbound_bytes, peer)
234+
else:
235+
connection.send(outbound_bytes)
236+
else:
237+
if isinstance(outbound, str):
238+
outbound = outbound.encode("utf-8")
239+
if args.listen and args.udp:
240+
if peer is None:
241+
print_error("No UDP peer yet (send requires receiving at least one datagram)")
242+
connection.sendto(outbound, peer)
243+
else:
244+
connection.send(outbound)
210245
except Empty:
211246
time.sleep(0.1)
247+
except (BlockingIOError, InterruptedError):
248+
time.sleep(0.01)
212249

213250
except KeyboardInterrupt:
214251
connection.close()
215252
print_error("\nExiting...")
216-
except socket.error:
253+
except OSError:
217254
print_error("Connection closed")

0 commit comments

Comments
 (0)