Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/agentscope_frozenlake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Frozen Lake Agent

This example shows the implementation of a Frozen Lake agent using the Agentscope framework. The agent is designed to navigate a frozen lake environment by interpreting observations and selecting appropriate actions.

The data preparation and environment setup are the same as those in the [GRPO Frozen Lake example](../grpo_frozen_lake/README.md). Please follow the instructions there to set up the environment and prepare the dataset.
Empty file.
67 changes: 67 additions & 0 deletions examples/agentscope_frozenlake/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from agentscope.agent import ReActAgent
from agentscope.formatter import OpenAIChatFormatter
from agentscope.message import Msg
from agentscope.model import OpenAIChatModel

from examples.agentscope_frozenlake.utils import SYSTEM_PROMPT, FrozenLakeAction

INVALID_ACTION = "still"
VALID_ACTIONS = {
"left": 1,
"down": 2,
"right": 3,
"up": 4,
}


class FrozenLakeAgent:
def __init__(self, model: OpenAIChatModel, max_steps: int = 20):
self.model = model
self.agent = ReActAgent(
name="frozenlake_agent",
sys_prompt=SYSTEM_PROMPT,
model=model,
formatter=OpenAIChatFormatter(),
)
self.response_structure = FrozenLakeAction
self.current_step = 0
self.last_action = None
self.last_observation = None
self.max_steps = max_steps

def get_prompt(self, observation: str) -> str:
prompt = (
f"Current Observation ({self.current_step}): \n"
+ observation
+ "\n"
+ "You have not achieved the goal, P has not reached G yet. Please give the next action."
)
if self.current_step > 0 and self.last_action is not None:
if self.last_observation == observation:
prompt = "\nYour last response is invalid. Your position didn't change at all. You may need to recheck your thinking process, action outputted, and the format of response."

if self.max_steps is not None and self.max_steps - self.current_step > 0:
prompt += (
f"\nThe maximum number of steps remaining is {self.max_steps - self.current_step}."
)

return prompt

def get_action(self, response: Msg) -> str:
if not response.metadata or "action" not in response.metadata:
return INVALID_ACTION
action = response.metadata["action"]
action = action.lower()
if action not in VALID_ACTIONS:
return INVALID_ACTION
return action

async def step(self, current_observation: str) -> str:
prompt = self.get_prompt(current_observation)
response = await self.agent.reply(
Msg("user", prompt, role="user"), structured_model=self.response_structure
)
action = self.get_action(response)
self.last_observation = current_observation
self.last_action = action
return action
210 changes: 210 additions & 0 deletions examples/agentscope_frozenlake/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import copy
from typing import Dict, Optional, Tuple

import numpy as np

from examples.agentscope_frozenlake.utils import generate_random_map, get_goal_position
from trinity.utils.log import get_logger

try:
from gymnasium.envs.toy_text.frozen_lake import FrozenLakeEnv as GymFrozenLakeEnv
except ImportError:
GymFrozenLakeEnv = object


class FrozenLakeEnv(GymFrozenLakeEnv):
# Map gym state in integer
MAP_LOOKUP = {
b"P": 0,
b"F": 1,
b"H": 2,
b"G": 3,
}

# Define rules to transform to rendered text observation of the environment
GRID_LOOKUP = {
0: " P \t", # player
1: " _ \t", # frozen
2: " O \t", # hole
3: " G \t", # goal
4: " X \t", # player fall into hole
5: " √ \t", # player on goal
}

ACTION_LOOKUP = {
"still": 0,
"left": 1,
"down": 2,
"right": 3,
"up": 4,
}

INVALID_ACTION = 0
PENALTY_FOR_INVALID = -1

def __init__(
self,
max_steps: int = 8,
desc: Optional[str] = None,
is_slippery: bool = False,
size: int = 8,
p: float = 0.8,
seed: int = 42,
):
self.logger = get_logger()
self.max_steps = max_steps or 8
self.desc = desc
self.is_slippery = is_slippery
self.size = size
self.p = p
self.seed = seed
try:
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import (
FrozenLakeEnv as GymFrozenLakeEnv,
)
except ImportError as e:
error_message = (
f"Gymnasium is not installed. Please install gymnasium first before "
f"running the frozen_lake workflow. Error: {str(e)}"
)
self.logger.error(error_message)
raise ImportError(error_message)

if self.desc is None:
random_map, goal_position = generate_random_map(
size=self.size, p=self.p, seed=self.seed, max_steps=self.max_steps
)
else:
random_map = np.asarray(copy.deepcopy(self.desc), dtype="c")
goal_position = get_goal_position(random_map)

self.goal_position = goal_position

GymFrozenLakeEnv.__init__(self, desc=random_map[:], is_slippery=self.is_slippery)
self.action_space = gym.spaces.Discrete(4, start=1)

self.map_kwargs = {
"size": size,
"p": p,
}
self.env_kwargs = {
"is_slippery": is_slippery,
"desc": copy.deepcopy(desc),
"seed": seed,
}

self.action_map = {
1: 0, # left
2: 1, # down
3: 2, # right
4: 3, # up
}

def _get_player_position(self) -> Tuple[int, int]:
return (self.s // self.ncol, self.s % self.ncol) # (row, col)

def step(self, action: str) -> Tuple[str, float, bool, Dict]:
"""Execute a step in the environment.

Maps custom action to gymnasium FrozenLakeEnv action and takes the step.
Checks if the action is effective (whether player moves in the env).

Args:
action: The action to take.

Returns:
Tuple of (observation, reward, done, info).
"""
if self.success():
return self.render(), 1, True, {"action_is_effective": False}

action_id: int = self.ACTION_LOOKUP.get(action.lower(), 0)

if not action_id:
action_id = self.INVALID_ACTION

if action_id == self.INVALID_ACTION or action_id not in self.action_map:
return self.render(), 0, False, {"action_is_effective": False}

prev_player_position = int(self.s)

player_pos, reward, done, _, _ = GymFrozenLakeEnv.step(self, self.action_map[action_id])

obs = self.render()
return obs, reward, done, {"action_is_effective": prev_player_position != int(player_pos)}

def render(self, mode="tiny_rgb_array"):
"""Render the environment.

Args:
mode: Rendering mode. Options: "tiny_rgb_array", "list", "state", "rgb_array", "ansi".

Returns:
Rendered observation based on the mode.
"""
assert mode in ["tiny_rgb_array", "list", "state", "rgb_array", "ansi"]
if mode in ["rgb_array", "ansi"]:
prev_render_mode = self.render_mode
self.render_mode = mode
obs = self.render()
self.render_mode = prev_render_mode
return obs
room_state = copy.deepcopy(self.desc)

# replace the position of start 'S' with 'F'
position_S = np.where(room_state == b"S")
room_state[position_S] = b"F"

# replace the position of the player with 'P'
position_P = self._get_player_position()
room_state[position_P] = b"P"

if mode == "state":
# transform 'S', 'F', 'H', 'G' to numpy integer array
room_state = np.vectorize(lambda x: self.MAP_LOOKUP[x])(room_state)
# add player in hole or player on goal
if self.desc[position_P] == b"H":
room_state[position_P] = 4
elif self.desc[position_P] == b"G":
room_state[position_P] = 5
return room_state

room_state = self.render(mode="state").tolist()

if mode == "list":

def lookup(cell):
return self.GRID_LOOKUP.get(cell, "?").strip("\t").strip()

return [" ".join(lookup(cell) for cell in row) for row in room_state]

if mode == "tiny_rgb_array":

def lookup(cell):
return self.GRID_LOOKUP.get(cell, "?")

result = "\n".join("".join(lookup(cell) for cell in row) for row in room_state)
return result

def reset(self, task: Optional[Dict] = None):
task = task or {}
self.__init__( # type: ignore [misc]
size=task.get("size", self.map_kwargs["size"]),
p=task.get("p", self.map_kwargs["p"]),
seed=task.get("seed", self.env_kwargs["seed"]),
is_slippery=task.get("is_slippery", self.env_kwargs["is_slippery"]),
)
GymFrozenLakeEnv.reset(self, seed=self.seed)
return self.render(mode="tiny_rgb_array"), {}

def finished(self) -> bool:
player_pos = self._get_player_position()
return self.desc[player_pos] in b"GH" # type: ignore [index,operator]

def success(self):
"""
Check if the agent has reacched the goal (G) or hole (H)
"""
player_pos = self._get_player_position()
return self.desc[player_pos] in b"G"
73 changes: 73 additions & 0 deletions examples/agentscope_frozenlake/frozenlake_agent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
project: "FrozenLake"
name: "Qwen3-1.7B-agent"
checkpoint_root_dir: ${oc.env:TRINITY_CHECKPOINT_ROOT_DIR,./checkpoints}
algorithm:
algorithm_type: multi_step_grpo
repeat_times: 8
kl_loss_fn: "low_var_kl"
optimizer:
lr: 1e-6
model:
model_path: ${oc.env:TRINITY_MODEL_PATH,Qwen/Qwen3-1.7B}
max_response_tokens: 1024
max_model_len: 25600
temperature: 0.7
cluster:
node_num: 1
gpu_per_node: 8
buffer:
total_epochs: 1
batch_size: 32
train_batch_size: 2048
explorer_input:
taskset:
name: frozenlake
storage_type: file
path: ${oc.env:TRINITY_TASKSET_PATH}
split: train
workflow_args:
env_max_steps: 8
agent_max_steps: 10
is_slippery: false
eval_tasksets:
- name: frozenlake
storage_type: file
path: ${oc.env:TRINITY_TASKSET_PATH}
split: test
workflow_args:
env_max_steps: 8
agent_max_steps: 10
is_slippery: false
repeat_times: 4
rollout_args:
temperature: 0.1
default_workflow_type: 'examples.agentscope_frozenlake.workflow.FrozenLakeWorkflow'
explorer:
eval_on_startup: true
eval_interval: 20
runner_per_model: 8
rollout_model:
engine_num: 6
tensor_parallel_size: 1
enable_chunked_prefill: true
enforce_eager: false
enable_openai_api: true
enable_history: true
enable_auto_tool_choice: true
tool_call_parser: hermes
reasoning_parser: deepseek_r1
enable_thinking: true
dtype: bfloat16
seed: 42
gpu_memory_utilization: 0.85
trainer:
save_interval: 100
use_dynamic_bsz: true
grad_clip: 1.0
ulysses_sequence_parallel_size: 2

synchronizer:
sync_method: nccl
sync_style: dynamic_by_explorer
sync_interval: 1
sync_timeout: 1200
Loading