#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Git 仓库监听器 监听多个 Git 仓库的指定分支,检测到新提交时触发部署 """ import os import sys import time import yaml import subprocess import socket from datetime import datetime from pathlib import Path # 添加当前目录到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 导入自定义模块 from scripts.log import Logger from scripts import docker, maven, npm from scripts.init import mysql, redis, nacos from scripts.dingtalk import DingTalkNotifier class GitMonitor: """Git 仓库监听器""" def __init__(self, config_path='.devops/config.yaml'): """初始化监听器""" self.config_path = config_path self.config = None self.last_commits = {} self.last_tags = {} # 记录每个仓库的最新 tag self.global_branch = 'main' self.project_root = None self.runtime_path = None self.dingtalk_notifier = None self.watch_tags = False self.tag_pattern = "v*" # 初始化 self._print_startup_banner() self._load_config() self._init_paths() self._init_dingtalk() def _print_startup_banner(self): """打印启动横幅""" print("\n") Logger.separator() print(" RuoYi Cloud DevOps 自动化部署系统") Logger.separator() print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") Logger.separator() print("\n") def _load_config(self): """加载配置文件""" Logger.info(f"[步骤 1/3] 读取配置文件: {self.config_path}") try: with open(self.config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) self.global_branch = self.config.get('global_branch', 'main') # 加载 tag 监听配置 monitor_config = self.config.get('monitor', {}) self.watch_tags = monitor_config.get('watch_tags', False) self.tag_pattern = monitor_config.get('tag_pattern', 'v*') # 初始化日志配置 log_config = self.config.get('logging', {}) log_file = log_config.get('file', '.devops/logs/devops.log') max_size = log_config.get('max_size', 10485760) Logger.init(log_file=log_file, max_size=max_size) Logger.info(f"✓ 配置加载成功 - 全局分支: {self.global_branch}") Logger.info(f"✓ Tag 监听: {'已启用' if self.watch_tags else '未启用'} (模式: {self.tag_pattern})") Logger.info(f"✓ 日志配置 - 文件: {log_file}, 最大大小: {max_size} 字节") except Exception as e: Logger.error(f"配置加载失败: {e}") sys.exit(1) def _init_paths(self): """初始化路径""" Logger.info("[步骤 2/3] 初始化路径") try: self.project_root = Path(__file__).parent.parent.resolve() runtime_path = self.config['main_repository']['runtime_path'] if not Path(runtime_path).is_absolute(): self.runtime_path = self.project_root / runtime_path else: self.runtime_path = Path(runtime_path) Logger.info(f"✓ 路径初始化成功") Logger.info(f" 项目根目录: {self.project_root}") Logger.info(f" Runtime 目录: {self.runtime_path}") except Exception as e: Logger.error(f"路径初始化失败: {e}") sys.exit(1) def _init_dingtalk(self): """初始化钉钉通知器""" try: dingtalk_config = self.config.get('dingtalk', {}) if dingtalk_config.get('enabled', False): access_token = dingtalk_config.get('access_token') secret = dingtalk_config.get('secret') if access_token: self.dingtalk_notifier = DingTalkNotifier(access_token, secret) Logger.info("✓ 钉钉通知已启用") else: Logger.warning("钉钉通知已启用但未配置 access_token") else: Logger.info("钉钉通知未启用") except Exception as e: Logger.warning(f"钉钉通知初始化失败: {e}") def get_server_ip(self): """获取服务器IP地址""" try: # 创建一个UDP socket连接到外部地址来获取本机IP s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: try: # 备用方案:获取主机名对应的IP return socket.gethostbyname(socket.gethostname()) except Exception: return "unknown" def get_remote_commit(self, repo_url, branch): """获取远程仓库的最新提交 hash""" try: cmd = f"git ls-remote {repo_url} refs/heads/{branch}" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=30 ) if result.returncode == 0 and result.stdout: return result.stdout.split()[0] return None except Exception as e: Logger.error(f"获取远程提交失败 {repo_url}: {e}") return None def get_commit_message(self, repo_url, commit_hash): """获取指定 commit 的提交消息""" try: cmd = f"git ls-remote --heads {repo_url} | grep {commit_hash[:8]}" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=10 ) # 由于 ls-remote 无法获取 commit message,我们返回简短的 hash return f"提交 {commit_hash[:8]}" except Exception as e: Logger.error(f"获取提交消息失败: {e}") return f"提交 {commit_hash[:8]}" def get_remote_tags(self, repo_url): """获取远程仓库的所有 tags""" try: cmd = f"git ls-remote --tags {repo_url}" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=30 ) if result.returncode == 0 and result.stdout: tags = {} for line in result.stdout.strip().split('\n'): if line: parts = line.split() if len(parts) >= 2: commit_hash = parts[0] ref = parts[1] # 提取 tag 名称,去掉 refs/tags/ 前缀和 ^{} 后缀 if ref.startswith('refs/tags/'): tag_name = ref.replace('refs/tags/', '') if not tag_name.endswith('^{}'): tags[tag_name] = commit_hash return tags return {} except Exception as e: Logger.error(f"获取远程 tags 失败 {repo_url}: {e}") return {} def check_repository(self, repo_config): """检查单个仓库是否有新提交""" repo_name = repo_config['name'] repo_url = repo_config['url'] current_commit = self.get_remote_commit(repo_url, self.global_branch) if not current_commit: return False last_commit = self.last_commits.get(repo_name) if last_commit is None: self.last_commits[repo_name] = current_commit Logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}") return False if current_commit != last_commit: Logger.info(f"检测到 {repo_name} 新提交: {last_commit[:8]} -> {current_commit[:8]}") self.last_commits[repo_name] = current_commit return True return False def check_repository_tags(self, repo_config): """检查单个仓库是否有新 tag""" repo_name = repo_config['name'] repo_url = repo_config['url'] # 获取远程所有 tags current_tags = self.get_remote_tags(repo_url) if current_tags is None or (not current_tags and repo_name not in self.last_tags): # 首次获取失败或获取失败时发送通知 error_msg = f"获取 {repo_name} 远程 tags 失败" Logger.error(error_msg) if self.dingtalk_notifier and repo_name in self.last_tags: # 只有在之前成功获取过 tags 的情况下才发送通知,避免首次初始化时发送 self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash='unknown', error_msg=error_msg ) return False, None # 获取上次记录的 tags last_tags = self.last_tags.get(repo_name, {}) # 找出新增的 tags new_tags = [] for tag_name, commit_hash in current_tags.items(): # 检查 tag 是否匹配模式 import fnmatch if fnmatch.fnmatch(tag_name, self.tag_pattern): if tag_name not in last_tags: new_tags.append((tag_name, commit_hash)) # 更新记录 self.last_tags[repo_name] = current_tags if new_tags: # 返回最新的 tag new_tags.sort(reverse=True) # 按名称排序,最新的在前 latest_tag = new_tags[0] Logger.info(f"检测到 {repo_name} 新 tag: {latest_tag[0]} ({latest_tag[1][:8]})") return True, latest_tag return False, None def update_main_repo(self): """更新主仓库和所有子模块""" repo_path = self.runtime_path / 'a-cloud-all' main_repo_url = self.config['main_repository']['url'] Logger.separator() Logger.info("更新主仓库和子模块") Logger.separator() # 检查主仓库是否存在 if not (repo_path / '.git').exists(): Logger.info("主仓库不存在,开始克隆...") self.runtime_path.mkdir(parents=True, exist_ok=True) cmd = f"git clone --recurse-submodules {main_repo_url} a-cloud-all" result = subprocess.run(cmd, shell=True, cwd=self.runtime_path, capture_output=True, text=True) if result.returncode != 0: Logger.error("克隆主仓库失败") return False Logger.info("主仓库克隆成功") else: Logger.info("主仓库已存在,更新代码...") # 切换到主分支 cmd = f"git checkout {self.global_branch}" subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True) # 拉取最新代码 cmd = "git pull" result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True) if result.returncode != 0: Logger.error("拉取主仓库失败") return False # 初始化和更新所有子模块(包括新增的子模块) cmd = "git submodule update --init --recursive" result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True) if result.returncode != 0: Logger.error("初始化子模块失败") return False # 更新所有子模块到最新代码 cmd = f"git submodule foreach 'git checkout {self.global_branch} && git pull'" result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True) if result.returncode != 0: Logger.error("更新子模块失败") return False Logger.info("主仓库和子模块更新成功") return True def init_infrastructure(self): """初始化基础设施服务(动态读取配置)""" repo_path = self.runtime_path / 'a-cloud-all' # 从配置文件读取基础设施列表 infra_config = self.config.get('infrastructure', []) for infra in infra_config: service_name = infra['name'] docker_service = infra['docker_service'] wait_time = infra.get('wait_time', 10) # 检查是否已初始化 flag_file = repo_path / '.devops' / f'.deployed_{service_name}' if not flag_file.exists(): Logger.info(f"初始化 {service_name}...") # 执行预部署命令(如果有) pre_deploy_commands = infra.get('pre_deploy_commands', []) if pre_deploy_commands: Logger.info(f"执行 {service_name} 预部署命令...") for cmd in pre_deploy_commands: Logger.info(f"执行命令: {cmd}") result = subprocess.run( cmd, shell=True, cwd=repo_path, capture_output=True, text=True ) if result.returncode != 0: Logger.error(f"预部署命令执行失败: {result.stderr}") return False # 构建并启动服务 docker_dir = repo_path / 'docker' # 构建镜像 Logger.info(f"构建 {service_name} 镜像...") build_cmd = f"docker-compose build --no-cache {docker_service}" result = subprocess.run( build_cmd, shell=True, cwd=docker_dir, capture_output=True, text=True ) if result.returncode != 0: Logger.error(f"{service_name} 镜像构建失败: {result.stderr}") return False Logger.info(f"{service_name} 镜像构建成功") # 启动容器 Logger.info(f"启动 {service_name} 容器...") up_cmd = f"docker-compose up -d {docker_service}" result = subprocess.run( up_cmd, shell=True, cwd=docker_dir, capture_output=True, text=True ) if result.returncode != 0: Logger.error(f"{service_name} 容器启动失败: {result.stderr}") return False Logger.info(f"{service_name} 容器启动成功") # 创建标记文件 flag_file.parent.mkdir(parents=True, exist_ok=True) flag_file.touch() # 等待服务启动 Logger.info(f"等待 {service_name} 启动({wait_time}秒)...") time.sleep(wait_time) return True def deploy(self, repo_config, tag_name=None): """执行部署流程 参数: repo_config: 仓库配置 tag_name: 可选的 tag 名称,如果提供则表示这是由 tag 触发的部署 """ repo_path = self.runtime_path / 'a-cloud-all' repo_name = repo_config['name'] commit_hash = self.last_commits.get(repo_name, 'unknown') start_time = time.time() Logger.separator() Logger.info(f"开始部署: {repo_name}") if tag_name: Logger.info(f"触发方式: Tag ({tag_name})") else: Logger.info(f"触发方式: 分支提交") Logger.separator() try: # 1. 更新主仓库和子模块 if not self.update_main_repo(): # 发送 Git 更新失败通知 if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg="Git 仓库更新失败(主仓库或子模块)" ) return False # 获取子仓库的 commit message commit_message = None submodule_path = repo_path / repo_config['path'] if submodule_path.exists(): try: cmd = f"git log -1 --pretty=format:'%s' {commit_hash}" result = subprocess.run( cmd, shell=True, cwd=submodule_path, capture_output=True, text=True, timeout=10 ) if result.returncode == 0 and result.stdout: commit_message = result.stdout.strip() Logger.info(f"提交消息: {commit_message}") except Exception as e: Logger.warning(f"获取提交消息失败: {e}") # 获取服务器 IP server_ip = self.get_server_ip() # 发送构建开始通知(包含 commit message 和服务器 IP) if self.dingtalk_notifier: # 如果是 tag 触发,在 commit_message 中添加 tag 信息 display_message = commit_message if tag_name: display_message = f"Tag: {tag_name}" + (f" - {commit_message}" if commit_message else "") self.dingtalk_notifier.send_build_start( repo_name=repo_name, branch=self.global_branch if not tag_name else f"tag/{tag_name}", commit_hash=commit_hash, commit_message=display_message, server_ip=server_ip ) # 2. 初始化基础设施 if not self.init_infrastructure(): # 发送基础设施初始化失败通知 if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg="基础设施初始化失败(MySQL/Redis/Nacos等)" ) return False # 3. 根据项目类型执行打包 if repo_config['type'] == 'java': # Maven 打包 work_dir = repo_path commands = ' && '.join(repo_config['build_commands']) source_path = repo_config['path'] + '/' + repo_config['artifact_path'] target_dir = repo_path / repo_config['docker_path'] success, error_msg = maven.run_maven(work_dir, commands, source_path, target_dir) if not success: # 发送 Maven 构建失败通知 if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg=f"Maven 打包失败: {error_msg}" ) return False elif repo_config['type'] == 'nodejs': # NPM 打包 work_dir = repo_path / repo_config['path'] commands = ' && '.join(repo_config['build_commands']) source_dir = repo_config['artifact_path'] target_dir = repo_path / repo_config['docker_path'] success, error_msg = npm.run_npm(work_dir, commands, source_dir, target_dir) if not success: # 发送 NPM/PNPM 构建失败通知 if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg=f"NPM/PNPM 打包失败: {error_msg}" ) return False elif repo_config['type'] == 'python': # Python 项目 - 直接复制源码到 docker 目录 Logger.separator() Logger.info("开始 Python 项目部署") Logger.separator() source_path = repo_path / repo_config['path'] target_dir = repo_path / repo_config['docker_path'] Logger.info(f"源码目录: {source_path}") Logger.info(f"目标目录: {target_dir}") try: # 清空目标目录(保留 .gitkeep 等隐藏文件) if target_dir.exists(): import shutil for item in target_dir.iterdir(): if not item.name.startswith('.'): if item.is_dir(): shutil.rmtree(item) else: item.unlink() Logger.info("清空目标目录完成") else: target_dir.mkdir(parents=True, exist_ok=True) Logger.info("创建目标目录完成") # 复制源码到目标目录 import shutil for item in source_path.iterdir(): if item.name in ['.git', '__pycache__', '.pytest_cache', '.venv', 'venv']: continue # 跳过不需要的目录 target_item = target_dir / item.name if item.is_dir(): if target_item.exists(): shutil.rmtree(target_item) shutil.copytree(item, target_item) else: shutil.copy2(item, target_item) Logger.info("源码复制完成") except Exception as e: error_msg = f"Python 项目源码复制失败: {str(e)}" Logger.error(error_msg) if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg=error_msg ) return False # 4. Docker 部署 compose_dir = repo_path / 'docker' service_name = repo_config['docker_service'] if not docker.run_docker_compose(compose_dir, service_name): # 发送构建失败通知 if self.dingtalk_notifier: duration = time.time() - start_time self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg="Docker 部署失败" ) return False # 计算构建耗时 duration = time.time() - start_time # 发送构建成功通知 if self.dingtalk_notifier: self.dingtalk_notifier.send_build_success( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, duration=duration ) Logger.info(f"部署完成: {repo_config['name']}") return True except Exception as e: # 计算构建耗时 duration = time.time() - start_time # 发送构建失败通知 if self.dingtalk_notifier: self.dingtalk_notifier.send_build_failure( repo_name=repo_name, branch=self.global_branch, commit_hash=commit_hash, error_msg=str(e), at_all=True ) Logger.error(f"部署异常: {e}") return False def run_once(self): """执行一次检查""" Logger.info("[步骤 3/3] 开始监听分支变化") repos = self.config.get('repositories', []) for repo_config in repos: try: # 检查分支提交 if self.check_repository(repo_config): Logger.info(f"触发部署: {repo_config['name']} (分支提交)") if self.deploy(repo_config): Logger.info(f"✓ 部署成功: {repo_config['name']}") else: Logger.error(f"✗ 部署失败: {repo_config['name']}") continue # 已经部署,跳过 tag 检查 # 检查 tag(如果启用) if self.watch_tags: has_new_tag, tag_info = self.check_repository_tags(repo_config) if has_new_tag and tag_info: tag_name, commit_hash = tag_info Logger.info(f"触发部署: {repo_config['name']} (新 tag: {tag_name})") # 更新 last_commits 以便 deploy 方法使用 self.last_commits[repo_config['name']] = commit_hash if self.deploy(repo_config, tag_name=tag_name): Logger.info(f"✓ 部署成功: {repo_config['name']}") else: Logger.error(f"✗ 部署失败: {repo_config['name']}") except Exception as e: Logger.error(f"处理仓库异常 {repo_config['name']}: {e}") # 发送异常通知 if self.dingtalk_notifier: commit_hash = self.last_commits.get(repo_config['name'], 'unknown') self.dingtalk_notifier.send_build_failure( repo_name=repo_config['name'], branch=self.global_branch, commit_hash=commit_hash, error_msg=f"处理仓库时发生异常: {str(e)}", at_all=True ) def run(self): """持续监听运行""" poll_interval = self.config['monitor']['poll_interval'] Logger.info(f"开始持续监听,轮询间隔: {poll_interval} 秒") Logger.info("按 Ctrl+C 停止监听\n") try: while True: self.run_once() time.sleep(poll_interval) except KeyboardInterrupt: Logger.info("\n收到停止信号,退出监听") except Exception as e: Logger.error(f"监听异常: {e}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='Git 仓库监听器') parser.add_argument('--config', default='.devops/config.yaml', help='配置文件路径') parser.add_argument('--once', action='store_true', help='只执行一次检查') args = parser.parse_args() monitor = GitMonitor(args.config) if args.once: monitor.run_once() else: monitor.run() if __name__ == '__main__': main()