-
Notifications
You must be signed in to change notification settings - Fork 676
Expand file tree
/
Copy pathllm_adapters.py
More file actions
428 lines (388 loc) · 16 KB
/
llm_adapters.py
File metadata and controls
428 lines (388 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# llm_adapters.py
# -*- coding: utf-8 -*-
import logging
from typing import Optional
from langchain_openai import ChatOpenAI, AzureChatOpenAI
# from google import genai
import google.generativeai as genai
# from google.genai import types
from google.generativeai import types
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.inference.models import SystemMessage, UserMessage
from openai import OpenAI
import requests
def check_base_url(url: str) -> str:
"""
处理base_url的规则:
1. 如果url以#结尾,则移除#并直接使用用户提供的url
2. 否则检查是否需要添加/v1后缀
"""
import re
url = url.strip()
if not url:
return url
if url.endswith('#'):
return url.rstrip('#')
if not re.search(r'/v\d+$', url):
if '/v1' not in url:
url = url.rstrip('/') + '/v1'
return url
class BaseLLMAdapter:
"""
统一的 LLM 接口基类,为不同后端(OpenAI、Ollama、ML Studio、Gemini等)提供一致的方法签名。
"""
def invoke(self, prompt: str) -> str:
raise NotImplementedError("Subclasses must implement .invoke(prompt) method.")
class DeepSeekAdapter(BaseLLMAdapter):
"""
适配官方/OpenAI兼容接口(使用 langchain.ChatOpenAI)
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = ChatOpenAI(
model=self.model_name,
api_key=self.api_key,
base_url=self.base_url,
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
response = self._client.invoke(prompt)
if not response:
logging.warning("No response from DeepSeekAdapter.")
return ""
return response.content
class OpenAIAdapter(BaseLLMAdapter):
"""
适配官方/OpenAI兼容接口(使用 langchain.ChatOpenAI)
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = ChatOpenAI(
model=self.model_name,
api_key=self.api_key,
base_url=self.base_url,
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
response = self._client.invoke(prompt)
if not response:
logging.warning("No response from OpenAIAdapter.")
return ""
return response.content
class GeminiAdapter(BaseLLMAdapter):
"""
适配 Google Gemini (Google Generative AI) 接口
"""
# PenBo 修复新版本google-generativeai 不支持 Client 类问题;而是使用 GenerativeModel 类来访问API
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
# 配置API密钥
genai.configure(api_key=self.api_key)
# 创建生成模型实例
self._model = genai.GenerativeModel(model_name=self.model_name)
def invoke(self, prompt: str) -> str:
try:
# 设置生成配置
generation_config = genai.types.GenerationConfig(
max_output_tokens=self.max_tokens,
temperature=self.temperature,
)
# 生成内容
response = self._model.generate_content(
prompt,
generation_config=generation_config
)
if response and response.text:
return response.text
else:
logging.warning("No text response from Gemini API.")
return ""
except Exception as e:
logging.error(f"Gemini API 调用失败: {e}")
return ""
class AzureOpenAIAdapter(BaseLLMAdapter):
"""
适配 Azure OpenAI 接口(使用 langchain.ChatOpenAI)
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
import re
match = re.match(r'https://(.+?)/openai/deployments/(.+?)/chat/completions\?api-version=(.+)', base_url)
if match:
self.azure_endpoint = f"https://{match.group(1)}"
self.azure_deployment = match.group(2)
self.api_version = match.group(3)
else:
raise ValueError("Invalid Azure OpenAI base_url format")
self.api_key = api_key
self.model_name = self.azure_deployment
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = AzureChatOpenAI(
azure_endpoint=self.azure_endpoint,
azure_deployment=self.azure_deployment,
api_version=self.api_version,
api_key=self.api_key,
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
response = self._client.invoke(prompt)
if not response:
logging.warning("No response from AzureOpenAIAdapter.")
return ""
return response.content
class OllamaAdapter(BaseLLMAdapter):
"""
Ollama 同样有一个 OpenAI-like /v1/chat 接口,可直接使用 ChatOpenAI。
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
if self.api_key == '':
self.api_key= 'ollama'
self._client = ChatOpenAI(
model=self.model_name,
api_key=self.api_key,
base_url=self.base_url,
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
response = self._client.invoke(prompt)
if not response:
logging.warning("No response from OllamaAdapter.")
return ""
return response.content
class MLStudioAdapter(BaseLLMAdapter):
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = ChatOpenAI(
model=self.model_name,
api_key=self.api_key,
base_url=self.base_url,
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
try:
response = self._client.invoke(prompt)
if not response:
logging.warning("No response from MLStudioAdapter.")
return ""
return response.content
except Exception as e:
logging.error(f"ML Studio API 调用超时或失败: {e}")
return ""
class AzureAIAdapter(BaseLLMAdapter):
"""
适配 Azure AI Inference 接口,用于访问Azure AI服务部署的模型
使用 azure-ai-inference 库进行API调用
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
import re
# 匹配形如 https://xxx.services.ai.azure.com/models/chat/completions?api-version=xxx 的URL
match = re.match(r'https://(.+?)\.services\.ai\.azure\.com(?:/models)?(?:/chat/completions)?(?:\?api-version=(.+))?', base_url)
if match:
# endpoint需要是形如 https://xxx.services.ai.azure.com/models 的格式
self.endpoint = f"https://{match.group(1)}.services.ai.azure.com/models"
# 如果URL中包含api-version参数,使用它;否则使用默认值
self.api_version = match.group(2) if match.group(2) else "2024-05-01-preview"
else:
raise ValueError("Invalid Azure AI base_url format. Expected format: https://<endpoint>.services.ai.azure.com/models/chat/completions?api-version=xxx")
self.base_url = self.endpoint # 存储处理后的endpoint URL
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = ChatCompletionsClient(
endpoint=self.endpoint,
credential=AzureKeyCredential(self.api_key),
model=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
try:
response = self._client.complete(
messages=[
SystemMessage("You are a helpful assistant."),
UserMessage(prompt)
]
)
if response and response.choices:
return response.choices[0].message.content
else:
logging.warning("No response from AzureAIAdapter.")
return ""
except Exception as e:
logging.error(f"Azure AI Inference API 调用失败: {e}")
return ""
# 火山引擎实现
class VolcanoEngineAIAdapter(BaseLLMAdapter):
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = OpenAI(
base_url=base_url,
api_key=api_key,
timeout=timeout # 添加超时配置
)
def invoke(self, prompt: str) -> str:
try:
response = self._client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "你是DeepSeek,是一个 AI 人工智能助手"},
{"role": "user", "content": prompt},
],
timeout=self.timeout # 添加超时参数
)
if not response:
logging.warning("No response from DeepSeekAdapter.")
return ""
return response.choices[0].message.content
except Exception as e:
logging.error(f"火山引擎API调用超时或失败: {e}")
return ""
class SiliconFlowAdapter(BaseLLMAdapter):
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = OpenAI(
base_url=base_url,
api_key=api_key,
timeout=timeout # 添加超时配置
)
def invoke(self, prompt: str) -> str:
try:
response = self._client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "你是DeepSeek,是一个 AI 人工智能助手"},
{"role": "user", "content": prompt},
],
timeout=self.timeout # 添加超时参数
)
if not response:
logging.warning("No response from DeepSeekAdapter.")
return ""
return response.choices[0].message.content
except Exception as e:
logging.error(f"硅基流动API调用超时或失败: {e}")
return ""
# grok實現
class GrokAdapter(BaseLLMAdapter):
"""
适配 xAI Grok API
"""
def __init__(self, api_key: str, base_url: str, model_name: str, max_tokens: int, temperature: float = 0.7, timeout: Optional[int] = 600):
self.base_url = check_base_url(base_url)
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.timeout = timeout
self._client = OpenAI(
base_url=self.base_url,
api_key=self.api_key,
timeout=self.timeout
)
def invoke(self, prompt: str) -> str:
try:
response = self._client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are Grok, created by xAI."},
{"role": "user", "content": prompt},
],
max_tokens=self.max_tokens,
temperature=self.temperature,
timeout=self.timeout
)
if response and response.choices:
return response.choices[0].message.content
else:
logging.warning("No response from GrokAdapter.")
return ""
except Exception as e:
logging.error(f"Grok API 调用失败: {e}")
return ""
def create_llm_adapter(
interface_format: str,
base_url: str,
model_name: str,
api_key: str,
temperature: float,
max_tokens: int,
timeout: int
) -> BaseLLMAdapter:
"""
工厂函数:根据 interface_format 返回不同的适配器实例。
"""
fmt = interface_format.strip().lower()
if fmt == "deepseek":
return DeepSeekAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "openai":
return OpenAIAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "azure openai":
return AzureOpenAIAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "azure ai":
return AzureAIAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "ollama":
return OllamaAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "ml studio":
return MLStudioAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "gemini":
return GeminiAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "阿里云百炼":
return OpenAIAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "火山引擎":
return VolcanoEngineAIAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "硅基流动":
return SiliconFlowAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
elif fmt == "grok":
return GrokAdapter(api_key, base_url, model_name, max_tokens, temperature, timeout)
else:
raise ValueError(f"Unknown interface_format: {interface_format}")