Skip to content

Commit a50aeb3

Browse files
authored
[Agent]Update the end-cloud collaboration module based on agent semantics (#631)
Use **Collaborate** instead of **Communicate** to better reflect the characteristics of end-cloud collaboration and communication storage integration. #### Agent Communication: - `send`: Publishes message to Redis channel - `listen`: Subscribes to channel with callback handler #### Agent Memory: - `record_agent_status`: Appends status to agent's short-term list - `read_agent_status`: Retrieves agent's status history - `clear_agent_status`: Clears agent's status records - `register_agent`: Registers agent in central registry - `retrieve_agent`: Gets single agent's registration data - `retrieve_all_agents`: Returns complete agent registry - `retrieve_all_agents_name`: Lists all registered agent names #### Agent Monitoring: - `agent_heartbeat`: Updates agent TTL - `update_agent_busy`: Sets agent's availability flag - `agent_is_busy`: Checks agent's current status - `wait_agents_free`: Blocks until agents become available
1 parent 25e30a1 commit a50aeb3

File tree

6 files changed

+698
-252
lines changed

6 files changed

+698
-252
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .collaborator import Collaborator
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
import datetime
2+
import threading
3+
import time
4+
5+
from typing import Any, Callable, Dict, List, Optional, Union
6+
7+
from redis import ConnectionPool, Redis
8+
from redis.exceptions import ConnectionError, RedisError, TimeoutError
9+
10+
11+
class Collaborator:
12+
def __init__(
13+
self,
14+
host: str = "localhost",
15+
port: int = 6379,
16+
db: int = 0,
17+
clear: bool = False,
18+
password: Optional[str] = None,
19+
):
20+
"""
21+
Initialize Redis with individual parameters.
22+
23+
Args:
24+
host (str): Redis server hostname/IP. Default: "localhost".
25+
port (int): Redis server port. Default: 6379.
26+
db (int): Redis database index. Default: 0.
27+
clear (bool): If True, flushes the database on initialization. Default: False.
28+
password (Optional[str]): Redis authentication password. Default: None.
29+
"""
30+
self.host = host
31+
self.port = port
32+
self.db = db
33+
self.clear = clear
34+
self.password = password
35+
36+
# Log connection details (mask password for security)
37+
print(f"Connecting to Redis at {host}:{port}, db: {db}")
38+
39+
# Create Redis connection pool
40+
self.pool = ConnectionPool(
41+
host=host,
42+
port=port,
43+
db=db,
44+
password=password,
45+
decode_responses=True, # Automatically decode byte responses to strings
46+
)
47+
48+
# Clear database if requested
49+
if clear:
50+
self._clear_db()
51+
52+
@classmethod
53+
def from_config(cls, config: Dict[str, Union[str, int, bool]]) -> "Collaborator":
54+
"""
55+
Alternative constructor that initializes from a configuration dictionary.
56+
57+
Args:
58+
config (Dict): Dictionary containing Redis connection parameters.
59+
Supported keys:
60+
- host (str)
61+
- port (int)
62+
- db (int)
63+
- password (Optional[str])
64+
- clear (bool)
65+
66+
Returns:
67+
Collaborator: New instance configured with the provided settings.
68+
69+
Example:
70+
>>> config = {
71+
... "host": "redis.example.com",
72+
... "port": 6380,
73+
... "db": 1,
74+
... "password": "secret",
75+
... "clear": True
76+
... }
77+
>>> coll = Collaborator.from_config(config)
78+
"""
79+
return cls(
80+
host=config.get("host", "localhost"), # Fallback to default if not provided
81+
port=config.get("port", 6379),
82+
db=config.get("db", 0),
83+
password=config.get("password"), # None if not provided
84+
clear=config.get("clear", False),
85+
)
86+
87+
def _clear_db(self) -> None:
88+
"""Flushes the current Redis database."""
89+
with Redis(connection_pool=self.pool) as redis_client:
90+
redis_client.flushdb()
91+
print("Database cleared successfully.")
92+
93+
def _get_conn(self) -> Redis:
94+
"""Get a Redis connection from the pool."""
95+
return Redis(connection_pool=self.pool)
96+
97+
# ----------------- send/recive -----------------
98+
def send(self, channel: str, message: str) -> bool:
99+
"""send a message to a Redis channel.
100+
Returns True if the message was published successfully, False otherwise.
101+
"""
102+
try:
103+
redis_client = self._get_conn()
104+
return redis_client.publish(channel, message) > 0
105+
except (ConnectionError, TimeoutError, RedisError) as e:
106+
print(f"Error while publishing to Redis: {e}")
107+
finally:
108+
redis_client.close()
109+
110+
def listen(
111+
self,
112+
channel: str,
113+
callback: Callable[[Dict[str, Any]], None],
114+
stop_event: Optional[threading.Event] = None,
115+
) -> None:
116+
"""Subscribe to a Redis channel and call the callback function with the message.
117+
The callback function should accept a single argument, which is the message.
118+
If stop_event is provided, the subscription will stop when the event is set.
119+
"""
120+
121+
conn = self._get_conn()
122+
pubsub = conn.pubsub()
123+
pubsub.subscribe(channel)
124+
for message in pubsub.listen():
125+
print(f"Received message: {message}, {channel}, {datetime.datetime.now()}")
126+
if stop_event and stop_event.is_set():
127+
break
128+
if message["type"] == "message":
129+
callback(message["data"])
130+
131+
# ----------------- data -----------------
132+
def record_agent_status(self, name: str, value: str, _: Optional[float] = None) -> bool:
133+
"""Append a member to short-term status list (score parameter is ignored)."""
134+
try:
135+
redis_client = self._get_conn()
136+
return redis_client.rpush(f"SHORT_STATUS:{name}", value) > 0
137+
except (ConnectionError, TimeoutError, RedisError) as e:
138+
print(f"Error while appending to short-term status list: {e}")
139+
return False
140+
141+
def read_agent_status(self, name: str) -> List[str]:
142+
"""Get all members from short-term status list."""
143+
try:
144+
redis_client = self._get_conn()
145+
return redis_client.lrange(f"SHORT_STATUS:{name}", 0, -1)
146+
except (ConnectionError, TimeoutError, RedisError) as e:
147+
print(f"Error while reading short-term status list: {e}")
148+
return []
149+
150+
def clear_agent_status(self, name: str) -> bool:
151+
"""Delete short-term status list."""
152+
try:
153+
redis_client = self._get_conn()
154+
return redis_client.delete(f"SHORT_STATUS:{name}") == 1
155+
except (ConnectionError, TimeoutError, RedisError) as e:
156+
print(f"Error while clearing short-term status list: {e}")
157+
return False
158+
159+
def register_agent(
160+
self, agent_name: str, agent_data: Dict[str, str], expire_second: Optional[int] = None
161+
) -> bool:
162+
"""Register agent in Redis under AGENT_INFO hash.
163+
164+
Creates AGENT_INFO hash if not exists.
165+
166+
Args:
167+
agent_name (str): Key identifier for the agent
168+
agent_data (Dict[str, str]): Agent attributes
169+
expire_second (Optional[int]): TTL in seconds for the AGENT_INFO hash
170+
171+
Returns:
172+
bool: True if successful, False on failure
173+
"""
174+
try:
175+
redis_client = self._get_conn()
176+
177+
# Pipeline both operations atomically
178+
with redis_client.pipeline() as pipe:
179+
# 1. Store agent data in AGENT_INFO hash
180+
pipe.hset("AGENT_INFO", key=agent_name, value=agent_data)
181+
182+
# 2. Set expiration if specified
183+
if expire_second is not None:
184+
pipe.expire("AGENT_INFO", expire_second)
185+
186+
pipe.execute()
187+
188+
self.send("AGENT_REGISTRATION", agent_name)
189+
190+
return True
191+
192+
except (ConnectionError, TimeoutError, RedisError) as e:
193+
print(f"Failed to register agent {agent_name}: {e}")
194+
return False
195+
196+
def retrieve_agent(self, agent_name: str) -> Optional[Dict[str, str]]:
197+
"""Retrieve agent data from AGENT_INFO hash."""
198+
try:
199+
redis_client = self._get_conn()
200+
return redis_client.hget("AGENT_INFO", agent_name)
201+
except (ConnectionError, TimeoutError, RedisError) as e:
202+
print(f"Error retrieving agent {agent_name}: {e}")
203+
return None
204+
205+
def retrieve_all_agents(self) -> Dict[str, Dict[str, str]]:
206+
"""Retrieve all agents from AGENT_INFO hash."""
207+
try:
208+
redis_client = self._get_conn()
209+
return redis_client.hgetall("AGENT_INFO")
210+
except (ConnectionError, TimeoutError, RedisError) as e:
211+
print(f"Error retrieving agent registry: {e}")
212+
return {}
213+
214+
def retrieve_all_agents_name(self) -> List[str]:
215+
"""Retrieve all agent names (keys) from AGENT_INFO hash.
216+
217+
Returns:
218+
List[str]: List of all agent names/keys.
219+
Returns empty list if no agents exist or error occurs.
220+
"""
221+
try:
222+
redis_client = self._get_conn()
223+
return list(redis_client.hkeys("AGENT_INFO"))
224+
225+
except (ConnectionError, TimeoutError, RedisError) as e:
226+
print(f"Error retrieving agent names: {e}")
227+
return []
228+
229+
def agent_heartbeat(self, agent_name: str, seconds: int) -> bool:
230+
"""Set TTL for the agent's registration in AGENT_INFO hash.
231+
232+
Args:
233+
agent_name: Name of the registered agent
234+
seconds: TTL in seconds (must be > 0)
235+
236+
Returns:
237+
bool: True if TTL was set successfully, False otherwise
238+
"""
239+
try:
240+
redis_client = self._get_conn()
241+
242+
# Verify agent exists
243+
if not redis_client.hexists("AGENT_INFO", agent_name):
244+
return False
245+
246+
# Set TTL for the entire hash
247+
return bool(redis_client.expire("AGENT_INFO", seconds))
248+
249+
except (ConnectionError, TimeoutError, RedisError):
250+
return False
251+
252+
def update_agent_busy(self, agent_name: str, busy: bool) -> bool:
253+
"""Update agent's busy status in the AGENT_BUSY hash.
254+
255+
Args:
256+
agent_name (str): Name identifier for the agent
257+
busy (bool): True for busy, False for available
258+
259+
Returns:
260+
bool: True if update succeeded, False on failure
261+
262+
Example:
263+
>>> coll.update_agent_busy("robot_1", True) # Set busy
264+
>>> coll.update_agent_busy("robot_1", False) # Set available
265+
"""
266+
try:
267+
redis_client = self._get_conn()
268+
return redis_client.hset("AGENT_BUSY", agent_name, int(busy)) >= 0
269+
except (ConnectionError, TimeoutError, RedisError) as e:
270+
print(f"Error updating busy status for {agent_name}: {e}")
271+
return False
272+
273+
def agent_is_busy(self, agent_name: str) -> Optional[bool]:
274+
"""Get current busy status of an agent.
275+
276+
Args:
277+
agent_name (str): Agent name to query
278+
279+
Returns:
280+
Optional[bool]:
281+
- True if agent is busy
282+
- False if available
283+
- None if record not found or error occurred
284+
"""
285+
try:
286+
redis_client = self._get_conn()
287+
status = redis_client.hget("AGENT_BUSY", agent_name)
288+
return bool(int(status)) if status is not None else None
289+
except (ConnectionError, TimeoutError, RedisError) as e:
290+
print(f"Error getting busy status for {agent_name}: {e}")
291+
return None
292+
293+
def wait_agents_free(
294+
self, agents_name: list[str], check_interval: float = 0.5, timeout: Optional[float] = None
295+
) -> bool:
296+
"""Wait until all specified agents become free (busy=False).
297+
298+
Args:
299+
agents_name: List of agent names to monitor
300+
check_interval: Seconds between status checks (default: 0.5)
301+
timeout: Maximum wait time in seconds (None = no timeout)
302+
303+
Returns:
304+
bool:
305+
- True if all agents became free
306+
- False if timeout occurred
307+
308+
Example:
309+
>>> # Wait for robot1 and robot2 to become free
310+
>>> success = coll.wait_agent_free(["robot1", "robot2"])
311+
>>> if success:
312+
>>> print("All agents are now available")
313+
"""
314+
start_time = time.time()
315+
316+
try:
317+
redis_client = self._get_conn()
318+
319+
while True:
320+
# Check timeout
321+
if timeout is not None and (time.time() - start_time) > timeout:
322+
return False
323+
324+
# Get all statuses in one atomic operation
325+
statuses = redis_client.hmget("AGENT_BUSY", agents_name)
326+
327+
# Check if all are free (None means no record = considered free)
328+
all_free = True
329+
for status in statuses:
330+
if status is not None and bool(int(status)):
331+
all_free = False
332+
break
333+
334+
if all_free:
335+
return True
336+
337+
# Wait before next check
338+
time.sleep(check_interval)
339+
340+
except (ConnectionError, TimeoutError, RedisError) as e:
341+
print(f"Error while waiting for agent status: {e}")
342+
return False
343+
344+
# ----------------- Close Connection -----------------
345+
def _close_db(self) -> None:
346+
"""Close the Redis connection pool."""
347+
self.pool.disconnect()
348+
print("Redis connection pool closed.")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .collaborator import Collaborator

0 commit comments

Comments
 (0)