a-cloud-all/.devops/monitor.py

503 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git 仓库监听器
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
"""
import os
import sys
import time
import yaml
import subprocess
import socket
from datetime import datetime
from pathlib import Path
# 添加当前目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入自定义模块
from scripts.log import Logger
from scripts import docker, maven, npm
from scripts.init import mysql, redis, nacos
from scripts.dingtalk import DingTalkNotifier
class GitMonitor:
"""Git 仓库监听器"""
def __init__(self, config_path='.devops/config.yaml'):
"""初始化监听器"""
self.config_path = config_path
self.config = None
self.last_commits = {}
self.global_branch = 'main'
self.project_root = None
self.runtime_path = None
self.dingtalk_notifier = None
# 初始化
self._print_startup_banner()
self._load_config()
self._init_paths()
self._init_dingtalk()
def _print_startup_banner(self):
"""打印启动横幅"""
print("\n")
Logger.separator()
print(" RuoYi Cloud DevOps 自动化部署系统")
Logger.separator()
print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
Logger.separator()
print("\n")
def _load_config(self):
"""加载配置文件"""
Logger.info(f"[步骤 1/3] 读取配置文件: {self.config_path}")
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.global_branch = self.config.get('global_branch', 'main')
# 初始化日志配置
log_config = self.config.get('logging', {})
log_file = log_config.get('file', '.devops/logs/devops.log')
max_size = log_config.get('max_size', 10485760)
Logger.init(log_file=log_file, max_size=max_size)
Logger.info(f"✓ 配置加载成功 - 全局分支: {self.global_branch}")
Logger.info(f"✓ 日志配置 - 文件: {log_file}, 最大大小: {max_size} 字节")
except Exception as e:
Logger.error(f"配置加载失败: {e}")
sys.exit(1)
def _init_paths(self):
"""初始化路径"""
Logger.info("[步骤 2/3] 初始化路径")
try:
self.project_root = Path(__file__).parent.parent.resolve()
runtime_path = self.config['main_repository']['runtime_path']
if not Path(runtime_path).is_absolute():
self.runtime_path = self.project_root / runtime_path
else:
self.runtime_path = Path(runtime_path)
Logger.info(f"✓ 路径初始化成功")
Logger.info(f" 项目根目录: {self.project_root}")
Logger.info(f" Runtime 目录: {self.runtime_path}")
except Exception as e:
Logger.error(f"路径初始化失败: {e}")
sys.exit(1)
def _init_dingtalk(self):
"""初始化钉钉通知器"""
try:
dingtalk_config = self.config.get('dingtalk', {})
if dingtalk_config.get('enabled', False):
access_token = dingtalk_config.get('access_token')
secret = dingtalk_config.get('secret')
if access_token:
self.dingtalk_notifier = DingTalkNotifier(access_token, secret)
Logger.info("✓ 钉钉通知已启用")
else:
Logger.warning("钉钉通知已启用但未配置 access_token")
else:
Logger.info("钉钉通知未启用")
except Exception as e:
Logger.warning(f"钉钉通知初始化失败: {e}")
def get_server_ip(self):
"""获取服务器IP地址"""
try:
# 创建一个UDP socket连接到外部地址来获取本机IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
try:
# 备用方案获取主机名对应的IP
return socket.gethostbyname(socket.gethostname())
except Exception:
return "unknown"
def get_remote_commit(self, repo_url, branch):
"""获取远程仓库的最新提交 hash"""
try:
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
return result.stdout.split()[0]
return None
except Exception as e:
Logger.error(f"获取远程提交失败 {repo_url}: {e}")
return None
def get_commit_message(self, repo_url, commit_hash):
"""获取指定 commit 的提交消息"""
try:
cmd = f"git ls-remote --heads {repo_url} | grep {commit_hash[:8]}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=10
)
# 由于 ls-remote 无法获取 commit message我们返回简短的 hash
return f"提交 {commit_hash[:8]}"
except Exception as e:
Logger.error(f"获取提交消息失败: {e}")
return f"提交 {commit_hash[:8]}"
def check_repository(self, repo_config):
"""检查单个仓库是否有新提交"""
repo_name = repo_config['name']
repo_url = repo_config['url']
current_commit = self.get_remote_commit(repo_url, self.global_branch)
if not current_commit:
return False
last_commit = self.last_commits.get(repo_name)
if last_commit is None:
self.last_commits[repo_name] = current_commit
Logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
return False
if current_commit != last_commit:
Logger.info(f"检测到 {repo_name} 新提交: {last_commit[:8]} -> {current_commit[:8]}")
self.last_commits[repo_name] = current_commit
return True
return False
def update_main_repo(self):
"""更新主仓库和所有子模块"""
repo_path = self.runtime_path / 'a-cloud-all'
main_repo_url = self.config['main_repository']['url']
Logger.separator()
Logger.info("更新主仓库和子模块")
Logger.separator()
# 检查主仓库是否存在
if not (repo_path / '.git').exists():
Logger.info("主仓库不存在,开始克隆...")
self.runtime_path.mkdir(parents=True, exist_ok=True)
cmd = f"git clone --recurse-submodules {main_repo_url} a-cloud-all"
result = subprocess.run(cmd, shell=True, cwd=self.runtime_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("克隆主仓库失败")
return False
Logger.info("主仓库克隆成功")
else:
Logger.info("主仓库已存在,更新代码...")
# 切换到主分支
cmd = f"git checkout {self.global_branch}"
subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True)
# 拉取最新代码
cmd = "git pull"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("拉取主仓库失败")
return False
# 初始化和更新所有子模块(包括新增的子模块)
cmd = "git submodule update --init --recursive"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("初始化子模块失败")
return False
# 更新所有子模块到最新代码
cmd = f"git submodule foreach 'git checkout {self.global_branch} && git pull'"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("更新子模块失败")
return False
Logger.info("主仓库和子模块更新成功")
return True
def init_infrastructure(self):
"""初始化基础设施服务(动态读取配置)"""
repo_path = self.runtime_path / 'a-cloud-all'
# 从配置文件读取基础设施列表
infra_config = self.config.get('infrastructure', [])
for infra in infra_config:
service_name = infra['name']
docker_service = infra['docker_service']
wait_time = infra.get('wait_time', 10)
# 检查是否已初始化
flag_file = repo_path / '.devops' / f'.deployed_{service_name}'
if not flag_file.exists():
Logger.info(f"初始化 {service_name}...")
# 执行预部署命令(如果有)
pre_deploy_commands = infra.get('pre_deploy_commands', [])
if pre_deploy_commands:
Logger.info(f"执行 {service_name} 预部署命令...")
for cmd in pre_deploy_commands:
Logger.info(f"执行命令: {cmd}")
result = subprocess.run(
cmd,
shell=True,
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"预部署命令执行失败: {result.stderr}")
return False
# 构建并启动服务
docker_dir = repo_path / 'docker'
# 构建镜像
Logger.info(f"构建 {service_name} 镜像...")
build_cmd = f"docker-compose build --no-cache {docker_service}"
result = subprocess.run(
build_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 镜像构建失败: {result.stderr}")
return False
Logger.info(f"{service_name} 镜像构建成功")
# 启动容器
Logger.info(f"启动 {service_name} 容器...")
up_cmd = f"docker-compose up -d {docker_service}"
result = subprocess.run(
up_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 容器启动失败: {result.stderr}")
return False
Logger.info(f"{service_name} 容器启动成功")
# 创建标记文件
flag_file.parent.mkdir(parents=True, exist_ok=True)
flag_file.touch()
# 等待服务启动
Logger.info(f"等待 {service_name} 启动({wait_time}秒)...")
time.sleep(wait_time)
return True
def deploy(self, repo_config):
"""执行部署流程"""
repo_path = self.runtime_path / 'a-cloud-all'
repo_name = repo_config['name']
commit_hash = self.last_commits.get(repo_name, 'unknown')
start_time = time.time()
Logger.separator()
Logger.info(f"开始部署: {repo_name}")
Logger.separator()
try:
# 1. 更新主仓库和子模块
if not self.update_main_repo():
return False
# 获取子仓库的 commit message
commit_message = None
submodule_path = repo_path / repo_config['path']
if submodule_path.exists():
try:
cmd = f"git log -1 --pretty=format:'%s' {commit_hash}"
result = subprocess.run(
cmd, shell=True, cwd=submodule_path,
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout:
commit_message = result.stdout.strip()
Logger.info(f"提交消息: {commit_message}")
except Exception as e:
Logger.warning(f"获取提交消息失败: {e}")
# 获取服务器 IP
server_ip = self.get_server_ip()
# 发送构建开始通知(包含 commit message 和服务器 IP
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_start(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
commit_message=commit_message,
server_ip=server_ip
)
# 2. 初始化基础设施
if not self.init_infrastructure():
return False
# 3. 根据项目类型执行打包
if repo_config['type'] == 'java':
# Maven 打包
work_dir = repo_path
commands = ' && '.join(repo_config['build_commands'])
source_path = repo_config['path'] + '/' + repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
if not maven.run_maven(work_dir, commands, source_path, target_dir):
# 发送 Maven 构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="Maven 打包失败"
)
return False
elif repo_config['type'] == 'nodejs':
# NPM 打包
work_dir = repo_path / repo_config['path']
commands = ' && '.join(repo_config['build_commands'])
source_dir = repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
if not npm.run_npm(work_dir, commands, source_dir, target_dir):
# 发送 NPM/PNPM 构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="NPM/PNPM 打包失败"
)
return False
# 4. Docker 部署
compose_dir = repo_path / 'docker'
service_name = repo_config['docker_service']
if not docker.run_docker_compose(compose_dir, service_name):
# 发送构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="Docker 部署失败"
)
return False
# 计算构建耗时
duration = time.time() - start_time
# 发送构建成功通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_success(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
duration=duration
)
Logger.info(f"部署完成: {repo_config['name']}")
return True
except Exception as e:
# 计算构建耗时
duration = time.time() - start_time
# 发送构建失败通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=str(e),
at_all=True
)
Logger.error(f"部署异常: {e}")
return False
def run_once(self):
"""执行一次检查"""
Logger.info("[步骤 3/3] 开始监听分支变化")
repos = self.config.get('repositories', [])
for repo_config in repos:
try:
if self.check_repository(repo_config):
Logger.info(f"触发部署: {repo_config['name']}")
if self.deploy(repo_config):
Logger.info(f"✓ 部署成功: {repo_config['name']}")
else:
Logger.error(f"✗ 部署失败: {repo_config['name']}")
except Exception as e:
Logger.error(f"处理仓库异常 {repo_config['name']}: {e}")
def run(self):
"""持续监听运行"""
poll_interval = self.config['monitor']['poll_interval']
Logger.info(f"开始持续监听,轮询间隔: {poll_interval}")
Logger.info("按 Ctrl+C 停止监听\n")
try:
while True:
self.run_once()
time.sleep(poll_interval)
except KeyboardInterrupt:
Logger.info("\n收到停止信号,退出监听")
except Exception as e:
Logger.error(f"监听异常: {e}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='Git 仓库监听器')
parser.add_argument('--config', default='.devops/config.yaml', help='配置文件路径')
parser.add_argument('--once', action='store_true', help='只执行一次检查')
args = parser.parse_args()
monitor = GitMonitor(args.config)
if args.once:
monitor.run_once()
else:
monitor.run()
if __name__ == '__main__':
main()