a-cloud-all/.devops/monitor.py

196 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git 仓库监听器
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
"""
import os
import sys
import time
import yaml
import logging
import subprocess
from datetime import datetime
from pathlib import Path
# 添加当前目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from deployer import Deployer
class GitMonitor:
"""Git 仓库监听器"""
def __init__(self, config_path='.devops/config.yaml'):
"""初始化监听器"""
self.config_path = config_path
self.config = self._load_config()
self._setup_logging()
self.deployer = Deployer(self.config)
self.last_commits = {} # 存储每个仓库的最后一次提交 hash
self.logger.info("Git 监听器初始化完成")
def _load_config(self):
"""加载配置文件"""
with open(self.config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _setup_logging(self):
"""设置日志"""
log_config = self.config.get('logging', {})
log_level = getattr(logging, log_config.get('level', 'INFO'))
log_file = log_config.get('file', '.devops/logs/devops.log')
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 配置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(log_level)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
# 配置根 logger让所有子 logger 都能输出
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# 配置当前 logger
self.logger = logging.getLogger('GitMonitor')
self.logger.setLevel(log_level)
def get_remote_commit(self, repo_url, branch):
"""获取远程仓库的最新提交 hash"""
try:
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
commit_hash = result.stdout.split()[0]
return commit_hash
return None
except Exception as e:
self.logger.error(f"获取远程提交失败 {repo_url}: {e}")
return None
def check_repository(self, repo_config):
"""检查单个仓库是否有新提交"""
repo_name = repo_config['name']
repo_url = repo_config['url']
branch = repo_config['branch']
self.logger.debug(f"检查仓库: {repo_name}")
# 获取最新提交
current_commit = self.get_remote_commit(repo_url, branch)
if not current_commit:
self.logger.warning(f"无法获取 {repo_name} 的最新提交")
return False
# 检查是否有新提交
last_commit = self.last_commits.get(repo_name)
if last_commit is None:
# 首次检查,记录当前提交
self.last_commits[repo_name] = current_commit
self.logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
return False
if current_commit != last_commit:
self.logger.info(
f"检测到 {repo_name} 有新提交: {last_commit[:8]} -> {current_commit[:8]}"
)
self.last_commits[repo_name] = current_commit
return True
return False
def get_enabled_repos(self):
"""获取需要监听的仓库列表"""
enabled = self.config['monitor'].get('enabled_repos', [])
all_repos = self.config['repositories']
if not enabled:
# 空列表表示监听所有仓库
return all_repos
# 只返回启用的仓库
return [repo for repo in all_repos if repo['name'] in enabled]
def run_once(self):
"""执行一次检查"""
repos = self.get_enabled_repos()
self.logger.info(f"开始检查 {len(repos)} 个仓库...")
for repo_config in repos:
try:
if self.check_repository(repo_config):
# 检测到新提交,触发部署
self.logger.info(f"触发部署: {repo_config['name']}")
success = self.deployer.deploy(repo_config)
if success:
self.logger.info(f"部署成功: {repo_config['name']}")
else:
self.logger.error(f"部署失败: {repo_config['name']}")
except Exception as e:
self.logger.error(f"处理仓库 {repo_config['name']} 时出错: {e}", exc_info=True)
def run(self):
"""持续监听运行"""
poll_interval = self.config['monitor']['poll_interval']
self.logger.info(f"开始监听 Git 仓库,轮询间隔: {poll_interval}")
self.logger.info("按 Ctrl+C 停止监听")
try:
while True:
self.run_once()
time.sleep(poll_interval)
except KeyboardInterrupt:
self.logger.info("收到停止信号,退出监听")
except Exception as e:
self.logger.error(f"监听过程中发生错误: {e}", exc_info=True)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='Git 仓库监听器')
parser.add_argument(
'--config',
default='.devops/config.yaml',
help='配置文件路径'
)
parser.add_argument(
'--once',
action='store_true',
help='只执行一次检查,不持续监听'
)
args = parser.parse_args()
monitor = GitMonitor(args.config)
if args.once:
monitor.run_once()
else:
monitor.run()
if __name__ == '__main__':
main()