Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions camel/benchmarks/browsecomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========

import asyncio
import base64
import hashlib
import json
Expand Down Expand Up @@ -593,7 +593,7 @@ def process_benchmark_row(row: Dict[str, Any]) -> Dict[str, Any]:
elif isinstance(pipeline_template, Workforce):
pipeline = pipeline_template.clone() # type: ignore[assignment]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still calls Workforce.clone() synchronously

task = Task(content=input_message, id="0")
task = pipeline.process_task(task) # type: ignore[attr-defined]
task = asyncio.run(pipeline.process_task_async(task)) # type: ignore[attr-defined]
if task_json_formatter:
formatter_in_process = task_json_formatter.clone()
else:
Expand Down
82 changes: 40 additions & 42 deletions camel/societies/workforce/workforce.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the excellent contribution—the async-first design should definitely help with
throughput and allow callbacks to run I/O without blocking. One question: since the long-
standing public Workforce APIs (e.g., add_single_agent_worker, reset, clone) are now
asynchronous, do we plan to retain the former synchronous entry points (even if just as thin
wrappers) to preserve backward compatibility for existing scripts and examples?

Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ def _initialize_callbacks(
)

for child in self._children:
self._notify_worker_created(child)
asyncio.run(self._notify_worker_created(child))

def _notify_worker_created(
async def _notify_worker_created(
self,
worker_node: BaseNode,
*,
Expand All @@ -527,7 +527,7 @@ def _notify_worker_created(
metadata=metadata,
)
for cb in self._callbacks:
cb.log_worker_created(event)
await cb.log_worker_created(event)

def _get_or_create_shared_context_utility(
self,
Expand Down Expand Up @@ -1189,7 +1189,7 @@ async def _apply_recovery_strategy(
subtask_ids=[st.id for st in subtasks],
)
for cb in self._callbacks:
cb.log_task_decomposed(task_decomposed_event)
await cb.log_task_decomposed(task_decomposed_event)
for subtask in subtasks:
task_created_event = TaskCreatedEvent(
task_id=subtask.id,
Expand All @@ -1199,7 +1199,7 @@ async def _apply_recovery_strategy(
metadata=subtask.additional_info,
)
for cb in self._callbacks:
cb.log_task_created(task_created_event)
await cb.log_task_created(task_created_event)

# Insert subtasks at the head of the queue
self._pending_tasks.extendleft(reversed(subtasks))
Expand Down Expand Up @@ -1719,7 +1719,7 @@ async def handle_decompose_append_task(
return [task]

if reset and self._state != WorkforceState.RUNNING:
self.reset()
await self.reset()
logger.info("Workforce reset before handling task.")

# Focus on the new task
Expand All @@ -1733,7 +1733,7 @@ async def handle_decompose_append_task(
metadata=task.additional_info,
)
for cb in self._callbacks:
cb.log_task_created(task_created_event)
await cb.log_task_created(task_created_event)

# The agent tend to be overconfident on the whole task, so we
# decompose the task into subtasks first
Expand All @@ -1754,7 +1754,7 @@ async def handle_decompose_append_task(
subtask_ids=[st.id for st in subtasks],
)
for cb in self._callbacks:
cb.log_task_decomposed(task_decomposed_event)
await cb.log_task_decomposed(task_decomposed_event)
for subtask in subtasks:
task_created_event = TaskCreatedEvent(
task_id=subtask.id,
Expand All @@ -1764,7 +1764,7 @@ async def handle_decompose_append_task(
metadata=subtask.additional_info,
)
for cb in self._callbacks:
cb.log_task_created(task_created_event)
await cb.log_task_created(task_created_event)

if subtasks:
# _pending_tasks will contain both undecomposed
Expand Down Expand Up @@ -2027,7 +2027,7 @@ def _start_child_node_when_paused(
# Close the coroutine to prevent RuntimeWarning
start_coroutine.close()

def add_single_agent_worker(
async def add_single_agent_worker(
self,
description: str,
worker: ChatAgent,
Expand Down Expand Up @@ -2083,13 +2083,13 @@ def add_single_agent_worker(
# If workforce is paused, start the worker's listening task
self._start_child_node_when_paused(worker_node.start())

self._notify_worker_created(
await self._notify_worker_created(
worker_node,
worker_type='SingleAgentWorker',
)
return self

def add_role_playing_worker(
async def add_role_playing_worker(
self,
description: str,
assistant_role_name: str,
Expand Down Expand Up @@ -2160,7 +2160,7 @@ def add_role_playing_worker(
# If workforce is paused, start the worker's listening task
self._start_child_node_when_paused(worker_node.start())

self._notify_worker_created(
await self._notify_worker_created(
worker_node,
worker_type='RolePlayingWorker',
)
Expand Down Expand Up @@ -2202,7 +2202,7 @@ async def _async_reset(self) -> None:
self._pause_event.set()

@check_if_running(False)
def reset(self) -> None:
async def reset(self) -> None:
r"""Reset the workforce and all the child nodes under it. Can only
be called when the workforce is not running.
"""
Expand All @@ -2229,9 +2229,7 @@ def reset(self) -> None:
if self._loop and not self._loop.is_closed():
# If we have a loop, use it to set the event safely
try:
asyncio.run_coroutine_threadsafe(
self._async_reset(), self._loop
).result()
await self._async_reset()
except RuntimeError as e:
logger.warning(f"Failed to reset via existing loop: {e}")
# Fallback to direct event manipulation
Expand All @@ -2242,7 +2240,7 @@ def reset(self) -> None:

for cb in self._callbacks:
if isinstance(cb, WorkforceMetrics):
cb.reset_task_data()
await cb.reset_task_data()

def save_workflow_memories(
self,
Expand Down Expand Up @@ -3093,7 +3091,7 @@ async def _post_task(self, task: Task, assignee_id: str) -> None:
task_id=task.id, worker_id=assignee_id
)
for cb in self._callbacks:
cb.log_task_started(task_started_event)
await cb.log_task_started(task_started_event)

try:
await self._channel.post_task(task, self.node_id, assignee_id)
Expand Down Expand Up @@ -3240,15 +3238,13 @@ async def _create_worker_node_for_task(self, task: Task) -> Worker:

self._children.append(new_node)

self._notify_worker_created(
await self._notify_worker_created(
new_node,
worker_type='SingleAgentWorker',
role=new_node_conf.role,
metadata={'description': new_node_conf.description},
)
self._child_listening_tasks.append(
asyncio.create_task(new_node.start())
)
self._child_listening_tasks.append(await new_node.start())
return new_node

async def _create_new_agent(self, role: str, sys_msg: str) -> ChatAgent:
Expand Down Expand Up @@ -3363,7 +3359,7 @@ async def _post_ready_tasks(self) -> None:
for cb in self._callbacks:
# queue_time_seconds can be derived by logger if task
# creation time is logged
cb.log_task_assigned(task_assigned_event)
await cb.log_task_assigned(task_assigned_event)

# Step 2: Iterate through all pending tasks and post those that are
# ready
Expand Down Expand Up @@ -3492,7 +3488,7 @@ async def _post_ready_tasks(self) -> None:
},
)
for cb in self._callbacks:
cb.log_task_failed(task_failed_event)
await cb.log_task_failed(task_failed_event)

self._completed_tasks.append(task)
self._cleanup_task_tracking(task.id)
Expand Down Expand Up @@ -3555,7 +3551,7 @@ async def _handle_failed_task(self, task: Task) -> bool:
},
)
for cb in self._callbacks:
cb.log_task_failed(task_failed_event)
await cb.log_task_failed(task_failed_event)

# Check for immediate halt conditions
if task.failure_count >= MAX_TASK_RETRIES:
Expand Down Expand Up @@ -3699,7 +3695,7 @@ async def _handle_completed_task(self, task: Task) -> None:
metadata={'current_state': task.state.value},
)
for cb in self._callbacks:
cb.log_task_completed(task_completed_event)
await cb.log_task_completed(task_completed_event)

# Find and remove the completed task from pending tasks
tasks_list = list(self._pending_tasks)
Expand Down Expand Up @@ -3815,7 +3811,7 @@ async def _graceful_shutdown(self, failed_task: Task) -> None:
# Wait for the full timeout period
await asyncio.sleep(self.graceful_shutdown_timeout)

def get_workforce_log_tree(self) -> str:
async def get_workforce_log_tree(self) -> str:
r"""Returns an ASCII tree representation of the task hierarchy and
worker status.
"""
Expand All @@ -3825,19 +3821,19 @@ def get_workforce_log_tree(self) -> str:
if len(metrics_cb) == 0:
return "Metrics Callback not initialized."
else:
return metrics_cb[0].get_ascii_tree_representation()
return await metrics_cb[0].get_ascii_tree_representation()

def get_workforce_kpis(self) -> Dict[str, Any]:
async def get_workforce_kpis(self) -> Dict[str, Any]:
r"""Returns a dictionary of key performance indicators."""
metrics_cb: List[WorkforceMetrics] = [
cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics)
]
if len(metrics_cb) == 0:
return {"error": "Metrics Callback not initialized."}
else:
return metrics_cb[0].get_kpis()
return await metrics_cb[0].get_kpis()

def dump_workforce_logs(self, file_path: str) -> None:
async def dump_workforce_logs(self, file_path: str) -> None:
r"""Dumps all collected logs to a JSON file.

Args:
Expand All @@ -3849,7 +3845,7 @@ def dump_workforce_logs(self, file_path: str) -> None:
if len(metrics_cb) == 0:
print("Logger not initialized. Cannot dump logs.")
return
metrics_cb[0].dump_to_json(file_path)
await metrics_cb[0].dump_to_json(file_path)
# Use logger.info or print, consistent with existing style
logger.info(f"Workforce logs dumped to {file_path}")

Expand Down Expand Up @@ -4325,7 +4321,7 @@ async def _listen_to_channel(self) -> None:
logger.info("All tasks completed.")
all_tasks_completed_event = AllTasksCompletedEvent()
for cb in self._callbacks:
cb.log_all_tasks_completed(all_tasks_completed_event)
await cb.log_all_tasks_completed(all_tasks_completed_event)

# shut down the whole workforce tree
self.stop()
Expand Down Expand Up @@ -4413,7 +4409,7 @@ async def cleanup():

self._running = False

def clone(self, with_memory: bool = False) -> 'Workforce':
async def clone(self, with_memory: bool = False) -> 'Workforce':
r"""Creates a new instance of Workforce with the same configuration.

Args:
Expand Down Expand Up @@ -4444,13 +4440,13 @@ def clone(self, with_memory: bool = False) -> 'Workforce':
for child in self._children:
if isinstance(child, SingleAgentWorker):
cloned_worker = child.worker.clone(with_memory)
new_instance.add_single_agent_worker(
await new_instance.add_single_agent_worker(
child.description,
cloned_worker,
pool_max_size=10,
)
elif isinstance(child, RolePlayingWorker):
new_instance.add_role_playing_worker(
await new_instance.add_role_playing_worker(
child.description,
child.assistant_role_name,
child.user_role_name,
Expand All @@ -4460,7 +4456,7 @@ def clone(self, with_memory: bool = False) -> 'Workforce':
child.chat_turn_limit,
)
elif isinstance(child, Workforce):
new_instance.add_workforce(child.clone(with_memory))
new_instance.add_workforce(await child.clone(with_memory))
else:
logger.warning(f"{type(child)} is not being cloned.")
continue
Expand Down Expand Up @@ -4695,7 +4691,7 @@ def get_children_info():
return children_info

# Add single agent worker
def add_single_agent_worker(
async def add_single_agent_worker(
description,
system_message=None,
role_name="Assistant",
Expand Down Expand Up @@ -4759,7 +4755,9 @@ def add_single_agent_worker(
"message": str(e),
}

workforce_instance.add_single_agent_worker(description, agent)
await workforce_instance.add_single_agent_worker(
description, agent
)

return {
"status": "success",
Expand All @@ -4770,7 +4768,7 @@ def add_single_agent_worker(
return {"status": "error", "message": str(e)}

# Add role playing worker
def add_role_playing_worker(
async def add_role_playing_worker(
description,
assistant_role_name,
user_role_name,
Expand Down Expand Up @@ -4827,7 +4825,7 @@ def add_role_playing_worker(
"message": "Cannot add workers while workforce is running", # noqa: E501
}

workforce_instance.add_role_playing_worker(
await workforce_instance.add_role_playing_worker(
description=description,
assistant_role_name=assistant_role_name,
user_role_name=user_role_name,
Expand Down
20 changes: 11 additions & 9 deletions camel/societies/workforce/workforce_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,42 @@ class WorkforceCallback(ABC):
"""

@abstractmethod
def log_task_created(
async def log_task_created(
self,
event: TaskCreatedEvent,
) -> None:
pass

@abstractmethod
def log_task_decomposed(self, event: TaskDecomposedEvent) -> None:
async def log_task_decomposed(self, event: TaskDecomposedEvent) -> None:
pass

@abstractmethod
def log_task_assigned(self, event: TaskAssignedEvent) -> None:
async def log_task_assigned(self, event: TaskAssignedEvent) -> None:
pass

@abstractmethod
def log_task_started(self, event: TaskStartedEvent) -> None:
async def log_task_started(self, event: TaskStartedEvent) -> None:
pass

@abstractmethod
def log_task_completed(self, event: TaskCompletedEvent) -> None:
async def log_task_completed(self, event: TaskCompletedEvent) -> None:
pass

@abstractmethod
def log_task_failed(self, event: TaskFailedEvent) -> None:
async def log_task_failed(self, event: TaskFailedEvent) -> None:
pass

@abstractmethod
def log_worker_created(self, event: WorkerCreatedEvent) -> None:
async def log_worker_created(self, event: WorkerCreatedEvent) -> None:
pass

@abstractmethod
def log_worker_deleted(self, event: WorkerDeletedEvent) -> None:
async def log_worker_deleted(self, event: WorkerDeletedEvent) -> None:
pass

@abstractmethod
def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None:
async def log_all_tasks_completed(
self, event: AllTasksCompletedEvent
) -> None:
pass
Loading
Loading