forked from agiresearch/AIOS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
388 lines (319 loc) · 13.2 KB
/
utils.py
File metadata and controls
388 lines (319 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
import json
import re
import uuid
from copy import deepcopy
import logging
from typing import List, Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def merge_messages_with_tools(messages: list, tools: list) -> list:
"""
Integrate tool information into the messages for open-sourced LLMs which don't support tool calling.
Args:
messages (list): A list of message dictionaries, each containing at least a "role"
and "content" field. Some messages may contain "tool_calls".
tools (list): A list of available tool definitions, formatted as dictionaries.
Returns:
list: The updated messages list, where:
- Tool call messages are formatted properly for models without built-in tool support.
- Messages indicating tool execution results are transformed into a user message.
- The last message includes an instruction prompt detailing tool usage requirements.
Example:
```python
messages = [
{"role": "user", "content": "Translate 'hello' to French."},
{"role": "assistant", "tool_calls": [{"name": "translate", "parameters": {"text": "hello", "language": "fr"}}]}
]
tools = [{"name": "translate", "description": "Translates text into another language."}]
updated_messages = tool_calling_input_format(messages, tools)
print(updated_messages)
```
"""
tool_prompt = f"""
You have access to the following tools:
{json.dumps(tools)}"""
format_prompt = f"""
To use a tool, respond with a JSON object in the following format:
```json
[{{"name": "tool_name","parameters": {{"arg1": "value1","arg2": "value2"}}}}]
```
Make sure your response is properly formatted as a valid JSON object.
"""
new_messages = deepcopy(messages)
new_messages[-1]["content"] += (tool_prompt + format_prompt)
return new_messages
def merge_messages_with_response_format(messages: list, response_format: dict) -> list:
"""
Format the response format instructions into a string for the prompt.
Args:
response_format: ResponseFormat object with schema information
Returns:
Formatted response format instructions as a string
"""
if response_format:
schema_str = json.dumps(response_format["json_schema"], indent=2)
format_prompt = f"""
You MUST respond with a JSON object that conforms to the following schema:
{schema_str}
Your entire response must be valid JSON without any other text, preamble, or postscript.
Do not use code blocks like ```json or ```. Just return the raw JSON.
"""
else:
format_prompt = """
You MUST respond with a valid JSON object.
Your entire response must be valid JSON without any other text, preamble, or postscript.
Do not use code blocks like ```json or ```. Just return the raw JSON.
"""
new_messages = deepcopy(messages)
new_messages[-1]["content"] += (format_prompt)
return new_messages
def parse_json_format(message: str) -> str:
"""
Extract and parse a JSON object or array from a given string.
Args:
message (str): The input string potentially containing a JSON object or array.
Returns:
str: A string representation of the extracted JSON object or array.
Example:
```python
message = "Here is some data: {\"key\": \"value\"}"
parsed_json = parse_json_format(message)
print(parsed_json) # Output: '{"key": "value"}'
```
"""
json_array_pattern = r"\[\s*\{.*?\}\s*\]"
json_object_pattern = r"\{\s*.*?\s*\}"
match_array = re.search(json_array_pattern, message)
if match_array:
json_array_substring = match_array.group(0)
try:
json_array_data = json.loads(json_array_substring)
return json.dumps(json_array_data)
except json.JSONDecodeError:
pass
match_object = re.search(json_object_pattern, message)
if match_object:
json_object_substring = match_object.group(0)
try:
json_object_data = json.loads(json_object_substring)
return json.dumps(json_object_data)
except json.JSONDecodeError:
pass
return "[]"
def decode_hf_tool_calls(message):
"""
Decode tool call responses from Hugging Face API format.
Args:
message: The response object from Hugging Face API.
Returns:
list: A list of dictionaries, each containing:
- "name": The name of the function being called.
- "parameters": The arguments passed to the function.
- "id": The unique identifier of the tool call.
Example:
```python
response = <Hugging Face API response>
decoded_calls = decode_hf_tool_calls(response)
print(decoded_calls)
# Output: [{'name': 'translate', 'parameters': {'text': 'hello', 'lang': 'fr'}, 'id': 'uuid1234'}]
```
"""
message = message.replace("assistant\n\n", "")
tool_calls = json.loads(message)
for tool_call in tool_calls:
tool_call["id"] = generator_tool_call_id()
return tool_calls
def generator_tool_call_id():
"""
Generate a unique identifier for a tool call.
This function creates a new UUID (Universally Unique Identifier) and returns it as a string.
Returns:
str: A unique tool call ID.
Example:
```python
tool_call_id = generator_tool_call_id()
print(tool_call_id) # Example output: 'f3f2e850-b5d4-11ef-ac7e-96584d5248b2'
```
"""
return str(uuid.uuid4())
def decode_litellm_tool_calls(response):
"""
Decode tool call responses from LiteLLM API format.
Args:
response: The response object from LiteLLM API.
Returns:
list: A list of dictionaries, each containing:
- "name": The name of the function being called.
- "parameters": The arguments passed to the function.
- "id": The unique identifier of the tool call.
Example:
```python
response = <LiteLLM API response>
decoded_calls = decode_litellm_tool_calls(response)
print(decoded_calls)
# Output: [{'name': 'translate', 'parameters': {'text': 'hello', 'lang': 'fr'}, 'id': 'uuid1234'}]
```
"""
decoded_tool_calls = []
if response.choices[0].message.content is None:
assert response.choices[0].message.tool_calls is not None
tool_calls = response.choices[0].message.tool_calls
for tool_call in tool_calls:
parameters = tool_call.function.arguments
if isinstance(parameters, str):
parameters = json.loads(parameters)
decoded_tool_calls.append(
{
"name": tool_call.function.name,
"parameters": parameters,
"id": tool_call.id
}
)
else:
assert response.choices[0].message.content is not None
# Some providers return a JSON string; attempt to parse. If parsing fails, treat as "no tools".
tool_calls = response.choices[0].message.content
if isinstance(tool_calls, str):
try:
parsed = json.loads(tool_calls)
if isinstance(parsed, (list, dict)):
tool_calls = parsed
# Unexpected JSON type → no-op; be forgiving.
else:
logger.info("decode_litellm_tool_calls: unexpected JSON type for tool_calls: %s", type(parsed))
tool_calls = []
except json.JSONDecodeError:
logger.info("decode_litellm_tool_calls: non-JSON tool_calls string, treating as no tools.")
tool_calls = []
if not isinstance(tool_calls, list):
tool_calls = [tool_calls]
try:
for tool_call in tool_calls:
name = None
if "name" in tool_call:
name = tool_call["name"]
elif "function_name" in tool_call:
name = tool_call["function_name"]
elif "tool_name" in tool_call:
name = tool_call["tool_name"]
if name is not None:
parameters = None
if "arguments" in tool_call:
parameters = tool_call["arguments"]
elif "parameters" in tool_call:
parameters = tool_call["parameters"]
if parameters is not None:
decoded_tool_calls.append(
{
"name": name,
"parameters": parameters,
"id": generator_tool_call_id()
}
)
except:
logger.info(f"decode_litellm_tool_calls: no valid attribute in tools, treating as no tools")
return decoded_tool_calls
def parse_tool_calls(message):
"""
Parse and process tool calls from a message string.
Args:
message (str): A JSON string representing tool calls.
Returns:
list: A list of processed tool calls with unique IDs.
Example:
```python
message = '[{"name": "text_translate", "parameters": {"text": "hello", "lang": "fr"}}]'
parsed_calls = parse_tool_calls(message)
print(parsed_calls)
# Output: [{'name': 'text/translate', 'parameters': {'text': 'hello', 'lang': 'fr'}, 'id': 'uuid1234'}]
```
"""
# add tool call id and type for models don't support tool call
# if isinstance(message, dict):
# message = [message]
# tool_calls = json.loads(parse_json_format(message))
tool_calls = json.loads(message)
# breakpoint()
# tool_calls = json.loads(message)
if isinstance(tool_calls, dict):
tool_calls = [tool_calls]
for tool_call in tool_calls:
tool_call["id"] = generator_tool_call_id()
# if "function" in tool_call:
tool_calls = double_underscore_to_slash(tool_calls)
# tool_call["type"] = "function"
return tool_calls
def slash_to_double_underscore(tools):
"""
Convert function names by replacing slashes ("/") with double underscores ("__").
Args:
tools (list): A list of tool dictionaries.
Returns:
list: The updated tools list with function names formatted properly.
Example:
```python
tools = [{"function": {"name": "text/translate"}}]
formatted_tools = slash_to_double_underscore(tools)
print(formatted_tools)
# Output: [{'function': {'name': 'text__translate'}}]
```
"""
for tool in tools:
tool_name = tool["function"]["name"]
if "/" in tool_name:
tool_name = "__".join(tool_name.split("/"))
tool["function"]["name"] = tool_name
return tools
def double_underscore_to_slash(tool_calls):
"""
Convert function names by replacing double underscores ("__") back to slashes ("/").
Args:
tool_calls (list): A list of tool call dictionaries.
Returns:
list: The updated tool calls list with function names restored to their original format.
Example:
```python
tool_calls = [{"name": "text__translate", "parameters": '{"text": "hello", "lang": "fr"}'}]
restored_calls = double_underscore_to_slash(tool_calls)
print(restored_calls)
# Output: [{'name': 'text/translate', 'parameters': {'text': 'hello', 'lang': 'fr'}}]
```
"""
for tool_call in tool_calls:
tool_call["name"] = tool_call["name"].replace("__", "/")
if isinstance(tool_call["parameters"], str):
tool_call["parameters"] = json.loads(tool_call["parameters"])
# tool_call["parameters"] = json.loads(tool_call["parameters"])
return tool_calls
def pre_process_tools(tools):
"""
Pre-process tool definitions by replacing slashes ("/") with double underscores ("__").
Args:
tools (list): A list of tool dictionaries.
Returns:
list: The processed tools list with modified function names.
Example:
```python
tools = [{"function": {"name": "text/translate"}}]
preprocessed_tools = pre_process_tools(tools)
print(preprocessed_tools)
# Output: [{'function': {'name': 'text__translate'}}]
```
"""
for tool in tools:
tool_name = tool["function"]["name"]
if "/" in tool_name:
tool_name = "__".join(tool_name.split("/"))
tool["function"]["name"] = tool_name
return tools
def check_availability_for_selected_llm_lists(available_llm_names: List[str], selected_llm_lists: List[List[Dict[str, Any]]]):
selected_llm_lists_availability = []
for selected_llm_list in selected_llm_lists:
all_available = True
for llm in selected_llm_list:
if llm["name"] not in available_llm_names:
all_available = False
break
selected_llm_lists_availability.append(all_available)
return selected_llm_lists_availability