Skip to content

Commit ce6d2fd

Browse files
committed
Added a HTTPS-based log handler to post logs in batches to a specified URL endpoint
1 parent a5b2ee8 commit ce6d2fd

File tree

1 file changed

+121
-1
lines changed

1 file changed

+121
-1
lines changed

src/murfey/client/customlogging.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
import json
44
import logging
5+
import threading
6+
import time
7+
from queue import Empty, Queue
58

6-
logger = logging.getLogger("murfey.client.customlogging")
9+
import requests
710

811

912
class CustomHandler(logging.Handler):
@@ -26,3 +29,120 @@ def emit(self, record):
2629
self._callback(self.prepare(record))
2730
except Exception:
2831
self.handleError(record)
32+
33+
34+
class HTTPSHandler(logging.Handler):
35+
"""
36+
A log handler collects log messages and posts them in batches to the backend
37+
FastAPI server using HTTPS POST.
38+
"""
39+
40+
def __init__(
41+
self,
42+
endpoint_url: str,
43+
min_batch: int = 5,
44+
max_batch: int = 50,
45+
min_interval: float = 0.5,
46+
max_interval: float = 2.0,
47+
max_retry: int = 5,
48+
timeout: int = 3,
49+
token: str = "",
50+
):
51+
super().__init__()
52+
self.endpoint_url = endpoint_url
53+
self.queue: Queue = Queue()
54+
self._stop_event = threading.Event()
55+
self.min_batch = min_batch
56+
self.max_batch = max_batch
57+
self.min_interval = min_interval
58+
self.max_interval = max_interval
59+
self.max_retry = max_retry
60+
self.timeout = timeout
61+
self.token = token
62+
63+
self.log_times: list[
64+
float
65+
] = [] # Timestamps of recent logs for rate estimation
66+
self.thread = threading.Thread(target=self._worker, daemon=True)
67+
self.thread.start()
68+
69+
def emit(self, record: logging.LogRecord):
70+
"""
71+
Formats the log and puts it on a queue for submission to the backend server
72+
"""
73+
try:
74+
log_entry = self.format_record(record)
75+
self.queue.put(log_entry)
76+
self.log_times.append(time.time())
77+
except Exception:
78+
self.handleError(record)
79+
pass
80+
81+
def format_record(self, record: logging.LogRecord):
82+
"""
83+
Packages the log record as a JSON-formatted string
84+
"""
85+
self.format(record)
86+
log_data = record.__dict__.copy()
87+
log_data["type"] = "log"
88+
return json.dumps(log_data)
89+
90+
def _worker(self):
91+
"""
92+
Worker function that sends batches of logs to the URL endpoint specified,
93+
with logic to adjust how frequently it should batch and send logs depending
94+
on rate of incoming traffic
95+
"""
96+
97+
batch: list[str] = []
98+
last_flush = time.time()
99+
100+
while not self._stop_event.is_set():
101+
try:
102+
log_entry = self.queue.get(timeout=0.05)
103+
batch.append(log_entry)
104+
except Empty:
105+
pass
106+
107+
# Calculate logging rate
108+
now = time.time()
109+
self.log_times = [
110+
t for t in self.log_times if now - t <= 1.0
111+
] # Number of logs in last second
112+
log_rate = len(self.log_times)
113+
114+
# Adjust batch size and flush interval
115+
batch_size = min(max(self.min_batch, log_rate), self.max_batch)
116+
flush_interval = max(
117+
self.min_interval, min(self.max_interval, 1 / max(log_rate, 1))
118+
)
119+
120+
# Flush if batch is ready
121+
if batch and (
122+
len(batch) >= batch_size or now - last_flush >= flush_interval
123+
):
124+
self._send_batch(batch)
125+
batch = []
126+
last_flush = now
127+
128+
# Flush remaining logs on shutdown
129+
if batch:
130+
self._send_batch(batch)
131+
132+
def _send_batch(self, batch: list[str]):
133+
"""
134+
Submits the list of stringified log records to the URL endpoint specified.
135+
"""
136+
for attempt in range(1, self.max_retry + 1):
137+
try:
138+
response = requests.post(self.endpoint_url, json=batch, timeout=5)
139+
if response.status_code == 200:
140+
return
141+
except requests.RequestException:
142+
pass
143+
time.sleep(2**attempt * 0.1) # Exponential backoff
144+
145+
def close(self):
146+
self._stop_event.set()
147+
self.thread.join()
148+
super().close()

0 commit comments

Comments
 (0)