介绍
较之前的版本,将合并的导入导出拆分为3个场景,并重复的导出动作进行了文档归档能力。以下将重新阐述能力
一、核心功能模块
1. 多线程数据库迁移
支持并行处理多个数据库(xargs -P实现并发)可配置线程数(THREADS参数控制并发度)导出/导入操作独立线程池管理
2. 双模式运行机制
| 模式 | 功能描述 | 关键特性 |
|---|---|---|
| both | 同时执行导出+导入 | 全自动流水线操作,迁移ID自动生成 |
| export | 仅导出数据到本地文件 | 自动创建时间戳目录 |
| import | 从指定目录导入数据 | 严格校验迁移ID一致性 |
3. 迁移安全控制
迁移ID机制:
导出模式:生成8位SHA1随机ID(date +%s%N | sha1sum | head -c 8)导入模式:从文件提取ID并验证一致性
事务保障:
导出:–single-transaction确保一致性快照导入:SET foreign_key_checks=0禁用约束检查
⚙️ 二、关键技术实现
性能优化设计
压缩加速:使用pigz并行压缩(-9最高压缩比)替代gzipMySQL参数优化:
--max-allowed-packet=1G # 提升大字段处理能力
--net-buffer-length=16384 # 优化网络传输效率
--skip-lock-tables # 避免锁表阻塞
健壮性保障
错误隔离:
每个数据库独立错误日志(${db}_${MIGRATION_ID}.export.err)失败任务记录到failed_imports.txt/failed_exports.txt
预检查机制:
命令依赖检查(mysql/mysqldump/pigz)双端数据库连接测试迁移文件存在性验证
日志系统
分级日志(info/warning/error)双输出:控制台关键信息+文件全量记录结构化日志格式:
[2023-11-04 15:30:22] [INFO] 开始导出: orders (ID:8a3d7f2b)
📂 三、配置与扩展能力
灵活输入支持
数据库来源:命令行直接指定(-d ‘db1,db2’)或文件读取(-f dblist.txt)连接参数覆盖:支持运行时动态设置源/目标数据库凭证
目录管理策略
| 变量 | 功能说明 | 默认行为 |
|---|---|---|
| EXPORT_DIR | 导出存储路径 | 按时间自动生成目录 |
| IMPORT_DIR | 导入数据源路径 | 必须显式指定 |
| MAIN_DIR | 运行时主目录 | 模式自适应(export=导出目录) |
清理机制
both模式自动清理中间压缩文件(保留日志)独立保留失败任务记录便于重试
📊 四、典型应用场景
跨环境迁移
# 生产库→测试库全迁移(双机模式)
./migrate.sh -d 'orders,users' --src-host prod-db --dest-host test-db
灾难恢复
# 从备份目录紧急恢复
./migrate.sh -m import -i /backups/20231104 -d critical_db
数据归档
# 仅导出历史数据
./migrate.sh -m export -d archive_2015..2020 -e /mnt/backups
⚠️ 五、局限性及注意事项
安全性限制
密码明文传递(需配合Vault等密钥管理工具)无SSL连接支持(需手动添加–ssl-mode参数)
超大库处理
单库>1TB时需调整–max_allowed_packet无分片导出能力(需拆分子库处理)
版本依赖
必需组件:MySQL 5.7+, pigz, GNU xargs不兼容BSD/macOS原生工具链
六、脚本内容如下:
#!/bin/bash
# 多线程MySQL迁移脚本 (v4.3 - 优化迁移ID逻辑)
# ===== 配置区 =====
export THREADS=4 # 并发线程数(默认值)
export MODE="both" # 运行模式(both/import/export)
export SRC_HOST="source-db.example.com" # 源数据库地址
export SRC_PORT=3306 # 源数据库端口
export SRC_USER="admin" # 源数据库用户
export SRC_PASS="secure_password" # 源数据库密码
export DEST_HOST="target-db.example.com" # 目标数据库地址
export DEST_PORT=3306 # 目标数据库端口
export DEST_USER="admin" # 目标数据库用户
export DEST_PASS="secure_password" # 目标数据库密码
export EXPORT_DIR="migration_data_$(date +%Y%m%d-%H%M%S)" # 导出文件夹
export IMPORT_DIR="" # 导入文件夹(默认空)
# ===== 参数解析 =====
usage() {
echo "用法: $0 -d 'db1,db2,...' [选项]"
echo "选项:"
echo " -d, --databases 要迁移的数据库列表(逗号分隔)"
echo " -f, --dbfile 包含数据库列表的文件(每行一个)"
echo " -t, --threads 并发线程数(默认: $THREADS)"
echo " -m, --mode 运行模式: both/import/export (默认: $MODE)"
echo " -e, --export-dir 导出文件夹路径(默认: 自动创建)"
echo " -i, --import-dir 导入文件夹路径(必须包含导出文件)"
echo " --src-host 源数据库主机"
echo " --src-port 源数据库端口"
echo " --src-user 源数据库用户"
echo " --src-pass 源数据库密码"
echo " --dest-host 目标数据库主机"
echo " --dest-port 目标数据库端口"
echo " --dest-user 目标数据库用户"
echo " --dest-pass 目标数据库密码"
exit 1
}
# 参数默认值
DATABASES=""
DBFILE=""
# 解析命令行参数
while [[ $# -gt 0 ]]; do
case "$1" in
-d|--databases)
DATABASES="$2"
shift 2
;;
-f|--dbfile)
DBFILE="$2"
shift 2
;;
-t|--threads)
THREADS="$2"
shift 2
;;
-m|--mode)
MODE="$2"
shift 2
;;
-e|--export-dir)
EXPORT_DIR="$2"
shift 2
;;
-i|--import-dir)
IMPORT_DIR="$2"
shift 2
;;
--src-host) SRC_HOST="$2"; shift 2 ;;
--src-port) SRC_PORT="$2"; shift 2 ;;
--src-user) SRC_USER="$2"; shift 2 ;;
--src-pass) SRC_PASS="$2"; shift 2 ;;
--dest-host) DEST_HOST="$2"; shift 2 ;;
--dest-port) DEST_PORT="$2"; shift 2 ;;
--dest-user) DEST_USER="$2"; shift 2 ;;
--dest-pass) DEST_PASS="$2"; shift 2 ;;
*)
echo "未知选项: $1"
usage
;;
esac
done
# 验证模式参数
VALID_MODES=("both" "import" "export")
if [[ ! " ${VALID_MODES[@]} " =~ " ${MODE} " ]]; then
echo "错误: 无效模式参数,可选值: both/import/export" >&2
exit 1
fi
# 验证数据库输入
if [[ -z "$DATABASES" && -z "$DBFILE" ]]; then
echo "错误:必须指定 -d 或 -f 参数" >&2
usage
fi
# 从文件读取数据库列表
if [[ -n "$DBFILE" ]]; then
if [[ ! -f "$DBFILE" ]]; then
echo "数据库文件不存在: $DBFILE" >&2
exit 1
fi
DATABASES=$(tr '
' ',' < "$DBFILE" | sed 's/,$//')
fi
# 转换为数组
IFS=',' read -ra DB_ARRAY <<< "$DATABASES"
if [[ ${#DB_ARRAY[@]} -eq 0 ]]; then
echo "错误:未找到有效数据库" >&2
exit 1
fi
# ===== MIGRATION_ID 生成逻辑优化 =====
if [[ $MODE == "import" ]]; then
# 从导入目录提取迁移ID
if [[ -z "$IMPORT_DIR" ]]; then
echo "错误:导入模式必须指定导入目录(-i)" >&2
exit 1
fi
mig_file=$(find "$IMPORT_DIR" -name "*.sql.gz" | head -n1)
if [[ -z "$mig_file" ]]; then
echo "错误:导入目录中未找到迁移文件" >&2
exit 1
fi
MIGRATION_ID=$(basename "$mig_file" | awk -F[_.] '{print $(NF-2)}')
else
# 生成新迁移ID
MIGRATION_ID=$(date +%s%N | sha1sum | head -c 8)
fi
export MIGRATION_ID
# ===== 文件夹管理 =====
# 主目录和日志文件设置
if [[ $MODE == "import" ]]; then
MAIN_DIR="$IMPORT_DIR"
[[ ! -d "$MAIN_DIR" ]] && { echo "错误:导入目录不存在" >&2; exit 1; }
elif [[ $MODE == "export" || $MODE == "both" ]]; then
if [[ -z "$EXPORT_DIR" ]]; then
EXPORT_DIR="migration_${MIGRATION_ID}"
fi
mkdir -p "$EXPORT_DIR" || exit 1
MAIN_DIR="$EXPORT_DIR"
fi
LOG_FILE="${MAIN_DIR}/migration_${MODE}.log"
# ===== 函数定义 =====
# 日志记录
log() {
local level=$1
local msg=$2
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
local log_entry="[${timestamp}] [${level^^}] ${msg}"
# 所有日志写入文件
echo "$log_entry" >> "$LOG_FILE"
# 控制台显示关键信息
if [[ $level == "error" || $level == "warning" || $level == "info" ]]; then
echo "$log_entry"
fi
}
# 获取文件路径
get_file_path() {
local db=$1
local type=$2
echo "${MAIN_DIR}/${db}_${MIGRATION_ID}.${type}"
}
# 验证迁移ID一致性
validate_migration_id() {
local db=$1
local expected_file="${MAIN_DIR}/${db}_${MIGRATION_ID}.sql.gz"
if [[ ! -f "$expected_file" ]]; then
log "error" "迁移文件不存在: $expected_file"
return 1
fi
return 0
}
# 命令检查
check_commands() {
local missing=()
for cmd in mysql mysqldump pigz; do
if ! command -v $cmd &>/dev/null; then
missing+=("$cmd")
fi
done
if [[ ${#missing[@]} -gt 0 ]]; then
log "error" "缺失命令: ${missing[*]}"
return 1
fi
}
# 数据库连接测试
test_db_connection() {
local type=$1 host=$2 port=$3 user=$4 pass=$5
export MYSQL_PWD="$pass"
if ! mysql -h"$host" -P"$port" -u"$user" -e "SELECT 1" &>> "$LOG_FILE"; then
log "error" "$type 数据库连接失败"
return 1
fi
log "info" "$type 数据库连接成功"
}
# 导出函数
optimized_dump() {
set -o pipefail
local db=$1
local start_time=$(date +%s)
log "info" "开始导出: $db (ID:${MIGRATION_ID})"
export MYSQL_PWD="$SRC_PASS"
# 添加性能优化参数
mysqldump -h"$SRC_HOST" -P"$SRC_PORT" -u"$SRC_USER"
--single-transaction --quick --skip-lock-tables
--max-allowed-packet=1G --net-buffer-length=16384
--routines --triggers --events --hex-blob
"$db" | pigz -9 > "$(get_file_path "$db" "sql.gz")" 2>> "$(get_file_path "$db" "export.err")"
local status=$?
local end_time=$(date +%s)
local duration=$((end_time - start_time))
if [[ $status -ne 0 ]]; then
log "error" "导出失败: $db (详见 $(get_file_path "$db" "export.err"))"
echo "$db" >> "${MAIN_DIR}/failed_exports.txt"
return 1
fi
log "info" "导出成功: $db (大小: $(du -h $(get_file_path "$db" "sql.gz") | awk '{print $1}'), 耗时: ${duration}秒)"
}
# 导入函数
optimized_import() {
set -o pipefail
local db=$1
local start_time=$(date +%s)
# 验证迁移ID一致性
if ! validate_migration_id "$db"; then
log "error" "迁移ID验证失败: $db"
echo "$db" >> "${MAIN_DIR}/failed_imports.txt"
return 1
fi
log "info" "开始导入: $db (来源: $(get_file_path "$db" "sql.gz"))"
export MYSQL_PWD="$DEST_PASS"
# 创建目标数据库
mysql -h"$DEST_HOST" -P"$DEST_PORT" -u"$DEST_USER"
-e "CREATE DATABASE IF NOT EXISTS `$db`" 2>> "$(get_file_path "$db" "import.err")"
if [[ $? -ne 0 ]]; then
log "error" "数据库创建失败: $db"
return 1
fi
# 执行导入
pigz -dc "$(get_file_path "$db" "sql.gz")" |
mysql -h"$DEST_HOST" -P"$DEST_PORT" -u"$DEST_USER"
--init-command="SET SESSION foreign_key_checks=0; SET SESSION unique_checks=0;"
--max-allowed-packet=1G --net-buffer-length=16384
"$db" 2>> "$(get_file_path "$db" "import.err")"
local status=$?
local end_time=$(date +%s)
local duration=$((end_time - start_time))
if [[ $status -ne 0 ]]; then
log "error" "导入失败: $db (详见 $(get_file_path "$db" "import.err"))"
echo "$db" >> "${MAIN_DIR}/failed_imports.txt"
return 1
fi
log "info" "导入成功: $db (耗时: ${duration}秒)"
}
# 多线程处理器
parallel_process() {
local mode=$1
local dbs=("${!2}")
local func=$3
log "info" "启动多线程$mode (线程数: $THREADS, 数据库数: ${#dbs[@]})"
printf "%s
" "${dbs[@]}" | xargs -P $THREADS -I {} bash -c "
source_db='{}'
$func "$source_db"
" 2>> "${MAIN_DIR}/parallel_${mode}.err"
}
# ===== 主流程 =====
# 初始化检查
check_commands || exit 1
[[ $MODE != "import" ]] && { test_db_connection "源" "$SRC_HOST" "$SRC_PORT" "$SRC_USER" "$SRC_PASS" || exit 1 ; }
[[ $MODE != "export" ]] && { test_db_connection "目标" "$DEST_HOST" "$DEST_PORT" "$DEST_USER" "$DEST_PASS" || exit 1 ; }
log "info" "===== 开始迁移任务 (ID: ${MIGRATION_ID}) ====="
log "info" "线程数: $THREADS | 数据库: ${DB_ARRAY[*]} | 模式: $MODE"
[[ $MODE == "import" ]] && log "info" "导入目录: $MAIN_DIR"
[[ $MODE == "export" || $MODE == "both" ]] && log "info" "导出目录: $MAIN_DIR"
# 导出函数暴露
export -f log optimized_dump optimized_import validate_migration_id get_file_path
export SRC_HOST SRC_PORT SRC_USER SRC_PASS
export DEST_HOST DEST_PORT DEST_USER DEST_PASS
export MAIN_DIR MIGRATION_ID
# 根据模式执行操作
if [[ $MODE == "export" || $MODE == "both" ]]; then
# 并行导出
parallel_process "导出" DB_ARRAY[@] optimized_dump
fi
if [[ $MODE == "import" || $MODE == "both" ]]; then
# 导入前检查迁移文件
missing_dbs=()
for db in "${DB_ARRAY[@]}"; do
if ! validate_migration_id "$db"; then
missing_dbs+=("$db")
fi
done
if [[ ${#missing_dbs[@]} -gt 0 ]]; then
log "error" "缺失迁移文件: ${missing_dbs[*]}"
exit 1
fi
# 并行导入
parallel_process "导入" DB_ARRAY[@] optimized_import
fi
# 结果报告
error_files=("${MAIN_DIR}/failed_exports.txt" "${MAIN_DIR}/failed_imports.txt")
has_errors=0
for ef in "${error_files[@]}"; do
if [[ -f "$ef" ]]; then
log "warning" "失败记录: $(tr '
' ' ' < "$ef")"
has_errors=1
fi
done
# 清理策略
if [[ $MODE == "both" ]] && [[ -z "$IMPORT_DIR" ]]; then
find "$MAIN_DIR" -name "*.sql.gz" -delete
fi
if [[ $has_errors -eq 0 ]]; then
log "info" "===== 迁移成功完成 ====="
log "info" "数据目录: $MAIN_DIR"
else
log "error" "===== 迁移完成但有错误 ====="
exit 1
fi


