1+ import atexit
12import logging
3+ import os
4+ import threading
25
6+ from concurrent .futures import ThreadPoolExecutor
37from logging .config import dictConfig
48from pathlib import Path
59from sys import stdout
610
11+ import requests
12+
713from dotenv import load_dotenv
814
915from memos import settings
16+ from memos .api .context .context import get_current_trace_id
1017
1118
1219# Load environment variables
@@ -26,27 +33,125 @@ def _setup_logfile() -> Path:
2633 return logfile
2734
2835
36+ class TraceIDFilter (logging .Filter ):
37+ """add trace_id to the log record"""
38+
39+ def filter (self , record ):
40+ try :
41+ trace_id = get_current_trace_id ()
42+ record .trace_id = trace_id if trace_id else "no-trace-id"
43+ except Exception :
44+ record .trace_id = "no-trace-id"
45+ return True
46+
47+
48+ class CustomLoggerRequestHandler (logging .Handler ):
49+ _instance = None
50+ _lock = threading .Lock ()
51+
52+ def __new__ (cls ):
53+ if cls ._instance is None :
54+ with cls ._lock :
55+ if cls ._instance is None :
56+ cls ._instance = super ().__new__ (cls )
57+ cls ._instance ._initialized = False
58+ return cls ._instance
59+
60+ def __init__ (self ):
61+ """Initialize handler with minimal setup"""
62+ if not self ._initialized :
63+ super ().__init__ ()
64+ workers = int (os .getenv ("CUSTOM_LOGGER_WORKERS" , "2" ))
65+ self ._executor = ThreadPoolExecutor (
66+ max_workers = workers , thread_name_prefix = "log_sender"
67+ )
68+ self ._is_shutting_down = threading .Event ()
69+ self ._session = requests .Session ()
70+ self ._initialized = True
71+ atexit .register (self ._cleanup )
72+
73+ def emit (self , record ):
74+ """Process log records of INFO or ERROR level (non-blocking)"""
75+ if os .getenv ("CUSTOM_LOGGER_URL" ) is None or self ._is_shutting_down .is_set ():
76+ return
77+
78+ if record .levelno in (logging .INFO , logging .ERROR ):
79+ try :
80+ trace_id = (
81+ get_current_trace_id ()
82+ ) # TODO: get trace_id from request context instead of get_current_trace_id
83+ if trace_id :
84+ self ._executor .submit (self ._send_log_sync , record .getMessage (), trace_id )
85+ except Exception as e :
86+ if not self ._is_shutting_down .is_set ():
87+ print (f"Error sending log: { e } " )
88+
89+ def _send_log_sync (self , message , trace_id ):
90+ """Send log message synchronously in a separate thread"""
91+ print (f"send_log_sync: { message } { trace_id } " )
92+ try :
93+ logger_url = os .getenv ("CUSTOM_LOGGER_URL" )
94+ token = os .getenv ("CUSTOM_LOGGER_TOKEN" )
95+
96+ headers = {"Content-Type" : "application/json" }
97+ post_content = {"message" : message , "trace_id" : trace_id }
98+
99+ # Add auth token if exists
100+ if token :
101+ headers ["Authorization" ] = f"Bearer { token } "
102+
103+ # Add traceId to headers for consistency
104+ headers ["traceId" ] = trace_id
105+
106+ # Add custom attributes from env
107+ for key , value in os .environ .items ():
108+ if key .startswith ("CUSTOM_LOGGER_ATTRIBUTE_" ):
109+ attribute_key = key [len ("CUSTOM_LOGGER_ATTRIBUTE_" ) :].lower ()
110+ post_content [attribute_key ] = value
111+
112+ self ._session .post (logger_url , headers = headers , json = post_content , timeout = 5 )
113+ except Exception :
114+ # Silently ignore errors to avoid affecting main application
115+ pass
116+
117+ def _cleanup (self ):
118+ """Clean up resources during program exit"""
119+ if not self ._initialized :
120+ return
121+
122+ self ._is_shutting_down .set ()
123+ try :
124+ self ._executor .shutdown (wait = False )
125+ self ._session .close ()
126+ except Exception as e :
127+ print (f"Error during cleanup: { e } " )
128+
129+ def close (self ):
130+ """Override close to prevent premature shutdown"""
131+
132+
29133LOGGING_CONFIG = {
30134 "version" : 1 ,
31135 "disable_existing_loggers" : False ,
32136 "formatters" : {
33137 "standard" : {
34- "format" : "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
138+ "format" : "%(asctime)s [%(trace_id)s] - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
35139 },
36140 "no_datetime" : {
37- "format" : "%(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
141+ "format" : "[%(trace_id)s] - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s"
38142 },
39143 },
40144 "filters" : {
41- "package_tree_filter" : {"()" : "logging.Filter" , "name" : settings .LOG_FILTER_TREE_PREFIX }
145+ "package_tree_filter" : {"()" : "logging.Filter" , "name" : settings .LOG_FILTER_TREE_PREFIX },
146+ "trace_id_filter" : {"()" : "memos.log.TraceIDFilter" },
42147 },
43148 "handlers" : {
44149 "console" : {
45150 "level" : selected_log_level ,
46151 "class" : "logging.StreamHandler" ,
47152 "stream" : stdout ,
48153 "formatter" : "no_datetime" ,
49- "filters" : ["package_tree_filter" ],
154+ "filters" : ["package_tree_filter" , "trace_id_filter" ],
50155 },
51156 "file" : {
52157 "level" : "DEBUG" ,
@@ -55,6 +160,7 @@ def _setup_logfile() -> Path:
55160 "maxBytes" : 1024 ** 2 * 10 ,
56161 "backupCount" : 10 ,
57162 "formatter" : "standard" ,
163+ "filters" : ["trace_id_filter" ],
58164 },
59165 },
60166 "root" : { # Root logger handles all logs
0 commit comments