1111import uuid
1212from typing import Any , Dict , List , Optional , Sequence , Tuple , Union
1313
14+ import httpx
1415from fastapi import Query , Request
1516from sqlalchemy import func , or_ , select , text
1617from starlette .responses import JSONResponse
3738from utils .logger import logger
3839
3940# Testing configuration
40- MAX_CHUNKS_FOR_TESTING = 5000 # max chunks to collect for testing
4141MAX_DURATION_FOR_TESTING = 120 # max duration to wait for testing
4242
4343MAX_HEADER_ITEMS = 20
@@ -943,8 +943,6 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
943943 Returns:
944944 A dictionary containing the test result.
945945 """
946- import httpx
947-
948946 try :
949947 # Prepare headers
950948 headers = {
@@ -1041,7 +1039,6 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
10411039async def _handle_streaming_response (
10421040 response ,
10431041 full_url : str ,
1044- max_chunks : int = MAX_CHUNKS_FOR_TESTING ,
10451042 max_duration : int = MAX_DURATION_FOR_TESTING ,
10461043) -> Dict :
10471044 """
@@ -1051,12 +1048,33 @@ async def _handle_streaming_response(
10511048 Args:
10521049 response: The response object from the API endpoint.
10531050 full_url: The full URL of the API endpoint.
1054- max_chunks: The maximum number of chunks to collect for testing.
10551051 max_duration: The maximum duration to wait for testing.
10561052 Returns:
10571053 A dictionary containing the streaming response.
10581054 """
1059- stream_data = []
1055+ stream_data : List [str ] = []
1056+ append_chunk = stream_data .append
1057+ test_note = "Streaming connection test completed, only collected partial data for verification"
1058+
1059+ def _build_stream_result (
1060+ note : str , warning : Optional [str ] = None
1061+ ) -> Dict [str , Any ]:
1062+ test_successful = len (stream_data ) > 0
1063+ response_payload : Dict [str , Any ] = {
1064+ "status_code" : response .status_code ,
1065+ "headers" : dict (response .headers ),
1066+ "data" : stream_data ,
1067+ "is_stream" : True ,
1068+ "test_note" : note ,
1069+ }
1070+ if warning :
1071+ response_payload ["warning" ] = warning
1072+ return {
1073+ "status" : "success" if test_successful else "error" ,
1074+ "response" : response_payload ,
1075+ "error" : None if test_successful else "No streaming data received" ,
1076+ }
1077+
10601078 try :
10611079 # If status code is not 200, return error response immediately
10621080 if response .status_code != 200 :
@@ -1082,54 +1100,44 @@ async def _handle_streaming_response(
10821100 "error" : f"HTTP { response .status_code } . { error_content } " ,
10831101 }
10841102
1085- # For testing purposes, we limit the time and data we collect
1103+ # For testing purposes, we limit the time we spend collecting data
1104+ loop = asyncio .get_running_loop ()
1105+ deadline = loop .time () + max_duration
10861106
1087- start_time = asyncio .get_event_loop ().time ()
1088-
1089- # Process streaming data with time and chunk limits
1107+ # Process streaming data with only time limit
10901108 async for chunk in response .aiter_lines ():
10911109 if chunk :
10921110 chunk_str = chunk .strip ()
10931111 if chunk_str :
1094- stream_data .append (chunk_str )
1095-
1096- # For testing, we can return early after getting a few valid chunks
1097- if len (stream_data ) >= max_chunks :
1098- stream_data .append (
1099- f"... (testing completed, collected { len (stream_data )} chunks, connection is normal)"
1100- )
1101- break
1112+ append_chunk (chunk_str )
11021113
11031114 # Check if we've spent too much time
1104- current_time = asyncio .get_event_loop ().time ()
1105- if current_time - start_time > max_duration :
1106- stream_data .append (
1107- f"... (testing time reached { max_duration } seconds, connection is normal)"
1115+ if loop .time () >= deadline :
1116+ test_note = (
1117+ "Streaming connection test completed after reaching the "
1118+ f"{ max_duration } -second limit"
1119+ )
1120+ logger .info (
1121+ f"Stopped streaming test after { max_duration } seconds" ,
11081122 )
11091123 break
11101124
1111- # If we got at least one chunk, the connection is working
1112- test_successful = len (stream_data ) > 0
1113-
1114- return {
1115- "status" : "success" if test_successful else "error" ,
1116- "response" : {
1117- "status_code" : response .status_code ,
1118- "headers" : dict (response .headers ),
1119- "data" : stream_data ,
1120- "is_stream" : True ,
1121- "test_note" : "Streaming connection test completed, only collected partial data for verification" ,
1122- },
1123- "error" : None if test_successful else "No streaming data received" ,
1124- }
1125+ return _build_stream_result (test_note )
11251126
11261127 except asyncio .TimeoutError :
11271128 logger .error ("Stream processing timeout" )
11281129 raise ErrorResponse .internal_server_error (
11291130 ErrorMessages .STREAM_PROCESSING_TIMEOUT
11301131 )
11311132 except Exception as stream_error :
1132- logger .error (
1133- f"Error processing stream: { stream_error } . stream data: { stream_data } "
1134- )
1133+ if stream_data :
1134+ warning_text = str (stream_error ).strip () or "Stream closed unexpectedly."
1135+ logger .error (
1136+ f"Stream test ended with error: { warning_text } " ,
1137+ )
1138+ return _build_stream_result (
1139+ test_note , warning = f"Stream ended early: { warning_text } "
1140+ )
1141+
1142+ logger .error (f"Error processing stream: { stream_error } " )
11351143 raise ErrorResponse .internal_server_error (ErrorMessages .STREAM_PROCESSING_ERROR )
0 commit comments