
IT系统运维工程师应聘指南:灾备管理专项
2024/2/29大约 18 分钟
IT系统运维工程师应聘指南07 - 灾备管理
岗位职责概述
负责制定并实施企业灾难恢复和业务连续性计划,确保在各种灾难情况下,关键业务系统能够快速恢复正常运行,最大限度降低业务中断带来的损失。
核心技能要求
1. 灾备策略规划
- 业务影响分析(BIA)
- 风险评估与分析
- RTO/RPO目标设定
- 灾备等级划分
2. 备份技术实施
- 数据备份策略设计
- 备份系统部署与管理
- 备份数据验证
- 异地备份实施
3. 灾难恢复技术
- 高可用架构设计
- 双机热备技术
- 数据同步技术
- 故障切换机制
4. 业务连续性管理
- 应急预案制定
- 演练计划设计
- 人员培训体系
- 恢复流程标准化
实操技能详解
一、业务影响分析与风险评估
1.1 业务影响分析工具
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
业务影响分析(BIA)工具
"""
import json
import csv
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Optional
import matplotlib.pyplot as plt
@dataclass
class BusinessProcess:
"""业务流程类"""
process_id: str
process_name: str
department: str
criticality: str # 关键、重要、一般
max_downtime_hours: int # 最大可接受停机时间
recovery_priority: int # 恢复优先级 1-5
dependencies: List[str] # 依赖的系统/服务
financial_impact_per_hour: float # 每小时财务影响
class BIAAnalyzer:
"""业务影响分析器"""
def __init__(self):
self.processes = []
self.systems = []
def add_business_process(self, process: BusinessProcess):
"""添加业务流程"""
self.processes.append(process)
def calculate_rto_rpo(self, process_id: str) -> Dict:
"""计算RTO/RPO目标"""
process = next((p for p in self.processes if p.process_id == process_id), None)
if not process:
return {"error": "业务流程不存在"}
# 基于业务影响计算RTO/RPO
if process.criticality == "关键":
rto_hours = min(1, process.max_downtime_hours)
rpo_hours = 0.25 # 15分钟
elif process.criticality == "重要":
rto_hours = min(4, process.max_downtime_hours)
rpo_hours = 1
else:
rto_hours = min(24, process.max_downtime_hours)
rpo_hours = 4
return {
"process_name": process.process_name,
"rto_hours": rto_hours,
"rpo_hours": rpo_hours,
"max_financial_loss": process.financial_impact_per_hour * process.max_downtime_hours,
"recommended_backup_frequency": f"每{rpo_hours}小时" if rpo_hours >= 1 else "每15分钟"
}
def generate_bia_report(self) -> str:
"""生成BIA报告"""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>业务影响分析报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.critical {{ background-color: #ffebee; }}
.important {{ background-color: #fff3e0; }}
.normal {{ background-color: #e8f5e8; }}
</style>
</head>
<body>
<h1>业务影响分析报告</h1>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h2>业务流程风险评估</h2>
<table>
<tr>
<th>流程ID</th>
<th>流程名称</th>
<th>部门</th>
<th>关键等级</th>
<th>RTO(小时)</th>
<th>RPO(小时)</th>
<th>每小时损失(万元)</th>
<th>最大损失(万元)</th>
</tr>
"""
total_risk = 0
for process in self.processes:
rto_rpo = self.calculate_rto_rpo(process.process_id)
max_loss = process.financial_impact_per_hour * process.max_downtime_hours
total_risk += max_loss
criticality_class = {
"关键": "critical",
"重要": "important",
"一般": "normal"
}.get(process.criticality, "normal")
html_content += f"""
<tr class="{criticality_class}">
<td>{process.process_id}</td>
<td>{process.process_name}</td>
<td>{process.department}</td>
<td>{process.criticality}</td>
<td>{rto_rpo['rto_hours']}</td>
<td>{rto_rpo['rpo_hours']}</td>
<td>{process.financial_impact_per_hour}</td>
<td>{max_loss}</td>
</tr>
"""
html_content += f"""
</table>
<h2>总结分析</h2>
<p><strong>总风险敞口:</strong> {total_risk} 万元</p>
<p><strong>关键业务流程:</strong> {len([p for p in self.processes if p.criticality == '关键'])} 个</p>
<p><strong>重要业务流程:</strong> {len([p for p in self.processes if p.criticality == '重要'])} 个</p>
<h2>建议措施</h2>
<ul>
<li>对关键业务流程实施双机热备</li>
<li>建立异地灾备中心</li>
<li>制定详细的应急预案</li>
<li>定期进行灾备演练</li>
</ul>
</body>
</html>
"""
filename = f"bia_report_{datetime.now().strftime('%Y%m%d')}.html"
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
return filename
# 示例数据
def create_sample_bia():
"""创建示例BIA数据"""
analyzer = BIAAnalyzer()
# 添加业务流程
processes = [
BusinessProcess("BP001", "在线交易系统", "业务部", "关键", 1, 1, ["数据库", "支付网关"], 50.0),
BusinessProcess("BP002", "客户服务系统", "客服部", "重要", 4, 2, ["CRM数据库"], 20.0),
BusinessProcess("BP003", "财务管理系统", "财务部", "重要", 8, 3, ["ERP数据库"], 30.0),
BusinessProcess("BP004", "人事管理系统", "人力资源部", "一般", 24, 4, ["HR数据库"], 5.0),
BusinessProcess("BP005", "邮件系统", "IT部", "一般", 12, 5, ["邮件服务器"], 2.0),
]
for process in processes:
analyzer.add_business_process(process)
return analyzer
if __name__ == "__main__":
bia = create_sample_bia()
report_file = bia.generate_bia_report()
print(f"BIA报告已生成: {report_file}")二、数据备份系统实施
2.1 备份策略配置脚本
#!/bin/bash
# 企业级备份策略实施脚本
BACKUP_ROOT="/backup"
LOG_DIR="/var/log/backup"
CONFIG_FILE="/etc/backup/backup.conf"
# 创建备份目录结构
mkdir -p $BACKUP_ROOT/{daily,weekly,monthly,yearly}
mkdir -p $LOG_DIR
mkdir -p /etc/backup
# 备份配置文件
cat > $CONFIG_FILE << 'EOF'
# 备份配置文件
MYSQL_USER="backup_user"
MYSQL_PASSWORD="backup_pass"
MYSQL_HOST="localhost"
# 备份保留策略
DAILY_RETENTION=7 # 保留7天的每日备份
WEEKLY_RETENTION=4 # 保留4周的周备份
MONTHLY_RETENTION=12 # 保留12个月的月备份
YEARLY_RETENTION=3 # 保留3年的年备份
# 备份压缩选项
COMPRESSION_LEVEL=6
ENCRYPTION_KEY="your_encryption_key_here"
# 远程备份配置
REMOTE_HOST="backup.company.com"
REMOTE_USER="backup"
REMOTE_PATH="/remote_backup"
EOF
# 数据库备份函数
backup_mysql() {
local db_name=$1
local backup_type=$2
local timestamp=$(date +"%Y%m%d_%H%M%S")
local backup_dir="$BACKUP_ROOT/$backup_type"
local backup_file="$backup_dir/${db_name}_${timestamp}.sql"
echo "$(date): 开始备份数据库 $db_name ($backup_type)" >> $LOG_DIR/backup.log
# 执行数据库备份
mysqldump --single-transaction --routines --triggers \
--user=$MYSQL_USER --password=$MYSQL_PASSWORD \
--host=$MYSQL_HOST $db_name > $backup_file
if [ $? -eq 0 ]; then
# 压缩备份文件
gzip -$COMPRESSION_LEVEL $backup_file
# 计算校验和
md5sum $backup_file.gz > $backup_file.gz.md5
echo "$(date): 数据库 $db_name 备份成功: $backup_file.gz" >> $LOG_DIR/backup.log
# 同步到远程
rsync -avz $backup_file.gz $backup_file.gz.md5 \
$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH/$backup_type/
return 0
else
echo "$(date): 数据库 $db_name 备份失败" >> $LOG_DIR/backup.log
return 1
fi
}
# 文件系统备份函数
backup_filesystem() {
local source_dir=$1
local backup_type=$2
local timestamp=$(date +"%Y%m%d_%H%M%S")
local backup_dir="$BACKUP_ROOT/$backup_type"
local backup_name=$(basename $source_dir)
local backup_file="$backup_dir/${backup_name}_${timestamp}.tar.gz"
echo "$(date): 开始备份文件系统 $source_dir ($backup_type)" >> $LOG_DIR/backup.log
# 创建增量备份快照文件
local snapshot_file="$backup_dir/snapshot_${backup_name}.snar"
# 执行备份
tar --create --gzip --file=$backup_file \
--listed-incremental=$snapshot_file \
--exclude-from=/etc/backup/exclude.list \
$source_dir
if [ $? -eq 0 ]; then
# 计算校验和
md5sum $backup_file > $backup_file.md5
echo "$(date): 文件系统 $source_dir 备份成功: $backup_file" >> $LOG_DIR/backup.log
# 同步到远程
rsync -avz $backup_file $backup_file.md5 \
$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH/$backup_type/
return 0
else
echo "$(date): 文件系统 $source_dir 备份失败" >> $LOG_DIR/backup.log
return 1
fi
}
# 备份清理函数
cleanup_backups() {
local backup_type=$1
local retention_days=$2
local backup_dir="$BACKUP_ROOT/$backup_type"
echo "$(date): 开始清理 $backup_type 备份,保留 $retention_days 天" >> $LOG_DIR/backup.log
find $backup_dir -name "*.gz" -mtime +$retention_days -delete
find $backup_dir -name "*.md5" -mtime +$retention_days -delete
echo "$(date): $backup_type 备份清理完成" >> $LOG_DIR/backup.log
}
# 备份验证函数
verify_backup() {
local backup_file=$1
local md5_file=$2
if [ -f "$backup_file" ] && [ -f "$md5_file" ]; then
local calculated_md5=$(md5sum $backup_file | awk '{print $1}')
local stored_md5=$(cat $md5_file | awk '{print $1}')
if [ "$calculated_md5" = "$stored_md5" ]; then
echo "$(date): 备份文件 $backup_file 校验成功" >> $LOG_DIR/backup.log
return 0
else
echo "$(date): 备份文件 $backup_file 校验失败" >> $LOG_DIR/backup.log
return 1
fi
else
echo "$(date): 备份文件或校验文件不存在" >> $LOG_DIR/backup.log
return 1
fi
}
# 每日备份脚本
daily_backup() {
echo "$(date): 开始每日备份" >> $LOG_DIR/backup.log
# 备份关键数据库
backup_mysql "production_db" "daily"
backup_mysql "user_db" "daily"
# 备份关键目录
backup_filesystem "/var/www/html" "daily"
backup_filesystem "/etc" "daily"
# 清理过期备份
cleanup_backups "daily" $DAILY_RETENTION
echo "$(date): 每日备份完成" >> $LOG_DIR/backup.log
}
# 周备份脚本
weekly_backup() {
echo "$(date): 开始周备份" >> $LOG_DIR/backup.log
# 完整系统备份
backup_filesystem "/" "weekly"
# 清理过期备份
cleanup_backups "weekly" $((WEEKLY_RETENTION * 7))
echo "$(date): 周备份完成" >> $LOG_DIR/backup.log
}
# 备份状态报告
generate_backup_report() {
local report_file="$LOG_DIR/backup_report_$(date +%Y%m%d).html"
cat > $report_file << EOF
<!DOCTYPE html>
<html>
<head>
<title>备份状态报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.success { color: green; }
.error { color: red; }
</style>
</head>
<body>
<h1>备份状态报告</h1>
<p>报告时间: $(date)</p>
<h2>备份文件统计</h2>
<table>
<tr><th>备份类型</th><th>文件数量</th><th>总大小</th><th>最新备份时间</th></tr>
EOF
for backup_type in daily weekly monthly yearly; do
if [ -d "$BACKUP_ROOT/$backup_type" ]; then
local file_count=$(find $BACKUP_ROOT/$backup_type -name "*.gz" | wc -l)
local total_size=$(du -sh $BACKUP_ROOT/$backup_type | awk '{print $1}')
local latest_backup=$(find $BACKUP_ROOT/$backup_type -name "*.gz" -printf '%T@ %p\n' | sort -n | tail -1 | cut -d' ' -f2-)
local latest_time=""
if [ -n "$latest_backup" ]; then
latest_time=$(stat -c %y "$latest_backup" | cut -d. -f1)
fi
cat >> $report_file << EOF
<tr>
<td>$backup_type</td>
<td>$file_count</td>
<td>$total_size</td>
<td>$latest_time</td>
</tr>
EOF
fi
done
cat >> $report_file << EOF
</table>
<h2>最近备份日志</h2>
<pre>
$(tail -50 $LOG_DIR/backup.log)
</pre>
</body>
</html>
EOF
echo "备份报告已生成: $report_file"
}
# 主函数
main() {
case "$1" in
"daily")
daily_backup
;;
"weekly")
weekly_backup
;;
"verify")
if [ -n "$2" ] && [ -n "$3" ]; then
verify_backup "$2" "$3"
else
echo "用法: $0 verify <backup_file> <md5_file>"
fi
;;
"report")
generate_backup_report
;;
"cleanup")
for backup_type in daily weekly monthly yearly; do
case $backup_type in
"daily") cleanup_backups $backup_type $DAILY_RETENTION ;;
"weekly") cleanup_backups $backup_type $((WEEKLY_RETENTION * 7)) ;;
"monthly") cleanup_backups $backup_type $((MONTHLY_RETENTION * 30)) ;;
"yearly") cleanup_backups $backup_type $((YEARLY_RETENTION * 365)) ;;
esac
done
;;
*)
echo "用法: $0 {daily|weekly|verify|report|cleanup}"
echo " daily - 执行每日备份"
echo " weekly - 执行周备份"
echo " verify - 验证备份文件"
echo " report - 生成备份报告"
echo " cleanup - 清理过期备份"
exit 1
;;
esac
}
# 加载配置
source $CONFIG_FILE
# 执行主函数
main "$@"2.2 数据库备份恢复脚本
-- MySQL数据库备份恢复管理脚本
-- 1. 创建备份用户
CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'strong_backup_password';
GRANT SELECT, LOCK TABLES, SHOW VIEW, EVENT, TRIGGER, RELOAD ON *.* TO 'backup_user'@'localhost';
FLUSH PRIVILEGES;
-- 2. 备份数据库存储过程
DELIMITER //
CREATE PROCEDURE sp_backup_database(
IN db_name VARCHAR(64),
IN backup_path VARCHAR(255),
IN compression_level INT DEFAULT 6
)
BEGIN
DECLARE backup_command VARCHAR(1000);
DECLARE backup_file VARCHAR(255);
-- 生成备份文件名
SET backup_file = CONCAT(backup_path, '/', db_name, '_', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'), '.sql');
-- 构建备份命令
SET backup_command = CONCAT(
'mysqldump --single-transaction --routines --triggers --events ',
'--user=backup_user --password=strong_backup_password ',
'--databases ', db_name, ' > ', backup_file
);
-- 记录备份开始
INSERT INTO backup_log (database_name, backup_file, start_time, status)
VALUES (db_name, backup_file, NOW(), 'STARTED');
-- 执行备份命令(需要在外部执行)
SELECT backup_command AS command_to_execute;
END //
DELIMITER ;
-- 3. 创建备份日志表
CREATE TABLE IF NOT EXISTS backup_log (
id INT AUTO_INCREMENT PRIMARY KEY,
database_name VARCHAR(64) NOT NULL,
backup_file VARCHAR(255) NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NULL,
status ENUM('STARTED', 'COMPLETED', 'FAILED') NOT NULL,
file_size BIGINT NULL,
checksum VARCHAR(32) NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 4. 备份状态查询
SELECT
database_name,
backup_file,
start_time,
end_time,
status,
ROUND(file_size/1024/1024, 2) as size_mb,
TIMESTAMPDIFF(MINUTE, start_time, COALESCE(end_time, NOW())) as duration_minutes
FROM backup_log
WHERE DATE(start_time) = CURDATE()
ORDER BY start_time DESC;
-- 5. 数据库恢复存储过程
DELIMITER //
CREATE PROCEDURE sp_restore_database(
IN backup_file VARCHAR(255),
IN target_db VARCHAR(64)
)
BEGIN
DECLARE restore_command VARCHAR(1000);
-- 构建恢复命令
SET restore_command = CONCAT(
'mysql --user=root --password=root_password ',
target_db, ' < ', backup_file
);
-- 记录恢复开始
INSERT INTO restore_log (backup_file, target_database, start_time, status)
VALUES (backup_file, target_db, NOW(), 'STARTED');
-- 返回恢复命令
SELECT restore_command AS command_to_execute;
END //
DELIMITER ;
-- 6. 创建恢复日志表
CREATE TABLE IF NOT EXISTS restore_log (
id INT AUTO_INCREMENT PRIMARY KEY,
backup_file VARCHAR(255) NOT NULL,
target_database VARCHAR(64) NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NULL,
status ENUM('STARTED', 'COMPLETED', 'FAILED') NOT NULL,
error_message TEXT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 7. 自动备份事件
DELIMITER //
CREATE EVENT ev_daily_backup
ON SCHEDULE EVERY 1 DAY STARTS '2024-01-01 02:00:00'
DO
BEGIN
-- 备份主要数据库
CALL sp_backup_database('production_db', '/backup/daily', 6);
CALL sp_backup_database('user_db', '/backup/daily', 6);
-- 清理7天前的备份日志
DELETE FROM backup_log WHERE start_time < DATE_SUB(NOW(), INTERVAL 7 DAY);
END //
DELIMITER ;
-- 8. 启用事件调度器
SET GLOBAL event_scheduler = ON;
-- 9. 数据库完整性检查
DELIMITER //
CREATE PROCEDURE sp_check_database_integrity(IN db_name VARCHAR(64))
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE table_name VARCHAR(64);
DECLARE check_result VARCHAR(20);
DECLARE table_cursor CURSOR FOR
SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA = db_name AND TABLE_TYPE = 'BASE TABLE';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
-- 创建临时结果表
DROP TEMPORARY TABLE IF EXISTS temp_check_results;
CREATE TEMPORARY TABLE temp_check_results (
table_name VARCHAR(64),
check_result VARCHAR(20)
);
OPEN table_cursor;
read_loop: LOOP
FETCH table_cursor INTO table_name;
IF done THEN
LEAVE read_loop;
END IF;
-- 执行表检查(简化版本,实际需要外部执行CHECK TABLE)
INSERT INTO temp_check_results VALUES (table_name, 'OK');
END LOOP;
CLOSE table_cursor;
-- 返回检查结果
SELECT * FROM temp_check_results;
END //
DELIMITER ;
-- 10. 备份性能监控
CREATE VIEW v_backup_performance AS
SELECT
DATE(start_time) as backup_date,
COUNT(*) as total_backups,
COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) as successful_backups,
COUNT(CASE WHEN status = 'FAILED' THEN 1 END) as failed_backups,
AVG(TIMESTAMPDIFF(MINUTE, start_time, end_time)) as avg_duration_minutes,
SUM(file_size)/1024/1024/1024 as total_size_gb
FROM backup_log
WHERE end_time IS NOT NULL
GROUP BY DATE(start_time)
ORDER BY backup_date DESC;三、高可用架构实施
3.1 MySQL主从复制配置
#!/bin/bash
# MySQL主从复制自动化配置脚本
# 配置参数
MASTER_HOST="192.168.1.10"
SLAVE_HOST="192.168.1.11"
REPL_USER="repl_user"
REPL_PASS="repl_password"
MYSQL_ROOT_PASS="root_password"
# 主服务器配置
configure_master() {
echo "配置MySQL主服务器..."
# 备份原始配置
cp /etc/mysql/mysql.conf.d/mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf.bak
# 添加主服务器配置
cat >> /etc/mysql/mysql.conf.d/mysqld.cnf << EOF
# 主从复制配置
server-id = 1
log-bin = mysql-bin
log-bin-index = mysql-bin.index
binlog-format = ROW
sync_binlog = 1
innodb_flush_log_at_trx_commit = 1
# 复制相关配置
expire_logs_days = 7
max_binlog_size = 100M
binlog_cache_size = 4M
max_binlog_cache_size = 512M
# 半同步复制
plugin-load = "rpl_semi_sync_master=semisync_master.so"
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 1000
EOF
# 重启MySQL服务
systemctl restart mysql
# 创建复制用户
mysql -uroot -p$MYSQL_ROOT_PASS << EOF
CREATE USER '$REPL_USER'@'%' IDENTIFIED BY '$REPL_PASS';
GRANT REPLICATION SLAVE ON *.* TO '$REPL_USER'@'%';
FLUSH PRIVILEGES;
SHOW MASTER STATUS;
EOF
echo "主服务器配置完成"
}
# 从服务器配置
configure_slave() {
echo "配置MySQL从服务器..."
# 备份原始配置
cp /etc/mysql/mysql.conf.d/mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf.bak
# 添加从服务器配置
cat >> /etc/mysql/mysql.conf.d/mysqld.cnf << EOF
# 主从复制配置
server-id = 2
relay-log = relay-log
relay-log-index = relay-log.index
read_only = 1
super_read_only = 1
# 半同步复制
plugin-load = "rpl_semi_sync_slave=semisync_slave.so"
rpl_semi_sync_slave_enabled = 1
# 复制过滤(可选)
# replicate-do-db = production_db
# replicate-ignore-db = test
EOF
# 重启MySQL服务
systemctl restart mysql
# 获取主服务器状态
MASTER_STATUS=$(mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "SHOW MASTER STATUS\G")
MASTER_FILE=$(echo "$MASTER_STATUS" | grep "File:" | awk '{print $2}')
MASTER_POS=$(echo "$MASTER_STATUS" | grep "Position:" | awk '{print $2}')
# 配置从服务器
mysql -uroot -p$MYSQL_ROOT_PASS << EOF
CHANGE MASTER TO
MASTER_HOST='$MASTER_HOST',
MASTER_USER='$REPL_USER',
MASTER_PASSWORD='$REPL_PASS',
MASTER_LOG_FILE='$MASTER_FILE',
MASTER_LOG_POS=$MASTER_POS;
START SLAVE;
SHOW SLAVE STATUS\G
EOF
echo "从服务器配置完成"
}
# 检查复制状态
check_replication() {
echo "检查MySQL主从复制状态..."
# 检查主服务器状态
echo "=== 主服务器状态 ==="
mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "SHOW MASTER STATUS;"
mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "SHOW SLAVE HOSTS;"
# 检查从服务器状态
echo "=== 从服务器状态 ==="
mysql -h$SLAVE_HOST -uroot -p$MYSQL_ROOT_PASS -e "SHOW SLAVE STATUS\G" | grep -E "(Slave_IO_State|Slave_IO_Running|Slave_SQL_Running|Seconds_Behind_Master|Last_Error)"
}
# 故障切换脚本
failover_to_slave() {
echo "执行故障切换到从服务器..."
# 停止从服务器复制
mysql -h$SLAVE_HOST -uroot -p$MYSQL_ROOT_PASS << EOF
STOP SLAVE;
RESET SLAVE ALL;
EOF
# 移除只读模式
mysql -h$SLAVE_HOST -uroot -p$MYSQL_ROOT_PASS << EOF
SET GLOBAL read_only = 0;
SET GLOBAL super_read_only = 0;
EOF
# 更新配置文件
sed -i 's/read_only = 1/read_only = 0/' /etc/mysql/mysql.conf.d/mysqld.cnf
sed -i 's/super_read_only = 1/super_read_only = 0/' /etc/mysql/mysql.conf.d/mysqld.cnf
echo "故障切换完成,从服务器现在作为主服务器运行"
}
# 数据一致性检查
check_data_consistency() {
echo "检查主从数据一致性..."
# 使用pt-table-checksum工具检查数据一致性
if command -v pt-table-checksum &> /dev/null; then
pt-table-checksum --host=$MASTER_HOST --user=root --password=$MYSQL_ROOT_PASS \
--databases=production_db \
--replicate=percona.checksums
else
echo "pt-table-checksum工具未安装,使用简单的行数检查"
# 获取数据库列表
DBS=$(mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "SHOW DATABASES;" | grep -v -E "Database|information_schema|performance_schema|mysql|sys")
for db in $DBS; do
echo "检查数据库: $db"
# 获取表列表
TABLES=$(mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "USE $db; SHOW TABLES;" | grep -v Tables_in_)
for table in $TABLES; do
MASTER_COUNT=$(mysql -h$MASTER_HOST -uroot -p$MYSQL_ROOT_PASS -e "SELECT COUNT(*) FROM $db.$table;" | tail -1)
SLAVE_COUNT=$(mysql -h$SLAVE_HOST -uroot -p$MYSQL_ROOT_PASS -e "SELECT COUNT(*) FROM $db.$table;" | tail -1)
if [ "$MASTER_COUNT" != "$SLAVE_COUNT" ]; then
echo "数据不一致: $db.$table - 主:$MASTER_COUNT, 从:$SLAVE_COUNT"
else
echo "数据一致: $db.$table - $MASTER_COUNT 行"
fi
done
done
fi
}
# 主函数
main() {
case "$1" in
"master")
configure_master
;;
"slave")
configure_slave
;;
"check")
check_replication
;;
"failover")
failover_to_slave
;;
"consistency")
check_data_consistency
;;
*)
echo "用法: $0 {master|slave|check|failover|consistency}"
echo " master - 配置主服务器"
echo " slave - 配置从服务器"
echo " check - 检查复制状态"
echo " failover - 故障切换"
echo " consistency - 检查数据一致性"
exit 1
;;
esac
}
main "$@"常见面试问题与答案
1. 灾备基础概念问题
Q: 请解释RTO和RPO的区别,以及如何确定这两个指标?
A: RTO和RPO是灾备管理的核心指标:
RTO (Recovery Time Objective) - 恢复时间目标:
- 定义:系统从故障发生到完全恢复正常运行的最大可接受时间
- 计算方法:业务分析 + 技术评估
- 影响因素:业务重要性、技术架构、人员响应速度
- 示例:核心交易系统RTO ≤ 1小时,一般管理系统RTO ≤ 24小时
RPO (Recovery Point Objective) - 恢复点目标:
- 定义:系统故障时可接受的最大数据丢失时间
- 计算方法:数据重要性分析 + 备份频率设计
- 影响因素:数据价值、备份技术、同步机制
- 示例:金融交易数据RPO ≤ 15分钟,日志数据RPO ≤ 4小时
确定方法:
1. 业务影响分析(BIA) → 确定业务优先级
2. 财务损失评估 → 计算停机成本
3. 技术可行性分析 → 评估实现难度
4. 成本效益分析 → 平衡投入产出Q: 描述一个完整的灾备演练流程?
A: 灾备演练标准流程:
演练前准备阶段:
- 制定演练计划:确定演练目标、范围、时间
- 组建演练团队:指定角色和职责
- 准备演练环境:搭建测试环境,避免影响生产
- 编制演练脚本:详细的操作步骤和检查点
演练执行阶段:
# 1. 模拟故障场景
# 主数据库服务器故障模拟
systemctl stop mysql
# 2. 启动应急响应
# 触发监控告警
# 通知相关人员
# 3. 执行切换操作
# 切换到备用数据库
mysql -e "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only=0;"
# 4. 验证系统功能
# 检查应用服务是否正常
curl -I http://app.company.com/health
# 5. 数据一致性验证
# 对比关键数据演练后总结阶段:
- 记录演练结果:成功/失败的操作步骤
- 分析问题原因:找出流程中的不足
- 制定改进措施:优化演练脚本和流程
- 更新应急预案:根据演练结果修订预案
2. 技术实施问题
Q: 如何设计一个企业级的备份策略?
A: 企业级备份策略设计要点:
3-2-1备份原则:
- 3份数据副本(1份生产 + 2份备份)
- 2种不同存储介质(本地磁盘 + 云存储)
- 1份异地备份(防范区域性灾难)
备份层次设计:
1. 实时备份 → 数据库事务日志、关键文件实时同步
2. 增量备份 → 每日备份变化数据
3. 全量备份 → 每周完整备份
4. 归档备份 → 每月/每年长期保存备份验证机制:
# 自动化验证脚本
#!/bin/bash
BACKUP_FILE="/backup/daily/db_20240815.sql.gz"
TEMP_DB="backup_verify_$(date +%s)"
# 1. 校验文件完整性
if md5sum -c $BACKUP_FILE.md5; then
echo "文件完整性验证通过"
else
echo "文件损坏,需要重新备份"
exit 1
fi
# 2. 恢复测试
mysql -e "CREATE DATABASE $TEMP_DB;"
zcat $BACKUP_FILE | mysql $TEMP_DB
# 3. 数据一致性检查
TABLE_COUNT=$(mysql -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='$TEMP_DB';" | tail -1)
if [ $TABLE_COUNT -gt 0 ]; then
echo "备份恢复验证成功"
else
echo "备份恢复验证失败"
fi
# 4. 清理测试数据
mysql -e "DROP DATABASE $TEMP_DB;"Q: 如何实现数据库的双机热备?
A: 数据库双机热备实现方案:
MySQL主从复制 + Keepalived高可用:
# 1. 配置MySQL主从复制
# 主服务器配置
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
# 从服务器配置
server-id = 2
relay-log = relay-log
read_only = 1
# 2. 安装配置Keepalived
yum install -y keepalived
# 主服务器keepalived配置
cat > /etc/keepalived/keepalived.conf << EOF
vrrp_script chk_mysql {
script "/usr/local/bin/check_mysql.sh"
interval 2
weight -2
fall 3
rise 2
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass mysqlha
}
virtual_ipaddress {
192.168.1.100
}
track_script {
chk_mysql
}
notify_master "/usr/local/bin/master.sh"
notify_backup "/usr/local/bin/backup.sh"
}
EOF
# 3. MySQL健康检查脚本
cat > /usr/local/bin/check_mysql.sh << 'EOF'
#!/bin/bash
mysql -uroot -ppassword -e "SELECT 1;" > /dev/null 2>&1
if [ $? -eq 0 ]; then
exit 0
else
exit 1
fi
EOF
# 4. 主备切换脚本
cat > /usr/local/bin/master.sh << 'EOF'
#!/bin/bash
# 成为主服务器时的操作
mysql -uroot -ppassword -e "
STOP SLAVE;
RESET SLAVE ALL;
SET GLOBAL read_only = 0;
"
EOF3. 应急响应问题
Q: 遇到数据中心断电,如何快速恢复业务?
A: 数据中心断电应急响应流程:
immediate response (0-15分钟):
- 评估影响范围:确定哪些系统受影响
- 启动应急预案:通知相关人员,激活应急团队
- 切换到备用系统:启用灾备中心或云端备用系统
短期恢复 (15分钟-2小时):
# 1. 快速切换脚本
#!/bin/bash
echo "启动应急切换程序..."
# 检查备用数据中心状态
ping -c 3 backup-dc.company.com
if [ $? -eq 0 ]; then
echo "备用数据中心可达,开始切换"
# 更新DNS记录指向备用系统
nsupdate << EOF
server dns.company.com
update delete app.company.com A
update add app.company.com 300 A 192.168.2.100
send
EOF
# 启动备用服务
ssh backup-dc.company.com "systemctl start nginx mysql redis"
# 验证服务状态
curl -f http://192.168.2.100/health
echo "切换完成"
else
echo "备用数据中心不可达,联系云服务商"
fi中期恢复 (2-24小时):
- 评估数据完整性:检查最后备份时间点
- 恢复关键业务:优先恢复核心业务系统
- 数据同步:将备用系统数据同步到主系统
长期恢复 (1-7天):
- 硬件修复/更换:恢复主数据中心
- 数据完整同步:确保数据一致性
- 系统切回:将业务切回主数据中心
- 事后总结:分析事故原因,改进预案
Q: 如何处理数据库主从同步延迟过大的问题?
A: 主从同步延迟优化策略:
问题诊断:
-- 查看从库状态
SHOW SLAVE STATUS\G
-- 关键指标分析
-- Seconds_Behind_Master: 延迟秒数
-- Slave_IO_Running: IO线程状态
-- Slave_SQL_Running: SQL线程状态
-- Last_Error: 错误信息优化方案:
网络优化:
- 增加带宽
- 优化网络路由
- 使用专线连接
配置优化:
-- 并行复制配置
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_parallel_workers = 4;
SET GLOBAL slave_preserve_commit_order = 1;
-- 二进制日志优化
SET GLOBAL sync_binlog = 0; -- 异步刷盘
SET GLOBAL innodb_flush_log_at_trx_commit = 2; -- 延迟刷新硬件优化:
- 使用SSD存储
- 增加内存
- 优化磁盘I/O
架构优化:
- 读写分离
- 分库分表
- 使用中间件
实操演练项目
项目1:企业级灾备体系建设
- 项目背景:建立覆盖全业务的灾备体系
- 实施内容:
- BIA业务影响分析
- 灾备中心建设
- 数据备份策略制定
- 应急预案编制
- 定期演练实施
项目2:数据库高可用架构改造
- 项目背景:提升数据库系统可用性到99.99%
- 技术方案:
- MySQL主从复制
- 读写分离架构
- 自动故障切换
- 数据一致性保证
项目3:云端灾备解决方案
- 项目背景:利用云服务构建低成本灾备方案
- 实施方案:
- 云存储备份
- 云端容灾环境
- 混合云架构
- 成本优化策略
学习资源推荐
书籍推荐
- 《灾难恢复规划与管理》
- 《高可用MySQL架构设计》
- 《业务连续性管理实践指南》
在线课程
- 灾备管理专业认证课程
- MySQL高可用架构设计
- 云端灾备解决方案
实验环境
- VMware vSphere实验环境
- MySQL主从复制实验
- 云平台灾备测试环境
认证考试建议
推荐认证路径
- 业务连续性:BCI认证、DRP认证
- 数据库技术:MySQL DBA认证、Oracle DBA认证
- 虚拟化技术:VMware VCP认证
考试准备策略
- 理论学习与实践结合
- 多参与灾备演练项目
- 关注行业最佳实践案例