Skip to content

Commit c3869f4

Browse files
author
Codex
committed
chore: update shell command timeout to 3600 seconds across configuration files and documentation
1 parent 7b9fecd commit c3869f4

File tree

11 files changed

+195
-15
lines changed

11 files changed

+195
-15
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ OPENROUTER_API_KEY=sk-or-your-openrouter-api-key-here
3939
# Custom base URL for providers (e.g., local LLM servers)
4040
# BASE_URL=https://your-custom-endpoint/v1
4141

42+
# Shell command timeout in seconds (default: 3600 = 60 minutes)
43+
# SPOON_BOT_SHELL_TIMEOUT=3600
44+
4245
# ======= Session Persistence (optional, default: file) =======
4346
#
4447
# Backend options: file (JSONL, default), sqlite, postgres

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,7 @@ test_spoon.py
7676

7777
# Config file with credentials
7878
config.yaml
79+
80+
workspace/*
81+
sessions/*
82+
memory/*

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ ENV JWT_ACCESS_EXPIRE_MINUTES=15
106106

107107
# --- Agent Settings ---
108108
ENV SPOON_BOT_MAX_ITERATIONS=20
109-
ENV SPOON_BOT_SHELL_TIMEOUT=60
109+
ENV SPOON_BOT_SHELL_TIMEOUT=3600
110110
ENV SPOON_BOT_MAX_OUTPUT=10000
111111
ENV SPOON_BOT_LOG_LEVEL=INFO
112112

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ The agent includes its context budget in the system prompt, allowing it to adjus
503503
│ SPOON-BOT TOOLS │
504504
├──────────────────────────────────────────────────────────────────┤
505505
│ Native OS Tools (Always Available, Priority) │
506-
│ ├── shell Execute commands (60s timeout, 10KB limit)
506+
│ ├── shell Execute commands (3600s timeout, 10KB limit) │
507507
│ ├── read_file Read file contents │
508508
│ ├── write_file Write content to file │
509509
│ ├── edit_file Edit file by replacing text │

config.example.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ agent:
2424
# filesystem:
2525
# command: "npx"
2626
# args: ["-y", "@modelcontextprotocol/server-filesystem", "."]
27-
# shell_timeout: 60
27+
# shell_timeout: 3600
2828
# max_output: 10000
2929
# context_window: 200000
3030
# auto_reload: false # Watch skill paths & config for changes, auto-reload

spoon_bot/agent/loop.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def __init__(
319319
api_key: str | None = None,
320320
base_url: str | None = None,
321321
max_iterations: int = 50,
322-
shell_timeout: int = 90,
322+
shell_timeout: int = 3600,
323323
max_output: int = 10000,
324324
session_key: str = "default",
325325
skill_paths: list[Path | str] | None = None,
@@ -672,8 +672,9 @@ async def initialize(self) -> None:
672672
# Initialize agent
673673
await self._agent.initialize()
674674

675-
# Increase default step timeout — on-chain txs (cast send) can take 60s+
676-
self._agent._default_timeout = 300.0
675+
# Keep the agent's per-step timeout aligned with the configured shell timeout
676+
# so long-running commands are not cancelled prematurely by the outer loop.
677+
self._agent._default_timeout = max(300.0, float(self.shell_timeout))
677678

678679
self._initialized = True
679680
active_count = len(self.tools.get_active_tools())

spoon_bot/agent/tools/self_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def _get_defaults(self) -> dict[str, Any]:
204204
return {
205205
"model": "claude-sonnet-4.6",
206206
"max_iterations": 50,
207-
"shell_timeout": 60,
207+
"shell_timeout": 3600,
208208
"max_output": 10000,
209209
"provider": "anthropic",
210210
}

spoon_bot/agent/tools/shell.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ class ShellTool(Tool):
372372

373373
def __init__(
374374
self,
375-
timeout: int = 60,
375+
timeout: int = 3600,
376376
max_output: int = 10000,
377377
working_dir: str | None = None,
378378
whitelist_mode: bool = False,
@@ -389,7 +389,7 @@ def __init__(
389389
Initialize shell tool.
390390
391391
Args:
392-
timeout: Command timeout in seconds (default 60).
392+
timeout: Command timeout in seconds (default 3600).
393393
max_output: Maximum output characters (default 10000).
394394
working_dir: Default working directory.
395395
whitelist_mode: If True, only allow whitelisted commands.

spoon_bot/bus/events.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class InboundMessage:
2424
timestamp: datetime = field(default_factory=datetime.now)
2525
media: list[str] = field(default_factory=list)
2626
metadata: dict[str, Any] = field(default_factory=dict)
27+
# Sequence number assigned by MessageBus for latest-wins ordering.
28+
# 0 means not yet assigned.
29+
_bus_seq: int = field(default=0, repr=False)
2730

2831
@property
2932
def has_media(self) -> bool:

spoon_bot/bus/queue.py

Lines changed: 172 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class MessageBus:
2222
- Handler registration
2323
- Channel routing
2424
- Error handling
25+
- **Latest-wins per session**: when a new message arrives for a session
26+
that already has an in-flight or queued message, the older message is
27+
cancelled/skipped and only the newest message is processed.
2528
"""
2629

2730
def __init__(self, max_queue_size: int = 100, max_concurrency: int = 4):
@@ -41,6 +44,21 @@ def __init__(self, max_queue_size: int = 100, max_concurrency: int = 4):
4144
self._semaphore = asyncio.Semaphore(max_concurrency)
4245
self._active_tasks: set[asyncio.Task] = set()
4346

47+
# Message coalescing: when multiple messages arrive for the same
48+
# session before processing begins, they are merged into a single
49+
# message so the agent sees the full context (e.g. a follow-up
50+
# clarification is kept together with the original request).
51+
# If a task is already in-flight, it is cancelled and the new
52+
# (merged) message is processed instead.
53+
self._seq_counter: int = 0
54+
self._latest_seq: dict[str, int] = {}
55+
self._session_locks: dict[str, asyncio.Lock] = {}
56+
self._session_tasks: dict[str, asyncio.Task] = {}
57+
# Per-session accumulator: messages are buffered here on publish()
58+
# and drained at processing time so that all pending messages for a
59+
# session are coalesced into one.
60+
self._session_buffer: dict[str, list[InboundMessage]] = {}
61+
4462
def set_handler(self, handler: MessageHandler) -> None:
4563
"""
4664
Set the message handler (typically the agent).
@@ -74,12 +92,39 @@ async def publish(self, message: InboundMessage) -> bool:
7492
would freeze the channel's event handler (e.g. Discord gateway
7593
heartbeats), potentially causing a disconnect.
7694
95+
**Message coalescing**: the message is added to a per-session
96+
buffer *and* enqueued. When processing starts, all buffered
97+
messages for the session are merged into one so the agent sees
98+
the full context (follow-ups, corrections, etc.). If a task is
99+
already running for this session, it is cancelled and the new
100+
(coalesced) message takes over.
101+
77102
Args:
78103
message: Inbound message from a channel.
79104
80105
Returns:
81106
True if the message was enqueued, False if the queue is full.
82107
"""
108+
# Assign a sequence number for ordering
109+
self._seq_counter += 1
110+
message._bus_seq = self._seq_counter
111+
session_key = message.session_key or message.channel
112+
self._latest_seq[session_key] = self._seq_counter
113+
114+
# Accumulate in per-session buffer for coalescing at processing time
115+
self._session_buffer.setdefault(session_key, []).append(message)
116+
117+
# Cancel the currently running task for this session (if any).
118+
# The cancelled task will release its session lock, allowing the
119+
# new (coalesced) message to proceed once it is dequeued.
120+
existing_task = self._session_tasks.get(session_key)
121+
if existing_task and not existing_task.done():
122+
existing_task.cancel()
123+
logger.info(
124+
f"Cancelling in-flight task for session {session_key} — "
125+
f"newer message arrived: {message.content[:50]}..."
126+
)
127+
83128
try:
84129
self._queue.put_nowait(message)
85130
logger.debug(f"Published message from {message.channel}: {message.content[:50]}...")
@@ -110,6 +155,9 @@ async def _process_message(self, message: InboundMessage) -> None:
110155
else:
111156
logger.warning(f"No outbound handler for channel: {target_channel}")
112157

158+
except asyncio.CancelledError:
159+
# Let CancelledError propagate — the caller handles it.
160+
raise
113161
except Exception as e:
114162
logger.error(f"Error processing message: {e}")
115163
# Send error response to ensure channel cleanup (typing, reactions).
@@ -129,11 +177,132 @@ async def _process_message(self, message: InboundMessage) -> None:
129177
except Exception as send_err:
130178
logger.error(f"Failed to send error response: {send_err}")
131179

180+
def _get_session_lock(self, session_key: str) -> asyncio.Lock:
181+
"""Return (or create) a per-session lock for serialised processing."""
182+
lock = self._session_locks.get(session_key)
183+
if lock is None:
184+
lock = asyncio.Lock()
185+
self._session_locks[session_key] = lock
186+
return lock
187+
188+
@staticmethod
189+
def _coalesce_messages(messages: list[InboundMessage]) -> InboundMessage:
190+
"""Merge a list of messages into one, preserving the latest metadata.
191+
192+
The content of all messages is joined with newlines so the agent
193+
sees the full context. Media attachments are concatenated. All
194+
other fields (channel, session_key, metadata, …) are taken from
195+
the **last** message since it is the most recent user intent.
196+
"""
197+
if len(messages) == 1:
198+
return messages[0]
199+
200+
base = messages[-1] # newest message is the base
201+
merged_content = "\n".join(m.content for m in messages)
202+
203+
# Merge media from all messages (deduplicated, order preserved)
204+
seen: set[str] = set()
205+
merged_media: list[str] = []
206+
for m in messages:
207+
for path in m.media:
208+
if path not in seen:
209+
seen.add(path)
210+
merged_media.append(path)
211+
212+
# Build the coalesced message from the newest, replacing content/media
213+
coalesced = InboundMessage(
214+
content=merged_content,
215+
channel=base.channel,
216+
session_key=base.session_key,
217+
sender_id=base.sender_id,
218+
sender_name=base.sender_name,
219+
message_id=base.message_id,
220+
timestamp=base.timestamp,
221+
media=merged_media,
222+
metadata=base.metadata.copy() if base.metadata else {},
223+
)
224+
coalesced._bus_seq = base._bus_seq
225+
return coalesced
226+
132227
async def _process_with_semaphore(self, message: InboundMessage) -> None:
133-
"""Process a single message under the concurrency semaphore."""
228+
"""Process a single message under the concurrency semaphore.
229+
230+
**Per-session serialisation**: messages belonging to the same
231+
``session_key`` are processed one at a time via a per-session lock
232+
so that a fast second message cannot run concurrently with the
233+
first.
234+
235+
**Message coalescing**: before starting actual work, all pending
236+
messages for this session are drained from ``_session_buffer``
237+
and merged into one. This means follow-up messages ("also use
238+
TypeScript") are kept together with the original request.
239+
240+
If this trigger message is not the latest for its session (i.e.
241+
a newer trigger was already enqueued), it yields to the newer
242+
trigger which will perform the coalescing instead.
243+
244+
**Cancellation-safe**: if this task is cancelled (because a newer
245+
message triggered cancellation via ``publish()``), the session
246+
lock and semaphore are properly released and the queue bookkeeping
247+
is maintained.
248+
"""
249+
session_key = message.session_key or message.channel
250+
session_lock = self._get_session_lock(session_key)
251+
134252
try:
135-
async with self._semaphore:
136-
await self._process_message(message)
253+
# Acquire per-session lock first (no semaphore slot consumed
254+
# while waiting, so other sessions are not starved).
255+
async with session_lock:
256+
# Only the trigger with the highest seq should coalesce
257+
# and process. Earlier triggers for the same session
258+
# exit here — the latest trigger will pick up all
259+
# buffered messages.
260+
msg_seq = message._bus_seq
261+
latest = self._latest_seq.get(session_key, 0)
262+
if msg_seq < latest:
263+
logger.info(
264+
f"Skipping earlier trigger (seq={msg_seq}, "
265+
f"latest={latest}) for session {session_key}"
266+
)
267+
return
268+
269+
# Drain the per-session buffer and coalesce
270+
buffered = self._session_buffer.pop(session_key, [])
271+
if buffered:
272+
message = self._coalesce_messages(buffered)
273+
if len(buffered) > 1:
274+
logger.info(
275+
f"Coalesced {len(buffered)} messages for "
276+
f"session {session_key}"
277+
)
278+
279+
# Register as the active task for this session
280+
current_task = asyncio.current_task()
281+
self._session_tasks[session_key] = current_task # type: ignore[assignment]
282+
283+
try:
284+
async with self._semaphore:
285+
await self._process_message(message)
286+
except asyncio.CancelledError:
287+
logger.info(
288+
f"Task cancelled for session {session_key}: "
289+
f"{message.content[:50]}..."
290+
)
291+
# Do NOT re-raise inside the session_lock context —
292+
# we want to release the lock cleanly so the next
293+
# message can proceed.
294+
return
295+
finally:
296+
# Only clear if we are still the registered task
297+
if self._session_tasks.get(session_key) is current_task:
298+
self._session_tasks.pop(session_key, None)
299+
except asyncio.CancelledError:
300+
# Cancelled while waiting for the session lock — nothing to
301+
# clean up, just exit silently.
302+
logger.debug(
303+
f"Task cancelled while waiting for lock, "
304+
f"session {session_key}: {message.content[:50]}..."
305+
)
137306
finally:
138307
self._queue.task_done()
139308

0 commit comments

Comments
 (0)