#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 日志管理模块 提供统一的日志输出功能,支持控制台和文件输出 """ import os from datetime import datetime from pathlib import Path class Logger: """日志管理器""" # 颜色定义 RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' NC = '\033[0m' # No Color # 日志配置 _log_file = None _max_size = 10485760 # 10MB _initialized = False @classmethod def init(cls, log_file=None, max_size=None): """ 初始化日志配置 参数: log_file: 日志文件路径 max_size: 日志文件最大大小(字节) """ if log_file: cls._log_file = Path(log_file) # 确保日志目录存在 cls._log_file.parent.mkdir(parents=True, exist_ok=True) if max_size: cls._max_size = max_size cls._initialized = True @classmethod def _rotate_log(cls): """日志轮转:如果日志文件超过最大大小,进行轮转""" if not cls._log_file or not cls._log_file.exists(): return if cls._log_file.stat().st_size >= cls._max_size: # 轮转日志文件 backup_file = cls._log_file.with_suffix('.log.1') if backup_file.exists(): backup_file.unlink() cls._log_file.rename(backup_file) @classmethod def _write_log(cls, level, message): """写入日志到控制台和文件""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 控制台输出(带颜色) color_map = { 'INFO': cls.GREEN, 'ERROR': cls.RED, 'WARN': cls.YELLOW, 'DEBUG': cls.BLUE } color = color_map.get(level, cls.NC) print(f"{color}[{level}]{cls.NC} {timestamp} - {message}") # 文件输出(不带颜色) if cls._log_file: cls._rotate_log() log_line = f"[{level}] {timestamp} - {message}\n" with open(cls._log_file, 'a', encoding='utf-8') as f: f.write(log_line) @classmethod def info(cls, message): """输出信息日志""" cls._write_log('INFO', message) @classmethod def error(cls, message): """输出错误日志""" cls._write_log('ERROR', message) @classmethod def warn(cls, message): """输出警告日志""" cls._write_log('WARN', message) @classmethod def warning(cls, message): """输出警告日志(warn 的别名)""" cls._write_log('WARN', message) @classmethod def debug(cls, message): """输出调试日志""" cls._write_log('DEBUG', message) @staticmethod def separator(): """输出分隔线""" print("=" * 60) @classmethod def run_command(cls, cmd, cwd=None): """ 执行命令并实时输出日志到控制台和文件 参数: cmd: 要执行的命令 cwd: 工作目录 返回: bool: 成功返回 True,失败返回 False """ import subprocess cls.info(f"执行命令: {cmd}") cls.info(f"工作目录: {cwd if cwd else '当前目录'}") try: process = subprocess.Popen( cmd, shell=True, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) # 实时读取并输出 for line in process.stdout: line = line.rstrip() if line: # 直接打印到控制台和写入文件 print(line) if cls._log_file: cls._rotate_log() with open(cls._log_file, 'a', encoding='utf-8') as f: f.write(line + '\n') process.wait() return process.returncode == 0 except Exception as e: cls.error(f"命令执行异常: {e}") return False