a-cloud-all/.devops/monitor.py

190 lines
6.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git 仓库监听器
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
"""
import os
import sys
import time
import yaml
import logging
import subprocess
from datetime import datetime
from pathlib import Path
# 添加当前目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from deployer import Deployer
class GitMonitor:
"""Git 仓库监听器"""
def __init__(self, config_path='.devops/config.yaml'):
"""初始化监听器"""
self.config_path = config_path
self.config = self._load_config()
self._setup_logging()
self.deployer = Deployer(self.config)
self.last_commits = {} # 存储每个仓库的最后一次提交 hash
self.logger.info("Git 监听器初始化完成")
def _load_config(self):
"""加载配置文件"""
with open(self.config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _setup_logging(self):
"""设置日志"""
log_config = self.config.get('logging', {})
log_level = getattr(logging, log_config.get('level', 'INFO'))
log_file = log_config.get('file', '.devops/logs/devops.log')
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 配置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# 配置 logger
self.logger = logging.getLogger('GitMonitor')
self.logger.setLevel(log_level)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def get_remote_commit(self, repo_url, branch):
"""获取远程仓库的最新提交 hash"""
try:
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
commit_hash = result.stdout.split()[0]
return commit_hash
return None
except Exception as e:
self.logger.error(f"获取远程提交失败 {repo_url}: {e}")
return None
def check_repository(self, repo_config):
"""检查单个仓库是否有新提交"""
repo_name = repo_config['name']
repo_url = repo_config['url']
branch = repo_config['branch']
self.logger.debug(f"检查仓库: {repo_name}")
# 获取最新提交
current_commit = self.get_remote_commit(repo_url, branch)
if not current_commit:
self.logger.warning(f"无法获取 {repo_name} 的最新提交")
return False
# 检查是否有新提交
last_commit = self.last_commits.get(repo_name)
if last_commit is None:
# 首次检查,记录当前提交
self.last_commits[repo_name] = current_commit
self.logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
return False
if current_commit != last_commit:
self.logger.info(
f"检测到 {repo_name} 有新提交: {last_commit[:8]} -> {current_commit[:8]}"
)
self.last_commits[repo_name] = current_commit
return True
return False
def get_enabled_repos(self):
"""获取需要监听的仓库列表"""
enabled = self.config['monitor'].get('enabled_repos', [])
all_repos = self.config['repositories']
if not enabled:
# 空列表表示监听所有仓库
return all_repos
# 只返回启用的仓库
return [repo for repo in all_repos if repo['name'] in enabled]
def run_once(self):
"""执行一次检查"""
repos = self.get_enabled_repos()
self.logger.info(f"开始检查 {len(repos)} 个仓库...")
for repo_config in repos:
try:
if self.check_repository(repo_config):
# 检测到新提交,触发部署
self.logger.info(f"触发部署: {repo_config['name']}")
success = self.deployer.deploy(repo_config)
if success:
self.logger.info(f"部署成功: {repo_config['name']}")
else:
self.logger.error(f"部署失败: {repo_config['name']}")
except Exception as e:
self.logger.error(f"处理仓库 {repo_config['name']} 时出错: {e}", exc_info=True)
def run(self):
"""持续监听运行"""
poll_interval = self.config['monitor']['poll_interval']
self.logger.info(f"开始监听 Git 仓库,轮询间隔: {poll_interval}")
self.logger.info("按 Ctrl+C 停止监听")
try:
while True:
self.run_once()
time.sleep(poll_interval)
except KeyboardInterrupt:
self.logger.info("收到停止信号,退出监听")
except Exception as e:
self.logger.error(f"监听过程中发生错误: {e}", exc_info=True)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='Git 仓库监听器')
parser.add_argument(
'--config',
default='.devops/config.yaml',
help='配置文件路径'
)
parser.add_argument(
'--once',
action='store_true',
help='只执行一次检查,不持续监听'
)
args = parser.parse_args()
monitor = GitMonitor(args.config)
if args.once:
monitor.run_once()
else:
monitor.run()
if __name__ == '__main__':
main()