Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/attackmate/executors/common/loopexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,27 @@ def break_condition_met(self, command: LoopCommand) -> bool:
return True
return False

def substitute_variables_in_command(self, command_obj, placeholders: dict):
"""
Replaces all occurrences of placeholders in *any* string attribute
of the command_obj with the values from placeholders.
E.g. if placeholders = {'LOOP_ITEM': 'https://example.com'},
and command_obj.url = '$LOOP_ITEM', then it becomes 'https://example.com'.
"""
for attr_name, attr_val in vars(command_obj).items():
if isinstance(attr_val, str) and "$" in attr_val:
new_val = Template(attr_val).safe_substitute(placeholders)
setattr(command_obj, attr_name, new_val)

def loop_range(self, command: LoopCommand, start: int, end: int) -> None:
for x in range(start, end):
for cmd in command.commands:
template_cmd: Command = copy.deepcopy(cmd)
tpl = Template(template_cmd.cmd)
template_cmd.cmd = tpl.substitute(LOOP_INDEX=x, **self.varstore.variables)
placeholders = {
'LOOP_INDEX': x,
**self.varstore.variables,
}
self.substitute_variables_in_command(template_cmd, placeholders)
if self.break_condition_met(command):
return
self.runfunc([template_cmd])
Expand All @@ -59,8 +74,11 @@ def loop_items(self, command: LoopCommand, varname: str, iterable: list[str]) ->
for x in iterable:
for cmd in command.commands:
template_cmd: Command = copy.deepcopy(cmd)
tpl = Template(template_cmd.cmd)
template_cmd.cmd = tpl.substitute(LOOP_ITEM=x, **self.varstore.variables)
placeholders = {
'LOOP_ITEM': x,
**self.varstore.variables,
}
self.substitute_variables_in_command(template_cmd, placeholders)
if self.break_condition_met(command):
return
self.runfunc([template_cmd])
Expand Down
53 changes: 53 additions & 0 deletions test/units/test_loopexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from attackmate.schemas.playbook import Commands
from attackmate.schemas.debug import DebugCommand
from attackmate.schemas.loop import LoopCommand
from attackmate.schemas.sleep import SleepCommand
from attackmate.executors.common.debugexecutor import DebugExecutor
from attackmate.executors.common.loopexecutor import LoopExecutor
from attackmate.executors.common.sleepexecutor import SleepExecutor
from attackmate.variablestore import VariableStore
from attackmate.processmanager import ProcessManager

Expand All @@ -12,13 +14,16 @@ class TestLoopExecutor:
def setup_method(self, method):
self.varstore = VariableStore()
self.process_manager = ProcessManager()
self.sleep_executor = SleepExecutor(self.process_manager, varstore=self.varstore)
self.debug_executor = DebugExecutor(self.process_manager, self.varstore)
self.loop_executor = LoopExecutor(self.process_manager, varstore=self.varstore, runfunc=self.run_func)

def run_func(self, commands: Commands):
for command in commands:
if command.type == 'debug':
self.debug_executor.run(command)
elif command.type == 'sleep':
self.sleep_executor.run(command)

def test_items(self, caplog):
caplog.set_level(logging.INFO)
Expand Down Expand Up @@ -55,3 +60,51 @@ def test_until(self, caplog):
assert 'Debug: \'0\'' in [rec.message for rec in caplog.records]
assert 'Debug: \'1\'' in [rec.message for rec in caplog.records]
assert 'Debug: \'2\'' not in [rec.message for rec in caplog.records]

def test_range_with_sleep(self, caplog):
"""
Test that $LOOP_INDEX is substituted correctly for a range-based loop.
The sleep command ensures substitution works in fields like "seconds," beyond just "cmd".
"""
caplog.set_level(logging.INFO)
self.varstore.clear()
lc = LoopCommand(
type='loop',
cmd='range(0,3)',
commands=[
SleepCommand(type='sleep', cmd='sleep', seconds='$LOOP_INDEX'),
],
)
self.loop_executor.run(lc)
expected_logs = [
'Sleeping 0 seconds',
'Sleeping 1 seconds',
'Sleeping 2 seconds',
]
# Verify the expected log messages
for log in expected_logs:
assert log in [rec.message for rec in caplog.records]

def test_items_with_sleep(self, caplog):
"""
Test that $LOOP_ITEM is substituted correctly for a list-based loop.
The sleep command ensures substitution works in fields like "seconds," beyond just "cmd".
"""
caplog.set_level(logging.INFO)
self.varstore.clear()
self.varstore.set_variable('LISTA', [1, 2])
lc = LoopCommand(
type='loop',
cmd='items(LISTA)',
commands=[
SleepCommand(type='sleep', cmd='sleep', seconds='$LOOP_ITEM'),
],
)
self.loop_executor.run(lc)
expected_logs = [
'Sleeping 1 seconds',
'Sleeping 2 seconds',
]
# Verify the expected log messages
for log in expected_logs:
assert log in [rec.message for rec in caplog.records]