Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 157 additions & 14 deletions master/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import yaml
from agents.planner import GlobalTaskPlanner

# Import flagscale last to avoid path conflicts
from flag_scale.flagscale.agent.collaboration import Collaborator


Expand Down Expand Up @@ -71,7 +73,7 @@ def _init_scene(self, scene_config):
if scene_name:
self.collaborator.record_environment(scene_name, json.dumps(scene_info))
else:
print("Warning: Missing 'name' in scene_info:", scene_info)
self.logger.warning("Warning: Missing 'name' in scene_info: %s", scene_info)

def _handle_register(self, robot_name: Dict) -> None:
"""Listen for robot registrations."""
Expand Down Expand Up @@ -100,7 +102,6 @@ def _handle_result(self, data: str):
subtask_handle = data.get("subtask_handle")
subtask_result = data.get("subtask_result")

# TODO: Task result should be refered to the next step determination.
if robot_name and subtask_handle and subtask_result:
self.logger.info(
f"================ Received result from {robot_name} ================"
Expand Down Expand Up @@ -198,9 +199,131 @@ def reasoning_and_subtasks_is_right(self, reasoning_and_subtasks: dict) -> bool:
except (TypeError, KeyError):
return False

def _save_task_data_to_json(self, task_id: str, task: str, reasoning_and_subtasks: dict):
"""Save task data to JSON file - single file stores all tasks"""
import os
from datetime import datetime

log_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.log')
os.makedirs(log_dir, exist_ok=True)

json_file = os.path.join(log_dir, f"master_data_{task_id}.json")
current_task_data = {
"task_id": task_id,
"task": task,
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"reasoning_explanation": reasoning_and_subtasks.get("reasoning_explanation", ""),
"subtask_list": reasoning_and_subtasks.get("subtask_list", []),
"prompt_content": self._get_last_prompt_content()
}

if os.path.exists(json_file):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except:
data = {"tasks": []}
else:
data = {"tasks": []}

data["tasks"].append(current_task_data)
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

def _get_last_prompt_content(self) -> str:
"""Get the last prompt content"""
if hasattr(self.planner, 'last_prompt_content'):
return self.planner.last_prompt_content
return ""

def _store_task_to_long_term_memory(self, task_id: str, task: str, reasoning_and_subtasks: dict):
"""Store task decomposition results to long-term memory

Args:
task_id: Task ID
task: Original task description
reasoning_and_subtasks: Task decomposition results
"""
if not hasattr(self.planner, 'long_term_memory'):
self.logger.warning(f"Planner does not have long_term_memory attribute, cannot store task {task_id}")
return
if not self.planner.long_term_memory:
self.logger.warning(f"Planner's long_term_memory is None, cannot store task {task_id}")
return

self.logger.info(f"[LongTermMemory] Storing task {task_id} to long-term memory: {task[:50]}")

try:
import time
import sys
import os
import importlib.util

# Import TaskContext and CompactActionStep from slaver memory module
_slaver_path = os.path.join(os.path.dirname(__file__), '..', '..', 'slaver')
sys.path.insert(0, _slaver_path)
try:
from tools.memory import TaskContext, CompactActionStep
except ImportError:
# Fallback to direct file loading
_memory_file = os.path.join(_slaver_path, 'tools', 'memory.py')
spec = importlib.util.spec_from_file_location('memory_module', _memory_file)
memory_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(memory_module)
TaskContext = memory_module.TaskContext
CompactActionStep = memory_module.CompactActionStep

subtask_list = reasoning_and_subtasks.get("subtask_list", [])

tool_sequence = []
for subtask in subtask_list:
subtask_desc = subtask.get("subtask", "")
if "Navigate" in subtask_desc:
tool_sequence.append("Navigate")
elif "Grasp" in subtask_desc:
tool_sequence.append("Grasp")
elif "Place" in subtask_desc:
tool_sequence.append("Place")

start_time = time.time()

actions = []
for i, subtask in enumerate(subtask_list, 1):
subtask_desc = subtask.get("subtask", "")
action = CompactActionStep(
step_number=i,
timestamp=start_time + i,
tool_name=subtask_desc.split()[0] if subtask_desc else "unknown",
tool_arguments={},
tool_result_summary=f"Subtask: {subtask_desc}",
success=True,
duration=1.0,
error_msg=None
)
actions.append(action)

task_context = TaskContext(
task_id=task_id,
task_text=task,
start_time=start_time,
actions=actions,
end_time=start_time + len(subtask_list),
success=True
)
stored_id = self.planner.long_term_memory.store_task_episode(task_context)
self.logger.info(f"[LongTermMemory] ✅ Task {task_id} stored to long-term memory as {stored_id}")
self.logger.info(f"[LongTermMemory] ✅ Task {task_id} stored to long-term memory")

except Exception as e:
error_msg = f"Failed to store task {task_id} to long-term memory: {e}"
self.logger.warning(error_msg)
self.logger.warning(f"[LongTermMemory] ❌ {error_msg}")
import traceback
self.logger.warning(traceback.format_exc())

def publish_global_task(self, task: str, refresh: bool, task_id: str) -> Dict:
"""Publish a global task to all Agents"""
self.logger.info(f"Publishing global task: {task}")
self.logger.info(f"[TASK_START:{task_id}] {task}")

response = self.planner.forward(task)
reasoning_and_subtasks = self._extract_json(response)
Expand All @@ -210,25 +333,45 @@ def publish_global_task(self, task: str, refresh: bool, task_id: str) -> Dict:
while (not self.reasoning_and_subtasks_is_right(reasoning_and_subtasks)) and (
attempt < self.config["model"]["model_retry_planning"]
):
self.logger.warning(
f"[WARNING] JSON extraction failed after {self.config['model']['model_retry_planning']} attempts."
)
self.logger.error(
f"[ERROR] Task ({task}) failed to be decomposed into subtasks, it will be ignored."
)
self.logger.warning(
f"Attempt {attempt + 1} to extract JSON failed. Retrying..."
)
response = self.planner.forward(task)
reasoning_and_subtasks = self._extract_json(response)
attempt += 1

self.logger.info(f"Received reasoning and subtasks:\n{reasoning_and_subtasks}")
if reasoning_and_subtasks is None:
reasoning_and_subtasks = {"error": "Failed to extract valid task decomposition"}
self.logger.info(f"[MASTER_RESPONSE:{task_id}] {json.dumps(reasoning_and_subtasks, ensure_ascii=False)}")

self._save_task_data_to_json(task_id, task, reasoning_and_subtasks)
if reasoning_and_subtasks and "error" not in reasoning_and_subtasks:
self._store_task_to_long_term_memory(task_id, task, reasoning_and_subtasks)

subtask_list = reasoning_and_subtasks.get("subtask_list", [])
grouped_tasks = self._group_tasks_by_order(subtask_list)

task_id = task_id or str(uuid.uuid4()).replace("-", "")

try:
from subtask_analyzer import SubtaskAnalyzer
import os
log_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.log')
analyzer = SubtaskAnalyzer(log_dir=log_dir)
if isinstance(task, list):
task_str = task[0] if task else str(task)
else:
task_str = str(task)

decomposition_record = analyzer.record_decomposition(
task_id=task_id,
original_task=task_str,
reasoning_and_subtasks=reasoning_and_subtasks
)
self.logger.info(f"Subtask decomposition recorded: {decomposition_record.decomposition_quality}")
self.logger.info(f"Decomposition details: {len(subtask_list)} subtasks")
for i, subtask in enumerate(subtask_list, 1):
self.logger.info(f" {i}. [{subtask.get('robot_name', 'unknown')}] {subtask.get('subtask', '')}")
except Exception as e:
self.logger.warning(f"Failed to record subtask: {e}")

threading.Thread(
target=asyncio.run,
args=(self._dispath_subtasks_async(task, task_id, grouped_tasks, refresh),),
Expand Down Expand Up @@ -258,5 +401,5 @@ async def _dispath_subtasks_async(
)
working_robots.append(robot_name)
self.collaborator.update_agent_busy(robot_name, True)
self.collaborator.wait_agents_free(working_robots)
self.logger.info(f"Tasks sent to {len(working_robots)} agents, executing asynchronously...")
self.logger.info(f"Task_id ({task_id}) [{task}] has been sent to all agents.")
Loading