99import pickle
1010import uuid
1111
12+ from lightllm .server .reasoning_parser import ReasoningParser
13+
1214from .function_call_parser import TOOLS_TAG_LIST , FunctionCallParser , ToolCallItem
1315from .build_prompt import build_prompt , init_tokenizer
1416
1719from http import HTTPStatus
1820from PIL import Image
1921import multiprocessing as mp
20- from typing import AsyncGenerator , Union , List , Dict
22+ from typing import Any , AsyncGenerator , Optional , Union , List , Dict
2123from typing import Callable
2224from lightllm .server import TokenLoad
2325from fastapi import BackgroundTasks , FastAPI , Request , WebSocket , WebSocketDisconnect
@@ -109,6 +111,39 @@ def _get_history_tool_calls_cnt(request: ChatCompletionRequest) -> int:
109111 return idx
110112
111113
114+ def _get_reasoning_from_request (request : ChatCompletionRequest ) -> bool :
115+ """Judge whether the request needs reasoning"""
116+ reasoning_parser = get_env_start_args ().get ("reasoning_parser" , None )
117+ if not reasoning_parser :
118+ return False
119+ if reasoning_parser in ["deepseek-v3" ]:
120+ return request .chat_template_kwargs is not None and request .chat_template_kwargs .get ("thinking" ) is True
121+ if reasoning_parser in ["qwen3" , "glm45" , "nano_v3" , "interns1" ]:
122+ # qwen3, glm45, nano_v3, and interns1 are reasoning by default
123+ return not request .chat_template_kwargs or request .chat_template_kwargs .get ("enable_thinking" , True ) is True
124+ return True # default
125+
126+
127+ def _process_reasoning_stream (
128+ self ,
129+ index : int ,
130+ delta : str ,
131+ reasoning_parser_dict : Dict [int , ReasoningParser ],
132+ content : Dict [str , Any ],
133+ request : ChatCompletionRequest ,
134+ ) -> tuple [Optional [str ], str ]:
135+ """Process reasoning content in streaming response"""
136+ if index not in reasoning_parser_dict :
137+ is_force_reasoning = _get_reasoning_from_request (request )
138+ reasoning_parser_dict [index ] = ReasoningParser (
139+ self .reasoning_parser ,
140+ request .stream_reasoning ,
141+ is_force_reasoning ,
142+ )
143+ reasoning_parser = reasoning_parser_dict [index ]
144+ return reasoning_parser .parse_stream_chunk (delta )
145+
146+
112147async def chat_completions_impl (request : ChatCompletionRequest , raw_request : Request ) -> Response :
113148 from .api_http import g_objs
114149
@@ -226,10 +261,30 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
226261
227262 finish_reason = finish_reason_dict [sub_req_id ]
228263 text = "" .join (final_output_dict [sub_req_id ])
264+
265+ # Handle reasoning content
266+ reasoning_text = None
267+ reasoning_parser = get_env_start_args ().get ("reasoning_parser" , None )
268+ if reasoning_parser and request .separate_reasoning :
269+ is_force_reasoning = _get_reasoning_from_request (request )
270+ try :
271+ parser = ReasoningParser (
272+ model_type = reasoning_parser ,
273+ stream_reasoning = False ,
274+ force_reasoning = is_force_reasoning ,
275+ )
276+ reasoning_text , text = parser .parse_non_stream (text )
277+ except Exception as e :
278+ logger .error (f"Reasoning parsing error: { e } " )
279+ return create_error_response (
280+ HTTPStatus .BAD_REQUEST ,
281+ "Failed to parse fc related info to json format!" ,
282+ )
283+
284+ # Handle tool_calls parsing
229285 tool_calls = None
230286 tool_choice = request .tool_choice
231287 tools = request .tools
232-
233288 if tool_choice != "none" and any ([i in text for i in TOOLS_TAG_LIST ]):
234289 if finish_reason == "stop" :
235290 finish_reason = "tool_calls"
@@ -257,7 +312,12 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
257312 )
258313 if finish_reason == "tool_calls" :
259314 text = ""
260- chat_message = ChatMessage (role = "assistant" , content = text , tool_calls = tool_calls )
315+ chat_message = ChatMessage (
316+ role = "assistant" ,
317+ content = text if text else None ,
318+ tool_calls = tool_calls ,
319+ reasoning_content = reasoning_text if reasoning_text else None ,
320+ )
261321 choice = ChatCompletionResponseChoice (
262322 index = i ,
263323 message = chat_message ,
@@ -273,6 +333,7 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
273333 return create_error_response (HTTPStatus .BAD_REQUEST , "stream api only support n = 1" )
274334
275335 parser_dict = {}
336+ reasoning_parser_dict = {}
276337
277338 # Streaming case
278339 async def stream_results () -> AsyncGenerator [bytes , None ]:
@@ -284,12 +345,29 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
284345 async for sub_req_id , request_output , metadata , finish_status in results_generator :
285346 prompt_tokens = metadata ["prompt_tokens" ]
286347 completion_tokens += 1
287- if request .tool_choice != "none" and request .tools :
288- delta = request_output
289- group_request_id = convert_sub_id_to_group_id (sub_req_id )
290- index = sub_req_id
291- finish_reason = finish_status .get_finish_reason ()
348+ group_request_id = convert_sub_id_to_group_id (sub_req_id )
349+ index = sub_req_id
350+ delta = request_output
351+ finish_reason = finish_status .get_finish_reason ()
292352
353+ # Handle reasoning content
354+ if get_env_start_args ("reasoning_parser" , None ) and request .separate_reasoning :
355+ reasoning_text , delta = _process_reasoning_stream (index , delta , reasoning_parser_dict , content , request )
356+ if reasoning_text :
357+ choice_data = ChatCompletionStreamResponseChoice (
358+ index = 0 ,
359+ delta = DeltaMessage (reasoning_content = reasoning_text ),
360+ finish_reason = None ,
361+ )
362+ chunk = ChatCompletionStreamResponse (
363+ id = group_request_id ,
364+ created = created_time ,
365+ choices = [choice_data ],
366+ model = request .model ,
367+ )
368+ yield f"data: { chunk .model_dump_json ()} \n \n "
369+
370+ if request .tool_choice != "none" and request .tools :
293371 if index not in parser_dict :
294372 # 为 tool_call_parser 提供默认值
295373 tool_parser = getattr (g_objs .args , "tool_call_parser" , None ) or "llama3"
0 commit comments