Skip to content

Commit 393c0af

Browse files
[MERGE]
- [x] Closes PR/MR #248 - [x] Closes PR/MR #249 - [x] Closes PR/MR #350
3 parents a5b4548 + 5885d93 + 034d846 commit 393c0af

File tree

2 files changed

+140
-10
lines changed

2 files changed

+140
-10
lines changed

tests/context.py

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,117 @@ def checkCovCommand(*args): # skipcq: PYL-W0102 - [] != [None]
380380
return [*args]
381381

382382

383+
def taint_command_args(args: (list, tuple)) -> list:
384+
"""Validate and sanitize command arguments for security.
385+
386+
This function validates the command (first argument) against a whitelist
387+
and sanitizes all arguments to prevent command injection attacks.
388+
389+
Args:
390+
args (list): Command arguments to validate
391+
392+
Returns:
393+
list: Sanitized command arguments
394+
395+
Raises:
396+
CommandExecutionError: If validation fails
397+
398+
Meta Testing:
399+
400+
>>> import tests.context as _context
401+
>>> import sys as _sys
402+
>>>
403+
404+
Testcase 1: Function should validate and return unmodified Python command.
405+
406+
>>> test_fixture = ['python', '-m', 'pytest']
407+
>>> _context.taint_command_args(test_fixture)
408+
['python', '-m', 'pytest']
409+
>>>
410+
411+
Testcase 2: Function should handle sys.executable path.
412+
413+
>>> test_fixture = [str(_sys.executable), '-m', 'coverage', 'run']
414+
>>> result = _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS
415+
>>> str('python') in str(result[0]) or str('coverage') in str(result[0])
416+
True
417+
>>> result[1:] == ['-m', 'coverage', 'run']
418+
True
419+
>>>
420+
421+
Testcase 3: Function should reject disallowed commands.
422+
423+
>>> test_fixture = ['rm', '-rf', '/']
424+
>>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL
425+
Traceback (most recent call last):
426+
multicast.exceptions.CommandExecutionError: Command 'rm' is not allowed...
427+
>>>
428+
429+
Testcase 4: Function should validate input types.
430+
431+
>>> test_fixture = None
432+
>>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL
433+
Traceback (most recent call last):
434+
multicast.exceptions.CommandExecutionError: Invalid command arguments
435+
>>>
436+
>>> test_fixture = "python -m pytest" # String instead of list
437+
>>> _context.taint_command_args(test_fixture) #doctest: +IGNORE_EXCEPTION_DETAIL
438+
Traceback (most recent call last):
439+
multicast.exceptions.CommandExecutionError: Invalid command arguments
440+
>>>
441+
442+
Testcase 5: Function should handle coverage command variations.
443+
444+
>>> test_fixture = [str(_sys.executable), 'coverage', 'run', '--source=multicast']
445+
>>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS
446+
[...'coverage', 'run', '--source=multicast']
447+
>>>
448+
>>> test_fixture = ['coverage', 'run', '--source=multicast']
449+
>>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS
450+
['exit 1 ; #', 'run',...'run', '--source=multicast']
451+
>>>
452+
>>> test_fixture = ['coverage3', 'run', '--source=.']
453+
>>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS
454+
['exit 1 ; #', 'run',...'--source=.']
455+
>>>
456+
457+
Testcase 6: Function should handle case-insensitive command validation.
458+
459+
>>> test_fixture = ['Python3', '-m', 'pytest']
460+
>>> _context.taint_command_args(test_fixture)
461+
['Python3', '-m', 'pytest']
462+
>>>
463+
>>> test_fixture = ['COVERAGE', 'run']
464+
>>> _context.taint_command_args(test_fixture) #doctest: +ELLIPSIS
465+
[...'COVERAGE', 'run'...]
466+
>>>
467+
"""
468+
if not args or not isinstance(args, (list, tuple)):
469+
raise CommandExecutionError("Invalid command arguments", exit_code=66)
470+
# Validate the command (first argument)
471+
allowed_commands = {
472+
"python", "python3", "coverage", "coverage3",
473+
sys.executable # Allow the current Python interpreter
474+
}
475+
command = str(args[0]).lower()
476+
# Extract base command name for exact matching
477+
# Handle both path separators (/ for Unix, \ for Windows)
478+
command_base = command.split("/")[-1].split("\\")[-1]
479+
# Check if command is allowed (exact match on base name or full path match with sys.executable)
480+
if command_base not in allowed_commands and command != str(sys.executable).lower():
481+
raise CommandExecutionError(
482+
f"Command '{command}' is not allowed. Allowed commands: {allowed_commands}",
483+
exit_code=77
484+
)
485+
# Sanitize all arguments to prevent injection
486+
tainted_args = [str(arg) for arg in args]
487+
# Special handling for coverage commands
488+
if "coverage" in command:
489+
tainted_args = checkCovCommand(*tainted_args)
490+
# Sanitize all arguments to prevent injection
491+
return tainted_args
492+
493+
383494
def validateCommandArgs(args: list) -> None:
384495
"""
385496
Validates command arguments to ensure they do not contain null characters.
@@ -541,7 +652,9 @@ def checkPythonCommand(args, stderr=None):
541652
validateCommandArgs(args)
542653
if str("coverage") in args[0]:
543654
args = checkCovCommand(*args)
544-
theOutput = subprocess.check_output(args, stderr=stderr)
655+
# Validate and sanitize command arguments
656+
safe_args = taint_command_args(args)
657+
theOutput = subprocess.check_output(safe_args, stderr=stderr)
545658
except Exception as err: # pragma: no branch
546659
theOutput = None
547660
try:
@@ -641,7 +754,9 @@ def checkPythonFuzzing(args, stderr=None): # skipcq: PYL-W0102 - [] != [None]
641754
else:
642755
if str("coverage") in args[0]:
643756
args = checkCovCommand(*args)
644-
theOutput = subprocess.check_output(args, stderr=stderr)
757+
# Validate and sanitize command arguments
758+
safe_args = taint_command_args(args)
759+
theOutput = subprocess.check_output(safe_args, stderr=stderr)
645760
except BaseException as err: # pragma: no branch
646761
theOutput = None
647762
raise CommandExecutionError(str(err), exit_code=2) from err # do not suppress errors

tests/test_hear_data_processing.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
@context.markWithMetaTag("mat", "hear")
4040
class RecvDataProcessingTestSuite(context.BasicUsageTestSuite):
4141
"""
42-
A test suite that checks empty data with the multicast sender and receiver.
42+
A test suite that validates the multicast sender and receiver's handling of empty data.
4343
44+
Test cases:
45+
- Sending empty binary data.
46+
- Sending empty data followed by a stop command.
4447
"""
4548

4649
__module__ = "tests.test_hear_data_processing"
@@ -55,16 +58,18 @@ def test_multicast_sender_with_no_data(self) -> None:
5558
theResult = False
5659
fail_fixture = "SAY -X] RECV? != error"
5760
_fixture_port_num = self._always_generate_random_port_WHEN_called()
61+
_fixture_mcast_addr = "224.0.0.1"
5862
try:
5963
self.assertIsNotNone(_fixture_port_num)
6064
self.assertIsInstance(_fixture_port_num, int)
65+
self.assertIsNotNone(_fixture_mcast_addr)
6166
_fixture_HEAR_args = [
6267
"--port",
6368
str(_fixture_port_num),
6469
"--groups",
65-
"'224.0.0.1'",
70+
f"'{_fixture_mcast_addr}'",
6671
"--group",
67-
"'224.0.0.1'",
72+
f"'{_fixture_mcast_addr}'",
6873
]
6974
p = Process(
7075
target=multicast.__main__.main, name="RECV", args=(
@@ -77,7 +82,7 @@ def test_multicast_sender_with_no_data(self) -> None:
7782
try:
7883
sender = multicast.send.McastSAY()
7984
self.assertIsNotNone(sender)
80-
sender(group='224.0.0.1', port=_fixture_port_num, ttl=1, data=b'')
85+
sender(group=_fixture_mcast_addr, port=_fixture_port_num, ttl=1, data=b'')
8186
self.assertIsNotNone(p)
8287
self.assertTrue(p.is_alive(), fail_fixture)
8388
except Exception as _cause:
@@ -105,16 +110,17 @@ def test_multicast_sender_with_no_data_before_follow_by_stop(self) -> None:
105110
theResult = False
106111
fail_fixture = "SAY -X] HEAR? != error"
107112
_fixture_port_num = self._always_generate_random_port_WHEN_called()
113+
_fixture_mcast_addr = "224.0.0.1"
108114
try:
109115
self.assertIsNotNone(_fixture_port_num)
110116
self.assertIsInstance(_fixture_port_num, int)
111117
_fixture_HEAR_args = [
112118
"--port",
113119
str(_fixture_port_num),
114120
"--groups",
115-
"'224.0.0.1'",
121+
f"'{_fixture_mcast_addr}'",
116122
"--group",
117-
"'224.0.0.1'",
123+
f"'{_fixture_mcast_addr}'",
118124
]
119125
p = Process(
120126
target=multicast.__main__.main,
@@ -130,11 +136,11 @@ def test_multicast_sender_with_no_data_before_follow_by_stop(self) -> None:
130136
try:
131137
sender = multicast.send.McastSAY()
132138
self.assertIsNotNone(sender)
133-
sender(group='224.0.0.1', port=_fixture_port_num, ttl=1, data=b'')
139+
sender(group=_fixture_mcast_addr, port=_fixture_port_num, ttl=1, data=b'')
134140
self.assertIsNotNone(p)
135141
self.assertTrue(p.is_alive(), fail_fixture)
136142
while p.is_alive():
137-
sender(group="224.0.0.1", port=_fixture_port_num, data=["STOP"])
143+
sender(group=_fixture_mcast_addr, port=_fixture_port_num, data=["STOP"])
138144
p.join(1)
139145
self.assertFalse(p.is_alive(), "HEAR ignored STOP")
140146
except Exception as _cause:
@@ -208,11 +214,20 @@ def test_handle_with_invalid_utf8_data(self) -> None:
208214
request=(data, sock), client_address=_fixture_client_addr, server=None
209215
)
210216
try:
217+
# Mock the processing method
218+
handler._process = MagicMock()
211219
# Should silently ignore invalid UTF-8 data
212220
handler.handle() # If no exception is raised, the test passes
213221
# Verify handler state after processing invalid data
214222
self.assertIsNone(handler.server) # Server should remain None
215223
self.assertEqual(handler.client_address, _fixture_client_addr)
224+
# Verify no data was processed
225+
handler._process.assert_not_called()
226+
# Test with different invalid UTF-8 sequences
227+
for invalid_data in [b'\xff', b'\xfe\xff', b'\xff\xff\xff']:
228+
handler.request = (invalid_data, sock)
229+
handler.handle()
230+
handler._process.assert_not_called()
216231
except Exception as e:
217232
self.fail(f"Handler raised an unexpected exception: {e}")
218233
finally:

0 commit comments

Comments
 (0)