Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions AIDojoCoordinator/worlds/NSEGameCoordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import copy
from faker import Faker
from pathlib import Path
import netaddr
import netaddr, re

from AIDojoCoordinator.game_components import GameState, Action, ActionType, IP, Network, Data, Service
from AIDojoCoordinator.coordinator import GameCoordinator
Expand Down Expand Up @@ -380,6 +380,60 @@ def _create_new_network_mapping(self)->tuple:
new_self_host_to_start.append(mapping_ips[ip])
self.hosts_to_start = new_self_host_to_start

# map IPs and networks stored in the taskconfig file
# This is a quick fix, we should find some other solution
agents = self.task_config.config['coordinator']['agents']
# Fields that are dictionaries with IP keys
dict_keys = ['known_data', 'blocked_ips', 'known_blocks']
# Fields that are lists of IP strings
list_keys = ['known_hosts', 'controlled_hosts']
ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')

for agent in agents.values():
for section_key in ['goal', 'start_position']:
section = agent.get(section_key, {})

# Remap IP addresses in the description field of the goal section
if section_key == 'goal' and 'description' in section:
description = section['description']
def repl(match):
ip_str = match.group(0)
try:
new_ip = str(mapping_ips[IP(ip_str)])
return new_ip
except (ValueError, KeyError):
return ip_str
section['description'] = ip_regex.sub(repl, description)

# Remap dictionary keys
for key in dict_keys:
if key in section:
current_dict = section[key]
for ip in list(current_dict.keys()):
try:
# Convert the ip string to an IP object
new_ip = str(mapping_ips[IP(ip)])
except (ValueError, KeyError):
# Skip if the IP is invalid or not found in mapping_ips
continue
current_dict[new_ip] = current_dict.pop(ip)

# Remap list items
for key in list_keys:
if key in section:
new_list = []
for ip in section[key]:
try:
new_ip = str(mapping_ips[IP(ip)])
except (ValueError, KeyError):
# Keep the original if invalid or not in mapping_ips
new_ip = ip
new_list.append(new_ip)
section[key] = new_list
# update win conditions with the new IPs
self._win_conditions_per_role = self._get_win_condition_per_role()
self._goal_description_per_role = self._get_goal_description_per_role()

#update mappings stored in the environment
for net, mapping in self._network_mapping.items():
self._network_mapping[net] = mapping_nets[mapping]
Expand Down Expand Up @@ -755,7 +809,7 @@ async def reset(self)->bool:
# reset self._data to orignal state
self._data = copy.deepcopy(self._data_original)
# reset self._data_content to orignal state
self._firewall = copy.deepcopy(self._firewall_original)
self._firewall = copy.deepcopy(self._firewall)
self._fw_blocks = {}
return True

Expand Down