a-cloud-all/.devops/deployer.py

418 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
部署执行器
负责执行具体的部署任务
"""
import os
import sys
import logging
import subprocess
import shutil
import glob
from pathlib import Path
class Deployer:
"""部署执行器"""
def __init__(self, config):
"""初始化部署器"""
self.config = config
self.logger = logging.getLogger('Deployer')
# 获取项目根目录(.devops 的父目录)
project_root = Path(__file__).parent.parent.resolve()
# 将 runtime_path 转换为绝对路径
runtime_path = config['main_repository']['runtime_path']
if not Path(runtime_path).is_absolute():
self.runtime_path = project_root / runtime_path
else:
self.runtime_path = Path(runtime_path)
self.main_repo_url = config['main_repository']['url']
self.main_repo_branch = config['main_repository']['branch']
self.logger.info(f"项目根目录: {project_root}")
self.logger.info(f"Runtime 目录: {self.runtime_path}")
def run_command(self, cmd, cwd=None, timeout=600):
"""执行命令"""
cwd_str = str(cwd) if cwd else "当前目录"
self.logger.info(f"执行目录: {cwd_str}")
self.logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout
)
# 始终输出标准输出(如果有)
if result.stdout:
# 限制输出长度,避免日志过大
stdout_lines = result.stdout.strip().split('\n')
if len(stdout_lines) > 50:
self.logger.info(f"标准输出 (前30行):\n" + '\n'.join(stdout_lines[:30]))
self.logger.info(f"... (省略 {len(stdout_lines) - 50} 行)")
self.logger.info(f"标准输出 (后20行):\n" + '\n'.join(stdout_lines[-20:]))
else:
self.logger.info(f"标准输出:\n{result.stdout.strip()}")
if result.returncode != 0:
self.logger.error(f"命令执行失败 (退出码: {result.returncode})")
if result.stderr:
# 限制错误输出长度
stderr_lines = result.stderr.strip().split('\n')
if len(stderr_lines) > 50:
self.logger.error(f"错误输出 (前30行):\n" + '\n'.join(stderr_lines[:30]))
self.logger.error(f"... (省略 {len(stderr_lines) - 50} 行)")
self.logger.error(f"错误输出 (后20行):\n" + '\n'.join(stderr_lines[-20:]))
else:
self.logger.error(f"错误输出:\n{result.stderr.strip()}")
return False
self.logger.info("命令执行成功")
return True
except subprocess.TimeoutExpired:
self.logger.error(f"命令执行超时 (超时时间: {timeout}秒)")
return False
except Exception as e:
self.logger.error(f"命令执行异常: {e}")
return False
def ensure_main_repo(self):
"""确保主仓库存在并是最新的"""
# 克隆到 runtime/a-cloud-all 目录
repo_path = self.runtime_path / 'a-cloud-all'
# 检查是否是有效的 Git 仓库
if not (repo_path / '.git').exists():
self.logger.info("主仓库不存在,开始克隆...")
# 确保 runtime 目录存在
self.runtime_path.mkdir(parents=True, exist_ok=True)
# 克隆到 runtime/a-cloud-all 目录
cmd = f"git clone --recurse-submodules {self.main_repo_url} a-cloud-all"
if not self.run_command(cmd, cwd=self.runtime_path):
self.logger.error("克隆主仓库失败")
return False
self.logger.info("主仓库克隆成功")
else:
self.logger.info("主仓库已存在,更新代码...")
# 切换到指定分支
if not self.run_command(f"git checkout {self.main_repo_branch}", cwd=repo_path):
return False
# 拉取最新代码
if not self.run_command("git pull", cwd=repo_path):
return False
# 初始化子模块(如果还没初始化)
if not self.run_command("git submodule update --init --recursive", cwd=repo_path):
return False
# 更新所有子模块到各自配置的分支
self.logger.info("更新所有子模块到最新代码...")
if not self.update_all_submodules(repo_path):
return False
self.logger.info("主仓库更新成功")
return True
def update_all_submodules(self, repo_path):
"""更新所有子模块到各自配置的分支"""
self.logger.info("更新所有子模块到各自配置的分支...")
# 构建分支映射:子模块路径 -> 分支名
# 注意:这里假设所有子模块使用相同的分支,如果需要不同分支,需要逐个处理
# 先尝试使用 git submodule foreach 批量更新
# 检查是否所有子模块使用相同的分支
branches = set(repo['branch'] for repo in self.config['repositories'])
if len(branches) == 1:
# 所有子模块使用相同分支,可以使用 foreach
branch = branches.pop()
self.logger.info(f"所有子模块使用相同分支 {branch},使用 git submodule foreach 批量更新")
cmd = f"git submodule foreach 'git checkout {branch} && git pull origin {branch}'"
if not self.run_command(cmd, cwd=repo_path, timeout=600):
self.logger.warning("批量更新子模块失败")
return False
else:
# 不同子模块使用不同分支,需要逐个更新
self.logger.info("子模块使用不同分支,逐个更新...")
for repo_config in self.config['repositories']:
branch = repo_config['branch']
submodule_path = repo_config['path']
self.logger.info(f"更新子模块 {repo_config['name']} 到分支 {branch}")
# 构建命令:进入子模块,切换分支并拉取
cmd = f"cd {submodule_path} && git checkout {branch} && git pull origin {branch}"
if not self.run_command(cmd, cwd=repo_path, timeout=300):
self.logger.warning(f"子模块 {repo_config['name']} 更新失败,继续处理其他子模块")
continue
return True
def update_submodule(self, repo_config):
"""更新指定的子模块"""
repo_path = self.runtime_path / 'a-cloud-all'
submodule_path = repo_path / repo_config['path']
self.logger.info(f"更新子模块: {repo_config['name']}")
# 进入子模块目录
if not submodule_path.exists():
self.logger.error(f"子模块目录不存在: {submodule_path}")
return False
# 切换到指定分支
branch = repo_config['branch']
if not self.run_command(f"git checkout {branch}", cwd=submodule_path):
return False
# 拉取最新代码
if not self.run_command("git pull origin " + branch, cwd=submodule_path):
return False
self.logger.info(f"子模块更新成功: {repo_config['name']}")
return True
def build_project(self, repo_config):
"""构建项目"""
repo_path = self.runtime_path / 'a-cloud-all'
self.logger.info(f"开始构建: {repo_config['name']}")
# 根据项目类型选择执行目录
if repo_config['type'] == 'nodejs':
# Node.js 项目在子模块目录执行
build_dir = repo_path / repo_config['path']
self.logger.info(f"Node.js 项目,在子模块目录执行构建")
else:
# Java 项目在主仓库根目录执行
build_dir = repo_path
self.logger.info(f"Java 项目,在主仓库根目录执行构建")
# 执行构建命令
for cmd in repo_config['build_commands']:
self.logger.info(f"执行构建命令: {cmd}")
if not self.run_command(cmd, cwd=build_dir, timeout=1800):
self.logger.error(f"构建失败: {cmd}")
return False
self.logger.info(f"构建成功: {repo_config['name']}")
return True
def copy_artifacts(self, repo_config):
"""复制构建产物到 docker 目录"""
repo_path = self.runtime_path / 'a-cloud-all'
submodule_path = repo_path / repo_config['path']
self.logger.info(f"复制构建产物: {repo_config['name']}")
# 获取构建产物路径
artifact_pattern = submodule_path / repo_config['artifact_path']
artifacts = glob.glob(str(artifact_pattern))
if not artifacts:
self.logger.error(f"未找到构建产物: {artifact_pattern}")
return False
# 目标目录
docker_path = repo_path / repo_config['docker_path']
docker_path.mkdir(parents=True, exist_ok=True)
# 复制文件
for artifact in artifacts:
artifact_path = Path(artifact)
if artifact_path.is_file():
dest = docker_path / artifact_path.name
shutil.copy2(artifact, dest)
self.logger.info(f"复制文件: {artifact_path.name}")
elif artifact_path.is_dir():
# 如果是目录(如 dist清空目标目录后复制
if docker_path.exists():
for item in docker_path.iterdir():
if item.name != '.gitkeep':
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
shutil.copytree(artifact, docker_path, dirs_exist_ok=True)
self.logger.info(f"复制目录: {artifact_path.name}")
self.logger.info("构建产物复制完成")
return True
def run_deploy_script(self, repo_config):
"""执行部署脚本"""
repo_path = self.runtime_path / 'a-cloud-all'
script_name = repo_config['deploy_script']
script_path = repo_path / '.devops' / 'scripts' / script_name
if not script_path.exists():
self.logger.error(f"部署脚本不存在: {script_path}")
return False
self.logger.info(f"执行部署脚本: {script_name}")
# 准备脚本参数
docker_service = repo_config.get('docker_service', '')
docker_compose_path = self.config['deploy']['docker_compose_path']
# 执行脚本
cmd = f"bash {script_path} {repo_config['name']} {docker_service} {docker_compose_path}"
if not self.run_command(cmd, cwd=repo_path, timeout=600):
self.logger.error("部署脚本执行失败")
return False
self.logger.info("部署脚本执行成功")
return True
def commit_submodule_update(self, repo_config):
"""提交子模块更新到主仓库"""
if not self.config['deploy'].get('auto_commit', False):
self.logger.info("自动提交已禁用,跳过")
return True
repo_path = self.runtime_path / 'a-cloud-all'
self.logger.info("提交子模块更新到主仓库")
# 添加子模块更改
submodule_path = repo_config['path']
if not self.run_command(f"git add {submodule_path}", cwd=repo_path):
return False
# 检查是否有更改
result = subprocess.run(
"git diff --cached --quiet",
shell=True,
cwd=repo_path
)
if result.returncode == 0:
self.logger.info("没有需要提交的更改")
return True
# 提交更改
commit_msg = self.config['deploy']['commit_message'].format(
repo_name=repo_config['name']
)
if not self.run_command(f'git commit -m "{commit_msg}"', cwd=repo_path):
return False
# 推送前先拉取远程最新代码
self.logger.info("推送前先拉取远程最新代码...")
if not self.run_command(f"git pull --rebase origin {self.main_repo_branch}", cwd=repo_path):
self.logger.warning("拉取远程代码失败,尝试直接推送")
# 推送到远程
if not self.run_command(f"git push origin {self.main_repo_branch}", cwd=repo_path):
self.logger.warning("推送失败,但部署已完成")
return True
self.logger.info("子模块更新已提交并推送")
return True
def deploy(self, repo_config):
"""执行完整的部署流程"""
self.logger.info(f"=" * 60)
self.logger.info(f"开始部署: {repo_config['name']}")
self.logger.info(f"=" * 60)
try:
# 1. 确保主仓库存在
if not self.ensure_main_repo():
return False
# 2. 部署基础设施(首次部署)
if not self.deploy_infrastructure():
return False
# 3. 更新子模块
if not self.update_submodule(repo_config):
return False
# 3. 构建项目
if not self.build_project(repo_config):
return False
# 4. 复制构建产物
if not self.copy_artifacts(repo_config):
return False
# 5. 执行部署脚本
if not self.run_deploy_script(repo_config):
return False
self.logger.info(f"部署完成: {repo_config['name']}")
return True
except Exception as e:
self.logger.error(f"部署过程中发生异常: {e}", exc_info=True)
return False
def deploy_infrastructure(self):
"""部署基础设施服务(只部署一次)"""
if 'infrastructure' not in self.config:
return True
repo_path = self.runtime_path / 'a-cloud-all'
for infra in self.config['infrastructure']:
name = infra['name']
deployed_flag = repo_path / infra['deployed_flag']
# 检查是否已部署
if deployed_flag.exists():
self.logger.info(f"基础设施 {name} 已部署,跳过")
continue
self.logger.info(f"部署基础设施: {name}")
# 执行预部署命令
if 'pre_deploy_commands' in infra:
for cmd in infra['pre_deploy_commands']:
if not self.run_command(cmd, cwd=repo_path):
self.logger.error(f"预部署命令失败: {cmd}")
return False
# 部署服务
docker_service = infra['docker_service']
docker_dir = repo_path / 'docker'
cmd = f"docker-compose build --no-cache {docker_service} && docker-compose up -d {docker_service}"
if not self.run_command(cmd, cwd=docker_dir, timeout=1800):
self.logger.error(f"部署失败: {name}")
return False
# 等待服务启动(特别是 MySQL 和 Redis
wait_time = infra.get('wait_time', 10)
self.logger.info(f"等待 {name} 启动完成 ({wait_time} 秒)...")
import time
time.sleep(wait_time)
# 创建部署标记
deployed_flag.parent.mkdir(parents=True, exist_ok=True)
deployed_flag.touch()
self.logger.info(f"基础设施部署完成: {name}")
return True