a-cloud-all/.devops/monitor.py

434 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git 仓库监听器
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
"""
import os
import sys
import time
import yaml
import subprocess
from datetime import datetime
from pathlib import Path
# 添加当前目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入自定义模块
from scripts.log import Logger
from scripts import docker, maven, npm
from scripts.init import mysql, redis, nacos
from scripts.dingtalk import DingTalkNotifier
class GitMonitor:
"""Git 仓库监听器"""
def __init__(self, config_path='.devops/config.yaml'):
"""初始化监听器"""
self.config_path = config_path
self.config = None
self.last_commits = {}
self.global_branch = 'main'
self.project_root = None
self.runtime_path = None
self.dingtalk_notifier = None
# 初始化
self._print_startup_banner()
self._load_config()
self._init_paths()
self._init_dingtalk()
def _print_startup_banner(self):
"""打印启动横幅"""
print("\n")
Logger.separator()
print(" RuoYi Cloud DevOps 自动化部署系统")
Logger.separator()
print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
Logger.separator()
print("\n")
def _load_config(self):
"""加载配置文件"""
Logger.info(f"[步骤 1/3] 读取配置文件: {self.config_path}")
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.global_branch = self.config.get('global_branch', 'main')
# 初始化日志配置
log_config = self.config.get('logging', {})
log_file = log_config.get('file', '.devops/logs/devops.log')
max_size = log_config.get('max_size', 10485760)
Logger.init(log_file=log_file, max_size=max_size)
Logger.info(f"✓ 配置加载成功 - 全局分支: {self.global_branch}")
Logger.info(f"✓ 日志配置 - 文件: {log_file}, 最大大小: {max_size} 字节")
except Exception as e:
Logger.error(f"配置加载失败: {e}")
sys.exit(1)
def _init_paths(self):
"""初始化路径"""
Logger.info("[步骤 2/3] 初始化路径")
try:
self.project_root = Path(__file__).parent.parent.resolve()
runtime_path = self.config['main_repository']['runtime_path']
if not Path(runtime_path).is_absolute():
self.runtime_path = self.project_root / runtime_path
else:
self.runtime_path = Path(runtime_path)
Logger.info(f"✓ 路径初始化成功")
Logger.info(f" 项目根目录: {self.project_root}")
Logger.info(f" Runtime 目录: {self.runtime_path}")
except Exception as e:
Logger.error(f"路径初始化失败: {e}")
sys.exit(1)
def _init_dingtalk(self):
"""初始化钉钉通知器"""
try:
dingtalk_config = self.config.get('dingtalk', {})
if dingtalk_config.get('enabled', False):
access_token = dingtalk_config.get('access_token')
secret = dingtalk_config.get('secret')
if access_token:
self.dingtalk_notifier = DingTalkNotifier(access_token, secret)
Logger.info("✓ 钉钉通知已启用")
else:
Logger.warning("钉钉通知已启用但未配置 access_token")
else:
Logger.info("钉钉通知未启用")
except Exception as e:
Logger.warning(f"钉钉通知初始化失败: {e}")
def get_remote_commit(self, repo_url, branch):
"""获取远程仓库的最新提交 hash"""
try:
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
return result.stdout.split()[0]
return None
except Exception as e:
Logger.error(f"获取远程提交失败 {repo_url}: {e}")
return None
def check_repository(self, repo_config):
"""检查单个仓库是否有新提交"""
repo_name = repo_config['name']
repo_url = repo_config['url']
current_commit = self.get_remote_commit(repo_url, self.global_branch)
if not current_commit:
return False
last_commit = self.last_commits.get(repo_name)
if last_commit is None:
self.last_commits[repo_name] = current_commit
Logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
return False
if current_commit != last_commit:
Logger.info(f"检测到 {repo_name} 新提交: {last_commit[:8]} -> {current_commit[:8]}")
self.last_commits[repo_name] = current_commit
return True
return False
def update_main_repo(self):
"""更新主仓库和所有子模块"""
repo_path = self.runtime_path / 'a-cloud-all'
main_repo_url = self.config['main_repository']['url']
Logger.separator()
Logger.info("更新主仓库和子模块")
Logger.separator()
# 检查主仓库是否存在
if not (repo_path / '.git').exists():
Logger.info("主仓库不存在,开始克隆...")
self.runtime_path.mkdir(parents=True, exist_ok=True)
cmd = f"git clone --recurse-submodules {main_repo_url} a-cloud-all"
result = subprocess.run(cmd, shell=True, cwd=self.runtime_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("克隆主仓库失败")
return False
Logger.info("主仓库克隆成功")
else:
Logger.info("主仓库已存在,更新代码...")
# 切换到主分支
cmd = f"git checkout {self.global_branch}"
subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True)
# 拉取最新代码
cmd = "git pull"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("拉取主仓库失败")
return False
# 初始化和更新所有子模块(包括新增的子模块)
cmd = "git submodule update --init --recursive"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("初始化子模块失败")
return False
# 更新所有子模块到最新代码
cmd = f"git submodule foreach 'git checkout {self.global_branch} && git pull'"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("更新子模块失败")
return False
Logger.info("主仓库和子模块更新成功")
return True
def init_infrastructure(self):
"""初始化基础设施服务(动态读取配置)"""
repo_path = self.runtime_path / 'a-cloud-all'
# 从配置文件读取基础设施列表
infra_config = self.config.get('infrastructure', [])
for infra in infra_config:
service_name = infra['name']
docker_service = infra['docker_service']
wait_time = infra.get('wait_time', 10)
# 检查是否已初始化
flag_file = repo_path / '.devops' / f'.deployed_{service_name}'
if not flag_file.exists():
Logger.info(f"初始化 {service_name}...")
# 执行预部署命令(如果有)
pre_deploy_commands = infra.get('pre_deploy_commands', [])
if pre_deploy_commands:
Logger.info(f"执行 {service_name} 预部署命令...")
for cmd in pre_deploy_commands:
Logger.info(f"执行命令: {cmd}")
result = subprocess.run(
cmd,
shell=True,
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"预部署命令执行失败: {result.stderr}")
return False
# 构建并启动服务
docker_dir = repo_path / 'docker'
# 构建镜像
Logger.info(f"构建 {service_name} 镜像...")
build_cmd = f"docker-compose build --no-cache {docker_service}"
result = subprocess.run(
build_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 镜像构建失败: {result.stderr}")
return False
Logger.info(f"{service_name} 镜像构建成功")
# 启动容器
Logger.info(f"启动 {service_name} 容器...")
up_cmd = f"docker-compose up -d {docker_service}"
result = subprocess.run(
up_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 容器启动失败: {result.stderr}")
return False
Logger.info(f"{service_name} 容器启动成功")
# 创建标记文件
flag_file.parent.mkdir(parents=True, exist_ok=True)
flag_file.touch()
# 等待服务启动
Logger.info(f"等待 {service_name} 启动({wait_time}秒)...")
time.sleep(wait_time)
return True
def deploy(self, repo_config):
"""执行部署流程"""
repo_path = self.runtime_path / 'a-cloud-all'
repo_name = repo_config['name']
commit_hash = self.last_commits.get(repo_name, 'unknown')
start_time = time.time()
Logger.separator()
Logger.info(f"开始部署: {repo_name}")
Logger.separator()
# 发送构建开始通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_start(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash
)
try:
# 1. 更新主仓库和子模块
if not self.update_main_repo():
return False
# 2. 初始化基础设施
if not self.init_infrastructure():
return False
# 3. 根据项目类型执行打包
if repo_config['type'] == 'java':
# Maven 打包
work_dir = repo_path
commands = ' && '.join(repo_config['build_commands'])
source_path = repo_config['path'] + '/' + repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
if not maven.run_maven(work_dir, commands, source_path, target_dir):
return False
elif repo_config['type'] == 'nodejs':
# NPM 打包
work_dir = repo_path / repo_config['path']
commands = ' && '.join(repo_config['build_commands'])
source_dir = repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
if not npm.run_npm(work_dir, commands, source_dir, target_dir):
return False
# 4. Docker 部署
compose_dir = repo_path / 'docker'
service_name = repo_config['docker_service']
if not docker.run_docker_compose(compose_dir, service_name):
# 发送构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="Docker 部署失败"
)
return False
# 计算构建耗时
duration = time.time() - start_time
# 发送构建成功通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_success(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
duration=duration
)
Logger.info(f"部署完成: {repo_config['name']}")
return True
except Exception as e:
# 计算构建耗时
duration = time.time() - start_time
# 发送构建失败通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=str(e),
at_all=True
)
Logger.error(f"部署异常: {e}")
return False
def run_once(self):
"""执行一次检查"""
Logger.info("[步骤 3/3] 开始监听分支变化")
repos = self.config.get('repositories', [])
for repo_config in repos:
try:
if self.check_repository(repo_config):
Logger.info(f"触发部署: {repo_config['name']}")
if self.deploy(repo_config):
Logger.info(f"✓ 部署成功: {repo_config['name']}")
else:
Logger.error(f"✗ 部署失败: {repo_config['name']}")
except Exception as e:
Logger.error(f"处理仓库异常 {repo_config['name']}: {e}")
def run(self):
"""持续监听运行"""
poll_interval = self.config['monitor']['poll_interval']
Logger.info(f"开始持续监听,轮询间隔: {poll_interval}")
Logger.info("按 Ctrl+C 停止监听\n")
try:
while True:
self.run_once()
time.sleep(poll_interval)
except KeyboardInterrupt:
Logger.info("\n收到停止信号,退出监听")
except Exception as e:
Logger.error(f"监听异常: {e}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='Git 仓库监听器')
parser.add_argument('--config', default='.devops/config.yaml', help='配置文件路径')
parser.add_argument('--once', action='store_true', help='只执行一次检查')
args = parser.parse_args()
monitor = GitMonitor(args.config)
if args.once:
monitor.run_once()
else:
monitor.run()
if __name__ == '__main__':
main()