Skip to content

Commit 36fb5e0

Browse files
authored
Merge pull request #257 from stratosphereips/support-parallelized-world
Support parallelized world
2 parents 696c8fe + 60e18fc commit 36fb5e0

File tree

9 files changed

+839
-534
lines changed

9 files changed

+839
-534
lines changed

coordinator.py

Lines changed: 344 additions & 178 deletions
Large diffs are not rendered by default.

docs/Coordinator.md

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,29 @@
22
Coordinator is the centerpiece of the game orchestration. It provides an interface between the agents and the AIDojo world.
33

44
1. Registration of new agents in the game
5-
2. Verification of agents' actionf format
5+
2. Verification of agents' action format
66
3. Recording (and storing) trajectories of agents
77
4. Detection of episode ends (either by reaching timout or agents reaching their respective goals)
88
5. Assigning rewards for each action and at the end of each episode
99
6. Removing agents from the game
1010
7. Registering the GameReset requests and handelling the game resets.
1111

12+
## Connction to other game components
13+
Coordinator, having the role of the middle man in all communication between the agent and the world uses several queues for massing passing and handelling.
14+
15+
1. `Actions queue` is a queue in which the agents submit their actions. It provides N:1 communication channel in which the coordinator receives the inputs.
16+
2. `Answer queue` is a separeate queue **per agent** in which the results of the actions are send to the agent.
17+
3. `World action queue` is a queue used for sending the acions from coordinator to the AI Dojo world
18+
4. `World response queue` is a channel used for wolrd -> coordinator communicaiton (responses to the agents' action)
19+
<img src="/docs/figures/message_passing_coordinator.jpg" alt="Message passing overview" width="30%"/>
20+
21+
1222
## Main components of the coordinator
13-
`self._actions_queue`: asycnio queue for agent -> aidojo_world communication
14-
`self._answers_queue`: asycnio queue for aidojo_world -> agent communication
23+
`self._actions_queue`: asycnio queue for agents -> coordinator communication
24+
`self._answer_queues`: dictionary of asycnio queues for coordinator -> agent communication (1 queue per agent)
25+
`self._world_action_queue`: asycnio queue for coordinator -> world queue communication
26+
`self._world_response_queue`: asycnio queue for world -> coordinator queue communication
27+
`self.task_config`: Object with the configuration of the scenario
1528
`self.ALLOWED_ROLES`: list of allowed agent roles [`Attacker`, `Defender`, `Benign`]
1629
`self._world`: Instance of `AIDojoWorld`. Implements the dynamics of the world
1730
`self._CONFIG_FILE_HASH`: hash of the configuration file used in the interaction (scenario, topology, etc.). Used for better reproducibility of results
@@ -24,33 +37,11 @@ Coordinator is the centerpiece of the game orchestration. It provides an interfa
2437
### Agent information components
2538
`self.agents`: information about connected agents {`agent address`: (`agent_name`,`agent_role`)}
2639
`self._agent_steps`: step counter for each agent in the current episode
27-
`self._reset_requests`: dictionary where requests for episode reset are collected (the world resets only if ALL agents request reset)
40+
`self._reset_requests`: dictionary where requests for episode reset are collected (the world resets only if **all** active agents request reset)
2841
`self._agent_observations`: current observation per agent
2942
`self._agent_starting_position`: starting position (with wildcards, see [configuration](../README.md#task-configuration)) per agent
3043
`self._agent_states`: current GameState per agent
31-
`self._agent_statuses`: status of each agent. One of following options:
32-
- `playing`: agent is registered and can participate in current episode. Can't influence the episode termination
33-
- `playing_active`: agent is registered and can participate in current episode. It has `goal` and `max_steps` defined and can influence the termination of the episode
34-
- `goal_reached`: agent has reached it's goal in this episode. It can't perform any more actions until the interaction is resetted.
35-
- `blocked`: agent has been blocked. It can't perform any more actions until the interaction is resetted.
36-
- `max_steps`: agent has reached it's maximum allowed steps. It can't perform any more actions until the interaction is resetted.
37-
38-
44+
`self._agent_last_action`: last Action per agent
45+
`self._agent_statuses`: status of each agent. One of AgentStatus
3946
`self._agent_rewards`: dictionary of final reward of each agent in the current episod. Only agent's which can't participate in the ongoing episode are listed.
40-
`self._agent_trajectories`: complete trajectories for each agent in the ongoing episode
41-
42-
## The format of the messages to the agents is
43-
{
44-
"to_agent": address of client,
45-
"status": {
46-
"#players": number of players,
47-
"running": true or false,
48-
"time": time in game,
49-
} ,
50-
"message": Generic text messages (optional),
51-
"state": (optional) {
52-
"observation": observation_object,
53-
"ended": if the game ended or not,
54-
"reason": reason for ending
55-
}
56-
}
47+
`self._agent_trajectories`: complete trajectories for each agent in the ongoing episode
506 KB
Loading

env/game_components.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,9 @@ def from_json(cls, json_string):
460460
@enum.unique
461461
class GameStatus(enum.Enum):
462462
OK = 200
463+
463464
CREATED = 201
465+
RESET_DONE = 202
464466
BAD_REQUEST = 400
465467
FORBIDDEN = 403
466468

@@ -475,6 +477,8 @@ def from_string(cls, string:str):
475477
return GameStatus.BAD_REQUEST
476478
case "GameStatus.FORBIDDEN":
477479
return GameStatus.FORBIDDEN
480+
case "GameStatus.RESET_DONE":
481+
return GameStatus.RESET_DONE
478482
def __repr__(self) -> str:
479483
return str(self)
480484
if __name__ == "__main__":

env/worlds/aidojo_world.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,37 @@
22
# Template of world to be used in AI Dojo
33
import sys
44
import os
5+
import asyncio
56

67
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
7-
import env.game_components as components
88
import logging
99
from utils.utils import ConfigParser
10+
from env.game_components import GameState, Action, GameStatus, ActionType
1011

1112
"""
1213
Basic class for worlds to be used in the AI Dojo.
1314
Every world (environment) used in AI Dojo should extend this class and implement
1415
all its methods to be compatible with the game server and game coordinator.
1516
"""
1617
class AIDojoWorld(object):
17-
def __init__(self, task_config_file:str, world_name:str="BasicAIDojoWorld")->None:
18+
def __init__(self, task_config_file:str,action_queue:asyncio.Queue, response_queue:asyncio.Queue, world_name:str="BasicAIDojoWorld")->None:
1819
self.task_config = ConfigParser(task_config_file)
1920
self.logger = logging.getLogger(world_name)
21+
self._action_queue = action_queue
22+
self._response_queue = response_queue
23+
self._world_name = world_name
24+
25+
@property
26+
def world_name(self)->str:
27+
return self._world_name
2028

21-
def step(self, current_state:components.GameState, action:components.Action, agent_id:tuple)-> components.GameState:
29+
def step(self, current_state:GameState, action:Action, agent_id:tuple)-> GameState:
2230
"""
2331
Executes given action in a current state of the environment and produces new GameState.
2432
"""
2533
raise NotImplementedError
2634

27-
def create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->components.GameState:
35+
def create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState:
2836
"""
2937
Produces a GameState based on the view of the world.
3038
"""
@@ -46,4 +54,31 @@ def update_goal_dict(self, goal_dict:dict)->dict:
4654
"""
4755
Takes the existing goal dict and updates it with respect to the world.
4856
"""
49-
raise NotImplementedError
57+
raise NotImplementedError
58+
59+
async def handle_incoming_action(self)->None:
60+
try:
61+
self.logger.info(f"\tStaring {self.world_name} task.")
62+
while True:
63+
agent_id, action, game_state = await self._action_queue.get()
64+
self.logger.debug(f"Received from{agent_id}: {action}, {game_state}.")
65+
match action.type:
66+
case ActionType.JoinGame:
67+
msg = (agent_id, (self.create_state_from_view(game_state), GameStatus.CREATED))
68+
case ActionType.QuitGame:
69+
msg = (agent_id, (GameState(),GameStatus.OK))
70+
case ActionType.ResetGame:
71+
if agent_id == "world": #reset the world
72+
self.reset()
73+
continue
74+
else:
75+
msg = (agent_id, (self.create_state_from_view(game_state), GameStatus.RESET_DONE))
76+
case _:
77+
new_state = self.step(game_state, action,agent_id)
78+
msg = (agent_id, (new_state, GameStatus.OK))
79+
# new_state = self.step(state, action, agent_id)
80+
self.logger.debug(f"Sending to{agent_id}: {msg}")
81+
await self._response_queue.put(msg)
82+
await asyncio.sleep(0)
83+
except asyncio.CancelledError:
84+
self.logger.info(f"\t{self.world_name} Terminating by CancelledError")

0 commit comments

Comments
 (0)