Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cfa7b0c
Add queues for coord - world communication
ondrej-lukas Nov 18, 2024
3bcd43b
Add coommunication queues to AIDojo World
ondrej-lukas Nov 18, 2024
8f36b13
Add AgentStatus enum
ondrej-lukas Nov 18, 2024
fa3b27e
Add additional queues per agent
ondrej-lukas Nov 19, 2024
10f3951
Handle remove player
ondrej-lukas Nov 19, 2024
5e36d62
handle reset per agent
ondrej-lukas Nov 19, 2024
f899f82
correctly remove queue when agent disconnects
ondrej-lukas Nov 19, 2024
89fc169
Do not send empty messages
ondrej-lukas Nov 19, 2024
a2faa5c
Split long method
ondrej-lukas Nov 19, 2024
8206d1d
Move response to reset to separate method
ondrej-lukas Nov 19, 2024
f17e477
update processing of general actions
ondrej-lukas Nov 19, 2024
83f87c8
Add ResetDone Game state
ondrej-lukas Nov 19, 2024
bbb5772
Add getter for world name
ondrej-lukas Nov 19, 2024
be58e0e
add await
ondrej-lukas Nov 19, 2024
b8b6cce
record last action per agent
ondrej-lukas Nov 19, 2024
9ab41e3
Add compatibility with updated coordinator
ondrej-lukas Nov 19, 2024
50f09d3
Fix ruff errors
ondrej-lukas Nov 19, 2024
cd57eca
Ad additonal params to the constructor of NSE
ondrej-lukas Nov 19, 2024
7c1f2ce
remove unused queue
ondrej-lukas Nov 19, 2024
dddaf48
skip coordinator tests
ondrej-lukas Nov 19, 2024
33a9037
Skipp coordinator tests for now
ondrej-lukas Nov 19, 2024
4a825cb
Move world queues to coordinator
ondrej-lukas Nov 19, 2024
b5ac060
Better comments
ondrej-lukas Nov 19, 2024
d2f9bd1
Fix typo
ondrej-lukas Nov 19, 2024
df196ab
Add from string repr for RESET_DONE
ondrej-lukas Nov 19, 2024
ba7ca57
reduce sleeping time
ondrej-lukas Nov 19, 2024
a7bd253
reduce sleeping time
ondrej-lukas Nov 19, 2024
9c2c1a0
Add diagram to readme
ondrej-lukas Nov 25, 2024
d3029a8
Remove unused import - RUFF error
ondrej-lukas Nov 25, 2024
60e18fc
Update docs
ondrej-lukas Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 344 additions & 178 deletions coordinator.py

Large diffs are not rendered by default.

49 changes: 20 additions & 29 deletions docs/Coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,29 @@
Coordinator is the centerpiece of the game orchestration. It provides an interface between the agents and the AIDojo world.

1. Registration of new agents in the game
2. Verification of agents' actionf format
2. Verification of agents' action format
3. Recording (and storing) trajectories of agents
4. Detection of episode ends (either by reaching timout or agents reaching their respective goals)
5. Assigning rewards for each action and at the end of each episode
6. Removing agents from the game
7. Registering the GameReset requests and handelling the game resets.

## Connction to other game components
Coordinator, having the role of the middle man in all communication between the agent and the world uses several queues for massing passing and handelling.

1. `Actions queue` is a queue in which the agents submit their actions. It provides N:1 communication channel in which the coordinator receives the inputs.
2. `Answer queue` is a separeate queue **per agent** in which the results of the actions are send to the agent.
3. `World action queue` is a queue used for sending the acions from coordinator to the AI Dojo world
4. `World response queue` is a channel used for wolrd -> coordinator communicaiton (responses to the agents' action)
<img src="/docs/figures/message_passing_coordinator.jpg" alt="Message passing overview" width="30%"/>


## Main components of the coordinator
`self._actions_queue`: asycnio queue for agent -> aidojo_world communication
`self._answers_queue`: asycnio queue for aidojo_world -> agent communication
`self._actions_queue`: asycnio queue for agents -> coordinator communication
`self._answer_queues`: dictionary of asycnio queues for coordinator -> agent communication (1 queue per agent)
`self._world_action_queue`: asycnio queue for coordinator -> world queue communication
`self._world_response_queue`: asycnio queue for world -> coordinator queue communication
`self.task_config`: Object with the configuration of the scenario
`self.ALLOWED_ROLES`: list of allowed agent roles [`Attacker`, `Defender`, `Benign`]
`self._world`: Instance of `AIDojoWorld`. Implements the dynamics of the world
`self._CONFIG_FILE_HASH`: hash of the configuration file used in the interaction (scenario, topology, etc.). Used for better reproducibility of results
Expand All @@ -24,33 +37,11 @@ Coordinator is the centerpiece of the game orchestration. It provides an interfa
### Agent information components
`self.agents`: information about connected agents {`agent address`: (`agent_name`,`agent_role`)}
`self._agent_steps`: step counter for each agent in the current episode
`self._reset_requests`: dictionary where requests for episode reset are collected (the world resets only if ALL agents request reset)
`self._reset_requests`: dictionary where requests for episode reset are collected (the world resets only if **all** active agents request reset)
`self._agent_observations`: current observation per agent
`self._agent_starting_position`: starting position (with wildcards, see [configuration](../README.md#task-configuration)) per agent
`self._agent_states`: current GameState per agent
`self._agent_statuses`: status of each agent. One of following options:
- `playing`: agent is registered and can participate in current episode. Can't influence the episode termination
- `playing_active`: agent is registered and can participate in current episode. It has `goal` and `max_steps` defined and can influence the termination of the episode
- `goal_reached`: agent has reached it's goal in this episode. It can't perform any more actions until the interaction is resetted.
- `blocked`: agent has been blocked. It can't perform any more actions until the interaction is resetted.
- `max_steps`: agent has reached it's maximum allowed steps. It can't perform any more actions until the interaction is resetted.


`self._agent_last_action`: last Action per agent
`self._agent_statuses`: status of each agent. One of AgentStatus
`self._agent_rewards`: dictionary of final reward of each agent in the current episod. Only agent's which can't participate in the ongoing episode are listed.
`self._agent_trajectories`: complete trajectories for each agent in the ongoing episode

## The format of the messages to the agents is
{
"to_agent": address of client,
"status": {
"#players": number of players,
"running": true or false,
"time": time in game,
} ,
"message": Generic text messages (optional),
"state": (optional) {
"observation": observation_object,
"ended": if the game ended or not,
"reason": reason for ending
}
}
`self._agent_trajectories`: complete trajectories for each agent in the ongoing episode
Binary file added docs/figures/message_passing_coordinator.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions env/game_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ def from_json(cls, json_string):
@enum.unique
class GameStatus(enum.Enum):
OK = 200

CREATED = 201
RESET_DONE = 202
BAD_REQUEST = 400
FORBIDDEN = 403

Expand All @@ -475,6 +477,8 @@ def from_string(cls, string:str):
return GameStatus.BAD_REQUEST
case "GameStatus.FORBIDDEN":
return GameStatus.FORBIDDEN
case "GameStatus.RESET_DONE":
return GameStatus.RESET_DONE
def __repr__(self) -> str:
return str(self)
if __name__ == "__main__":
Expand Down
45 changes: 40 additions & 5 deletions env/worlds/aidojo_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,37 @@
# Template of world to be used in AI Dojo
import sys
import os
import asyncio

sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import env.game_components as components
import logging
from utils.utils import ConfigParser
from env.game_components import GameState, Action, GameStatus, ActionType

"""
Basic class for worlds to be used in the AI Dojo.
Every world (environment) used in AI Dojo should extend this class and implement
all its methods to be compatible with the game server and game coordinator.
"""
class AIDojoWorld(object):
def __init__(self, task_config_file:str, world_name:str="BasicAIDojoWorld")->None:
def __init__(self, task_config_file:str,action_queue:asyncio.Queue, response_queue:asyncio.Queue, world_name:str="BasicAIDojoWorld")->None:
self.task_config = ConfigParser(task_config_file)
self.logger = logging.getLogger(world_name)
self._action_queue = action_queue
self._response_queue = response_queue
self._world_name = world_name

@property
def world_name(self)->str:
return self._world_name

def step(self, current_state:components.GameState, action:components.Action, agent_id:tuple)-> components.GameState:
def step(self, current_state:GameState, action:Action, agent_id:tuple)-> GameState:
"""
Executes given action in a current state of the environment and produces new GameState.
"""
raise NotImplementedError

def create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->components.GameState:
def create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState:
"""
Produces a GameState based on the view of the world.
"""
Expand All @@ -46,4 +54,31 @@ def update_goal_dict(self, goal_dict:dict)->dict:
"""
Takes the existing goal dict and updates it with respect to the world.
"""
raise NotImplementedError
raise NotImplementedError

async def handle_incoming_action(self)->None:
try:
self.logger.info(f"\tStaring {self.world_name} task.")
while True:
agent_id, action, game_state = await self._action_queue.get()
self.logger.debug(f"Received from{agent_id}: {action}, {game_state}.")
match action.type:
case ActionType.JoinGame:
msg = (agent_id, (self.create_state_from_view(game_state), GameStatus.CREATED))
case ActionType.QuitGame:
msg = (agent_id, (GameState(),GameStatus.OK))
case ActionType.ResetGame:
if agent_id == "world": #reset the world
self.reset()
continue
else:
msg = (agent_id, (self.create_state_from_view(game_state), GameStatus.RESET_DONE))
case _:
new_state = self.step(game_state, action,agent_id)
msg = (agent_id, (new_state, GameStatus.OK))
# new_state = self.step(state, action, agent_id)
self.logger.debug(f"Sending to{agent_id}: {msg}")
await self._response_queue.put(msg)
await asyncio.sleep(0)
except asyncio.CancelledError:
self.logger.info(f"\t{self.world_name} Terminating by CancelledError")
Loading
Loading