-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_api.py
More file actions
304 lines (258 loc) · 10.5 KB
/
test_api.py
File metadata and controls
304 lines (258 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图片信息提取API测试脚本
用于测试API接口的各种功能
"""
import os
import sys
import json
import requests
import time
from datetime import datetime
# API基础URL
BASE_URL = "http://localhost:5000"
def test_health_check():
"""测试健康检查接口"""
print("🔍 测试健康检查接口...")
try:
response = requests.get(f"{BASE_URL}/health", timeout=10)
if response.status_code == 200:
data = response.json()
print(f"✅ 健康检查通过: {data.get('message', 'N/A')}")
print(f" 状态: {data.get('data', {}).get('status', 'N/A')}")
print(f" 版本: {data.get('data', {}).get('version', 'N/A')}")
return True
else:
print(f"❌ 健康检查失败: HTTP {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 健康检查请求失败: {str(e)}")
return False
def test_api_info():
"""测试API信息接口"""
print("\n🔍 测试API信息接口...")
try:
response = requests.get(f"{BASE_URL}/info", timeout=10)
if response.status_code == 200:
data = response.json()
print(f"✅ API信息获取成功: {data.get('message', 'N/A')}")
api_data = data.get('data', {})
print(f" API名称: {api_data.get('name', 'N/A')}")
print(f" 版本: {api_data.get('version', 'N/A')}")
print(f" 支持格式: {', '.join(api_data.get('supported_formats', []))}")
print(f" 最大文件大小: {api_data.get('max_file_size', 'N/A')}")
return True
else:
print(f"❌ API信息获取失败: HTTP {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ API信息请求失败: {str(e)}")
return False
def test_extract_with_invalid_file():
"""测试无效文件上传"""
print("\n🔍 测试无效文件上传...")
# 创建一个无效的"图片"文件
invalid_content = b"This is not an image file"
files = {'image': ('invalid.txt', invalid_content, 'text/plain')}
try:
response = requests.post(f"{BASE_URL}/extract", files=files, timeout=10)
if response.status_code == 400:
data = response.json()
print(f"✅ 无效文件正确拒绝: {data.get('message', 'N/A')}")
print(f" 错误代码: {data.get('error', 'N/A')}")
return True
else:
print(f"❌ 无效文件处理异常: HTTP {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 无效文件测试请求失败: {str(e)}")
return False
def test_extract_no_file():
"""测试无文件上传"""
print("\n🔍 测试无文件上传...")
try:
response = requests.post(f"{BASE_URL}/extract", timeout=10)
if response.status_code == 400:
data = response.json()
print(f"✅ 无文件上传正确拒绝: {data.get('message', 'N/A')}")
print(f" 错误代码: {data.get('error', 'N/A')}")
return True
else:
print(f"❌ 无文件上传处理异常: HTTP {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 无文件上传测试请求失败: {str(e)}")
return False
def create_test_image():
"""创建一个测试图片文件"""
try:
from PIL import Image, ImageDraw
# 创建一个简单的测试图片
img = Image.new('RGB', (100, 100), color='red')
draw = ImageDraw.Draw(img)
draw.text((10, 10), "Test Image", fill='white')
# 保存到临时文件
temp_path = "test_image.jpg"
img.save(temp_path, "JPEG")
# 添加一些EXIF信息
from PIL.ExifTags import TAGS
# 简单的EXIF数据
exif_dict = {
36864: b'0230', # ExifVersion
36867: '2023:12:01 14:30:25', # DateTimeOriginal
271: 'Test Camera', # Make
272: 'Test Model', # Model
305: 'Test Software 1.0', # Software
}
# 重新保存带EXIF的图片
img.save(temp_path, "JPEG", exif=exif_dict)
return temp_path
except ImportError:
print("⚠️ PIL库未安装,无法创建测试图片")
return None
except Exception as e:
print(f"⚠️ 创建测试图片失败: {str(e)}")
return None
def test_extract_with_valid_image():
"""测试有效图片上传"""
print("\n🔍 测试有效图片上传...")
# 创建测试图片
test_image_path = create_test_image()
if not test_image_path:
print("❌ 无法创建测试图片,跳过此测试")
return False
try:
with open(test_image_path, 'rb') as f:
files = {'image': (test_image_path, f, 'image/jpeg')}
response = requests.post(f"{BASE_URL}/extract", files=files, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get('success'):
print(f"✅ 图片信息提取成功: {data.get('message', 'N/A')}")
# 显示提取的信息
result_data = data.get('data', {})
print(f" 文件名: {result_data.get('filename', 'N/A')}")
print(f" 文件大小: {result_data.get('file_size', 'N/A')} 字节")
print(f" 拍摄时间: {result_data.get('capture_time', 'N/A')}")
# 相机信息
camera_info = result_data.get('camera_info', {})
if camera_info:
print(f" 相机品牌: {camera_info.get('make', 'N/A')}")
print(f" 相机型号: {camera_info.get('model', 'N/A')}")
print(f" 软件: {camera_info.get('software', 'N/A')}")
# 图片尺寸
dimensions = result_data.get('dimensions', {})
if dimensions:
print(f" 图片尺寸: {dimensions.get('width', 'N/A')} x {dimensions.get('height', 'N/A')}")
print(f" 图片格式: {dimensions.get('format', 'N/A')}")
return True
else:
print(f"❌ 图片处理失败: {data.get('message', 'N/A')}")
return False
else:
print(f"❌ 图片上传失败: HTTP {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 图片上传测试请求失败: {str(e)}")
return False
finally:
# 清理测试文件
if os.path.exists(test_image_path):
try:
os.remove(test_image_path)
except:
pass
def test_performance():
"""测试API性能"""
print("\n🔍 测试API性能...")
test_times = []
num_requests = 5
for i in range(num_requests):
start_time = time.time()
try:
response = requests.get(f"{BASE_URL}/health", timeout=10)
if response.status_code == 200:
end_time = time.time()
request_time = (end_time - start_time) * 1000 # 转换为毫秒
test_times.append(request_time)
print(f" 请求 {i+1}: {request_time:.2f}ms")
else:
print(f" 请求 {i+1}: 失败 (HTTP {response.status_code})")
except requests.exceptions.RequestException as e:
print(f" 请求 {i+1}: 失败 ({str(e)})")
if test_times:
avg_time = sum(test_times) / len(test_times)
min_time = min(test_times)
max_time = max(test_times)
print(f"✅ 性能测试完成 ({len(test_times)}/{num_requests} 成功):")
print(f" 平均响应时间: {avg_time:.2f}ms")
print(f" 最快响应时间: {min_time:.2f}ms")
print(f" 最慢响应时间: {max_time:.2f}ms")
# 性能评估
if avg_time < 100:
print(" 🚀 性能优秀")
elif avg_time < 500:
print(" ✅ 性能良好")
elif avg_time < 1000:
print(" ⚠️ 性能一般")
else:
print(" ❌ 性能较差")
return True
else:
print("❌ 性能测试失败,所有请求都失败")
return False
def run_all_tests():
"""运行所有测试"""
print("=" * 60)
print("🧪 图片信息提取API测试套件")
print("=" * 60)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"测试目标: {BASE_URL}")
print()
tests = [
("健康检查", test_health_check),
("API信息", test_api_info),
("无文件上传", test_extract_no_file),
("无效文件上传", test_extract_with_invalid_file),
("有效图片上传", test_extract_with_valid_image),
("性能测试", test_performance),
]
results = {}
for test_name, test_func in tests:
try:
result = test_func()
results[test_name] = result
except Exception as e:
print(f"❌ 测试 '{test_name}' 发生异常: {str(e)}")
results[test_name] = False
# 显示测试总结
print("\n" + "=" * 60)
print("📊 测试结果总结")
print("=" * 60)
passed = 0
total = len(results)
for test_name, result in results.items():
status = "✅ 通过" if result else "❌ 失败"
print(f"{test_name:20} {status}")
if result:
passed += 1
print("-" * 60)
print(f"总计: {passed}/{total} 测试通过")
if passed == total:
print("🎉 所有测试通过!API服务运行正常。")
return 0
else:
print("⚠️ 部分测试失败,请检查API服务状态。")
return 1
def main():
"""主函数"""
# 检查命令行参数
if len(sys.argv) > 1:
BASE_URL = sys.argv[1]
print(f"使用自定义API地址: {BASE_URL}")
# 运行测试
exit_code = run_all_tests()
sys.exit(exit_code)
if __name__ == "__main__":
main()