forked from reata/sqllineage
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_web.py
More file actions
58 lines (41 loc) · 1.47 KB
/
test_web.py
File metadata and controls
58 lines (41 loc) · 1.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
"""
测试 Web 服务
"""
import json
from sqllineage.drawing import app
def test_directory_api():
"""测试目录 API"""
print("🧪 测试目录 API...")
# 模拟请求
environ = {"REQUEST_METHOD": "POST", "PATH_INFO": "/directory", "CONTENT_LENGTH": "2", "wsgi.input": None}
def start_response(status, headers):
print(f"状态: {status}")
# 创建模拟的请求体
import io
environ["wsgi.input"] = io.BytesIO(b"{}")
# 调用应用
try:
response = app(environ, start_response)
data = json.loads(response[0].decode("utf-8"))
print("API 响应:")
print(json.dumps(data, indent=2, ensure_ascii=False))
# 检查 tpcds 目录
for child in data.get("children", []):
if child["name"] == "tpcds" and child["is_dir"]:
print("\n✅ 找到 tpcds 目录")
print(f"tpcds children 数量: {len(child.get('children', []))}")
if child.get("children"):
print("tpcds 前5个文件:")
for i, file in enumerate(child["children"][:5]):
print(f" {i + 1}. {file['name']} (is_dir: {file['is_dir']})")
break
else:
print("\n❌ 未找到 tpcds 目录")
except Exception as e:
print(f"❌ 测试失败: {e}")
def main():
print("🚀 测试目录 API...")
test_directory_api()
if __name__ == "__main__":
main()