自动化单mysql多实例库的全量迁移脚本-v2版本

内容分享2小时前发布
0 0 0

介绍

较之前的版本,将合并的导入导出拆分为3个场景,并重复的导出动作进行了文档归档能力。以下将重新阐述能力

一、核心功能模块

1. 多线程数据库迁移

支持并行处理多个数据库(xargs -P实现并发)可配置线程数(THREADS参数控制并发度)导出/导入操作独立线程池管理

2. 双模式运行机制

模式 功能描述 关键特性
both 同时执行导出+导入 全自动流水线操作,迁移ID自动生成
export 仅导出数据到本地文件 自动创建时间戳目录
import 从指定目录导入数据 严格校验迁移ID一致性

3. 迁移安全控制

迁移ID机制:
导出模式:生成8位SHA1随机ID(date +%s%N | sha1sum | head -c 8)导入模式:从文件提取ID并验证一致性
事务保障:
导出:–single-transaction确保一致性快照导入:SET foreign_key_checks=0禁用约束检查

⚙️ 二、关键技术实现

性能优化设计
压缩加速:使用pigz并行压缩(-9最高压缩比)替代gzipMySQL参数优化:


--max-allowed-packet=1G       # 提升大字段处理能力
--net-buffer-length=16384     # 优化网络传输效率
--skip-lock-tables            # 避免锁表阻塞

健壮性保障
错误隔离:
每个数据库独立错误日志(${db}_${MIGRATION_ID}.export.err)失败任务记录到failed_imports.txt/failed_exports.txt
预检查机制:
命令依赖检查(mysql/mysqldump/pigz)双端数据库连接测试迁移文件存在性验证

日志系统
分级日志(info/warning/error)双输出:控制台关键信息+文件全量记录结构化日志格式:
[2023-11-04 15:30:22] [INFO] 开始导出: orders (ID:8a3d7f2b)

📂 三、配置与扩展能力

灵活输入支持
数据库来源:命令行直接指定(-d ‘db1,db2’)或文件读取(-f dblist.txt)连接参数覆盖:支持运行时动态设置源/目标数据库凭证
目录管理策略

变量 功能说明 默认行为
EXPORT_DIR 导出存储路径 按时间自动生成目录
IMPORT_DIR 导入数据源路径 必须显式指定
MAIN_DIR 运行时主目录 模式自适应(export=导出目录)

清理机制
both模式自动清理中间压缩文件(保留日志)独立保留失败任务记录便于重试

📊 四、典型应用场景

跨环境迁移


# 生产库→测试库全迁移(双机模式)
./migrate.sh -d 'orders,users' --src-host prod-db --dest-host test-db

灾难恢复


# 从备份目录紧急恢复
./migrate.sh -m import -i /backups/20231104 -d critical_db

数据归档


# 仅导出历史数据
./migrate.sh -m export -d archive_2015..2020 -e /mnt/backups

⚠️ 五、局限性及注意事项

安全性限制
密码明文传递(需配合Vault等密钥管理工具)无SSL连接支持(需手动添加–ssl-mode参数)
超大库处理
单库>1TB时需调整–max_allowed_packet无分片导出能力(需拆分子库处理)
版本依赖
必需组件:MySQL 5.7+, pigz, GNU xargs不兼容BSD/macOS原生工具链

六、脚本内容如下:


#!/bin/bash
# 多线程MySQL迁移脚本 (v4.3 - 优化迁移ID逻辑)
# ===== 配置区 =====
export THREADS=4                  # 并发线程数(默认值)
export MODE="both"                # 运行模式(both/import/export)
export SRC_HOST="source-db.example.com" # 源数据库地址
export SRC_PORT=3306              # 源数据库端口
export SRC_USER="admin"           # 源数据库用户
export SRC_PASS="secure_password" # 源数据库密码
export DEST_HOST="target-db.example.com" # 目标数据库地址
export DEST_PORT=3306             # 目标数据库端口
export DEST_USER="admin"          # 目标数据库用户
export DEST_PASS="secure_password" # 目标数据库密码
export EXPORT_DIR="migration_data_$(date +%Y%m%d-%H%M%S)" # 导出文件夹
export IMPORT_DIR=""              # 导入文件夹(默认空)
# ===== 参数解析 =====
usage() {
  echo "用法: $0 -d 'db1,db2,...' [选项]"
  echo "选项:"
  echo "  -d, --databases  要迁移的数据库列表(逗号分隔)"
  echo "  -f, --dbfile     包含数据库列表的文件(每行一个)"
  echo "  -t, --threads    并发线程数(默认: $THREADS)"
  echo "  -m, --mode       运行模式: both/import/export (默认: $MODE)"
  echo "  -e, --export-dir 导出文件夹路径(默认: 自动创建)"
  echo "  -i, --import-dir 导入文件夹路径(必须包含导出文件)"
  echo "  --src-host       源数据库主机"
  echo "  --src-port       源数据库端口"
  echo "  --src-user       源数据库用户"
  echo "  --src-pass       源数据库密码"
  echo "  --dest-host      目标数据库主机"
  echo "  --dest-port      目标数据库端口"
  echo "  --dest-user      目标数据库用户"
  echo "  --dest-pass      目标数据库密码"
  exit 1
}

# 参数默认值
DATABASES=""
DBFILE=""

# 解析命令行参数
while [[ $# -gt 0 ]]; do
  case "$1" in
    -d|--databases)
      DATABASES="$2"
      shift 2
      ;;
    -f|--dbfile)
      DBFILE="$2"
      shift 2
      ;;
    -t|--threads)
      THREADS="$2"
      shift 2
      ;;
    -m|--mode)
      MODE="$2"
      shift 2
      ;;
    -e|--export-dir)
      EXPORT_DIR="$2"
      shift 2
      ;;
    -i|--import-dir)
      IMPORT_DIR="$2"
      shift 2
      ;;
    --src-host) SRC_HOST="$2"; shift 2 ;;
    --src-port) SRC_PORT="$2"; shift 2 ;;
    --src-user) SRC_USER="$2"; shift 2 ;;
    --src-pass) SRC_PASS="$2"; shift 2 ;;
    --dest-host) DEST_HOST="$2"; shift 2 ;;
    --dest-port) DEST_PORT="$2"; shift 2 ;;
    --dest-user) DEST_USER="$2"; shift 2 ;;
    --dest-pass) DEST_PASS="$2"; shift 2 ;;
    *)
      echo "未知选项: $1"
      usage
      ;;
  esac
done

# 验证模式参数
VALID_MODES=("both" "import" "export")
if [[ ! " ${VALID_MODES[@]} " =~ " ${MODE} " ]]; then
  echo "错误: 无效模式参数,可选值: both/import/export" >&2
  exit 1
fi

# 验证数据库输入
if [[ -z "$DATABASES" && -z "$DBFILE" ]]; then
  echo "错误:必须指定 -d 或 -f 参数" >&2
  usage
fi

# 从文件读取数据库列表
if [[ -n "$DBFILE" ]]; then
  if [[ ! -f "$DBFILE" ]]; then
    echo "数据库文件不存在: $DBFILE" >&2
    exit 1
  fi
  DATABASES=$(tr '
' ',' < "$DBFILE" | sed 's/,$//')
fi

# 转换为数组
IFS=',' read -ra DB_ARRAY <<< "$DATABASES"
if [[ ${#DB_ARRAY[@]} -eq 0 ]]; then
  echo "错误:未找到有效数据库" >&2
  exit 1
fi

# ===== MIGRATION_ID 生成逻辑优化 =====
if [[ $MODE == "import" ]]; then
  # 从导入目录提取迁移ID
  if [[ -z "$IMPORT_DIR" ]]; then
    echo "错误:导入模式必须指定导入目录(-i)" >&2
    exit 1
  fi
  
  mig_file=$(find "$IMPORT_DIR" -name "*.sql.gz" | head -n1)
  if [[ -z "$mig_file" ]]; then
    echo "错误:导入目录中未找到迁移文件" >&2
    exit 1
  fi
  
  MIGRATION_ID=$(basename "$mig_file" | awk -F[_.] '{print $(NF-2)}')
else
  # 生成新迁移ID
  MIGRATION_ID=$(date +%s%N | sha1sum | head -c 8)
fi
export MIGRATION_ID

# ===== 文件夹管理 =====
# 主目录和日志文件设置
if [[ $MODE == "import" ]]; then
  MAIN_DIR="$IMPORT_DIR"
  [[ ! -d "$MAIN_DIR" ]] && { echo "错误:导入目录不存在" >&2; exit 1; }
elif [[ $MODE == "export" || $MODE == "both" ]]; then
  if [[ -z "$EXPORT_DIR" ]]; then
    EXPORT_DIR="migration_${MIGRATION_ID}"
  fi
  mkdir -p "$EXPORT_DIR" || exit 1
  MAIN_DIR="$EXPORT_DIR"
fi

LOG_FILE="${MAIN_DIR}/migration_${MODE}.log"

# ===== 函数定义 =====
# 日志记录
log() {
  local level=$1
  local msg=$2
  local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
  local log_entry="[${timestamp}] [${level^^}] ${msg}"
  
  # 所有日志写入文件
  echo "$log_entry" >> "$LOG_FILE"
  
  # 控制台显示关键信息
  if [[ $level == "error" || $level == "warning" || $level == "info" ]]; then
    echo "$log_entry"
  fi
}

# 获取文件路径
get_file_path() {
  local db=$1
  local type=$2
  echo "${MAIN_DIR}/${db}_${MIGRATION_ID}.${type}"
}

# 验证迁移ID一致性
validate_migration_id() {
  local db=$1
  local expected_file="${MAIN_DIR}/${db}_${MIGRATION_ID}.sql.gz"
  if [[ ! -f "$expected_file" ]]; then
    log "error" "迁移文件不存在: $expected_file"
    return 1
  fi
  return 0
}

# 命令检查
check_commands() {
  local missing=()
  for cmd in mysql mysqldump pigz; do
    if ! command -v $cmd &>/dev/null; then
      missing+=("$cmd")
    fi
  done
  if [[ ${#missing[@]} -gt 0 ]]; then
    log "error" "缺失命令: ${missing[*]}"
    return 1
  fi
}

# 数据库连接测试
test_db_connection() {
  local type=$1 host=$2 port=$3 user=$4 pass=$5
  export MYSQL_PWD="$pass"
  if ! mysql -h"$host" -P"$port" -u"$user" -e "SELECT 1" &>> "$LOG_FILE"; then
    log "error" "$type 数据库连接失败"
    return 1
  fi
  log "info" "$type 数据库连接成功"
}

# 导出函数
optimized_dump() {
  set -o pipefail
  local db=$1
  local start_time=$(date +%s)
  
  log "info" "开始导出: $db (ID:${MIGRATION_ID})"
  export MYSQL_PWD="$SRC_PASS"
  
  # 添加性能优化参数
  mysqldump -h"$SRC_HOST" -P"$SRC_PORT" -u"$SRC_USER" 
      --single-transaction --quick --skip-lock-tables 
      --max-allowed-packet=1G --net-buffer-length=16384 
      --routines --triggers --events --hex-blob 
      "$db" | pigz -9 > "$(get_file_path "$db" "sql.gz")" 2>> "$(get_file_path "$db" "export.err")"
  
  local status=$?
  local end_time=$(date +%s)
  local duration=$((end_time - start_time))
  
  if [[ $status -ne 0 ]]; then
    log "error" "导出失败: $db (详见 $(get_file_path "$db" "export.err"))"
    echo "$db" >> "${MAIN_DIR}/failed_exports.txt"
    return 1
  fi
  
  log "info" "导出成功: $db (大小: $(du -h $(get_file_path "$db" "sql.gz") | awk '{print $1}'), 耗时: ${duration}秒)"
}

# 导入函数
optimized_import() {
  set -o pipefail
  local db=$1
  local start_time=$(date +%s)
  
  # 验证迁移ID一致性
  if ! validate_migration_id "$db"; then
    log "error" "迁移ID验证失败: $db"
    echo "$db" >> "${MAIN_DIR}/failed_imports.txt"
    return 1
  fi
  
  log "info" "开始导入: $db (来源: $(get_file_path "$db" "sql.gz"))"
  export MYSQL_PWD="$DEST_PASS"
  
  # 创建目标数据库
  mysql -h"$DEST_HOST" -P"$DEST_PORT" -u"$DEST_USER" 
      -e "CREATE DATABASE IF NOT EXISTS `$db`" 2>> "$(get_file_path "$db" "import.err")"
  
  if [[ $? -ne 0 ]]; then
    log "error" "数据库创建失败: $db"
    return 1
  fi
  
  # 执行导入
  pigz -dc "$(get_file_path "$db" "sql.gz")" | 
      mysql -h"$DEST_HOST" -P"$DEST_PORT" -u"$DEST_USER" 
      --init-command="SET SESSION foreign_key_checks=0; SET SESSION unique_checks=0;" 
      --max-allowed-packet=1G --net-buffer-length=16384 
      "$db" 2>> "$(get_file_path "$db" "import.err")"
  
  local status=$?
  local end_time=$(date +%s)
  local duration=$((end_time - start_time))
  
  if [[ $status -ne 0 ]]; then
    log "error" "导入失败: $db (详见 $(get_file_path "$db" "import.err"))"
    echo "$db" >> "${MAIN_DIR}/failed_imports.txt"
    return 1
  fi
  
  log "info" "导入成功: $db (耗时: ${duration}秒)"
}

# 多线程处理器
parallel_process() {
  local mode=$1
  local dbs=("${!2}")
  local func=$3
  log "info" "启动多线程$mode (线程数: $THREADS, 数据库数: ${#dbs[@]})"
  printf "%s
" "${dbs[@]}" | xargs -P $THREADS -I {} bash -c "
    source_db='{}'
    $func "$source_db"
  " 2>> "${MAIN_DIR}/parallel_${mode}.err"
}

# ===== 主流程 =====
  # 初始化检查
  check_commands || exit 1
  [[ $MODE != "import" ]] && { test_db_connection "源" "$SRC_HOST" "$SRC_PORT" "$SRC_USER" "$SRC_PASS" || exit 1 ; }
  [[ $MODE != "export" ]] && { test_db_connection "目标" "$DEST_HOST" "$DEST_PORT" "$DEST_USER" "$DEST_PASS" || exit 1 ; }
  
  log "info" "===== 开始迁移任务 (ID: ${MIGRATION_ID}) ====="
  log "info" "线程数: $THREADS | 数据库: ${DB_ARRAY[*]} | 模式: $MODE"
  [[ $MODE == "import" ]] && log "info" "导入目录: $MAIN_DIR"
  [[ $MODE == "export" || $MODE == "both" ]] && log "info" "导出目录: $MAIN_DIR"
  
  # 导出函数暴露
  export -f log optimized_dump optimized_import validate_migration_id get_file_path
  export SRC_HOST SRC_PORT SRC_USER SRC_PASS
  export DEST_HOST DEST_PORT DEST_USER DEST_PASS
  export MAIN_DIR MIGRATION_ID

  # 根据模式执行操作
  if [[ $MODE == "export" || $MODE == "both" ]]; then
    # 并行导出
    parallel_process "导出" DB_ARRAY[@] optimized_dump
  fi

  if [[ $MODE == "import" || $MODE == "both" ]]; then
    # 导入前检查迁移文件
    missing_dbs=()
    for db in "${DB_ARRAY[@]}"; do
      if ! validate_migration_id "$db"; then
        missing_dbs+=("$db")
      fi
    done
    
    if [[ ${#missing_dbs[@]} -gt 0 ]]; then
      log "error" "缺失迁移文件: ${missing_dbs[*]}"
      exit 1
    fi
    
    # 并行导入
    parallel_process "导入" DB_ARRAY[@] optimized_import
  fi

  # 结果报告
  error_files=("${MAIN_DIR}/failed_exports.txt" "${MAIN_DIR}/failed_imports.txt")
  has_errors=0
  for ef in "${error_files[@]}"; do
    if [[ -f "$ef" ]]; then
      log "warning" "失败记录: $(tr '
' ' ' < "$ef")"
      has_errors=1
    fi
  done

  # 清理策略
  if [[ $MODE == "both" ]] && [[ -z "$IMPORT_DIR" ]]; then
    find "$MAIN_DIR" -name "*.sql.gz" -delete
  fi

  if [[ $has_errors -eq 0 ]]; then
    log "info" "===== 迁移成功完成 ====="
    log "info" "数据目录: $MAIN_DIR"
  else
    log "error" "===== 迁移完成但有错误 ====="
    exit 1
  fi

© 版权声明

相关文章

暂无评论

none
暂无评论...