Skip to content

Commit 7954b55

Browse files
committed
refactor: extract KnowledgeBaseHandler to separate file
1 parent 788c259 commit 7954b55

File tree

2 files changed

+88
-75
lines changed

2 files changed

+88
-75
lines changed

src/datapilot/core/knowledge/cli.py

Lines changed: 5 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
1-
import json
2-
import re
3-
from http.server import BaseHTTPRequestHandler
41
from http.server import HTTPServer
5-
from urllib.error import HTTPError
6-
from urllib.error import URLError
7-
from urllib.parse import urlparse
8-
from urllib.request import Request
9-
from urllib.request import urlopen
10-
2+
from .server import KnowledgeBaseHandler
113
import click
124

135

@@ -32,72 +24,10 @@ def serve(ctx, port):
3224
)
3325
ctx.exit(1)
3426

35-
class KnowledgeBaseHandler(BaseHTTPRequestHandler):
36-
def do_GET(self):
37-
"""Handle GET requests."""
38-
path = urlparse(self.path).path
39-
40-
# Match /knowledge_bases/{uuid} pattern
41-
match = re.match(r"^/knowledge_bases/([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12})$", path)
42-
43-
if match:
44-
public_id = match.group(1)
45-
self.handle_knowledge_base(public_id)
46-
elif path == "/health":
47-
self.handle_health()
48-
else:
49-
self.send_error(404, "Not Found")
50-
51-
def handle_knowledge_base(self, public_id):
52-
"""Fetch and return knowledge base data."""
53-
url = f"{backend_url}/knowledge_bases/public/{public_id}"
54-
55-
# Validate URL scheme for security
56-
parsed_url = urlparse(url)
57-
if parsed_url.scheme not in ("http", "https"):
58-
self.send_response(400)
59-
self.send_header("Content-Type", "application/json")
60-
self.end_headers()
61-
error_msg = json.dumps({"error": "Invalid URL scheme. Only HTTP and HTTPS are allowed."})
62-
self.wfile.write(error_msg.encode("utf-8"))
63-
return
64-
65-
headers = {"Authorization": f"Bearer {token}", "X-Tenant": instance_name, "Content-Type": "application/json"}
66-
67-
req = Request(url, headers=headers) # noqa: S310
68-
69-
try:
70-
# URL scheme validated above - only HTTP/HTTPS allowed
71-
with urlopen(req, timeout=30) as response: # noqa: S310
72-
data = response.read()
73-
self.send_response(200)
74-
self.send_header("Content-Type", "application/json")
75-
self.end_headers()
76-
self.wfile.write(data)
77-
except HTTPError as e:
78-
error_body = e.read()
79-
error_data = error_body.decode("utf-8") if error_body else '{"error": "HTTP Error"}'
80-
self.send_response(e.code)
81-
self.send_header("Content-Type", "application/json")
82-
self.end_headers()
83-
self.wfile.write(error_data.encode("utf-8"))
84-
except URLError as e:
85-
self.send_response(500)
86-
self.send_header("Content-Type", "application/json")
87-
self.end_headers()
88-
error_msg = json.dumps({"error": str(e)})
89-
self.wfile.write(error_msg.encode("utf-8"))
90-
91-
def handle_health(self):
92-
"""Handle health check endpoint."""
93-
self.send_response(200)
94-
self.send_header("Content-Type", "application/json")
95-
self.end_headers()
96-
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
97-
98-
def log_message(self, format, *args):
99-
"""Override to use click.echo for logging."""
100-
click.echo(f"{self.address_string()} - {format % args}")
27+
# Set context data for the handler
28+
KnowledgeBaseHandler.token = token
29+
KnowledgeBaseHandler.instance_name = instance_name
30+
KnowledgeBaseHandler.backend_url = backend_url
10131

10232
server_address = ("", port)
10333
httpd = HTTPServer(server_address, KnowledgeBaseHandler)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import json
2+
import re
3+
from http.server import BaseHTTPRequestHandler
4+
from urllib.error import HTTPError
5+
from urllib.error import URLError
6+
from urllib.parse import urlparse
7+
from urllib.request import Request
8+
from urllib.request import urlopen
9+
10+
import click
11+
12+
class KnowledgeBaseHandler(BaseHTTPRequestHandler):
13+
"""HTTP request handler for serving knowledge bases and health checks."""
14+
15+
token: str = ""
16+
instance_name: str = ""
17+
backend_url: str = ""
18+
19+
def do_GET(self):
20+
"""Handle GET requests."""
21+
path = urlparse(self.path).path
22+
23+
# Match /knowledge_bases/{uuid} pattern
24+
match = re.match(r"^/knowledge_bases/([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12})$", path)
25+
26+
if match:
27+
public_id = match.group(1)
28+
self.handle_knowledge_base(public_id)
29+
elif path == "/health":
30+
self.handle_health()
31+
else:
32+
self.send_error(404, "Not Found")
33+
34+
def handle_knowledge_base(self, public_id):
35+
"""Fetch and return knowledge base data."""
36+
url = f"{self.backend_url}/knowledge_bases/public/{public_id}"
37+
38+
# Validate URL scheme for security
39+
parsed_url = urlparse(url)
40+
if parsed_url.scheme not in ("http", "https"):
41+
self.send_response(400)
42+
self.send_header("Content-Type", "application/json")
43+
self.end_headers()
44+
error_msg = json.dumps({"error": "Invalid URL scheme. Only HTTP and HTTPS are allowed."})
45+
self.wfile.write(error_msg.encode("utf-8"))
46+
return
47+
48+
headers = {"Authorization": f"Bearer {self.token}", "X-Tenant": self.instance_name, "Content-Type": "application/json"}
49+
50+
req = Request(url, headers=headers) # noqa: S310
51+
52+
try:
53+
# URL scheme validated above - only HTTP/HTTPS allowed
54+
with urlopen(req, timeout=30) as response: # noqa: S310
55+
data = response.read()
56+
self.send_response(200)
57+
self.send_header("Content-Type", "application/json")
58+
self.end_headers()
59+
self.wfile.write(data)
60+
except HTTPError as e:
61+
error_body = e.read()
62+
error_data = error_body.decode("utf-8") if error_body else '{"error": "HTTP Error"}'
63+
self.send_response(e.code)
64+
self.send_header("Content-Type", "application/json")
65+
self.end_headers()
66+
self.wfile.write(error_data.encode("utf-8"))
67+
except URLError as e:
68+
self.send_response(500)
69+
self.send_header("Content-Type", "application/json")
70+
self.end_headers()
71+
error_msg = json.dumps({"error": str(e)})
72+
self.wfile.write(error_msg.encode("utf-8"))
73+
74+
def handle_health(self):
75+
"""Handle health check endpoint."""
76+
self.send_response(200)
77+
self.send_header("Content-Type", "application/json")
78+
self.end_headers()
79+
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
80+
81+
def log_message(self, format, *args):
82+
"""Override to use click.echo for logging."""
83+
click.echo(f"{self.address_string()} - {format % args}")

0 commit comments

Comments
 (0)