diff --git a/fastdeploy/logger.py b/fastdeploy/logger.py new file mode 100644 index 0000000000..dc5f73c900 --- /dev/null +++ b/fastdeploy/logger.py @@ -0,0 +1,113 @@ +""" +日志模块:用于初始化和获取 FastDeploy 日志记录器。 +本模块提供 get_logger 方法,统一管理各子模块的日志记录行为。 +""" + +import logging +import os + +from fastdeploy import envs +from fastdeploy.util.formatters import ColoredFormatter +from fastdeploy.util.handlers import DailyRotatingFileHandler +from fastdeploy.util.setup_logging import setup_logging + +# 初始化一次日志系统 +setup_logging() + + +def get_logger(name, file_name=None, without_formater=False, print_to_console=False): + """ + 获取日志记录器(兼容原有接口) + + Args: + name: 日志器名称 + file_name: 日志文件名(保持兼容性) + without_formater: 是否不使用格式化器 + print_to_console: 是否打印到控制台 + """ + # 如果只有一个参数,使用新的统一命名方式 + if file_name is None and not without_formater and not print_to_console: + return _get_unified_logger(name) + + # 兼容原有接口 + return _get_legacy_logger(name, file_name, without_formater, print_to_console) + + +def _get_unified_logger(name): + """ + 新的统一日志获取方式 + """ + if name is None: + return logging.getLogger("fastdeploy") + + # 处理 __main__ 特殊情况 + if name == "__main__": + return logging.getLogger("fastdeploy.main") + + # 如果已经是fastdeploy命名空间,直接使用 + if name.startswith("fastdeploy.") or name == "fastdeploy": + return logging.getLogger(name) + else: + # 其他情况添加fastdeploy前缀 + return logging.getLogger(f"fastdeploy.{name}") + + +def _get_legacy_logger(name, file_name, without_formater=False, print_to_console=False): + """ + 兼容原有接口的日志获取方式 + """ + + log_dir = envs.FD_LOG_DIR + if not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + + is_debug = int(envs.FD_DEBUG) + # logger = logging.getLogger(name) + legacy_name = f"legacy.{name}" + logger = logging.getLogger(legacy_name) + + # 设置日志级别 + if is_debug: + logger.setLevel(level=logging.DEBUG) + else: + logger.setLevel(level=logging.INFO) + + # 清除现有的handlers(保持原有逻辑) + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + # 创建主日志文件handler + LOG_FILE = f"{log_dir}/{file_name}" + backup_count = int(envs.FD_LOG_BACKUP_COUNT) + handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count) + + # 创建ERROR日志文件handler(新增功能) + ERROR_LOG_FILE = f"{log_dir}/error_{file_name}" + error_handler = DailyRotatingFileHandler(ERROR_LOG_FILE, backupCount=backup_count) + error_handler.setLevel(logging.ERROR) + + # 设置格式化器 + formatter = ColoredFormatter("%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s") + + if not without_formater: + handler.setFormatter(formatter) + error_handler.setFormatter(formatter) + + # 添加文件handlers + logger.addHandler(handler) + logger.addHandler(error_handler) + + # 控制台handler(如果需要) + if print_to_console: + console_handler = logging.StreamHandler() + if not without_formater: + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + console_handler.propagate = False + + # 设置propagate(保持原有逻辑) + handler.propagate = False + error_handler.propagate = False + logger.propagate = False + + return logger diff --git a/fastdeploy/util/__init__.py b/fastdeploy/util/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/fastdeploy/util/formatters.py b/fastdeploy/util/formatters.py new file mode 100644 index 0000000000..a553c7ed45 --- /dev/null +++ b/fastdeploy/util/formatters.py @@ -0,0 +1,44 @@ +""" +自定义日志格式化器模块 + +该模块定义了 ColoredFormatter 类,用于在控制台输出带颜色的日志信息, +便于开发者在终端中快速识别不同级别的日志。 +""" + +import logging + + +class ColoredFormatter(logging.Formatter): + """ + 自定义日志格式器,用于控制台输出带颜色的日志。 + + 支持的颜色: + - WARNING: 黄色 + - ERROR: 红色 + - CRITICAL: 红色 + - 其他等级: 默认终端颜色 + """ + + COLOR_CODES = { + logging.WARNING: 33, # 黄色 + logging.ERROR: 31, # 红色 + logging.CRITICAL: 31, # 红色 + } + + def format(self, record): + """ + 格式化日志记录,并根据日志等级添加 ANSI 颜色前缀和后缀。 + + Args: + record (LogRecord): 日志记录对象。 + + Returns: + str: 带有颜色的日志消息字符串。 + """ + color_code = self.COLOR_CODES.get(record.levelno, 0) + prefix = f"\033[{color_code}m" + suffix = "\033[0m" + message = super().format(record) + if color_code: + message = f"{prefix}{message}{suffix}" + return message diff --git a/fastdeploy/util/handlers.py b/fastdeploy/util/handlers.py new file mode 100644 index 0000000000..5dbe76ca47 --- /dev/null +++ b/fastdeploy/util/handlers.py @@ -0,0 +1,177 @@ +import codecs +import os +import re +import time +from datetime import datetime +from logging.handlers import BaseRotatingHandler, TimedRotatingFileHandler +from pathlib import Path + +"""自定义日志处理器模块: +该模块包含FastDeploy项目中使用的自定义日志处理器实现, +用于处理和控制日志输出格式、级别和目标等。 +""" + + +class DailyFolderTimedRotatingFileHandler(TimedRotatingFileHandler): + """ + 自定义处理器:每天一个目录,每小时一个文件 + 文件结构: + logs/ + └── 2025-08-05/ + ├── fastdeploy_error_10.log + └── fastdeploy_debug_10.log + """ + + def __init__(self, filename, when="H", interval=1, backupCount=48, encoding=None, utc=False, **kwargs): + # 支持从dictConfig中通过filename传入 base_log_dir/base_filename + base_log_dir, base_name = os.path.split(filename) + base_filename = os.path.splitext(base_name)[0] + + self.base_log_dir = base_log_dir + self.base_filename = base_filename + self.current_day = datetime.now().strftime("%Y-%m-%d") + self._update_baseFilename() + + super().__init__( + filename=self.baseFilename, + when=when, + interval=interval, + backupCount=backupCount, + encoding=encoding, + utc=utc, + ) + + def _update_baseFilename(self): + dated_dir = os.path.join(self.base_log_dir, self.current_day) + os.makedirs(dated_dir, exist_ok=True) + self.baseFilename = os.path.abspath( + os.path.join(dated_dir, f"{self.base_filename}_{datetime.now().strftime('%H')}.log") + ) + + def shouldRollover(self, record): + new_day = datetime.now().strftime("%Y-%m-%d") + if new_day != self.current_day: + self.current_day = new_day + return 1 + return super().shouldRollover(record) + + def doRollover(self): + self.stream.close() + self._update_baseFilename() + self.stream = self._open() + + +class DailyRotatingFileHandler(BaseRotatingHandler): + """ + like `logging.TimedRotatingFileHandler`, but this class support multi-process + """ + + def __init__( + self, + filename, + backupCount=0, + encoding="utf-8", + delay=False, + utc=False, + **kwargs, + ): + """ + 初始化 RotatingFileHandler 对象。 + + Args: + filename (str): 日志文件的路径,可以是相对路径或绝对路径。 + backupCount (int, optional, default=0): 保存的备份文件数量,默认为 0,表示不保存备份文件。 + encoding (str, optional, default='utf-8'): 编码格式,默认为 'utf-8'。 + delay (bool, optional, default=False): 是否延迟写入,默认为 False,表示立即写入。 + utc (bool, optional, default=False): 是否使用 UTC 时区,默认为 False,表示不使用 UTC 时区。 + kwargs (dict, optional): 其他参数将被传递给 BaseRotatingHandler 类的 init 方法。 + + Raises: + TypeError: 如果 filename 不是 str 类型。 + ValueError: 如果 backupCount 小于等于 0。 + """ + self.backup_count = backupCount + self.utc = utc + self.suffix = "%Y-%m-%d" + self.base_log_path = Path(filename) + self.base_filename = self.base_log_path.name + self.current_filename = self._compute_fn() + self.current_log_path = self.base_log_path.with_name(self.current_filename) + BaseRotatingHandler.__init__(self, filename, "a", encoding, delay) + + def shouldRollover(self, record): + """ + check scroll through the log + """ + if self.current_filename != self._compute_fn(): + return True + return False + + def doRollover(self): + """ + scroll log + """ + if self.stream: + self.stream.close() + self.stream = None + + self.current_filename = self._compute_fn() + self.current_log_path = self.base_log_path.with_name(self.current_filename) + + if not self.delay: + self.stream = self._open() + + self.delete_expired_files() + + def _compute_fn(self): + """ + Calculate the log file name corresponding current time + """ + return self.base_filename + "." + time.strftime(self.suffix, time.localtime()) + + def _open(self): + """ + open new log file + """ + if self.encoding is None: + stream = open(str(self.current_log_path), self.mode) + else: + stream = codecs.open(str(self.current_log_path), self.mode, self.encoding) + + if self.base_log_path.exists(): + try: + if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_filename: + os.remove(self.base_log_path) + except OSError: + pass + + try: + os.symlink(self.current_filename, str(self.base_log_path)) + except OSError: + pass + return stream + + def delete_expired_files(self): + """ + delete expired log files + """ + if self.backup_count <= 0: + return + + file_names = os.listdir(str(self.base_log_path.parent)) + result = [] + prefix = self.base_filename + "." + plen = len(prefix) + for file_name in file_names: + if file_name[:plen] == prefix: + suffix = file_name[plen:] + if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", suffix): + result.append(file_name) + if len(result) < self.backup_count: + result = [] + else: + result.sort() + result = result[: len(result) - self.backup_count] + + for file_name in result: + os.remove(str(self.base_log_path.with_name(file_name))) diff --git a/fastdeploy/util/setup_logging.py b/fastdeploy/util/setup_logging.py new file mode 100644 index 0000000000..a7030cf5cb --- /dev/null +++ b/fastdeploy/util/setup_logging.py @@ -0,0 +1,133 @@ +""" +配置日志系统 +""" + +import json +import logging +import logging.config +import os +from pathlib import Path + +from fastdeploy import envs + + +def setup_logging(log_dir=None, config_file=None): + """ + 设置FastDeploy的日志配置 + + Args: + log_dir: 日志文件存储目录,如果不提供则使用环境变量 + config_file: JSON配置文件路径,如果不提供则使用默认配置 + """ + + # 避免重复配置 + if getattr(setup_logging, "_configured", False): + return logging.getLogger("fastdeploy") + + # 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值 + if log_dir is None: + log_dir = getattr(envs, "FD_LOG_DIR", "logs") + + # 确保日志目录存在 + Path(log_dir).mkdir(parents=True, exist_ok=True) + + # 从环境变量获取日志级别和备份数量 + is_debug = int(getattr(envs, "FD_DEBUG", 0)) + FASTDEPLOY_LOGGING_LEVEL = "DEBUG" if is_debug else "INFO" + backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7)) + + _FORMAT = "%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s" + + # 默认配置 + default_config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": { + "class": "logging.Formatter", + "format": _FORMAT, + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + "colored": { + "class": "fastdeploy.util.formatters.ColoredFormatter", + "format": _FORMAT, + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": FASTDEPLOY_LOGGING_LEVEL, + "formatter": "colored", + "stream": "ext://sys.stdout", + }, + "error_file": { + "class": "fastdeploy.util.handlers.DailyRotatingFileHandler", + "level": "ERROR", + "formatter": "standard", + "filename": os.path.join(log_dir, "error.log"), + "backupCount": backup_count, + "encoding": "utf-8", + }, + "error_file2": { + "class": "fastdeploy.util.handlers.DailyFolderTimedRotatingFileHandler", + "level": "ERROR", + "formatter": "standard", + "filename": os.path.join(log_dir, "error.log"), + "when": "H", + "interval": 1, + "backupCount": 48, + "encoding": "utf-8", + }, + "default_file": { + "class": "fastdeploy.util.handlers.DailyRotatingFileHandler", + "level": FASTDEPLOY_LOGGING_LEVEL, + "formatter": "standard", + "filename": os.path.join(log_dir, "default.log"), + "backupCount": backup_count, + "encoding": "utf-8", + }, + "default_file2": { + "class": "fastdeploy.util.handlers.DailyFolderTimedRotatingFileHandler", + "level": FASTDEPLOY_LOGGING_LEVEL, + "formatter": "standard", + "filename": os.path.join(log_dir, "default.log"), + "when": "H", + "interval": 1, + "backupCount": 48, + "encoding": "utf-8", + }, + }, + "loggers": { + "fastdeploy": { + "level": "DEBUG", + "handlers": ["console", "error_file", "default_file", "error_file2", "default_file2"], + "propagate": False, + } + }, + "root": {"level": "WARNING", "handlers": ["console"]}, + } + + # 如果提供了配置文件,则加载配置文件 + if config_file and os.path.exists(config_file): + with open(config_file, "r", encoding="utf-8") as f: + config = json.load(f) + + # 合并环境变量配置到用户配置中 + if "handlers" in config: + for handler_name, handler_config in config["handlers"].items(): + if "backupCount" not in handler_config and "DailyRotating" in handler_config.get("class", ""): + handler_config["backupCount"] = backup_count + if handler_config.get("level") == "INFO" and is_debug: + handler_config["level"] = "DEBUG" + else: + config = default_config + + # 应用日志配置 + logging.config.dictConfig(config) + + # 避免重复加载 + setup_logging._configured = True + + # 返回fastdeploy的logger + return logging.getLogger("fastdeploy") diff --git a/fastdeploy/utils.py b/fastdeploy/utils.py index 4afcc72214..f61ade5005 100644 --- a/fastdeploy/utils.py +++ b/fastdeploy/utils.py @@ -40,6 +40,7 @@ from typing_extensions import TypeIs, assert_never from fastdeploy import envs +from fastdeploy.logger import get_logger T = TypeVar("T") @@ -191,38 +192,38 @@ def delete_expired_files(self): os.remove(str(self.base_log_path.with_name(file_name))) -def get_logger(name, file_name, without_formater=False, print_to_console=False): - """ - get logger - """ - log_dir = envs.FD_LOG_DIR - if not os.path.exists(log_dir): - os.mkdir(log_dir) - is_debug = int(envs.FD_DEBUG) - logger = logging.getLogger(name) - if is_debug: - logger.setLevel(level=logging.DEBUG) - else: - logger.setLevel(level=logging.INFO) - - for handler in logger.handlers[:]: - logger.removeHandler(handler) - - LOG_FILE = f"{log_dir}/{file_name}" - backup_count = int(envs.FD_LOG_BACKUP_COUNT) - handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count) - formatter = ColoredFormatter("%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s") - - console_handler = logging.StreamHandler() - if not without_formater: - handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - logger.addHandler(handler) - if print_to_console: - logger.addHandler(console_handler) - handler.propagate = False - console_handler.propagate = False - return logger +# def get_logger(name, file_name, without_formater=False, print_to_console=False): +# """ +# get logger +# """ +# log_dir = envs.FD_LOG_DIR +# if not os.path.exists(log_dir): +# os.mkdir(log_dir) +# is_debug = int(envs.FD_DEBUG) +# logger = logging.getLogger(name) +# if is_debug: +# logger.setLevel(level=logging.DEBUG) +# else: +# logger.setLevel(level=logging.INFO) + +# for handler in logger.handlers[:]: +# logger.removeHandler(handler) + +# LOG_FILE = f"{log_dir}/{file_name}" +# backup_count = int(envs.FD_LOG_BACKUP_COUNT) +# handler = DailyRotatingFileHandler(LOG_FILE, backupCount=backup_count) +# formatter = ColoredFormatter("%(levelname)-8s %(asctime)s %(process)-5s %(filename)s[line:%(lineno)d] %(message)s") + +# console_handler = logging.StreamHandler() +# if not without_formater: +# handler.setFormatter(formatter) +# console_handler.setFormatter(formatter) +# logger.addHandler(handler) +# if print_to_console: +# logger.addHandler(console_handler) +# handler.propagate = False +# console_handler.propagate = False +# return logger def str_to_datetime(date_string): diff --git a/test/test_logger.py b/test/test_logger.py new file mode 100644 index 0000000000..6704fd385f --- /dev/null +++ b/test/test_logger.py @@ -0,0 +1,98 @@ +""" +unittest 版本 +python -m unittest tests.test_logger -v +""" + +import logging +import os +import shutil +import tempfile +import unittest +from unittest.mock import patch + +from fastdeploy.logger import _get_legacy_logger, _get_unified_logger, get_logger + + +class LoggerTests(unittest.TestCase): + """logger 模块单元测试""" + + # ------------------------------------------------- + # 夹具:每个测试独占临时日志目录 + # ------------------------------------------------- + def setUp(self): + self.tmp_dir = tempfile.mkdtemp(prefix="fd_unittest_") + # 统一 patch 环境变量 + self.patchers = [ + patch("fastdeploy.envs.FD_LOG_DIR", self.tmp_dir), + patch("fastdeploy.envs.FD_DEBUG", "0"), + patch("fastdeploy.envs.FD_LOG_BACKUP_COUNT", "1"), + ] + for p in self.patchers: + p.start() + + def tearDown(self): + for p in self.patchers: + p.stop() + shutil.rmtree(self.tmp_dir, ignore_errors=True) + + # ------------------------------------------------- + # 测试 _get_unified_logger 命名空间 + # ------------------------------------------------- + def test_unified_namespace(self): + cases = [ + (None, "fastdeploy"), + ("__main__", "fastdeploy.main"), + ("fastdeploy.xxx", "fastdeploy.xxx"), + ("foo", "fastdeploy.foo"), + ] + for inp, expected in cases: + with self.subTest(inp=inp): + self.assertEqual(_get_unified_logger(inp).name, expected) + + # ------------------------------------------------- + # 测试 legacy logger 控制台输出(patch stderr) + # ------------------------------------------------- + @patch("sys.stderr", new_callable=lambda: open(os.devnull, "w")) + def test_legacy_console(self, mock_stderr): + # 这里简单验证 handler 数量即可;真正颜色在终端可见 + logger = _get_legacy_logger("console_test", "console_test.log", without_formater=False, print_to_console=True) + # 至少有一个 StreamHandler + handlers = [h for h in logger.handlers if isinstance(h, logging.StreamHandler)] + self.assertTrue(len(handlers) >= 1) + + # ------------------------------------------------- + # 测试 legacy logger 关闭格式化器 + # ------------------------------------------------- + def test_legacy_without_formatter(self): + logger = _get_legacy_logger("no_fmt", "no_fmt.log", without_formater=True) + logger.info("no fmt") + with open(os.path.join(self.tmp_dir, "no_fmt.log")) as f: + line = f.read().strip() + self.assertEqual(line, "no fmt") + + # ------------------------------------------------- + # 测试 DEBUG 级别开关 + # ------------------------------------------------- + def test_legacy_debug_level(self): + with patch("fastdeploy.envs.FD_DEBUG", "1"): + logger = _get_legacy_logger("debug", "debug.log") + self.assertEqual(logger.level, logging.DEBUG) + logger.debug("debug msg") + with open(os.path.join(self.tmp_dir, "debug.log")) as f: + self.assertIn("debug msg", f.read()) + + # ------------------------------------------------- + # 测试 get_logger 分支选择 + # ------------------------------------------------- + def test_get_logger_branch(self): + # 只给 name -> unified + unified_logger = get_logger("foo") + self.assertEqual(unified_logger.name, "fastdeploy.foo") + + # 给了 file_name -> legacy + legacy_logger = get_logger("foo", "foo.log") + self.assertTrue(legacy_logger.name.startswith("legacy.")) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/util/test_formatters.py b/test/util/test_formatters.py new file mode 100644 index 0000000000..754b246096 --- /dev/null +++ b/test/util/test_formatters.py @@ -0,0 +1,93 @@ +""" +单测:自定义日志格式化器 +""" + +import logging +import unittest + +from fastdeploy.util.formatters import ColoredFormatter + + +class TestColoredFormatter(unittest.TestCase): + """测试 ColoredFormatter 类""" + + def setUp(self): + """测试前准备""" + self.formatter = ColoredFormatter("%(levelname)s - %(message)s") + + def test_color_codes_definition(self): + """测试颜色代码定义""" + expected_colors = { + logging.WARNING: 33, # 黄色 + logging.ERROR: 31, # 红色 + logging.CRITICAL: 31, # 红色 + } + self.assertEqual(self.formatter.COLOR_CODES, expected_colors) + + def test_format_warning_message(self): + """测试 WARNING 级别日志格式化(黄色)""" + record = logging.LogRecord( + name="test", level=logging.WARNING, pathname="", lineno=0, msg="This is a warning", args=(), exc_info=None + ) + + formatted_message = self.formatter.format(record) + expected = "\033[33mWARNING - This is a warning\033[0m" + self.assertEqual(formatted_message, expected) + + def test_format_error_message(self): + """测试 ERROR 级别日志格式化(红色)""" + record = logging.LogRecord( + name="test", level=logging.ERROR, pathname="", lineno=0, msg="This is an error", args=(), exc_info=None + ) + + formatted_message = self.formatter.format(record) + expected = "\033[31mERROR - This is an error\033[0m" + self.assertEqual(formatted_message, expected) + + def test_format_critical_message(self): + """测试 CRITICAL 级别日志格式化(红色)""" + record = logging.LogRecord( + name="test", level=logging.CRITICAL, pathname="", lineno=0, msg="This is critical", args=(), exc_info=None + ) + + formatted_message = self.formatter.format(record) + expected = "\033[31mCRITICAL - This is critical\033[0m" + self.assertEqual(formatted_message, expected) + + def test_format_info_message(self): + """测试 INFO 级别日志格式化(无颜色)""" + record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, msg="This is info", args=(), exc_info=None + ) + + formatted_message = self.formatter.format(record) + expected = "INFO - This is info" + self.assertEqual(formatted_message, expected) + + def test_format_debug_message(self): + """测试 DEBUG 级别日志格式化(无颜色)""" + record = logging.LogRecord( + name="test", level=logging.DEBUG, pathname="", lineno=0, msg="This is debug", args=(), exc_info=None + ) + + formatted_message = self.formatter.format(record) + expected = "DEBUG - This is debug" + self.assertEqual(formatted_message, expected) + + def test_format_custom_level(self): + """测试自定义级别日志格式化(无颜色)""" + # 创建自定义级别 + custom_level = 25 # 介于 INFO(20) 和 WARNING(30) 之间 + record = logging.LogRecord( + name="test", level=custom_level, pathname="", lineno=0, msg="This is custom level", args=(), exc_info=None + ) + record.levelname = "CUSTOM" + + formatted_message = self.formatter.format(record) + expected = "CUSTOM - This is custom level" + self.assertEqual(formatted_message, expected) + + +if __name__ == "__main__": + # 运行测试时显示详细信息 + unittest.main(verbosity=2) diff --git a/test/util/test_handlers.py b/test/util/test_handlers.py new file mode 100644 index 0000000000..da20805911 --- /dev/null +++ b/test/util/test_handlers.py @@ -0,0 +1,185 @@ +""" +单测:自定义日志处理器 +""" + +import os +import shutil +import tempfile +import unittest +from datetime import datetime, timedelta +from logging import INFO, getLogger +from unittest.mock import MagicMock, patch + +from fastdeploy.util.handlers import ( + DailyFolderTimedRotatingFileHandler, + DailyRotatingFileHandler, +) + + +class TestDailyFolderTimedRotatingFileHandler(unittest.TestCase): + """测试 DailyFolderTimedRotatingFileHandler""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="fd_handler_test_") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_daily_folder_structure(self): + """测试每天一个目录,每小时一个文件""" + handler = DailyFolderTimedRotatingFileHandler( + os.path.join(self.temp_dir, "test.log"), when="H", interval=1, backupCount=3 + ) + logger = getLogger("test_daily_folder") + logger.addHandler(handler) + logger.setLevel(INFO) + + # 写入日志 + logger.info("Test log message") + handler.flush() # 确保日志写入 + handler.close() + + # 验证目录结构 + today = datetime.now().strftime("%Y-%m-%d") + log_dir = os.path.join(self.temp_dir, today) + self.assertTrue(os.path.isdir(log_dir)) + + log_file = os.path.join(log_dir, f"test_{datetime.now().strftime('%H')}.log") + self.assertTrue(os.path.isfile(log_file)) + + def test_rollover(self): + """测试跨天滚动""" + handler = DailyFolderTimedRotatingFileHandler( + os.path.join(self.temp_dir, "test.log"), when="H", interval=1, backupCount=3 + ) + logger = getLogger("test_rollover") + logger.addHandler(handler) + logger.setLevel(INFO) + + # 写入第一条日志 + logger.info("Test log message before rollover") + handler.flush() + + # 验证第一天的文件 + today = datetime.now().strftime("%Y-%m-%d") + today_dir = os.path.join(self.temp_dir, today) + self.assertTrue(os.path.isdir(today_dir)) + + # 模拟跨天 - 需要 mock 处理器内部使用的 datetime + tomorrow = datetime.now() + timedelta(days=1) + tomorrow_str = tomorrow.strftime("%Y-%m-%d") + + # 创建一个 mock 记录来触发滚动检查 + mock_record = MagicMock() + + # 修改处理器的当前天数来模拟跨天 + handler.current_day = tomorrow_str + + # 手动触发滚动 + with patch.object(handler, "_update_baseFilename") as mock_update: + # 设置新的基础文件名 + tomorrow_dir = os.path.join(self.temp_dir, tomorrow_str) + os.makedirs(tomorrow_dir, exist_ok=True) + new_filename = os.path.join(tomorrow_dir, f"test_{tomorrow.strftime('%H')}.log") + + def update_side_effect(): + handler.baseFilename = new_filename + + mock_update.side_effect = update_side_effect + + # 触发滚动 + if handler.shouldRollover(mock_record): + handler.doRollover() + + # 写入第二条日志到新文件 + logger.info("Test log message after rollover") + handler.flush() + handler.close() + + # 验证新目录存在 + self.assertTrue(os.path.isdir(tomorrow_dir)) + + +class TestDailyRotatingFileHandler(unittest.TestCase): + """测试 DailyRotatingFileHandler""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="fd_handler_test_") + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_daily_rotation(self): + """测试每天滚动""" + log_file = os.path.join(self.temp_dir, "test.log") + handler = DailyRotatingFileHandler(log_file, backupCount=3) + logger = getLogger("test_daily_rotation") + logger.addHandler(handler) + logger.setLevel(INFO) + + # 写入第一条日志 + logger.info("Test log message day 1") + handler.flush() + + # 模拟时间变化到第二天 + with patch.object(handler, "_compute_fn") as mock_compute: + tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") + new_filename = f"test.log.{tomorrow}" + mock_compute.return_value = new_filename + + # 手动触发滚动检查和执行 + mock_record = MagicMock() + if handler.shouldRollover(mock_record): + handler.doRollover() + + # 写入第二条日志 + logger.info("Test log message day 2") + handler.flush() + handler.close() + + # 验证文件存在 + today = datetime.now().strftime("%Y-%m-%d") + tomorrow = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") + + # 检查原始文件和带日期的文件 + base_file = os.path.join(self.temp_dir, "test.log") + today_file = os.path.join(self.temp_dir, f"test.log.{today}") + tomorrow_file = os.path.join(self.temp_dir, f"test.log.{tomorrow}") + + # 至少应该有一个文件存在 + files_exist = any([os.path.isfile(base_file), os.path.isfile(today_file), os.path.isfile(tomorrow_file)]) + self.assertTrue(files_exist, f"No log files found in {self.temp_dir}") + + def test_backup_count(self): + """测试备份文件数量限制""" + log_file = os.path.join(self.temp_dir, "test.log") + handler = DailyRotatingFileHandler(log_file, backupCount=2) + logger = getLogger("test_backup_count") + logger.addHandler(handler) + logger.setLevel(INFO) + + # 创建多个日期的日志文件 + base_date = datetime.now() + + for i in range(5): # 创建5天的日志 + date_str = (base_date - timedelta(days=i)).strftime("%Y-%m-%d") + test_file = os.path.join(self.temp_dir, f"test.log.{date_str}") + + # 直接创建文件 + with open(test_file, "w") as f: + f.write(f"Test log for {date_str}\n") + + # 触发清理 + handler.delete_expired_files() + handler.close() + + # 验证备份文件数量(应该保留最新的2个 + 当前文件) + log_files = [f for f in os.listdir(self.temp_dir) if f.startswith("test.log.")] + print(f"Log files found: {log_files}") # 调试输出 + + # backupCount=2 意味着应该最多保留2个备份文件 + self.assertLessEqual(len(log_files), 3) # 2个备份 + 可能的当前文件 + + +if __name__ == "__main__": + unittest.main() diff --git a/test/util/test_setup_logging.py b/test/util/test_setup_logging.py new file mode 100644 index 0000000000..d23850e5e9 --- /dev/null +++ b/test/util/test_setup_logging.py @@ -0,0 +1,123 @@ +""" +单测:setup_logging +""" + +import json +import logging +import os +import shutil +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from fastdeploy.util.setup_logging import setup_logging + + +class TestSetupLogging(unittest.TestCase): + + # ------------------------------------------------- + # 夹具:每个测试独占临时目录 + # ------------------------------------------------- + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="fd_setup_test_") + # 统一 patch 环境变量 + self.patches = [ + patch("fastdeploy.envs.FD_LOG_DIR", self.temp_dir), + patch("fastdeploy.envs.FD_DEBUG", "0"), + patch("fastdeploy.envs.FD_LOG_BACKUP_COUNT", "3"), + ] + [p.start() for p in self.patches] + + def tearDown(self): + [p.stop() for p in self.patches] + shutil.rmtree(self.temp_dir, ignore_errors=True) + # 清理单例标记,避免影响其他测试 + if hasattr(setup_logging, "_configured"): + delattr(setup_logging, "_configured") + + # ------------------------------------------------- + # 基础:目录自动创建 + # ------------------------------------------------- + def test_log_dir_created(self): + nested = os.path.join(self.temp_dir, "a", "b", "c") + setup_logging(log_dir=nested) + self.assertTrue(Path(nested).is_dir()) + + # ------------------------------------------------- + # 默认配置文件:文件 handler 不带颜色 + # ------------------------------------------------- + def test_default_config_file_no_ansi(self): + setup_logging() + logger = logging.getLogger("fastdeploy") + logger.error("test ansi") + + default_file = Path(self.temp_dir) / "default.log" + self.assertTrue(default_file.exists()) + with default_file.open() as f: + content = f.read() + # 文件中不应出现 ANSI 转义 + self.assertNotIn("\033[", content) + + # ------------------------------------------------- + # 调试级别开关 + # ------------------------------------------------- + def test_debug_level(self): + with patch("fastdeploy.envs.FD_DEBUG", "1"): + setup_logging() + logger = logging.getLogger("fastdeploy") + self.assertEqual(logger.level, logging.DEBUG) + # debug 消息应该能落到文件 + logger.debug("debug msg") + default_file = Path(self.temp_dir) / "default.log" + self.assertIn("debug msg", default_file.read_text()) + + # ------------------------------------------------- + # 自定义 JSON 配置文件加载 + # ------------------------------------------------- + def test_custom_config_file(self): + custom_cfg = { + "version": 1, + "disable_existing_loggers": False, + "formatters": {"plain": {"format": "%(message)s"}}, + "handlers": { + "custom": { + "class": "logging.FileHandler", + "filename": os.path.join(self.temp_dir, "custom.log"), + "formatter": "plain", + } + }, + "loggers": {"fastdeploy": {"handlers": ["custom"], "level": "INFO"}}, + } + cfg_path = Path(self.temp_dir) / "cfg.json" + cfg_path.write_text(json.dumps(custom_cfg)) + + setup_logging(config_file=str(cfg_path)) + logger = logging.getLogger("fastdeploy") + logger.info("from custom cfg") + + custom_file = Path(self.temp_dir) / "custom.log" + self.assertEqual(custom_file.read_text().strip(), "from custom cfg") + + # ------------------------------------------------- + # 重复调用 setup_logging 不会重复配置 + # ------------------------------------------------- + def test_configure_once(self): + logger1 = setup_logging() + logger2 = setup_logging() + self.assertIs(logger1, logger2) + + # ------------------------------------------------- + # 控制台 handler 使用 ColoredFormatter + # ------------------------------------------------- + @patch("logging.StreamHandler.emit") + def test_console_colored(self, mock_emit): + setup_logging() + logger = logging.getLogger("fastdeploy") + logger.error("color test") + # 只要 ColoredFormatter 被实例化即可,简单断言 emit 被调用 + self.assertTrue(mock_emit.called) + + +if __name__ == "__main__": + unittest.main()