@@ -53,6 +53,7 @@ def __init__(self,
5353 openai_extra_kwargs : Dict | None = None ,
5454 stream : bool = True ,
5555 stream_chunk_size : int = 1 ,
56+ timeout : int = 3600 ,
5657 max_workers : Optional [int ] = None ):
5758
5859 super ().__init__ (
@@ -82,6 +83,7 @@ def __init__(self,
8283 self .stream = stream
8384 self .stream_chunk_size = stream_chunk_size
8485 self .openai_extra_kwargs = openai_extra_kwargs
86+ self .timeout = timeout
8587
8688 def _create_fresh_client (self ):
8789 """Create a fresh OpenAI client for each request to avoid
@@ -116,18 +118,17 @@ def _create_fresh_client(self):
116118 return OpenAI (
117119 base_url = self .openai_api_base ,
118120 api_key = current_key ,
119- http_client = httpx .Client (
120- ** http_client_cfg ,
121- timeout = httpx .Timeout (3600.0 ) # 1 hour timeout
122- ) if http_client_cfg or True else None ,
121+ http_client = httpx .Client (** http_client_cfg ,
122+ timeout = httpx .Timeout (self .timeout ))
123+ if http_client_cfg or True else None ,
123124 )
124125
125126 def _generate (
126- self ,
127- input : PromptList | str ,
128- max_out_len : int ,
129- temperature : float ,
130- timeout : int = 3600 , # Set timeout to 1 hour
127+ self ,
128+ input : PromptList | str ,
129+ max_out_len : int ,
130+ temperature : float ,
131+ # timeout: int = 3600, # Set timeout to 1 hour
131132 ) -> str :
132133 """Generate results with streaming support.
133134
@@ -196,7 +197,7 @@ def _generate(
196197
197198 # Handle streaming response with shorter timeout
198199 response_stream = fresh_client .chat .completions .create (
199- ** query_data , timeout = timeout )
200+ ** query_data , timeout = self . timeout )
200201
201202 result = self ._handle_stream_response (
202203 response_stream , thread_id if self .verbose else None )
@@ -210,7 +211,7 @@ def _generate(
210211 else :
211212 # Fallback to non-streaming (use parent method)
212213 return super ()._generate (input , max_out_len , temperature ,
213- timeout )
214+ self . timeout )
214215
215216 except (BadRequestError , APIStatusError ) as e :
216217 status_code = e .status_code
@@ -250,6 +251,7 @@ def _handle_stream_response(self, response_stream, thread_id=None) -> str:
250251 Returns:
251252 str: Complete generated text from all chunks
252253 """
254+ finish_reason = None
253255 completion_chunks = []
254256 reasoning_content = ''
255257 chunk_count = 0
@@ -269,8 +271,7 @@ def log_with_thread(message, level='info'):
269271 current_time = time .time ()
270272
271273 # Add timeout check for stuck streams
272- # 1 hour timeout for streaming
273- if current_time - start_time > 3600 :
274+ if current_time - start_time > self .timeout :
274275 log_with_thread (
275276 f'Streaming timeout after '
276277 f'{ current_time - start_time :.1f} s, '
@@ -301,6 +302,7 @@ def log_with_thread(message, level='info'):
301302
302303 # Check if streaming is finished
303304 if chunk .choices [0 ].finish_reason is not None :
305+ finish_reason = chunk .choices [0 ].finish_reason
304306 if self .verbose :
305307 print () # Add newline after streaming complete
306308 elapsed = current_time - start_time
@@ -334,6 +336,18 @@ def log_with_thread(message, level='info'):
334336 # Combine reasoning content and regular content
335337 complete_content = '' .join (completion_chunks )
336338
339+ if finish_reason is None :
340+ elapsed = time .time () - start_time
341+ log_with_thread (
342+ f'Stream ended without finish_reason (possible truncation). '
343+ f'elapsed={ elapsed :.1f} s chunks={ chunk_count } '
344+ f'content_len={ sum (len (x ) for x in completion_chunks )} '
345+ f'reasoning_len={ len (reasoning_content )} ' ,
346+ 'error' ,
347+ )
348+ raise RuntimeError (
349+ 'Streaming ended without finish_reason (truncated).' )
350+
337351 if self .verbose :
338352 log_with_thread (f'Stream processing complete. Content length: '
339353 f'{ len (complete_content )} , '
0 commit comments