a-cloud-all/.devops/deployer.py

360 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
部署执行器
负责执行具体的部署任务
"""
import os
import sys
import logging
import subprocess
import shutil
import glob
from pathlib import Path
class Deployer:
"""部署执行器"""
def __init__(self, config):
"""初始化部署器"""
self.config = config
self.logger = logging.getLogger('Deployer')
# 获取项目根目录(.devops 的父目录)
project_root = Path(__file__).parent.parent.resolve()
# 将 runtime_path 转换为绝对路径
runtime_path = config['main_repository']['runtime_path']
if not Path(runtime_path).is_absolute():
self.runtime_path = project_root / runtime_path
else:
self.runtime_path = Path(runtime_path)
self.main_repo_url = config['main_repository']['url']
self.main_repo_branch = config['main_repository']['branch']
self.logger.info(f"项目根目录: {project_root}")
self.logger.info(f"Runtime 目录: {self.runtime_path}")
def run_command(self, cmd, cwd=None, timeout=600):
"""执行命令"""
cwd_str = str(cwd) if cwd else "当前目录"
self.logger.info(f"执行目录: {cwd_str}")
self.logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout
)
# 始终输出标准输出(如果有)
if result.stdout:
# 限制输出长度,避免日志过大
stdout_lines = result.stdout.strip().split('\n')
if len(stdout_lines) > 50:
self.logger.info(f"标准输出 (前30行):\n" + '\n'.join(stdout_lines[:30]))
self.logger.info(f"... (省略 {len(stdout_lines) - 50} 行)")
self.logger.info(f"标准输出 (后20行):\n" + '\n'.join(stdout_lines[-20:]))
else:
self.logger.info(f"标准输出:\n{result.stdout.strip()}")
if result.returncode != 0:
self.logger.error(f"命令执行失败 (退出码: {result.returncode})")
if result.stderr:
# 限制错误输出长度
stderr_lines = result.stderr.strip().split('\n')
if len(stderr_lines) > 50:
self.logger.error(f"错误输出 (前30行):\n" + '\n'.join(stderr_lines[:30]))
self.logger.error(f"... (省略 {len(stderr_lines) - 50} 行)")
self.logger.error(f"错误输出 (后20行):\n" + '\n'.join(stderr_lines[-20:]))
else:
self.logger.error(f"错误输出:\n{result.stderr.strip()}")
return False
self.logger.info("命令执行成功")
return True
except subprocess.TimeoutExpired:
self.logger.error(f"命令执行超时 (超时时间: {timeout}秒)")
return False
except Exception as e:
self.logger.error(f"命令执行异常: {e}")
return False
def ensure_main_repo(self):
"""确保主仓库存在并是最新的"""
# 克隆到 runtime/a-cloud-all 目录
repo_path = self.runtime_path / 'a-cloud-all'
# 检查是否是有效的 Git 仓库
if not (repo_path / '.git').exists():
self.logger.info("主仓库不存在,开始克隆...")
# 确保 runtime 目录存在
self.runtime_path.mkdir(parents=True, exist_ok=True)
# 克隆到 runtime/a-cloud-all 目录
cmd = f"git clone --recurse-submodules {self.main_repo_url} a-cloud-all"
if not self.run_command(cmd, cwd=self.runtime_path):
self.logger.error("克隆主仓库失败")
return False
self.logger.info("主仓库克隆成功")
else:
self.logger.info("主仓库已存在,更新代码...")
# 切换到指定分支
if not self.run_command(f"git checkout {self.main_repo_branch}", cwd=repo_path):
return False
# 拉取最新代码
if not self.run_command("git pull", cwd=repo_path):
return False
# 更新所有子模块
if not self.run_command("git submodule update --init --recursive", cwd=repo_path):
return False
self.logger.info("主仓库更新成功")
return True
def update_submodule(self, repo_config):
"""更新指定的子模块"""
repo_path = self.runtime_path / 'a-cloud-all'
submodule_path = repo_path / repo_config['path']
self.logger.info(f"更新子模块: {repo_config['name']}")
# 进入子模块目录
if not submodule_path.exists():
self.logger.error(f"子模块目录不存在: {submodule_path}")
return False
# 切换到指定分支
branch = repo_config['branch']
if not self.run_command(f"git checkout {branch}", cwd=submodule_path):
return False
# 拉取最新代码
if not self.run_command("git pull origin " + branch, cwd=submodule_path):
return False
self.logger.info(f"子模块更新成功: {repo_config['name']}")
return True
def build_project(self, repo_config):
"""构建项目"""
repo_path = self.runtime_path / 'a-cloud-all'
self.logger.info(f"开始构建: {repo_config['name']}")
# 执行构建命令(在主仓库根目录执行)
for cmd in repo_config['build_commands']:
self.logger.info(f"执行构建命令: {cmd}")
if not self.run_command(cmd, cwd=repo_path, timeout=1800):
self.logger.error(f"构建失败: {cmd}")
return False
self.logger.info(f"构建成功: {repo_config['name']}")
return True
def copy_artifacts(self, repo_config):
"""复制构建产物到 docker 目录"""
repo_path = self.runtime_path / 'a-cloud-all'
submodule_path = repo_path / repo_config['path']
self.logger.info(f"复制构建产物: {repo_config['name']}")
# 获取构建产物路径
artifact_pattern = submodule_path / repo_config['artifact_path']
artifacts = glob.glob(str(artifact_pattern))
if not artifacts:
self.logger.error(f"未找到构建产物: {artifact_pattern}")
return False
# 目标目录
docker_path = repo_path / repo_config['docker_path']
docker_path.mkdir(parents=True, exist_ok=True)
# 复制文件
for artifact in artifacts:
artifact_path = Path(artifact)
if artifact_path.is_file():
dest = docker_path / artifact_path.name
shutil.copy2(artifact, dest)
self.logger.info(f"复制文件: {artifact_path.name}")
elif artifact_path.is_dir():
# 如果是目录(如 dist清空目标目录后复制
if docker_path.exists():
for item in docker_path.iterdir():
if item.name != '.gitkeep':
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
shutil.copytree(artifact, docker_path, dirs_exist_ok=True)
self.logger.info(f"复制目录: {artifact_path.name}")
self.logger.info("构建产物复制完成")
return True
def run_deploy_script(self, repo_config):
"""执行部署脚本"""
repo_path = self.runtime_path / 'a-cloud-all'
script_name = repo_config['deploy_script']
script_path = repo_path / '.devops' / 'scripts' / script_name
if not script_path.exists():
self.logger.error(f"部署脚本不存在: {script_path}")
return False
self.logger.info(f"执行部署脚本: {script_name}")
# 准备脚本参数
docker_service = repo_config.get('docker_service', '')
docker_compose_path = self.config['deploy']['docker_compose_path']
# 执行脚本
cmd = f"bash {script_path} {repo_config['name']} {docker_service} {docker_compose_path}"
if not self.run_command(cmd, cwd=repo_path, timeout=600):
self.logger.error("部署脚本执行失败")
return False
self.logger.info("部署脚本执行成功")
return True
def commit_submodule_update(self, repo_config):
"""提交子模块更新到主仓库"""
if not self.config['deploy'].get('auto_commit', False):
self.logger.info("自动提交已禁用,跳过")
return True
repo_path = self.runtime_path / 'a-cloud-all'
self.logger.info("提交子模块更新到主仓库")
# 添加子模块更改
submodule_path = repo_config['path']
if not self.run_command(f"git add {submodule_path}", cwd=repo_path):
return False
# 检查是否有更改
result = subprocess.run(
"git diff --cached --quiet",
shell=True,
cwd=repo_path
)
if result.returncode == 0:
self.logger.info("没有需要提交的更改")
return True
# 提交更改
commit_msg = self.config['deploy']['commit_message'].format(
repo_name=repo_config['name']
)
if not self.run_command(f'git commit -m "{commit_msg}"', cwd=repo_path):
return False
# 推送到远程
if not self.run_command(f"git push origin {self.main_repo_branch}", cwd=repo_path):
self.logger.warning("推送失败,但部署已完成")
return True
self.logger.info("子模块更新已提交并推送")
return True
def deploy(self, repo_config):
"""执行完整的部署流程"""
self.logger.info(f"=" * 60)
self.logger.info(f"开始部署: {repo_config['name']}")
self.logger.info(f"=" * 60)
try:
# 1. 确保主仓库存在
if not self.ensure_main_repo():
return False
# 2. 部署基础设施(首次部署)
if not self.deploy_infrastructure():
return False
# 3. 更新子模块
if not self.update_submodule(repo_config):
return False
# 3. 构建项目
if not self.build_project(repo_config):
return False
# 4. 复制构建产物
if not self.copy_artifacts(repo_config):
return False
# 5. 执行部署脚本
if not self.run_deploy_script(repo_config):
return False
# 6. 提交子模块更新
if not self.commit_submodule_update(repo_config):
return False
self.logger.info(f"部署完成: {repo_config['name']}")
return True
except Exception as e:
self.logger.error(f"部署过程中发生异常: {e}", exc_info=True)
return False
def deploy_infrastructure(self):
"""部署基础设施服务(只部署一次)"""
if 'infrastructure' not in self.config:
return True
repo_path = self.runtime_path / 'a-cloud-all'
for infra in self.config['infrastructure']:
name = infra['name']
deployed_flag = repo_path / infra['deployed_flag']
# 检查是否已部署
if deployed_flag.exists():
self.logger.info(f"基础设施 {name} 已部署,跳过")
continue
self.logger.info(f"部署基础设施: {name}")
# 执行预部署命令
if 'pre_deploy_commands' in infra:
for cmd in infra['pre_deploy_commands']:
if not self.run_command(cmd, cwd=repo_path):
self.logger.error(f"预部署命令失败: {cmd}")
return False
# 部署服务
docker_service = infra['docker_service']
docker_dir = repo_path / 'docker'
cmd = f"docker-compose build --no-cache {docker_service} && docker-compose up -d {docker_service}"
if not self.run_command(cmd, cwd=docker_dir, timeout=1800):
self.logger.error(f"部署失败: {name}")
return False
# 创建部署标记
deployed_flag.parent.mkdir(parents=True, exist_ok=True)
deployed_flag.touch()
self.logger.info(f"基础设施部署完成: {name}")
return True