-
Notifications
You must be signed in to change notification settings - Fork 252
Expand file tree
/
Copy pathconnection_manager.py
More file actions
236 lines (210 loc) · 9.31 KB
/
connection_manager.py
File metadata and controls
236 lines (210 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import logging
from typing import Any, List, Optional, Type, Dict
from src.connections.base_connection import BaseConnection
from src.connections.anthropic_connection import AnthropicConnection
from src.connections.eternalai_connection import EternalAIConnection
from src.connections.goat_connection import GoatConnection
from src.connections.groq_connection import GroqConnection
from src.connections.openai_connection import OpenAIConnection
from src.connections.twitter_connection import TwitterConnection
from src.connections.farcaster_connection import FarcasterConnection
from src.connections.ollama_connection import OllamaConnection
from src.connections.echochambers_connection import EchochambersConnection
from src.connections.solana_connection import SolanaConnection
from src.connections.hyperbolic_connection import HyperbolicConnection
from src.connections.galadriel_connection import GaladrielConnection
from src.connections.sonic_connection import SonicConnection
from src.connections.discord_connection import DiscordConnection
from src.connections.allora_connection import AlloraConnection
from src.connections.xai_connection import XAIConnection
from src.connections.ethereum_connection import EthereumConnection
from src.connections.together_connection import TogetherAIConnection
from src.connections.evm_connection import EVMConnection
from src.connections.perplexity_connection import PerplexityConnection
from src.connections.debridge_connection import DebridgeConnection
from src.connections.monad_connection import MonadConnection
logger = logging.getLogger("connection_manager")
class ConnectionManager:
def __init__(self, agent_config):
self.connections: Dict[str, BaseConnection] = {}
for config in agent_config:
self._register_connection(config)
@staticmethod
def _class_name_to_type(class_name: str) -> Type[BaseConnection]:
if class_name == "twitter":
return TwitterConnection
elif class_name == "anthropic":
return AnthropicConnection
elif class_name == "openai":
return OpenAIConnection
elif class_name == "farcaster":
return FarcasterConnection
elif class_name == "groq":
return GroqConnection
elif class_name == "eternalai":
return EternalAIConnection
elif class_name == "ollama":
return OllamaConnection
elif class_name == "echochambers":
return EchochambersConnection
elif class_name == "goat":
return GoatConnection
elif class_name == "solana":
return SolanaConnection
elif class_name == "hyperbolic":
return HyperbolicConnection
elif class_name == "galadriel":
return GaladrielConnection
elif class_name == "sonic":
return SonicConnection
elif class_name == "discord":
return DiscordConnection
elif class_name == "allora":
return AlloraConnection
elif class_name == "xai":
return XAIConnection
elif class_name == "ethereum":
return EthereumConnection
elif class_name == "together":
return TogetherAIConnection
elif class_name == "evm":
return EVMConnection
elif class_name == "perplexity":
return PerplexityConnection
elif class_name == "debridge":
return DebridgeConnection
elif class_name == "monad":
return MonadConnection
return None
def _register_connection(self, config_dic: Dict[str, Any]) -> None:
"""
Create and register a new connection with configuration
Args:
name: Identifier for the connection
connection_class: The connection class to instantiate
config: Configuration dictionary for the connection
"""
try:
name = config_dic["name"]
connection_class = self._class_name_to_type(name)
if name == "debridge":
connection = DebridgeConnection(config_dic, self.connections)
else:
connection = connection_class(config_dic)
self.connections[name] = connection
except Exception as e:
logging.error(f"Failed to initialize connection {name}: {e}")
def _check_connection(self, connection_string: str) -> bool:
try:
connection = self.connections[connection_string]
return connection.is_configured(verbose=True)
except KeyError:
logging.error(
"\nUnknown connection. Try 'list-connections' to see all supported connections."
)
return False
except Exception as e:
logging.error(f"\nAn error occurred: {e}")
return False
def configure_connection(self, connection_name: str) -> bool:
"""Configure a specific connection"""
try:
connection = self.connections[connection_name]
success = connection.configure()
if success:
logging.info(
f"\n✅ SUCCESSFULLY CONFIGURED CONNECTION: {connection_name}"
)
else:
logging.error(f"\n❌ ERROR CONFIGURING CONNECTION: {connection_name}")
return success
except KeyError:
logging.error(
"\nUnknown connection. Try 'list-connections' to see all supported connections."
)
return False
except Exception as e:
logging.error(f"\nAn error occurred: {e}")
return False
def list_connections(self) -> None:
"""List all available connections and their status"""
logging.info("\nAVAILABLE CONNECTIONS:")
for name, connection in self.connections.items():
status = (
"✅ Configured" if connection.is_configured() else "❌ Not Configured"
)
logging.info(f"- {name}: {status}")
def list_actions(self, connection_name: str) -> None:
"""List all available actions for a specific connection"""
try:
connection = self.connections[connection_name]
if connection.is_configured():
logging.info(
f"\n✅ {connection_name} is configured. You can use any of its actions."
)
else:
logging.info(
f"\n❌ {connection_name} is not configured. You must configure a connection to use its actions."
)
logging.info("\nAVAILABLE ACTIONS:")
for action_name, action in connection.actions.items():
logging.info(f"- {action_name}: {action.description}")
logging.info(" Parameters:")
for param in action.parameters:
req = "required" if param.required else "optional"
logging.info(f" - {param.name} ({req}): {param.description}")
except KeyError:
logging.error(
"\nUnknown connection. Try 'list-connections' to see all supported connections."
)
except Exception as e:
logging.error(f"\nAn error occurred: {e}")
def perform_action(
self, connection_name: str, action_name: str, params: List[Any]
) -> Optional[Any]:
"""Perform an action on a specific connection with given parameters"""
try:
connection = self.connections[connection_name]
if not connection.is_configured():
logging.error(
f"\nError: Connection '{connection_name}' is not configured"
)
return None
if action_name not in connection.actions:
logging.error(
f"\nError: Unknown action '{action_name}' for connection '{connection_name}'"
)
return None
action = connection.actions[action_name]
# Convert list of params to kwargs dictionary, handling both required and optional params
kwargs = {}
param_index = 0
# Add provided parameters up to the number provided
for i, param in enumerate(action.parameters):
if param_index < len(params):
kwargs[param.name] = params[param_index]
param_index += 1
# Validate all required parameters are present
missing_required = [
param.name
for param in action.parameters
if param.required and param.name not in kwargs
]
if missing_required:
logging.error(
f"\nError: Missing required parameters: {', '.join(missing_required)}"
)
return None
return connection.perform_action(action_name, kwargs)
except Exception as e:
logging.error(
f"\nAn error occurred while trying action {action_name} for {connection_name} connection: {e}"
)
return None
def get_model_providers(self) -> List[str]:
"""Get a list of all LLM provider connections"""
return [
name
for name, conn in self.connections.items()
if conn.is_configured() and getattr(conn, "is_llm_provider", lambda: False)
]