forked from yoheinakajima/babyagi3
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
2703 lines (2266 loc) · 107 KB
/
agent.py
File metadata and controls
2703 lines (2266 loc) · 107 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Unified Message Abstraction Agent with Background Objectives
Core insight: Everything is still a message, but some messages trigger background work.
- User input → message
- Assistant response → message
- Tool execution → message
- Background objective → messages in its own thread
The elegance: objectives are just agent runs in separate threads.
Chat continues while objectives work. Objectives can spawn sub-objectives.
"""
import asyncio
import heapq
import inspect
import json
import logging
import threading
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, Protocol, runtime_checkable
logger = logging.getLogger(__name__)
import anthropic
from metrics import (
InstrumentedAsyncAnthropic,
AsyncLiteLLMAnthropicAdapter,
MetricsCollector,
set_event_emitter,
track_source,
get_model_for_use_case,
)
from llm_config import get_llm_config, init_llm_config
from scheduler import (
Scheduler, ScheduledTask, Schedule, SchedulerStore,
create_task, parse_schedule, RunRecord
)
from utils.events import EventEmitter
from utils.console import console, VerboseLevel
from utils.collections import ThreadSafeList
from context_budget import (
ContextBudget,
truncate_tool_result,
summarize_tool_result,
SUMMARIZE_THRESHOLD_CHARS,
)
def json_serialize(obj):
"""JSON serializer for objects not serializable by default."""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
# =============================================================================
# Tool Validation
# =============================================================================
class ToolValidationError(Exception):
"""Raised when a tool fails validation during registration."""
pass
class BudgetExceededException(Exception):
"""Raised when an objective exceeds its allocated budget."""
pass
class ObjectiveCancelledException(Exception):
"""Raised when an objective is cancelled during execution."""
pass
@runtime_checkable
class ToolLike(Protocol):
"""Protocol for duck-typed tool validation.
Any object with these attributes can be converted to a Tool.
"""
name: str
fn: Callable
schema: dict
# =============================================================================
# Core Data Model
# =============================================================================
@dataclass
class Objective:
"""
An objective is background work with its own conversation thread.
Simple objectives: "search for X and summarize"
Complex objectives: "research Y, create a report, email it to Z"
The agent handles complexity naturally through its loop.
"""
id: str
goal: str
status: str = "pending" # pending, running, completed, failed, cancelled
thread_id: str = ""
schedule: str | None = None # cron expression for recurring
result: str | None = None
error: str | None = None
created: str = ""
completed: str | None = None
# Priority: 1=highest, 10=lowest (default 5)
priority: int = 5
# Retry configuration
retry_count: int = 0
max_retries: int = 3
last_error: str | None = None
# Error history for retry awareness
error_history: list = field(default_factory=list)
# Budget tracking
budget_usd: float | None = None # Max cost allowed (None = unlimited)
spent_usd: float = 0.0
token_limit: int | None = None # Max tokens allowed (None = unlimited)
tokens_used: int = 0
def __post_init__(self):
if not self.thread_id:
self.thread_id = f"objective_{self.id}"
if not self.created:
self.created = datetime.now().isoformat()
class Tool:
"""A capability the agent can use."""
def __init__(
self,
name: str,
description: str,
parameters: dict,
fn: Callable,
packages: list[str] = None,
env: list[str] = None
):
self.name = name
self.fn = fn
self.packages = packages or []
self.env = env or []
self.schema = {
"name": name,
"description": description,
"input_schema": parameters
}
def execute(self, params: dict, agent: "Agent"):
return self.fn(params, agent)
def check_health(self) -> dict:
"""
Check if this tool's requirements are satisfied.
Returns:
{
"ready": True/False,
"missing_packages": [...],
"missing_env": [...],
}
"""
from tools import check_requirements
return check_requirements(self.packages, self.env)
class Agent(EventEmitter):
"""
The agent: a loop that processes messages, with support for background objectives.
┌─────────────────────────────────────────┐
│ Agent │
│ ┌─────────┐ ┌────────────┐ │
│ │ Threads │ │ Objectives │ │
│ │ (chat) │ │ (bg work) │ │
│ └────┬────┘ └─────┬──────┘ │
│ └──────┬──────┘ │
│ ▼ │
│ ┌────────┐ │
│ │ Tools │ │
│ └────────┘ │
└─────────────────────────────────────────┘
Multi-Channel Architecture:
- Listeners (input): Receive messages from CLI, email, voice, etc.
- Senders (output): Send messages via any channel
- Context: Each message carries channel info and owner status
Events emitted:
- tool_start: {"name": str, "input": dict}
- tool_end: {"name": str, "result": any, "duration_ms": int}
- objective_start: {"id": str, "goal": str}
- objective_end: {"id": str, "status": str, "result": str}
- task_start: {"id": str, "name": str, "goal": str}
- task_end: {"id": str, "status": str, "duration_ms": int}
"""
def __init__(self, model: str = None, load_tools: bool = True, config: dict = None, use_litellm: bool = True):
self.__init_events__() # Initialize event system
# Initialize LLM configuration from config
if config:
init_llm_config(config)
# Get model from config if not provided
llm_config = get_llm_config()
self.model = model or llm_config.agent_model.model_id
# Select client based on configuration
# use_litellm=True (default): Use LiteLLM for multi-provider support (OpenAI, Anthropic, etc.)
# use_litellm=False: Use direct Anthropic client (requires ANTHROPIC_API_KEY)
if use_litellm:
self.client = AsyncLiteLLMAnthropicAdapter()
else:
self.client = InstrumentedAsyncAnthropic()
set_event_emitter(self) # Agent is an EventEmitter, metrics emit through it
self.tools: dict[str, Tool] = {}
self.threads: dict[str, list] = {"main": []}
self.objectives: dict[str, Objective] = {}
self._running_objectives: set[str] = set()
self._lock = asyncio.Lock()
# Concurrency control for objectives
self.MAX_CONCURRENT_OBJECTIVES = 5
self._concurrency_semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_OBJECTIVES)
# Cancellation tokens for stopping running objectives
self._cancellation_tokens: dict[str, asyncio.Event] = {}
# Priority queue for objectives: (priority, timestamp, obj_id)
self._objective_queue: list[tuple[int, float, str]] = []
# Reference to main event loop for tools running in thread pool
# Set when first async operation runs
self._main_loop: asyncio.AbstractEventLoop | None = None
# Per-thread locks for serializing operations on same thread_id
# Prevents race conditions when multiple sources (scheduler, email, etc.)
# access the same conversation thread concurrently
self._thread_locks: dict[str, asyncio.Lock] = {}
# Activity tracking for priority-aware background tasks.
# Updated on every interactive message. Background extraction checks this
# to yield to interactive traffic (prevents extraction from starving chat).
self._last_interactive_activity: float | None = None
# Multi-channel support
self.senders: dict[str, "Sender"] = {} # Channel senders for output
self.config = config or {} # Agent configuration
self._current_context: dict = {} # Context for current message
# Scheduler for recurring and one-time tasks
self.scheduler = Scheduler(executor=self._execute_scheduled_task)
# Memory system - try SQLite, gracefully fall back to in-memory
self.memory = self._initialize_memory()
# Metrics collector for tracking costs and performance
self.metrics_collector = MetricsCollector(
store=self.memory.store if self.memory else None
)
self.metrics_collector.attach(self)
_set_metrics_collector(self.metrics_collector) # Set global for tool access
# Register core tools
self._register_core_tools()
# Load persisted dynamic tools from database (self-improvement)
self._load_persisted_tools()
# Load external tools from /tools directory
if load_tools:
self._load_external_tools()
# Tool context builder for intelligent tool selection
self._tool_context_builder = self._initialize_tool_context()
self._current_tool_selection = None # Cached selection for current turn
# Context window budget management — prevents overflow
self._context_budget = ContextBudget()
# Cache of full (un-truncated) tool results, keyed by tool_use_id.
# Allows retrieval if the LLM needs details that were summarized away.
self._full_tool_results: dict[str, str] = {}
# Fast model for summarizing large tool results
self._summarizer_model = get_model_for_use_case("fast")
def register_sender(self, channel: str, sender):
"""Register a channel sender for outbound messages.
Args:
channel: Channel name (e.g., "email", "sms", "whatsapp")
sender: Sender instance implementing the Sender protocol
"""
self.senders[channel] = sender
def register(
self,
tool: Tool | ToolLike,
emit_event: bool = False,
source_code: str | None = None,
tool_var_name: str | None = None,
category: str = "custom",
is_dynamic: bool = True,
):
"""Register a tool with validation.
Accepts Tool instances or any object matching ToolLike protocol.
Duck-typed objects are automatically converted to Tool instances.
Args:
tool: The tool to register.
emit_event: If True, emit a tool_registered event for persistence.
source_code: Source code for dynamic tools (enables persistence).
tool_var_name: Variable name in source code.
category: Tool category for organization.
is_dynamic: Whether this is a dynamically created tool.
Raises:
ToolValidationError: If tool is malformed or missing required attributes.
"""
validated = self._validate_and_convert(tool)
self.tools[validated.name] = validated
# Emit event for persistence (hooks will handle DB storage)
if emit_event:
self.emit("tool_registered", {
"name": validated.name,
"description": validated.schema.get("description", ""),
"parameters": validated.schema.get("input_schema", {}),
"source_code": source_code,
"packages": getattr(validated, "packages", []),
"env": getattr(validated, "env", []),
"tool_var_name": tool_var_name,
"category": category,
"is_dynamic": is_dynamic,
})
def _validate_and_convert(self, obj) -> Tool:
"""Validate and convert an object to a proper Tool instance.
This provides defense against:
- Custom classes missing required attributes
- Malformed schema dictionaries
- Missing or invalid tool functions
"""
# Already a proper Tool instance
if isinstance(obj, Tool):
return obj
# Check for ToolLike protocol (duck typing)
if isinstance(obj, ToolLike):
# Validate schema structure
schema = obj.schema
if not isinstance(schema, dict):
raise ToolValidationError(
f"Tool '{getattr(obj, 'name', '?')}' has invalid schema: expected dict, got {type(schema).__name__}"
)
if "name" not in schema or "input_schema" not in schema:
raise ToolValidationError(
f"Tool '{obj.name}' schema missing required keys. "
f"Expected 'name', 'description', 'input_schema'. Got: {list(schema.keys())}"
)
# Convert to proper Tool
return Tool(
name=obj.name,
description=schema.get("description", f"Tool: {obj.name}"),
parameters=schema["input_schema"],
fn=obj.fn
)
# Not a Tool and doesn't match protocol - provide helpful error
missing = []
for attr in ("name", "fn", "schema"):
if not hasattr(obj, attr):
missing.append(attr)
if missing:
raise ToolValidationError(
f"Invalid tool object (type: {type(obj).__name__}). "
f"Missing required attributes: {missing}. "
f"Use the Tool class or ensure your class has 'name', 'fn', and 'schema' attributes."
)
# Has attributes but wrong types
raise ToolValidationError(
f"Tool '{getattr(obj, 'name', '?')}' has attributes but failed protocol check. "
f"Verify 'name' is str, 'fn' is callable, and 'schema' is dict."
)
def _initialize_memory(self):
"""Initialize the memory system with automatic SQLite setup.
Falls back gracefully to in-memory storage if SQLite fails.
Returns Memory instance if successful, None otherwise.
"""
from pathlib import Path
memory_config = self.config.get("memory", {})
if memory_config.get("enabled") is False:
console.system("Memory: disabled (config)")
return None
try:
from memory import Memory
from memory.integration import setup_memory_hooks
store_path = memory_config.get("path", "~/.babyagi/memory")
db_path = Path(store_path).expanduser() / "memory.db"
is_new = not db_path.exists()
console.system(f"Memory: {'initializing' if is_new else 'loading'} SQLite database...")
memory = Memory(store_path=store_path)
setup_memory_hooks(self, memory)
self._log_memory_success(memory, db_path.parent, is_new)
return memory
except (ImportError, PermissionError, Exception) as e:
self._log_memory_fallback(e)
return None
def _log_memory_success(self, memory, path, is_new: bool):
"""Log successful memory initialization."""
if is_new:
console.success(f"Memory: created at {path}")
else:
try:
count = memory.store._conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
console.success(f"Memory: loaded ({count} events)")
except Exception as e:
logger.debug("Could not count memory events: %s", e)
console.success("Memory: loaded (SQLite persistent)")
def _log_memory_fallback(self, error: Exception):
"""Log memory fallback due to error."""
if isinstance(error, ImportError):
console.warning(f"SQLite memory unavailable: {error}")
elif isinstance(error, PermissionError):
console.error(f"Cannot access memory directory: {error}")
else:
console.error(f"Memory initialization failed: {error}")
console.system("Memory: using in-memory storage (session only)")
def _register_core_tools(self):
"""Register the built-in tools."""
# Use enhanced memory tool if SQLite memory is available
if self.memory is not None:
from memory.integration import create_enhanced_memory_tool
self.register(create_enhanced_memory_tool(self.memory))
else:
self.register(_memory_tool(self))
self.register(_objective_tool(self))
self.register(_notes_tool(self))
self.register(_schedule_tool(self))
self.register(_register_tool_tool(self))
self.register(_send_message_tool(self))
# Register skill and composio management tools
try:
from tools.skills import get_skill_tools
for tool in get_skill_tools(self, Tool):
self.register(tool)
except ImportError:
pass # Skills module not available
def _load_persisted_tools(self):
"""Load dynamically-created tools from the database on startup.
This enables self-improvement: tools the agent creates persist
across restarts and are automatically reloaded.
Handles three tool types:
- "executable": Python code that runs directly
- "skill": Returns behavioral instructions when called
- "composio": Thin wrapper that calls Composio library
Tools that fail to load are disabled rather than crashing.
"""
if self.memory is None:
return
try:
tool_defs = self.memory.store.get_dynamic_tool_definitions(enabled_only=True)
if not tool_defs:
return
loaded = 0
failed = 0
skills_loaded = 0
composio_loaded = 0
# Try to import skill helpers
try:
from tools.skills import create_skill_tool, create_composio_tool
skills_available = True
except ImportError:
skills_available = False
# Try to get composio client for composio tools
composio_client = None
try:
from composio import Composio
composio_client = Composio()
except (ImportError, Exception) as e:
logger.debug("Composio not available: %s", e)
for tool_def in tool_defs:
try:
tool = None
tool_type = getattr(tool_def, 'tool_type', 'executable') or 'executable'
if tool_type == "skill" and skills_available:
# Skill tool - returns instructions
tool = create_skill_tool(tool_def, Tool)
skills_loaded += 1
elif tool_type == "composio" and composio_client:
# Composio tool - wraps Composio API
tool = create_composio_tool(tool_def, Tool, composio_client)
composio_loaded += 1
else:
# Executable tool - reconstruct from source code
tool = self._reconstruct_tool(tool_def)
if tool:
# Register without emitting event (already persisted)
self.tools[tool.name] = tool
loaded += 1
except Exception as e:
# Disable broken tool rather than crash
failed += 1
try:
self.memory.store.disable_tool(
tool_def.name,
reason=f"Failed to load on startup: {str(e)[:200]}"
)
except Exception as e:
logger.warning("Failed to disable broken tool '%s' in database: %s", tool_def.name, e)
if loaded > 0 or failed > 0:
details = []
if skills_loaded > 0:
details.append(f"{skills_loaded} skills")
if composio_loaded > 0:
details.append(f"{composio_loaded} composio")
exec_count = loaded - skills_loaded - composio_loaded
if exec_count > 0:
details.append(f"{exec_count} executable")
detail_str = f" ({', '.join(details)})" if details else ""
if failed == 0:
console.success(f"Tools: loaded {loaded} persisted tool(s){detail_str}")
else:
console.warning(
f"Tools: loaded {loaded}{detail_str}, disabled {failed} broken tool(s)"
)
except Exception as e:
# Don't crash startup if tool loading fails
console.warning(f"Could not load persisted tools: {e}")
def _reconstruct_tool(self, tool_def) -> Tool | None:
"""
Reconstruct a Tool from its persisted ToolDefinition.
For tools with external packages, creates a sandboxed executor.
For local tools, executes source code to get the function.
Returns:
Tool instance if successful, None if reconstruction fails.
"""
# No source code - can't reconstruct
if not tool_def.source_code:
return None
# Has external packages - needs sandbox execution
if tool_def.packages:
return self._create_sandboxed_tool_from_definition(tool_def)
# Local tool - execute source to get function
try:
# Set up namespace with required imports
namespace = {
"Tool": Tool,
"datetime": datetime,
"json": json,
}
# Execute the source code
exec(tool_def.source_code, namespace)
# Get the tool variable
if tool_def.tool_var_name and tool_def.tool_var_name in namespace:
return namespace[tool_def.tool_var_name]
# Try to find a Tool instance in namespace
for name, value in namespace.items():
if isinstance(value, Tool):
return value
return None
except Exception as e:
raise RuntimeError(f"Failed to reconstruct tool '{tool_def.name}': {e}")
def _create_sandboxed_tool_from_definition(self, tool_def) -> Tool:
"""
Create a sandboxed tool from a ToolDefinition.
Uses e2b sandbox for tools with external package dependencies.
"""
def sandboxed_fn(params: dict, agent) -> dict:
try:
from tools.sandbox import run_in_sandbox
# Build the execution code
exec_code = f'''
{tool_def.source_code}
# Execute the tool function
import json
params = json.loads("""{json.dumps(params)}""")
if "{tool_def.tool_var_name}" in dir():
tool = {tool_def.tool_var_name}
result = tool.fn(params, None)
else:
result = {{"error": "Tool variable not found"}}
print("RESULT:", json.dumps(result))
'''
result = run_in_sandbox(
exec_code,
packages=tool_def.packages,
timeout=120
)
return result
except Exception as e:
return {"error": f"Sandbox execution failed: {str(e)}"}
return Tool(
name=tool_def.name,
description=tool_def.description,
parameters=tool_def.parameters,
fn=sandboxed_fn,
packages=tool_def.packages,
env=tool_def.env,
)
async def _execute_scheduled_task(self, task: ScheduledTask) -> str:
"""Execute a scheduled task - runs as the agent with its own thread."""
import logging
import traceback
logger = logging.getLogger(__name__)
logger.info(f"[Scheduler] Starting task execution: {task.name} (id={task.id})")
# Emit task_start event
self.emit("task_start", {
"id": task.id,
"name": task.name,
"goal": task.goal
})
start_time = time.time()
prompt = f"""Execute this scheduled task: {task.goal}
Task: {task.name}
Schedule: {task.schedule.human_readable()}
Work autonomously. Use tools as needed. When done, provide a brief summary of what you accomplished."""
try:
logger.info(f"[Scheduler] Calling run_async for task: {task.name}")
result = await self.run_async(prompt, task.thread_id)
logger.info(f"[Scheduler] Task completed: {task.name}, result length: {len(result)}")
duration_ms = int((time.time() - start_time) * 1000)
# Emit task_end event (success)
self.emit("task_end", {
"id": task.id,
"name": task.name,
"status": "completed",
"duration_ms": duration_ms
})
return result
except Exception as e:
logger.exception(f"[Scheduler] Task failed: {task.name}, error: {e}")
duration_ms = int((time.time() - start_time) * 1000)
# Emit task_end event (failure)
self.emit("task_end", {
"id": task.id,
"name": task.name,
"status": "failed",
"error": str(e),
"duration_ms": duration_ms
})
# Don't re-raise - let other tasks continue
return f"Task failed: {e}"
def _load_external_tools(self):
"""Load tools from the tools/ folder."""
try:
from tools import get_all_tools
for tool in get_all_tools(Tool):
self.register(tool)
except ImportError:
pass
def _initialize_tool_context(self):
"""Initialize the tool context builder for intelligent tool selection.
The builder selects a subset of relevant tools for each API call instead
of sending all tools. This manages context window usage as the agent
creates more tools over time.
Returns:
ToolContextBuilder if memory is available, None otherwise.
"""
if self.memory is None:
return None
try:
from memory.tool_context import create_tool_context_builder
return create_tool_context_builder(self.memory.store)
except ImportError:
return None
except Exception as e:
logger.debug("Could not create tool context builder, continuing without smart tool selection: %s", e)
return None
def _refresh_tool_selection(self, current_query: str = None, context: dict = None):
"""Refresh the tool selection for the current turn.
Called before making API calls to select which tools to include.
Caches the selection in _current_tool_selection for use by
_tool_schemas() and _system_prompt().
Args:
current_query: The current user query for relevance matching.
context: Channel context with metadata.
"""
if self._tool_context_builder is None:
self._current_tool_selection = None
return
# Extract topics from agent state if available
current_topics = None
if self.memory and hasattr(self.memory, 'store'):
try:
agent_state = self.memory.store.get_agent_state()
if agent_state:
current_topics = agent_state.current_topics
except Exception as e:
logger.debug("Could not retrieve agent state for tool selection: %s", e)
current_channel = context.get("channel") if context else None
self._current_tool_selection = self._tool_context_builder.select_tools(
all_tools=self.tools,
current_query=current_query,
current_topics=current_topics,
current_channel=current_channel,
)
# -------------------------------------------------------------------------
# Message Processing
# -------------------------------------------------------------------------
def run(self, user_input: str, thread_id: str = "main") -> str:
"""
Process user input synchronously.
For async with background objectives, use run_async().
"""
return asyncio.get_event_loop().run_until_complete(
self.run_async(user_input, thread_id)
)
def _get_thread_lock(self, thread_id: str) -> asyncio.Lock:
"""Get or create a lock for a thread_id.
Ensures serialized access to each conversation thread,
preventing race conditions when multiple sources (scheduler,
email, CLI) access the same thread concurrently.
"""
if thread_id not in self._thread_locks:
self._thread_locks[thread_id] = asyncio.Lock()
return self._thread_locks[thread_id]
def _spawn_background_task(self, coro) -> bool:
"""Schedule an async coroutine as a background task from any thread.
This method safely schedules a coroutine on the main event loop,
even when called from a thread pool worker (e.g., during tool execution).
Args:
coro: The coroutine to run as a background task
Returns:
True if the task was scheduled, False if no event loop is available.
The task runs asynchronously; errors should be handled by the coroutine.
"""
if self._main_loop is None:
return False
asyncio.run_coroutine_threadsafe(coro, self._main_loop)
return True
async def run_async(self, user_input: str, thread_id: str = "main", context: dict = None) -> str:
"""Process user input and return response. Objectives run in background.
Args:
user_input: The message to process
thread_id: Conversation thread ID (e.g., "main", "email:123", "voice:session")
context: Channel context dict with:
- channel: Source channel ("cli", "email", "voice", etc.)
- is_owner: Whether message is from the agent's owner
- sender: Sender identifier (email address, phone, etc.)
- Additional channel-specific metadata
Thread Safety:
Operations on each thread_id are serialized via per-thread locks.
This prevents message interleaving when concurrent requests
(e.g., scheduled task + email) target the same thread.
"""
# Capture main event loop for tools that need it
if self._main_loop is None:
self._main_loop = asyncio.get_running_loop()
# Acquire per-thread lock to serialize operations on this thread
async with self._get_thread_lock(thread_id):
context = context or {"channel": "cli", "is_owner": True}
self._current_context = context
# Track interactive activity so background tasks can yield
if not thread_id.startswith("objective_"):
self._last_interactive_activity = time.time()
thread = self.threads.setdefault(thread_id, [])
# Auto-repair corrupted threads (orphaned tool_use without tool_result)
# This prevents "tool_use ids were found without tool_result blocks" errors
if thread:
repair_result = self.repair_thread(thread_id)
if repair_result.get("repaired", 0) > 0:
self.emit("thread_repaired", {
"thread_id": thread_id,
"repaired": repair_result["repaired"]
})
thread.append({"role": "user", "content": user_input})
# Log inbound message to memory for context assembly
self._current_memory_event = None
if self.memory:
try:
from memory.integration import log_message
self._current_memory_event = log_message(
self.memory, user_input, context, "inbound"
)
except Exception as e:
logger.warning("Failed to log inbound message to memory: %s", e)
# Refresh tool selection for this turn
# This selects relevant tools based on query, context, and usage patterns
self._refresh_tool_selection(user_input, context)
# Generate context-aware system prompt
is_owner = context.get("is_owner", True)
system_prompt = self._system_prompt(thread_id, is_owner, context)
while True:
# --- Pre-flight: trim thread to fit context budget ---
tool_schemas = self._tool_schemas()
thread = self._context_budget.trim_thread(thread, system_prompt, tool_schemas)
self.threads[thread_id] = thread # Persist trimmed thread
# If thread budget is still tight, reduce tool schemas
if not self._context_budget.fits(system_prompt, tool_schemas, thread):
tool_schemas = self._context_budget.reduce_tool_schemas(tool_schemas)
thread = self._context_budget.trim_thread(thread, system_prompt, tool_schemas)
self.threads[thread_id] = thread
# --- LLM call with context-overflow recovery ---
try:
response = await self.client.messages.create(
model=self.model,
max_tokens=8096,
system=system_prompt,
tools=tool_schemas,
messages=thread
)
except Exception as e:
if not self._is_context_overflow(e):
raise
# Stage 1 recovery: aggressive trim + reduced tools
logger.warning("Context overflow detected, attempting recovery (stage 1)...")
thread = self._context_budget.emergency_trim(thread)
self.threads[thread_id] = thread
tool_schemas = self._context_budget.reduce_tool_schemas(tool_schemas, max_count=15)
try:
response = await self.client.messages.create(
model=self.model,
max_tokens=8096,
system=system_prompt,
tools=tool_schemas,
messages=thread
)
except Exception as e2:
if not self._is_context_overflow(e2):
raise
# Stage 2 recovery: minimal thread + core tools only
logger.warning("Context overflow persists, attempting recovery (stage 2)...")
thread = thread[-2:] if len(thread) >= 2 else thread[-1:]
self.threads[thread_id] = thread
tool_schemas = self._context_budget.reduce_tool_schemas(tool_schemas, max_count=8)
try:
response = await self.client.messages.create(
model=self.model,
max_tokens=8096,
system=system_prompt,
tools=tool_schemas,
messages=thread
)
except Exception as e3:
if not self._is_context_overflow(e3):
raise
# Stage 3: give up gracefully — clear thread and inform user
logger.error("Context overflow unrecoverable after 3 stages. Clearing thread.")
self.threads[thread_id] = []
return (
"I ran into a context limit and couldn't recover automatically. "
"I've cleared the conversation history so we can continue. "
"Please repeat your request and I'll start fresh."
)
# Track budget and tokens for objectives
if thread_id.startswith("objective_"):
obj_id = thread_id.replace("objective_", "")
await self._track_objective_usage(obj_id, response)
# Check cancellation for objectives
if thread_id.startswith("objective_"):
obj_id = thread_id.replace("objective_", "")
if obj_id in self._cancellation_tokens and self._cancellation_tokens[obj_id].is_set():
raise ObjectiveCancelledException(f"Objective {obj_id} was cancelled")
thread.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
return self._extract_text(response)
# Execute tools (in thread pool to avoid blocking event loop)
# CRITICAL: We must ensure a tool_result is appended for every tool_use,
# even if execution fails. Otherwise the thread becomes corrupted and
# the API will reject subsequent messages.
tool_results = []
for block in response.content:
if block.type == "tool_use":
# Emit tool_start event
self.emit("tool_start", {
"name": block.name,
"input": block.input
})
start_time = time.time()
result = None
error_msg = None
try:
# Check if tool exists
if block.name not in self.tools:
error_msg = f"Tool '{block.name}' not found"
result = {"error": error_msg}
else:
# Check if tool function is async
tool_fn = self.tools[block.name].fn
if inspect.iscoroutinefunction(tool_fn):
# Async tools: await directly (they're I/O bound, not CPU bound)
result = await tool_fn(block.input, self)
else:
# Sync tools: run in thread pool to prevent blocking the event loop
result = await asyncio.to_thread(
self.tools[block.name].execute, block.input, self