Skip to content

Commit 2ce034a

Browse files
committed
Optimize: API testing
1 parent 16a2d8c commit 2ce034a

File tree

2 files changed

+187
-99
lines changed

2 files changed

+187
-99
lines changed

backend/service/log_service.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""
55

66
import os.path
7-
from collections import deque
87

98
from starlette.responses import JSONResponse
109

@@ -26,38 +25,58 @@ def get_last_n_lines(file_path: str, n: int = 100) -> str:
2625
A string containing the last N lines. Returns an empty string on failure.
2726
"""
2827
try:
29-
with open(file_path, "rb") as f:
30-
# Move to the end of the file
28+
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
29+
# For small files, just read all and return last n lines
3130
f.seek(0, os.SEEK_END)
32-
block_size = 1024
33-
lines_found: deque[str] = deque()
34-
35-
while f.tell() > 0 and len(lines_found) <= n:
36-
# Calculate the position and size of the next block to read
37-
seek_step = min(block_size, f.tell())
38-
f.seek(-seek_step, os.SEEK_CUR)
39-
chunk = f.read(seek_step)
40-
f.seek(-seek_step, os.SEEK_CUR)
41-
42-
# Prepend to any partial line from previous chunk
43-
if lines_found:
44-
lines_found[0] = chunk.decode("utf-8", "ignore") + lines_found[0]
45-
else:
46-
lines_found.append(chunk.decode("utf-8", "ignore"))
47-
48-
# Split into lines
49-
split_lines = lines_found[0].splitlines()
50-
51-
# If we have more than one line, the first one is partial
52-
if len(split_lines) > 1:
53-
lines_found[0] = split_lines.pop(0)
54-
for line in reversed(split_lines):
55-
lines_found.insert(1, line)
56-
57-
if f.tell() == 0:
58-
break
59-
60-
return "\n".join(list(lines_found)[-n:])
31+
file_size = f.tell()
32+
33+
# If file is small (< 50KB), read all lines and return last n
34+
if file_size < 50 * 1024:
35+
f.seek(0)
36+
all_lines = f.readlines()
37+
return "".join(all_lines[-n:]) if all_lines else ""
38+
39+
# For larger files, use a more reliable approach
40+
# Start from end and read backwards in larger chunks
41+
buffer_size = 8192
42+
lines: list[str] = []
43+
buffer = ""
44+
position = file_size
45+
46+
while position > 0 and len(lines) < n:
47+
# Calculate chunk size to read
48+
chunk_size = min(buffer_size, position)
49+
position -= chunk_size
50+
51+
# Read chunk from current position
52+
f.seek(position)
53+
chunk = f.read(chunk_size)
54+
55+
# Prepend chunk to buffer
56+
buffer = chunk + buffer
57+
58+
# Split buffer into lines
59+
lines_in_buffer = buffer.split("\n")
60+
61+
# Keep the first part (might be incomplete line) in buffer
62+
buffer = lines_in_buffer[0]
63+
64+
# Add complete lines to our lines list (in reverse order since we're reading backwards)
65+
for line in reversed(lines_in_buffer[1:]):
66+
lines.insert(0, line)
67+
if len(lines) >= n:
68+
break
69+
70+
# If we've reached the beginning of file, add the remaining buffer as a line
71+
if position == 0 and buffer:
72+
lines.insert(0, buffer)
73+
74+
# Take last n lines and join them with newlines
75+
result_lines = lines[-n:] if len(lines) > n else lines
76+
return "\n".join(result_lines) + (
77+
"\n" if result_lines and not result_lines[-1].endswith("\n") else ""
78+
)
79+
6180
except Exception as e:
6281
logger.error(f"Failed to read log file: {str(e)}")
6382
return ""

backend/service/task_service.py

Lines changed: 136 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -771,67 +771,6 @@ def _prepare_client_cert(body: TaskCreateReq):
771771
return client_cert
772772

773773

774-
async def _handle_streaming_response(response, full_url: str) -> Dict:
775-
"""Handle streaming response from API endpoint."""
776-
stream_data = []
777-
try:
778-
# If status code is not 200, return error response immediately
779-
if response.status_code != 200:
780-
# Try to get response text for error details
781-
try:
782-
error_text = await response.aread()
783-
error_content = error_text.decode("utf-8")
784-
except Exception:
785-
error_content = "Unable to read response content"
786-
787-
# Try to parse as JSON for better error information
788-
try:
789-
error_data = json.loads(error_content)
790-
except (json.JSONDecodeError, ValueError):
791-
error_data = error_content
792-
793-
return {
794-
"status": "error",
795-
"response": {
796-
"status_code": response.status_code,
797-
"headers": dict(response.headers),
798-
"data": error_data,
799-
"is_stream": False,
800-
},
801-
"error": f"HTTP {response.status_code}. {error_content}",
802-
}
803-
804-
# Process streaming data for successful responses
805-
async for chunk in response.aiter_lines():
806-
if chunk:
807-
chunk_str = chunk.strip()
808-
if chunk_str:
809-
stream_data.append(chunk_str)
810-
# Limit the number of chunks to prevent memory issues
811-
if len(stream_data) >= 1000:
812-
stream_data.append("... (truncated, too many chunks)")
813-
break
814-
815-
return {
816-
"status": "success",
817-
"response": {
818-
"status_code": response.status_code,
819-
"headers": dict(response.headers),
820-
"data": stream_data,
821-
"is_stream": True,
822-
},
823-
"error": None,
824-
}
825-
826-
except Exception as stream_error:
827-
logger.error(f"Error processing stream: {stream_error}")
828-
return {
829-
"status": "error",
830-
"error": f"Stream processing error: {str(stream_error)}",
831-
"response": None,
832-
}
833-
834-
835774
async def _handle_non_streaming_response(response) -> Dict:
836775
"""Handle non-streaming response from API endpoint."""
837776
# Try to parse response as JSON
@@ -867,6 +806,8 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
867806
Returns:
868807
A dictionary containing the test result.
869808
"""
809+
import asyncio
810+
870811
import httpx
871812

872813
logger.info(f"Testing API endpoint: {body.target_host}{body.api_path}")
@@ -898,12 +839,23 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
898839
# Prepare certificate configuration
899840
client_cert = _prepare_client_cert(body)
900841

901-
# Test with httpx client - increased timeout for slow APIs
842+
# Optimized timeout settings
843+
timeout_config = httpx.Timeout(
844+
connect=10.0, # connect timeout: 10s
845+
read=30.0, # read timeout: 30s (for testing purposes, not too long)
846+
write=10.0, # write timeout: 10s
847+
pool=5.0, # pool timeout: 5s
848+
)
849+
850+
# Use connection limits for better performance
851+
limits = httpx.Limits(max_keepalive_connections=20, max_connections=100)
852+
853+
# Test with httpx client - optimized configuration
902854
async with httpx.AsyncClient(
903-
timeout=180.0, verify=False, cert=client_cert
855+
timeout=timeout_config, verify=False, cert=client_cert, limits=limits
904856
) as client:
905857
if body.stream_mode:
906-
# Handle streaming response
858+
# Handle streaming response with early termination
907859
async with client.stream(
908860
"POST", full_url, json=payload, headers=headers, cookies=cookies
909861
) as response:
@@ -915,11 +867,11 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
915867
)
916868
return await _handle_non_streaming_response(response)
917869

918-
except httpx.TimeoutException:
919-
logger.error("Request timeout when testing API endpoint")
870+
except httpx.TimeoutException as e:
871+
logger.error(f"Request timeout when testing API endpoint: {e}")
920872
return {
921873
"status": "error",
922-
"error": "Request timeout (180 seconds)",
874+
"error": f"Request timeout: {str(e)}",
923875
"response": None,
924876
}
925877
except httpx.ConnectError as e:
@@ -929,10 +881,127 @@ async def test_api_endpoint_svc(request: Request, body: TaskCreateReq):
929881
"error": f"Connection error: {str(e)}",
930882
"response": None,
931883
}
884+
except asyncio.TimeoutError:
885+
logger.error("Asyncio timeout when testing API endpoint")
886+
return {
887+
"status": "error",
888+
"error": "Operation timeout, please check network connection and target server status",
889+
"response": None,
890+
}
932891
except Exception as e:
933892
logger.error(f"Error testing API endpoint: {e}", exc_info=True)
934893
return {
935894
"status": "error",
936895
"error": f"Unexpected error: {str(e)}",
937896
"response": None,
938897
}
898+
899+
900+
async def _handle_streaming_response(response, full_url: str) -> Dict:
901+
"""
902+
Handle streaming response from API endpoint with optimized performance.
903+
For testing purposes, we only need to verify connectivity and get initial response.
904+
"""
905+
import asyncio
906+
907+
stream_data = []
908+
try:
909+
# If status code is not 200, return error response immediately
910+
if response.status_code != 200:
911+
try:
912+
error_text = await response.aread()
913+
error_content = error_text.decode("utf-8")
914+
except Exception:
915+
error_content = "Unable to read response content"
916+
917+
try:
918+
error_data = json.loads(error_content)
919+
except (json.JSONDecodeError, ValueError):
920+
error_data = error_content
921+
922+
return {
923+
"status": "error",
924+
"response": {
925+
"status_code": response.status_code,
926+
"headers": dict(response.headers),
927+
"data": error_data,
928+
"is_stream": False,
929+
},
930+
"error": f"HTTP {response.status_code}. {error_content}",
931+
}
932+
933+
# For testing purposes, we limit the time and data we collect
934+
max_chunks = 50 # max chunks to collect for testing
935+
max_duration = 15 # max duration to wait for testing
936+
937+
start_time = asyncio.get_event_loop().time()
938+
939+
# Process streaming data with time and chunk limits
940+
async for chunk in response.aiter_lines():
941+
if chunk:
942+
chunk_str = chunk.strip()
943+
if chunk_str:
944+
stream_data.append(chunk_str)
945+
946+
# For testing, we can return early after getting a few valid chunks
947+
if len(stream_data) >= max_chunks:
948+
stream_data.append(
949+
f"... (testing completed, collected {len(stream_data)} chunks, connection is normal)"
950+
)
951+
break
952+
953+
# Check if we've spent too much time
954+
current_time = asyncio.get_event_loop().time()
955+
if current_time - start_time > max_duration:
956+
stream_data.append(
957+
f"... (testing time reached {max_duration} seconds, connection is normal)"
958+
)
959+
break
960+
961+
# If we got at least one chunk, the connection is working
962+
test_successful = len(stream_data) > 0
963+
964+
return {
965+
"status": "success" if test_successful else "error",
966+
"response": {
967+
"status_code": response.status_code,
968+
"headers": dict(response.headers),
969+
"data": stream_data,
970+
"is_stream": True,
971+
"test_note": "Streaming connection test completed, only collected partial data for verification",
972+
},
973+
"error": None if test_successful else "No streaming data received",
974+
}
975+
976+
except asyncio.TimeoutError:
977+
logger.error("Stream processing timeout")
978+
return {
979+
"status": "error",
980+
"error": "Streaming data processing timeout",
981+
"response": {
982+
"status_code": (
983+
response.status_code if hasattr(response, "status_code") else None
984+
),
985+
"headers": (
986+
dict(response.headers) if hasattr(response, "headers") else {}
987+
),
988+
"data": stream_data,
989+
"is_stream": True,
990+
},
991+
}
992+
except Exception as stream_error:
993+
logger.error(f"Error processing stream: {stream_error}")
994+
return {
995+
"status": "error",
996+
"error": f"Streaming data processing error: {str(stream_error)}",
997+
"response": {
998+
"status_code": (
999+
response.status_code if hasattr(response, "status_code") else None
1000+
),
1001+
"headers": (
1002+
dict(response.headers) if hasattr(response, "headers") else {}
1003+
),
1004+
"data": stream_data,
1005+
"is_stream": True,
1006+
},
1007+
}

0 commit comments

Comments
 (0)