|
7 | 7 | import copy |
8 | 8 | from faker import Faker |
9 | 9 | from pathlib import Path |
10 | | -import netaddr |
| 10 | +import netaddr, re |
11 | 11 |
|
12 | 12 | from AIDojoCoordinator.game_components import GameState, Action, ActionType, IP, Network, Data, Service |
13 | 13 | from AIDojoCoordinator.coordinator import GameCoordinator |
@@ -380,6 +380,60 @@ def _create_new_network_mapping(self)->tuple: |
380 | 380 | new_self_host_to_start.append(mapping_ips[ip]) |
381 | 381 | self.hosts_to_start = new_self_host_to_start |
382 | 382 |
|
| 383 | + # map IPs and networks stored in the taskconfig file |
| 384 | + # This is a quick fix, we should find some other solution |
| 385 | + agents = self.task_config.config['coordinator']['agents'] |
| 386 | + # Fields that are dictionaries with IP keys |
| 387 | + dict_keys = ['known_data', 'blocked_ips', 'known_blocks'] |
| 388 | + # Fields that are lists of IP strings |
| 389 | + list_keys = ['known_hosts', 'controlled_hosts'] |
| 390 | + ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') |
| 391 | + |
| 392 | + for agent in agents.values(): |
| 393 | + for section_key in ['goal', 'start_position']: |
| 394 | + section = agent.get(section_key, {}) |
| 395 | + |
| 396 | + # Remap IP addresses in the description field of the goal section |
| 397 | + if section_key == 'goal' and 'description' in section: |
| 398 | + description = section['description'] |
| 399 | + def repl(match): |
| 400 | + ip_str = match.group(0) |
| 401 | + try: |
| 402 | + new_ip = str(mapping_ips[IP(ip_str)]) |
| 403 | + return new_ip |
| 404 | + except (ValueError, KeyError): |
| 405 | + return ip_str |
| 406 | + section['description'] = ip_regex.sub(repl, description) |
| 407 | + |
| 408 | + # Remap dictionary keys |
| 409 | + for key in dict_keys: |
| 410 | + if key in section: |
| 411 | + current_dict = section[key] |
| 412 | + for ip in list(current_dict.keys()): |
| 413 | + try: |
| 414 | + # Convert the ip string to an IP object |
| 415 | + new_ip = str(mapping_ips[IP(ip)]) |
| 416 | + except (ValueError, KeyError): |
| 417 | + # Skip if the IP is invalid or not found in mapping_ips |
| 418 | + continue |
| 419 | + current_dict[new_ip] = current_dict.pop(ip) |
| 420 | + |
| 421 | + # Remap list items |
| 422 | + for key in list_keys: |
| 423 | + if key in section: |
| 424 | + new_list = [] |
| 425 | + for ip in section[key]: |
| 426 | + try: |
| 427 | + new_ip = str(mapping_ips[IP(ip)]) |
| 428 | + except (ValueError, KeyError): |
| 429 | + # Keep the original if invalid or not in mapping_ips |
| 430 | + new_ip = ip |
| 431 | + new_list.append(new_ip) |
| 432 | + section[key] = new_list |
| 433 | + # update win conditions with the new IPs |
| 434 | + self._win_conditions_per_role = self._get_win_condition_per_role() |
| 435 | + self._goal_description_per_role = self._get_goal_description_per_role() |
| 436 | + |
383 | 437 | #update mappings stored in the environment |
384 | 438 | for net, mapping in self._network_mapping.items(): |
385 | 439 | self._network_mapping[net] = mapping_nets[mapping] |
@@ -755,7 +809,7 @@ async def reset(self)->bool: |
755 | 809 | # reset self._data to orignal state |
756 | 810 | self._data = copy.deepcopy(self._data_original) |
757 | 811 | # reset self._data_content to orignal state |
758 | | - self._firewall = copy.deepcopy(self._firewall_original) |
| 812 | + self._firewall = copy.deepcopy(self._firewall) |
759 | 813 | self._fw_blocks = {} |
760 | 814 | return True |
761 | 815 |
|
|
0 commit comments