Skip to content
69 changes: 69 additions & 0 deletions trinity/cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,75 @@ def convert(
converter.convert(dir_path)


@app.command()
def log(
log_dir: Annotated[
str,
typer.Option(
"--log-dir",
"-d",
help="Path to the log directory. If provided, it will be used directly and ignore --config.",
),
] = "",
config: Annotated[
str,
typer.Option(
"--config",
help="Path to the config file. If provided, it will automatically locate the log directory based on the config.",
),
] = "",
keyword: Annotated[
Optional[str],
typer.Option("--keyword", "-k", help="The keyword to filter log files."),
] = None,
level: Annotated[
str,
typer.Option("--level", "-l", help="The minimum log level to display."),
] = "INFO",
last_n_lines: Annotated[
int,
typer.Option("--last-n-lines", "-n", help="Number of last lines to display when starting."),
] = 0,
search_pattern: Annotated[
Optional[str],
typer.Option(
"--search-pattern",
"-p",
help="The pattern to search in log files. Only search for history logs and display all lines containing the pattern.",
),
] = None,
no_color: Annotated[
bool,
typer.Option("--no-color", help="Disable colored output."),
] = False,
) -> None:
"""Monitor log files in real-time."""
from trinity.manager.log_manager import LogManager

if not config and not log_dir:
raise typer.BadParameter("Either --config or --log-dir must be provided.")
if not log_dir:
cfg = load_config(config)
checkpoint_job_dir = cfg.get_checkpoint_job_dir()
# we do not use check_and_update here because user may use this command
# in another environment
log_dir = os.path.join(checkpoint_job_dir, "log")
if not os.path.exists(log_dir):
raise FileNotFoundError(f"Log directory not found: {log_dir}")
if not os.path.exists(log_dir):
raise FileNotFoundError(f"Log directory not found: {log_dir}")

log_manager = LogManager(
log_dir,
keyword=keyword,
min_level=level,
color_output=not no_color,
last_n_lines=last_n_lines,
search_pattern=search_pattern,
)
log_manager.monitor()


def main() -> None:
"""The main entrypoint."""
app()
Expand Down
4 changes: 4 additions & 0 deletions trinity/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,10 @@ def get_envs(self) -> Dict[str, str]:
envs["TINKER_BASE_URL"] = self.model.tinker.base_url
return envs

def get_checkpoint_job_dir(self) -> str:
"""Get the checkpoint job dir."""
return os.path.join(self.checkpoint_root_dir, self.project, self.group, self.name)


def load_config(config_path: str) -> Config:
"""Load the configuration from the given path."""
Expand Down
4 changes: 1 addition & 3 deletions trinity/common/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ def validate(self, config: Config) -> None:
if not os.path.isabs(config.checkpoint_root_dir):
config.checkpoint_root_dir = os.path.join(os.getcwd(), config.checkpoint_root_dir)
# create a job dir at checkpoint_root_dir/project/name
config.checkpoint_job_dir = os.path.join(
config.checkpoint_root_dir, config.project, config.group, config.name
)
config.checkpoint_job_dir = config.get_checkpoint_job_dir()
# rename the experiment when necessary
if not config.continue_from_checkpoint and (
os.path.exists(config.checkpoint_job_dir) and os.listdir(config.checkpoint_job_dir)
Expand Down
279 changes: 279 additions & 0 deletions trinity/manager/log_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
"""logger manager"""

import os
import re
import threading
import time
from pathlib import Path
from typing import Dict, List, Set


class Colors:
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
GRAY = "\033[90m"


LOG_LEVELS = {
"DEBUG": 0,
"INFO": 1,
"WARNING": 2,
"WARN": 2,
"ERROR": 3,
"CRITICAL": 4,
"FATAL": 4,
}


class LogFileTracker:
"""Single log file tracker"""

def __init__(
self,
filepath: str,
min_level: int = 0,
color_output: bool = True,
last_n_lines: int = 0,
search_pattern: str | None = None,
):
self.filepath = filepath
self.min_level = min_level
self.color_output = color_output
self.last_n_lines = last_n_lines
self.search_pattern = search_pattern
self.file = None
self.file_size = 0
self.inode = None

def open_file(self):
"""Open file and optionally read last N lines"""
try:
self.file = open(self.filepath, "r", encoding="utf-8", errors="ignore")
if self.search_pattern:
print(
f"{Colors.CYAN}[INFO] Searching for pattern '{self.search_pattern}' in {self.filepath}{Colors.RESET}"
)
self.file.seek(0)
lines = self.file.readlines()
match_indices = [i for i, line in enumerate(lines) if self.search_pattern in line]
for idx in match_indices:
start = max(0, idx - 5)
end = min(len(lines), idx + 6)
print(f"{Colors.MAGENTA}[{self.filepath}:{idx + 1}]{Colors.RESET}")
for i in range(start, end):
prefix = f"{Colors.MAGENTA}>> {Colors.RESET}" if i == idx else " "
print(prefix + self.format_output(lines[i].rstrip("\n")))
print(
f"{Colors.CYAN}[INFO] Finished searching in {self.filepath}, now monitoring for new lines...{Colors.RESET}"
)
stat = os.stat(self.filepath)
self.inode = stat.st_ino
if self.last_n_lines > 0:
# Read last N lines
self.file.seek(0, 2)
file_size = self.file.tell()
block_size = 4096
blocks = []
lines_found = 0
pos = file_size
while pos > 0 and lines_found < self.last_n_lines:
read_size = min(block_size, pos)
pos -= read_size
self.file.seek(pos)
block = self.file.read(read_size)
blocks.insert(0, block)
lines_found = sum(b.count("\n") for b in blocks)
all_data = "".join(blocks)
last_lines = all_data.splitlines()[-self.last_n_lines :]
for line in last_lines:
print(self.format_output(line))
self.file.seek(0, 2)
else:
self.file.seek(0, 2)
self.file_size = self.file.tell()
return True
except Exception as e:
print(f"{Colors.RED}[ERROR] Failed to open {self.filepath}: {e}{Colors.RESET}")
return False

def check_rotation(self):
"""Check if file has been rotated"""
try:
stat = os.stat(self.filepath)
# detect rotation: inode changed or file size decreased
if stat.st_ino != self.inode or stat.st_size < self.file_size:
print(f"{Colors.CYAN}[INFO] Detected file rotation: {self.filepath}{Colors.RESET}")
if self.file:
self.file.close()
return True
return False
except FileNotFoundError:
return True
except Exception:
return False

def read_new_lines(self) -> List[str]:
"""Read newly added lines"""
lines = []
try:
if self.check_rotation():
if self.open_file():
return []

while True:
line = self.file.readline()
if not line:
break
lines.append(line.rstrip("\n"))

self.file_size = self.file.tell()
except Exception as e:
print(f"{Colors.RED}[ERROR] Error reading file {self.filepath}: {e}{Colors.RESET}")

return lines

def parse_log_level(self, line: str) -> int:
"""Parse log level"""
match = re.match(r"^(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)\b", line)
if match:
level = match.group(1).upper()
return LOG_LEVELS.get(level, -1)
return -1

def should_display(self, line: str) -> bool:
"""Determine if this log line should be displayed"""
priority = self.parse_log_level(line)

if priority >= self.min_level:
return True
return False

def format_output(self, line: str) -> str:
"""Format output"""
filename = Path(self.filepath).name

if self.color_output:
return f"{Colors.BLUE}[{filename}]{Colors.RESET} {line}"
else:
return f"[{filename}] {line}"

def close(self):
"""Close file"""
if self.file:
self.file.close()


class LogManager:
"""A manager to track multiple log files in real-time."""

def __init__(
self,
log_dir: str,
keyword: str | None = None,
min_level: str = "DEBUG",
scan_interval: float = 0.5,
last_n_lines: int = 0,
search_pattern: str | None = None,
color_output: bool = True,
):
self.log_dir = Path(log_dir)
self.keyword = keyword
self.min_level_name = min_level.upper()
self.min_level_priority = LOG_LEVELS.get(self.min_level_name, 0)
self.scan_interval = scan_interval
self.color_output = color_output
self.search_pattern = search_pattern
self.trackers: Dict[str, LogFileTracker] = {}
self.running = False
self.last_n_lines = last_n_lines
self.lock = threading.Lock()

def find_log_files(self) -> Set[str]:
"""Find matching log files"""
log_files = set()
try:
for file in self.log_dir.iterdir():
if file.is_file() and (self.keyword is None or self.keyword in file.name):
log_files.add(str(file.resolve()))
except Exception as e:
print(f"{Colors.RED}[ERROR] Failed to scan directory {self.log_dir}: {e}{Colors.RESET}")

return log_files

def scan_new_files(self):
"""Scan for newly added log files"""
current_files = self.find_log_files()

with self.lock:
for filepath in current_files:
if filepath not in self.trackers:
tracker = LogFileTracker(
filepath,
self.min_level_priority,
self.color_output,
self.last_n_lines,
self.search_pattern,
)
if tracker.open_file():
self.trackers[filepath] = tracker
print(f"{Colors.GREEN}[INFO] Started tracking: {filepath}{Colors.RESET}")

removed_files = set(self.trackers.keys()) - current_files
for filepath in removed_files:
self.trackers[filepath].close()
del self.trackers[filepath]
print(f"{Colors.YELLOW}[INFO] Stopped tracking: {filepath}{Colors.RESET}")

def monitor(self):
"""Main monitoring loop"""
self.running = True
last_scan = 0

print(f"{Colors.CYAN}{'=' * 60}{Colors.RESET}")
print(f"{Colors.CYAN}Log monitoring started{Colors.RESET}")
print(f"{Colors.CYAN}Monitoring directory: {self.log_dir}{Colors.RESET}")
print(f"{Colors.CYAN}Keyword: {self.keyword}{Colors.RESET}")
print(f"{Colors.CYAN}Minimum level: {self.min_level_name}{Colors.RESET}")
print(f"{Colors.CYAN}{'=' * 60}{Colors.RESET}\n")

# Initial scan
self.scan_new_files()

try:
while self.running:
current_time = time.time()

# Periodically scan for new files (every 5 seconds)
if current_time - last_scan > 5:
self.scan_new_files()
last_scan = current_time

# Read new content from all files
with self.lock:
for tracker in list(self.trackers.values()):
lines = tracker.read_new_lines()
for line in lines:
if tracker.should_display(line):
output = tracker.format_output(line)
print(output)

time.sleep(self.scan_interval)

except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}[INFO] Received stop signal, exiting...{Colors.RESET}")
finally:
self.stop()

def stop(self):
"""Stop monitoring"""
self.running = False
with self.lock:
for tracker in self.trackers.values():
tracker.close()
print(f"{Colors.GREEN}[INFO] Monitoring stopped{Colors.RESET}")