Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 90 additions & 51 deletions openmanus_rl/agentgym/agentenv-gaia/agentenv_gaia/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

# List of default tools to include in every environment
DEFAULT_TOOLS = ["web_search", "bash", "python_execute", "browser_use", "terminate"]

# Define valid action types
VALID_ACTIONS = {
"web_search": {"param": "query"},
"bash": {"param": "command"},
"python_execute": {"param": "code"},
"browser_use": {"param": "input"},
"terminate": {"param": "answer"}
}


# GaiaEnvServer class to manage environment instances
Expand Down Expand Up @@ -239,32 +246,45 @@ def step(self, env_id: str, action: str):
with self.env_locks[env_id]:
env = self.env_instances[env_id]

# Parse and process the action
action_type, action_input = self._parse_action(action)
try:
# Parse and process the action
action_type, action_input = self._parse_action(action)

# Update state
env["state"]["steps_taken"] += 1
# Update state
env["state"]["steps_taken"] += 1

# Process the action using available tools
observation, reward, done = self._process_action(env_id, action_type, action_input)
# Process the action using available tools
observation, reward, done = self._process_action(env_id, action_type, action_input)

# Store action and result in memory
env["state"]["memory"].append({
"action": action,
"result": observation,
"step": env["state"]["steps_taken"]
})
# Store action and result in memory
env["state"]["memory"].append({
"action": action,
"result": observation,
"step": env["state"]["steps_taken"]
})

# Update state with results
env["state"]["reward"] += reward
env["state"]["done"] = done
# Update state with results
env["state"]["reward"] += reward
env["state"]["done"] = done

info = {
"steps_taken": env["state"]["steps_taken"],
"action_processed": action,
}
info = {
"steps_taken": env["state"]["steps_taken"],
"action_processed": action,
}

return observation, reward, done, info
return observation, reward, done, info

except ValueError as e:
# Handle validation errors
error_msg = str(e)
observation = f"Error: {error_msg}"
reward = -0.1 # Penalty for invalid action
done = False
info = {
"steps_taken": env["state"]["steps_taken"],
"error": error_msg
}
return observation, reward, done, info

def _parse_action(self, action: str) -> Tuple[str, Dict[str, Any]]:
"""
Expand All @@ -275,53 +295,71 @@ def _parse_action(self, action: str) -> Tuple[str, Dict[str, Any]]:

Returns:
tuple: (action_type, action_parameters)

Raises:
ValueError: If action format is invalid
"""
try:
# Split by newlines and find the action line
lines = action.split('\n')
action_line = None
for line in lines:
if line.strip().startswith('Action:'):
action_line = line.strip()
break

if not action_line:
raise ValueError("No valid action found in the input")

# Parse actions in format "Action: action_type with Action Input: action_input"
if "Action:" in action and "Action Input:" in action:
action_parts = action.split("Action Input:", 1)
if "Action:" in action_line and "Action Input:" in action_line:
action_parts = action_line.split("Action Input:", 1)
action_type = action_parts[0].replace("Action:", "").strip()
action_input = action_parts[1].strip()
return action_type, {"query": action_input} if action_type == "web_search" else {
"command": action_input} if action_type == "bash" else {
"code": action_input} if action_type == "python_execute" else {"input": action_input}

# Validate action type
if action_type not in VALID_ACTIONS:
raise ValueError(f"Invalid action type: {action_type}. Valid types are: {', '.join(VALID_ACTIONS.keys())}")

# Return with appropriate parameter name
param_name = VALID_ACTIONS[action_type]["param"]
return action_type, {param_name: action_input}

# Parse JSON format with tool_name and parameters
elif action.strip().startswith("{") and action.strip().endswith("}"):
elif action_line.strip().startswith("{") and action_line.strip().endswith("}"):
try:
action_json = json.loads(action)
if "tool_name" in action_json:
tool_name = action_json.pop("tool_name")
return tool_name, action_json
action_json = json.loads(action_line)
if "tool_name" not in action_json:
raise ValueError("JSON format must include 'tool_name' field")

tool_name = action_json.pop("tool_name")
if tool_name not in VALID_ACTIONS:
raise ValueError(f"Invalid tool name: {tool_name}. Valid tools are: {', '.join(VALID_ACTIONS.keys())}")

return tool_name, action_json
except json.JSONDecodeError:
pass
raise ValueError("Invalid JSON format")

# Parse direct tool calls (e.g., "web_search: query")
elif ":" in action:
tool_name, input_text = action.split(":", 1)
elif ":" in action_line:
tool_name, input_text = action_line.split(":", 1)
tool_name = tool_name.strip()
input_text = input_text.strip()

# Map to appropriate parameter name based on tool
if tool_name.lower() in ["web_search", "search_web"]:
return tool_name, {"query": input_text}
elif tool_name.lower() == "bash":
return tool_name, {"command": input_text}
elif tool_name.lower() == "python_execute":
return tool_name, {"code": input_text}
elif tool_name.lower() == "browser_use":
return tool_name, {"url": input_text}
elif tool_name.lower() == "terminate":
return "terminate", {"answer": input_text}
else:
return tool_name, {"input": input_text}
if tool_name not in VALID_ACTIONS:
raise ValueError(f"Invalid tool name: {tool_name}. Valid tools are: {', '.join(VALID_ACTIONS.keys())}")

# Handle plain text as a default case
return "unknown", {"input": action}
param_name = VALID_ACTIONS[tool_name]["param"]
return tool_name, {param_name: input_text}

else:
raise ValueError("Invalid action format. Must be one of:\n" +
"1. 'Action: tool_name with Action Input: input'\n" +
"2. JSON format with tool_name and parameters\n" +
"3. 'tool_name: input'")

except Exception as e:
print(f"Error parsing action: {e}")
return "unknown", {"input": action}
raise ValueError(f"Error parsing action: {str(e)}")

def _process_action(self, env_id: str, action_type: str, action_input: Dict[str, Any]) -> Tuple[str, float, bool]:
"""
Expand Down Expand Up @@ -495,6 +533,7 @@ def _format_new_task(self, env_id: str):
observation += "\nUse tools in the format: Action: tool_name with Action Input: your_input\n"
observation += "\nGive me one action."


return observation

def _format_observation(self, env_id: str):
Expand Down
87 changes: 87 additions & 0 deletions openmanus_rl/agentgym/agentenv-gaia/test_gaia_react.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import requests
import json

def test_gaia_react_format():
# 服务器地址
base_url = "http://127.0.0.1:8081"

# 1. 创建新环境
create_response = requests.post(
f"{base_url}/create",
json={
"data_dir": "data/",
"level": "level1",
"dataset": "validation"
}
)
print(f"Create response: {create_response.text}")
env_idx = create_response.text.strip('"') # 移除引号
print(f"Created environment with ID: {env_idx}")

if not env_idx:
print("Failed to create environment")
return

# 2. 获取初始观察
obs_response = requests.get(
f"{base_url}/observation",
params={"env_idx": env_idx}
)
print(f"Observation response: {obs_response.text}")
try:
initial_observation = obs_response.json().get("observation")
except:
initial_observation = obs_response.text
print(f"\nInitial observation:\n{initial_observation}")

# 3. 测试ReAct格式的响应
# 使用完整的ReAct格式,包含Thought和Action
react_response = """Thought: I need to search for information about the capital of France.
Action: web_search Action Input: What is the capital of France?"""

# 4. 执行动作
step_response = requests.post(
f"{base_url}/step",
json={
"env_idx": env_idx,
"action": react_response
}
)
print(f"Step response: {step_response.text}")
try:
result = step_response.json()
except:
result = {"response": step_response.text}
print(f"\nStep result:\n{json.dumps(result, indent=2)}")

# 5. 验证输出格式
print("\nValidating ReAct format:")
print("1. Response contains 'Thought:' section:", "Thought:" in react_response)
print("2. Response contains 'Action:' section:", "Action:" in react_response)
print("3. Action format is correct:", "Action Input:" in react_response)

# 提取并验证action部分
action_line = None
for line in react_response.split('\n'):
if line.strip().startswith('Action:'):
action_line = line.strip()
break

if action_line:
print("4. Action line format is valid:", action_line.startswith('Action:'))
print("5. Tool name is valid:", action_line.split("Action:")[1].split("Action Input:")[0].strip() in ["web_search", "bash", "python_execute", "browser_use", "terminate"])

# 6. 获取可用动作
actions_response = requests.get(
f"{base_url}/available_actions",
params={"env_idx": env_idx}
)
print(f"Available actions response: {actions_response.text}")
try:
available_actions = actions_response.json()
except:
available_actions = {"response": actions_response.text}
print(f"\nAvailable actions:\n{json.dumps(available_actions, indent=2)}")

if __name__ == "__main__":
test_gaia_react_format()
Loading