diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index f5ab0c5a..c1f53d13 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -7,7 +7,7 @@ import copy from faker import Faker from pathlib import Path -import netaddr +import netaddr, re from AIDojoCoordinator.game_components import GameState, Action, ActionType, IP, Network, Data, Service from AIDojoCoordinator.coordinator import GameCoordinator @@ -380,6 +380,60 @@ def _create_new_network_mapping(self)->tuple: new_self_host_to_start.append(mapping_ips[ip]) self.hosts_to_start = new_self_host_to_start + # map IPs and networks stored in the taskconfig file + # This is a quick fix, we should find some other solution + agents = self.task_config.config['coordinator']['agents'] + # Fields that are dictionaries with IP keys + dict_keys = ['known_data', 'blocked_ips', 'known_blocks'] + # Fields that are lists of IP strings + list_keys = ['known_hosts', 'controlled_hosts'] + ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') + + for agent in agents.values(): + for section_key in ['goal', 'start_position']: + section = agent.get(section_key, {}) + + # Remap IP addresses in the description field of the goal section + if section_key == 'goal' and 'description' in section: + description = section['description'] + def repl(match): + ip_str = match.group(0) + try: + new_ip = str(mapping_ips[IP(ip_str)]) + return new_ip + except (ValueError, KeyError): + return ip_str + section['description'] = ip_regex.sub(repl, description) + + # Remap dictionary keys + for key in dict_keys: + if key in section: + current_dict = section[key] + for ip in list(current_dict.keys()): + try: + # Convert the ip string to an IP object + new_ip = str(mapping_ips[IP(ip)]) + except (ValueError, KeyError): + # Skip if the IP is invalid or not found in mapping_ips + continue + current_dict[new_ip] = current_dict.pop(ip) + + # Remap list items + for key in list_keys: + if key in section: + new_list = [] + for ip in section[key]: + try: + new_ip = str(mapping_ips[IP(ip)]) + except (ValueError, KeyError): + # Keep the original if invalid or not in mapping_ips + new_ip = ip + new_list.append(new_ip) + section[key] = new_list + # update win conditions with the new IPs + self._win_conditions_per_role = self._get_win_condition_per_role() + self._goal_description_per_role = self._get_goal_description_per_role() + #update mappings stored in the environment for net, mapping in self._network_mapping.items(): self._network_mapping[net] = mapping_nets[mapping] @@ -755,7 +809,7 @@ async def reset(self)->bool: # reset self._data to orignal state self._data = copy.deepcopy(self._data_original) # reset self._data_content to orignal state - self._firewall = copy.deepcopy(self._firewall_original) + self._firewall = copy.deepcopy(self._firewall) self._fw_blocks = {} return True