MySQL自动备份系统
完整的目录结构
mysql_auto_backup/
├── main.py # 程序主入口
├── config_manager.py # 配置管理模块
├── requirements.txt # 依赖包列表
├── setup_windows_tasks.bat # Windows计划任务设置
├── backup/ # 备份核心模块
│ ├── __init__.py
│ ├── backup_manager.py # 备份管理器(主协调器)
│ ├── database_manager.py # 数据库管理
│ ├── command_builder.py # 命令构建器
│ ├── filename_generator.py # 文件名生成器
│ ├── backup_executor.py # 备份执行器
│ └── backup_cleaner.py # 备份清理器
├── utils/ # 工具类模块
│ ├── __init__.py
│ ├── logger.py # 日志配置
│ ├── file_utils.py # 文件操作工具
│ ├── system_utils.py # 系统工具
│ ├── validation_utils.py # 数据验证工具
│ └── error_handler.py # 错误处理器
├── models/ # 数据模型模块
│ ├── __init__.py
│ ├── config_models.py # 配置数据模型
│ └── backup_models.py # 备份数据模型
└── schedule/ # 任务调度模块
├── __init__.py
└── task_scheduler.py # 任务调度器
系统特点
✅ 模块化设计 - 代码结构清晰,易于维护
✅ 完整的日志系统 - 详细的执行日志和错误记录
✅ 压缩功能 - 自动压缩备份文件,节省空间
✅ 灵活的配置 - 支持自定义文件名格式和备份选项
✅ 错误处理 - 完善的错误处理和恢复机制
✅ 进度跟踪 - 实时显示备份进度和性能指标
系统功能验证,测试功能:
1. 测试数据库连接
python main.py test
2. 查看所有数据库
python main.py list
3. 备份所有数据库
python main.py all
4. 查看配置
python main.py config
5. 设置Windows计划任务
运行 setup_windows_tasks.bat 来设置开机和关机自动备份。
下一步建议
验证备份文件:解压一个备份文件检查内容是否完整
测试恢复功能:使用生成的备份文件测试数据库恢复
配置计划任务:设置自动备份确保数据安全
监控磁盘空间:定期检查备份目录的磁盘使用情况
################################################################################
# 文件: config_manager.py
################################################################################
"""
配置管理模块
负责配置文件的加载、保存、验证和默认配置管理
"""
import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional
from models.config_models import MySQLConfig, BackupConfig, LoggingConfig, ScheduleConfig
class ConfigManager:
"""
配置管理器
提供配置文件的完整生命周期管理
"""
def __init__(self, config_file: str = "config.json"):
"""
初始化配置管理器
Args:
config_file: 配置文件路径
"""
self.config_file = Path(config_file)
self.default_config = self.get_default_config()
self.config = self.load_config()
def get_default_config(self) -> Dict[str, Any]:
"""
获取默认配置
Returns:
Dict[str, Any]: 完整的默认配置字典
"""
return {
"mysql": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "1234",
"database": "td",
"charset": "utf8mb4"
},
"backup": {
"backup_dir": "backups",
"keep_backups": 20,
"compression": True,
"timeout": 300,
"use_custom_filename": True,
"filename_format": "dump-{timestamp}-{data_source}-{database}.sql",
"add_drop_table": True,
"disable_keys": True,
"lock_tables": False,
"add_drop_trigger": True,
"no_data": False,
"complete_insert": True,
"create_options": True,
"routines": True,
"extended_insert": True,
"databases": ["td", "yxkj"],
"tables": [],
"include_routines": True,
"include_triggers": True,
"include_events": True
},
"logging": {
"log_dir": "logs",
"log_level": "INFO",
"max_log_size": "10MB",
"backup_count": 5
},
"schedule": {
"startup_delay": 120,
"shutdown_timeout": 300,
"wait_mysql_timeout": 300
}
}
def load_config(self) -> Dict[str, Any]:
"""
加载配置文件
Returns:
Dict[str, Any]: 合并后的配置字典
Raises:
FileNotFoundError: 配置文件不存在且创建失败
json.JSONDecodeError: 配置文件格式错误
"""
# 如果配置文件不存在,创建默认配置
if not self.config_file.exists():
logging.warning(f"配置文件不存在: {self.config_file},正在创建默认配置...")
if not self.create_default_config():
raise FileNotFoundError(f"无法创建配置文件: {self.config_file}")
try:
# 读取配置文件
with open(self.config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# 深度合并配置
config = self.merge_config(self.default_config, user_config)
logging.info(f"配置文件加载成功: {self.config_file}")
# 验证配置
self.validate_config(config)
return config
except json.JSONDecodeError as e:
logging.error(f"配置文件格式错误: {e}")
raise
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
raise
def merge_config(self, default: Dict, user: Dict) -> Dict:
"""
深度合并配置
Args:
default: 默认配置
user: 用户配置
Returns:
Dict: 合并后的配置
"""
result = default.copy()
for key, value in user.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
# 递归合并字典
result[key] = self.merge_config(result[key], value)
else:
# 直接覆盖其他类型
result[key] = value
return result
def create_default_config(self) -> bool:
"""
创建默认配置文件
Returns:
bool: 是否创建成功
"""
try:
# 确保目录存在
self.config_file.parent.mkdir(parents=True, exist_ok=True)
# 写入默认配置
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.default_config, f, indent=4, ensure_ascii=False)
logging.info(f"默认配置文件创建成功: {self.config_file}")
return True
except Exception as e:
logging.error(f"创建默认配置文件失败: {e}")
return False
def validate_config(self, config: Dict[str, Any]):
"""
验证配置有效性
Args:
config: 要验证的配置字典
Raises:
ValueError: 配置验证失败
"""
# 验证MySQL配置
mysql_config = config.get("mysql", {})
if not mysql_config.get("host"):
raise ValueError("MySQL host 不能为空")
if not mysql_config.get("user"):
raise ValueError("MySQL user 不能为空")
if not mysql_config.get("password"):
raise ValueError("MySQL password 不能为空")
# 验证备份配置
backup_config = config.get("backup", {})
if not backup_config.get("backup_dir"):
raise ValueError("备份目录不能为空")
if backup_config.get("keep_backups", 0) <= 0:
raise ValueError("保留备份数量必须大于0")
logging.debug("配置验证通过")
def save_config(self, new_config: Dict[str, Any]) -> bool:
"""
保存配置到文件
Args:
new_config: 新的配置字典
Returns:
bool: 是否保存成功
"""
try:
# 验证新配置
self.validate_config(new_config)
# 保存到文件
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(new_config, f, indent=4, ensure_ascii=False)
# 更新内存中的配置
self.config = self.merge_config(self.default_config, new_config)
logging.info("配置保存成功")
return True
except Exception as e:
logging.error(f"配置保存失败: {e}")
return False
def get_mysql_config(self) -> Dict[str, Any]:
"""获取MySQL配置"""
return self.config.get("mysql", {})
def get_backup_config(self) -> Dict[str, Any]:
"""获取备份配置"""
return self.config.get("backup", {})
def get_logging_config(self) -> Dict[str, Any]:
"""获取日志配置"""
return self.config.get("logging", {})
def get_schedule_config(self) -> Dict[str, Any]:
"""获取调度配置"""
return self.config.get("schedule", {})
def get_config_object(self) -> Dict[str, Any]:
"""获取完整的配置对象"""
return self.config.copy()
################################################################################
# 文件: main.py
################################################################################
#!/usr/bin/env python3
"""
MySQL自动备份系统 - 主程序入口
提供命令行接口,支持多种备份模式和配置管理
"""
import sys
import time
import logging
from backup.backup_manager import BackupManager
from config_manager import ConfigManager
from utils.logger import setup_logging
def main():
"""
主函数 - 程序入口点
解析命令行参数并执行相应的备份操作
"""
try:
# 支持指定配置文件
config_file = sys.argv[2] if len(sys.argv) > 2 else "config.json"
# 初始化配置管理器
config_manager = ConfigManager(config_file)
# 设置日志
setup_logging(config_manager)
# 创建备份管理器
backup_manager = BackupManager(config_manager)
# 根据命令行参数确定操作类型
if len(sys.argv) > 1:
action = sys.argv[1].lower()
else:
action = 'manual'
# 执行对应的操作
execute_action(backup_manager, action)
except Exception as e:
logging.error(f"程序执行异常: {e}")
import traceback
logging.error(traceback.format_exc())
sys.exit(1)
def execute_action(backup_manager: BackupManager, action: str):
"""
执行对应的备份操作
Args:
backup_manager: 备份管理器实例
action: 操作类型
"""
if action == 'startup':
execute_startup_backup(backup_manager)
elif action == 'shutdown':
execute_shutdown_backup(backup_manager)
elif action == 'manual':
execute_manual_backup(backup_manager)
elif action == 'all':
execute_all_databases_backup(backup_manager)
elif action == 'list':
list_databases(backup_manager)
elif action == 'config':
show_config(backup_manager)
elif action == 'test':
test_database_connection(backup_manager)
else:
show_usage()
def execute_startup_backup(backup_manager: BackupManager):
"""执行开机备份"""
logging.info("=== 执行开机备份 ===")
# 等待MySQL服务启动
startup_delay = backup_manager.config['schedule'].get('startup_delay', 120)
logging.info(f"等待 {startup_delay} 秒系统稳定...")
time.sleep(startup_delay)
if backup_manager.wait_for_mysql():
success = backup_manager.backup_database('startup')
if success:
logging.info("开机备份完成")
else:
logging.error("开机备份失败")
else:
logging.error("开机备份失败:MySQL服务未就绪")
def execute_shutdown_backup(backup_manager: BackupManager):
"""执行关机备份"""
logging.info("=== 执行关机备份 ===")
success = backup_manager.backup_database('shutdown')
if success:
logging.info("关机备份完成")
else:
logging.error("关机备份失败")
def execute_manual_backup(backup_manager: BackupManager):
"""执行手动备份"""
logging.info("=== 执行手动备份 ===")
success = backup_manager.backup_database('manual')
if success:
logging.info("手动备份完成")
else:
logging.error("手动备份失败")
def execute_all_databases_backup(backup_manager: BackupManager):
"""备份所有数据库"""
logging.info("=== 备份所有数据库 ===")
success = backup_manager.backup_all_databases('manual')
if success:
logging.info("所有数据库备份完成")
else:
logging.error("所有数据库备份失败")
def list_databases(backup_manager: BackupManager):
"""显示所有数据库"""
databases = backup_manager.get_all_databases()
print("所有用户数据库:")
for db in databases:
print(f" - {db}")
def show_config(backup_manager: BackupManager):
"""显示当前配置"""
import json
print("当前配置:")
print(json.dumps(backup_manager.config, indent=2, ensure_ascii=False))
def test_database_connection(backup_manager: BackupManager):
"""测试数据库连接"""
success = backup_manager.test_database_connection()
if success:
print("数据库连接测试成功")
else:
print("数据库连接测试失败")
def show_usage():
"""显示使用说明"""
print("MySQL自动备份系统 - 使用说明")
print("=" * 50)
print("用法:")
print(" python main.py startup [config_file] # 开机备份")
print(" python main.py shutdown [config_file] # 关机备份")
print(" python main.py manual [config_file] # 手动备份指定数据库")
print(" python main.py all [config_file] # 备份所有数据库")
print(" python main.py list [config_file] # 显示所有数据库")
print(" python main.py config [config_file] # 显示配置")
print(" python main.py test [config_file] # 测试数据库连接")
print()
print("示例:")
print(" python main.py manual # 使用默认配置手动备份")
print(" python main.py startup my_config.json # 使用自定义配置开机备份")
if __name__ == "__main__":
main()
################################################################################
# 文件: backupackup_cleaner.py
################################################################################
"""
备份清理器模块
负责清理旧的备份文件,管理备份存储空间
"""
import os
import logging
from pathlib import Path
from typing import List, Dict
from datetime import datetime, timedelta
from models.config_models import BackupConfig
class BackupCleaner:
"""
备份清理器
负责管理备份文件的存储空间,清理旧的备份文件
"""
def __init__(self, backup_config: BackupConfig):
"""
初始化备份清理器
Args:
backup_config: 备份配置对象
"""
self.backup_config = backup_config
logging.debug("备份清理器初始化完成")
def clean_old_backups(self):
"""
清理旧的备份文件
根据配置的保留数量删除多余的备份文件
"""
try:
backup_dir = Path(self.backup_config.backup_dir)
if not backup_dir.exists():
logging.warning(f"备份目录不存在: {backup_dir}")
return
# 获取所有备份文件
backup_files = self._get_all_backup_files(backup_dir)
if not backup_files:
logging.info("没有找到备份文件")
return
# 按修改时间排序(最新的在前)
backup_files.sort(key=os.path.getmtime, reverse=True)
# 计算需要保留的文件数量
keep_count = self.backup_config.keep_backups
files_to_keep = backup_files[:keep_count]
files_to_delete = backup_files[keep_count:]
# 删除多余的文件
self._delete_backup_files(files_to_delete)
# 记录清理结果
self._log_cleanup_result(len(files_to_keep), len(files_to_delete))
except Exception as e:
logging.error(f"清理旧备份文件失败: {e}")
def _get_all_backup_files(self, backup_dir: Path) -> List[Path]:
"""
获取所有备份文件
Args:
backup_dir: 备份目录
Returns:
List[Path]: 备份文件路径列表
"""
backup_files = []
# 根据压缩设置确定要搜索的文件模式
patterns = ["*.sql.gz", "*.sql"] if self.backup_config.compression else ["*.sql"]
for pattern in patterns:
try:
files = list(backup_dir.glob(pattern))
backup_files.extend(files)
logging.debug(f"找到 {len(files)} 个 {pattern} 文件")
except Exception as e:
logging.warning(f"搜索 {pattern} 文件时出错: {e}")
return backup_files
def _delete_backup_files(self, files_to_delete: List[Path]):
"""
删除备份文件
Args:
files_to_delete: 要删除的文件列表
"""
if not files_to_delete:
return
deleted_count = 0
total_size = 0
for old_file in files_to_delete:
try:
if old_file.exists():
file_size = old_file.stat().st_size
os.remove(old_file)
deleted_count += 1
total_size += file_size
logging.info(f"删除旧备份文件: {old_file.name} ({file_size / 1024 / 1024:.2f} MB)")
except Exception as e:
logging.error(f"删除文件失败 {old_file}: {e}")
# 记录删除统计
if deleted_count > 0:
total_size_mb = total_size / (1024 * 1024)
logging.info(f"清理完成: 删除了 {deleted_count} 个文件,释放 {total_size_mb:.2f} MB 空间")
def _log_cleanup_result(self, kept_count: int, deleted_count: int):
"""
记录清理结果
Args:
kept_count: 保留的文件数量
deleted_count: 删除的文件数量
"""
if deleted_count == 0:
logging.info(f"备份文件清理完成: 保留 {kept_count} 个文件,无需删除")
else:
logging.info(
f"备份文件清理完成: 保留 {kept_count} 个文件,删除 {deleted_count} 个旧文件"
)
def get_backup_stats(self) -> Dict[str, any]:
"""
获取备份文件统计信息
Returns:
Dict[str, any]: 统计信息字典
"""
try:
backup_dir = Path(self.backup_config.backup_dir)
if not backup_dir.exists():
return {'error': '备份目录不存在'}
backup_files = self._get_all_backup_files(backup_dir)
if not backup_files:
return {
'total_files': 0,
'total_size_mb': 0,
'oldest_file': None,
'newest_file': None
}
# 计算统计信息
total_size = sum(f.stat().st_size for f in backup_files)
file_times = [(f, f.stat().st_mtime) for f in backup_files]
oldest_file = min(file_times, key=lambda x: x[1])[0]
newest_file = max(file_times, key=lambda x: x[1])[0]
return {
'total_files': len(backup_files),
'total_size_mb': round(total_size / (1024 * 1024), 2),
'oldest_file': {
'name': oldest_file.name,
'size_mb': round(oldest_file.stat().st_size / (1024 * 1024), 2),
'modified': datetime.fromtimestamp(oldest_file.stat().st_mtime).isoformat()
},
'newest_file': {
'name': newest_file.name,
'size_mb': round(newest_file.stat().st_size / (1024 * 1024), 2),
'modified': datetime.fromtimestamp(newest_file.stat().st_mtime).isoformat()
},
'keep_backups': self.backup_config.keep_backups,
'files_to_keep': min(len(backup_files), self.backup_config.keep_backups)
}
except Exception as e:
logging.error(f"获取备份统计信息失败: {e}")
return {'error': str(e)}
def cleanup_by_age(self, days: int):
"""
根据文件年龄清理备份文件
Args:
days: 保留天数,超过这个天数的文件将被删除
"""
try:
backup_dir = Path(self.backup_config.backup_dir)
if not backup_dir.exists():
return
cutoff_time = datetime.now() - timedelta(days=days)
backup_files = self._get_all_backup_files(backup_dir)
files_to_delete = []
for backup_file in backup_files:
file_time = datetime.fromtimestamp(backup_file.stat().st_mtime)
if file_time < cutoff_time:
files_to_delete.append(backup_file)
self._delete_backup_files(files_to_delete)
logging.info(f"按年龄清理完成: 删除了 {len(files_to_delete)} 个超过 {days} 天的备份文件")
except Exception as e:
logging.error(f"按年龄清理备份文件失败: {e}")
def get_disk_usage(self) -> Dict[str, any]:
"""
获取磁盘使用情况
Returns:
Dict[str, any]: 磁盘使用信息
"""
try:
backup_dir = Path(self.backup_config.backup_dir)
if not backup_dir.exists():
return {'error': '备份目录不存在'}
# 获取备份目录所在磁盘的使用情况
stat = os.statvfs(backup_dir)
total_space = stat.f_blocks * stat.f_frsize
free_space = stat.f_bavail * stat.f_frsize
used_space = total_space - free_space
return {
'total_space_gb': round(total_space / (1024 ** 3), 2),
'used_space_gb': round(used_space / (1024 ** 3), 2),
'free_space_gb': round(free_space / (1024 ** 3), 2),
'usage_percentage': round((used_space / total_space) * 100, 2)
}
except Exception as e:
logging.error(f"获取磁盘使用情况失败: {e}")
return {'error': str(e)}
################################################################################
# 文件: backupackup_executor.py
################################################################################
"""
备份执行器模块
负责执行备份命令和处理执行结果
"""
import os
import subprocess
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple
from datetime import datetime
from models.backup_models import BackupResult
from models.config_models import BackupConfig
from utils.error_handler import BackupErrorHandler
class BackupExecutor:
"""
备份执行器
负责执行备份命令和处理执行结果,包括错误处理和结果记录
"""
def __init__(self, backup_config: BackupConfig):
"""
初始化备份执行器
Args:
backup_config: 备份配置对象
"""
self.backup_config = backup_config
self.error_handler = BackupErrorHandler()
logging.debug("备份执行器初始化完成")
def execute_backup_command(self, cmd: list, backup_file: Path,
database: str) -> Tuple[bool, Optional[str]]:
"""
执行备份命令
Args:
cmd: 要执行的命令列表
backup_file: 备份文件路径
database: 数据库名称
Returns:
Tuple[bool, Optional[str]]: (是否成功, 错误信息)
Example:
>>> success, error = executor.execute_backup_command(cmd, backup_file, 'mydb')
>>> if success:
... print("备份成功")
"""
logging.info(f"执行数据库 {database} 的备份命令")
try:
# 确保备份目录存在
backup_file.parent.mkdir(parents=True, exist_ok=True)
# 执行备份命令
with open(backup_file, 'w', encoding='utf-8') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True,
timeout=self.backup_config.timeout
)
# 处理执行结果
return self._handle_execution_result(result, backup_file, database)
except subprocess.TimeoutExpired:
error_msg = f"数据库 {database} 备份超时 (超过 {self.backup_config.timeout} 秒)"
logging.error(error_msg)
self._cleanup_failed_backup(backup_file)
return False, error_msg
except Exception as e:
error_msg = f"数据库 {database} 备份过程中发生错误: {e}"
logging.error(error_msg)
self._cleanup_failed_backup(backup_file)
return False, error_msg
def _handle_execution_result(self, result: subprocess.CompletedProcess,
backup_file: Path, database: str) -> Tuple[bool, Optional[str]]:
"""
处理命令执行结果
Args:
result: 子进程执行结果
backup_file: 备份文件路径
database: 数据库名称
Returns:
Tuple[bool, Optional[str]]: (是否成功, 错误信息)
"""
if result.returncode != 0:
# 处理错误情况
return self.error_handler.handle_backup_error(result, backup_file, database)
else:
# 成功情况下的处理
return self._handle_successful_backup(backup_file, database)
def _handle_successful_backup(self, backup_file: Path, database: str) -> Tuple[bool, Optional[str]]:
"""
处理成功的备份
Args:
backup_file: 备份文件路径
database: 数据库名称
Returns:
Tuple[bool, Optional[str]]: (True, None)
"""
# 检查备份文件是否为空
self._check_backup_file_size(backup_file, database)
# 验证备份文件完整性
if not self._validate_backup_file(backup_file):
error_msg = f"数据库 {database} 备份文件验证失败"
logging.error(error_msg)
self._cleanup_failed_backup(backup_file)
return False, error_msg
logging.info(f"数据库 {database} 备份命令执行成功")
return True, None
def _check_backup_file_size(self, backup_file: Path, database: str):
"""
检查备份文件大小
Args:
backup_file: 备份文件路径
database: 数据库名称
"""
if backup_file.exists():
file_size = backup_file.stat().st_size
if file_size == 0:
logging.warning(f"数据库 '{database}' 的备份文件为空")
else:
size_mb = file_size / (1024 * 1024)
logging.debug(f"备份文件大小: {size_mb:.2f} MB")
else:
logging.warning(f"备份文件不存在: {backup_file}")
def _validate_backup_file(self, backup_file: Path) -> bool:
"""
验证备份文件完整性
Args:
backup_file: 备份文件路径
Returns:
bool: 文件是否有效
"""
if not backup_file.exists():
return False
try:
# 检查文件是否包含有效的SQL内容
with open(backup_file, 'r', encoding='utf-8', errors='ignore') as f:
first_line = f.readline().strip()
# 基本的SQL文件验证
valid_starts = ['--', '/*', 'CREATE', 'DROP', 'INSERT']
if any(first_line.startswith(prefix) for prefix in valid_starts):
return True
else:
logging.warning(f"备份文件可能无效,第一行内容: {first_line}")
return False
except Exception as e:
logging.warning(f"备份文件验证失败: {e}")
return False
def _cleanup_failed_backup(self, backup_file: Path):
"""
清理失败的备份文件
Args:
backup_file: 要清理的备份文件路径
"""
try:
if backup_file.exists():
backup_file.unlink()
logging.info(f"已清理失败的备份文件: {backup_file}")
except Exception as e:
logging.error(f"清理备份文件失败 {backup_file}: {e}")
def create_backup_result(self, database: str, success: bool,
backup_file: Optional[Path] = None,
error_message: str = "",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> BackupResult:
"""
创建备份结果对象
Args:
database: 数据库名称
success: 是否成功
backup_file: 备份文件路径
error_message: 错误信息
start_time: 开始时间
end_time: 结束时间
Returns:
BackupResult: 备份结果对象
"""
file_size = 0.0
if backup_file and backup_file.exists():
file_size = backup_file.stat().st_size / (1024 * 1024) # 转换为MB
return BackupResult(
database=database,
success=success,
backup_file=backup_file,
file_size=file_size,
error_message=error_message,
start_time=start_time or datetime.now(),
end_time=end_time or datetime.now()
)
def get_performance_metrics(self, backup_result: BackupResult) -> Dict[str, any]:
"""
获取性能指标
Args:
backup_result: 备份结果对象
Returns:
Dict[str, any]: 性能指标字典
"""
return {
'duration_seconds': backup_result.duration,
'file_size_mb': backup_result.file_size,
'throughput_mb_per_second': (
backup_result.file_size / backup_result.duration
if backup_result.duration > 0 else 0
),
'success': backup_result.success,
'database': backup_result.database
}
################################################################################
# 文件: backupackup_manager.py
################################################################################
"""
备份管理器模块
协调各个备份组件,管理整个备份流程的主控制器
"""
import os
import time
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from config_manager import ConfigManager
from .database_manager import DatabaseManager
from .command_builder import CommandBuilder
from .filename_generator import FilenameGenerator
from .backup_executor import BackupExecutor
from .backup_cleaner import BackupCleaner
from models.config_models import MySQLConfig, BackupConfig
from models.backup_models import BackupResult
class BackupManager:
"""
备份管理器
作为备份系统的核心协调器,管理整个备份流程
"""
def __init__(self, config_manager: ConfigManager):
"""
初始化备份管理器
Args:
config_manager: 配置管理器实例
"""
self.config_manager = config_manager
self.config = config_manager.config
# 初始化配置对象
self.mysql_config = self._create_mysql_config()
self.backup_config = self._create_backup_config()
# 初始化各个组件
self.database_manager = DatabaseManager(self.config_manager)
self.command_builder = CommandBuilder()
self.filename_generator = FilenameGenerator()
self.backup_executor = BackupExecutor(self.backup_config)
self.backup_cleaner = BackupCleaner(self.backup_config)
# 设置备份目录
self._setup_backup_dir()
logging.info("备份管理器初始化完成")
def _create_mysql_config(self) -> MySQLConfig:
"""
从配置字典创建MySQL配置对象
Returns:
MySQLConfig: MySQL配置对象
"""
mysql_dict = self.config['mysql']
return MySQLConfig(
host=mysql_dict['host'],
user=mysql_dict['user'],
password=mysql_dict['password'],
database=mysql_dict.get('database', 'mysql'),
port=mysql_dict.get('port', 3306),
charset=mysql_dict.get('charset', 'utf8mb4')
)
def _create_backup_config(self) -> BackupConfig:
"""
从配置字典创建备份配置对象
Returns:
BackupConfig: 备份配置对象
"""
backup_dict = self.config['backup']
return BackupConfig(
backup_dir=backup_dict['backup_dir'],
keep_backups=backup_dict.get('keep_backups', 20),
compression=backup_dict.get('compression', True),
timeout=backup_dict.get('timeout', 300),
use_custom_filename=backup_dict.get('use_custom_filename', True),
filename_format=backup_dict.get('filename_format', 'dump-{timestamp}-{data_source}-{database}.sql'),
add_drop_table=backup_dict.get('add_drop_table', True),
disable_keys=backup_dict.get('disable_keys', True),
lock_tables=backup_dict.get('lock_tables', False),
add_drop_trigger=backup_dict.get('add_drop_trigger', True),
no_data=backup_dict.get('no_data', False),
complete_insert=backup_dict.get('complete_insert', True),
create_options=backup_dict.get('create_options', True),
routines=backup_dict.get('routines', True),
extended_insert=backup_dict.get('extended_insert', True),
databases=backup_dict.get('databases', []),
tables=backup_dict.get('tables', [])
)
def _setup_backup_dir(self):
"""
设置备份目录
确保备份目录存在且为绝对路径
"""
backup_dir = Path(self.backup_config.backup_dir)
# 如果是相对路径,转换为绝对路径
if not backup_dir.is_absolute():
backup_dir = Path(__file__).parent.parent / backup_dir
self.backup_config.backup_dir = str(backup_dir)
# 确保备份目录存在
backup_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"备份目录设置完成: {backup_dir}")
def backup_database(self, backup_type: str = 'manual') -> bool:
"""
执行数据库备份 - 支持单个或多个数据库
Args:
backup_type: 备份类型 (startup/shutdown/manual)
Returns:
bool: 是否至少有一个数据库备份成功
Example:
>>> backup_manager.backup_database('manual')
True
"""
logging.info(f"开始执行{backup_type}备份...")
try:
# 获取要备份的数据库列表
databases_to_backup = self.database_manager.get_databases_to_backup()
if not databases_to_backup:
logging.error("没有找到要备份的数据库")
return False
success_count = 0
total_count = len(databases_to_backup)
results = []
# 逐个备份数据库
for database in databases_to_backup:
backup_result = self.backup_single_database(database, backup_type)
results.append(backup_result)
if backup_result.success:
success_count += 1
self._log_backup_success(backup_result)
else:
self._log_backup_failure(backup_result)
# 记录备份统计
self._log_backup_statistics(success_count, total_count, backup_type)
# 清理旧备份
if success_count > 0:
self.backup_cleaner.clean_old_backups()
return success_count > 0
except Exception as e:
logging.error(f"{backup_type}备份过程中发生错误: {e}")
import traceback
logging.error(traceback.format_exc())
return False
def backup_single_database(self, database: str, backup_type: str = 'manual') -> BackupResult:
"""
备份单个数据库
Args:
database: 数据库名称
backup_type: 备份类型
Returns:
BackupResult: 包含详细结果的备份结果对象
"""
start_time = datetime.now()
try:
backup_dir = Path(self.backup_config.backup_dir)
# 生成备份文件名和路径
filename = self.filename_generator.generate_backup_filename(
database, backup_type, self.mysql_config, self.backup_config
)
backup_file = self.filename_generator.get_backup_file_path(backup_dir, filename)
logging.info(f"开始备份数据库: {database}")
logging.info(f"备份文件: {backup_file}")
# 构建备份命令
cmd = self.command_builder.build_complete_command(
database, self.mysql_config, self.backup_config
)
# 记录安全命令(隐藏密码)
safe_cmd = self.command_builder.get_safe_command_for_logging(cmd, self.mysql_config)
logging.debug(f"执行命令: {safe_cmd}")
# 执行备份命令
success, error_message = self.backup_executor.execute_backup_command(
cmd, backup_file, database
)
# 处理压缩
final_file = backup_file
if success and self.backup_config.compression:
from utils.file_utils import compress_file
compressed_file = compress_file(backup_file)
if compressed_file and Path(compressed_file).exists():
final_file = Path(compressed_file)
logging.debug(f"文件压缩完成: {final_file}")
else:
error_message = "文件压缩失败"
success = False
# 创建备份结果
end_time = datetime.now()
return self.backup_executor.create_backup_result(
database=database,
success=success,
backup_file=final_file,
error_message=error_message,
start_time=start_time,
end_time=end_time
)
except Exception as e:
error_msg = f"数据库 {database} 备份过程中发生未知错误: {e}"
logging.error(error_msg)
end_time = datetime.now()
return self.backup_executor.create_backup_result(
database=database,
success=False,
error_message=error_msg,
start_time=start_time,
end_time=end_time
)
def _log_backup_success(self, backup_result: BackupResult):
"""
记录备份成功日志
Args:
backup_result: 备份结果对象
"""
if backup_result.backup_file:
logging.info(
f"✅ 数据库 {backup_result.database} 备份成功: "
f"{backup_result.backup_file} "
f"({backup_result.file_size:.2f} MB, "
f"耗时: {backup_result.duration:.2f}秒)"
)
def _log_backup_failure(self, backup_result: BackupResult):
"""
记录备份失败日志
Args:
backup_result: 备份结果对象
"""
logging.error(
f"❌ 数据库 {backup_result.database} 备份失败: {backup_result.error_message}"
)
def _log_backup_statistics(self, success_count: int, total_count: int, backup_type: str):
"""
记录备份统计信息
Args:
success_count: 成功数量
total_count: 总数量
backup_type: 备份类型
"""
if success_count == total_count:
logging.info(f"🎉 {backup_type}备份完成: 全部 {total_count} 个数据库备份成功")
elif success_count > 0:
logging.warning(
f"⚠️ {backup_type}备份完成: {success_count}/{total_count} 个数据库备份成功"
)
else:
logging.error(f"💥 {backup_type}备份完成: 全部 {total_count} 个数据库备份失败")
def backup_all_databases(self, backup_type: str = 'manual') -> bool:
"""
备份所有用户数据库
Args:
backup_type: 备份类型
Returns:
bool: 是否备份成功
"""
try:
all_databases = self.database_manager.get_all_databases()
if not all_databases:
logging.error("没有找到用户数据库")
return False
# 临时修改配置,备份所有数据库
original_databases = self.backup_config.databases.copy()
self.backup_config.databases = all_databases
success = self.backup_database(backup_type)
# 恢复原配置
self.backup_config.databases = original_databases
return success
except Exception as e:
logging.error(f"备份所有数据库失败: {e}")
return False
def wait_for_mysql(self, timeout: Optional[int] = None) -> bool:
"""
等待MySQL服务启动
Args:
timeout: 超时时间(秒),None表示使用配置中的超时时间
Returns:
bool: 是否在超时时间内成功连接
"""
if timeout is None:
timeout = self.config['schedule'].get('wait_mysql_timeout', 300)
logging.info(f"等待MySQL服务启动,超时时间: {timeout}秒...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
if self.database_manager.test_connection():
logging.info("✅ MySQL服务已就绪")
return True
except Exception as e:
logging.debug(f"等待MySQL服务: {e}")
# 等待10秒后重试
time.sleep(10)
logging.error(f"❌ 等待MySQL服务启动超时 ({timeout}秒)")
return False
def clean_old_backups(self):
"""清理旧的备份文件"""
self.backup_cleaner.clean_old_backups()
def test_database_connection(self) -> bool:
"""
测试数据库连接
Returns:
bool: 连接是否成功
"""
return self.database_manager.test_database_connection()
def get_all_databases(self) -> List[str]:
"""
获取所有用户数据库
Returns:
List[str]: 数据库名称列表
"""
return self.database_manager.get_all_databases()
def get_databases_to_backup(self) -> List[str]:
"""
获取要备份的数据库列表
Returns:
List[str]: 要备份的数据库名称列表
"""
return self.database_manager.get_databases_to_backup()
def get_backup_files(self) -> List[Path]:
"""
获取所有备份文件
Returns:
List[Path]: 备份文件路径列表
"""
backup_dir = Path(self.backup_config.backup_dir)
patterns = ["*.sql.gz", "*.sql"] if self.backup_config.compression else ["*.sql"]
backup_files = []
for pattern in patterns:
backup_files.extend(list(backup_dir.glob(pattern)))
return sorted(backup_files, key=lambda x: x.stat().st_mtime, reverse=True)
def get_backup_info(self) -> Dict[str, Any]:
"""
获取备份系统信息
Returns:
Dict[str, Any]: 备份系统信息字典
"""
backup_files = self.get_backup_files()
total_size = sum(f.stat().st_size for f in backup_files) / (1024 * 1024) # MB
return {
'backup_dir': self.backup_config.backup_dir,
'total_backups': len(backup_files),
'total_size_mb': round(total_size, 2),
'compression_enabled': self.backup_config.compression,
'keep_backups': self.backup_config.keep_backups,
'recent_backups': [str(f) for f in backup_files[:5]] # 最近5个备份
}
################################################################################
# 文件: backupcommand_builder.py
################################################################################
"""
命令构建器模块
负责构建mysqldump命令和参数
"""
import logging
from typing import List, Dict
from models.config_models import BackupConfig, MySQLConfig
class CommandBuilder:
"""
mysqldump命令构建器
负责构建和执行备份命令的各个组件
"""
# mysqldump选项映射表
OPTION_MAPPING = {
'add_drop_table': '--add-drop-table',
'disable_keys': '--disable-keys',
'lock_tables': '--lock-tables',
'add_drop_trigger': '--add-drop-trigger',
'no_data': '--no-data',
'complete_insert': '--complete-insert',
'create_options': '--create-options',
'routines': '--routines',
'extended_insert': '--extended-insert'
}
@staticmethod
def build_base_command(mysql_config: MySQLConfig) -> List[str]:
"""
构建基础mysqldump命令
Args:
mysql_config: MySQL配置对象
Returns:
List[str]: 基础命令参数列表
Example:
>>> cmd = CommandBuilder.build_base_command(mysql_config)
>>> print(cmd)
['mysqldump', '-hlocalhost', '-P3306', '-uroot', '-p***', '--single-transaction']
"""
base_cmd = [
'mysqldump',
f'-h{mysql_config.host}',
f'-P{mysql_config.port}',
f'-u{mysql_config.user}',
f'-p{mysql_config.password}',
'--single-transaction', # 使用事务保证一致性
]
logging.debug(f"构建基础命令: {len(base_cmd)} 个参数")
return base_cmd
@staticmethod
def add_backup_options(cmd: List[str], backup_config: BackupConfig) -> List[str]:
"""
添加备份选项到命令
Args:
cmd: 基础命令列表
backup_config: 备份配置对象
Returns:
List[str]: 添加了备份选项的命令列表
"""
options_added = 0
# 添加启用的选项
for config_key, option in CommandBuilder.OPTION_MAPPING.items():
if getattr(backup_config, config_key, False):
cmd.append(option)
options_added += 1
logging.debug(f"添加选项: {option}")
# 特殊处理锁表选项
if not backup_config.lock_tables:
cmd.append('--skip-lock-tables')
logging.debug("添加选项: --skip-lock-tables")
options_added += 1
# 特殊处理扩展插入选项
if not backup_config.extended_insert:
cmd.append('--skip-extended-insert')
logging.debug("添加选项: --skip-extended-insert")
options_added += 1
logging.debug(f"添加了 {options_added} 个备份选项")
return cmd
@staticmethod
def add_database_and_tables(cmd: List[str], database: str, tables: List[str]) -> List[str]:
"""
添加数据库和表到命令
Args:
cmd: 命令列表
database: 数据库名称
tables: 要备份的表列表
Returns:
List[str]: 完整的命令列表
"""
# 添加数据库
cmd.append(database)
logging.debug(f"添加数据库: {database}")
# 添加特定表(如果有)
if tables:
cmd.extend(tables)
logging.debug(f"添加 {len(tables)} 个特定表: {tables}")
return cmd
@classmethod
def build_complete_command(cls, database: str, mysql_config: MySQLConfig,
backup_config: BackupConfig) -> List[str]:
"""
构建完整的mysqldump命令
Args:
database: 数据库名称
mysql_config: MySQL配置
backup_config: 备份配置
Returns:
List[str]: 完整的命令参数列表
Example:
>>> cmd = CommandBuilder.build_complete_command('mydb', mysql_config, backup_config)
>>> print(' '.join(cmd))
"""
logging.info(f"为数据库 {database} 构建备份命令")
# 构建基础命令
cmd = cls.build_base_command(mysql_config)
# 添加备份选项
cmd = cls.add_backup_options(cmd, backup_config)
# 添加数据库和表
tables = backup_config.tables or []
cmd = cls.add_database_and_tables(cmd, database, tables)
logging.debug(f"命令构建完成,共 {len(cmd)} 个参数")
return cmd
@staticmethod
def get_safe_command_for_logging(cmd: List[str], mysql_config: MySQLConfig) -> str:
"""
获取用于日志记录的安全命令(隐藏密码)
Args:
cmd: 原始命令列表
mysql_config: MySQL配置(用于获取密码)
Returns:
str: 隐藏密码的安全命令字符串
Example:
>>> safe_cmd = CommandBuilder.get_safe_command_for_logging(cmd, mysql_config)
>>> logging.info(f"执行命令: {safe_cmd}")
"""
safe_cmd = ' '.join(cmd)
# 替换密码为星号
safe_cmd = safe_cmd.replace(mysql_config.password, '***')
# 额外的安全处理:确保密码不会以其他形式泄露
password_patterns = [
f"-p{mysql_config.password}",
f"--password={mysql_config.password}"
]
for pattern in password_patterns:
safe_cmd = safe_cmd.replace(pattern, "-p***")
return safe_cmd
@classmethod
def validate_command(cls, cmd: List[str]) -> bool:
"""
验证命令是否有效
Args:
cmd: 要验证的命令列表
Returns:
bool: 命令是否有效
"""
if not cmd:
logging.error("命令为空")
return False
if cmd[0] != 'mysqldump':
logging.error("命令必须以 'mysqldump' 开头")
return False
# 检查必要的参数
required_params = ['-h', '-u', '-p']
cmd_str = ' '.join(cmd)
for param in required_params:
if param not in cmd_str:
logging.error(f"缺少必要参数: {param}")
return False
logging.debug("命令验证通过")
return True
@staticmethod
def get_command_summary(cmd: List[str]) -> Dict[str, any]:
"""
获取命令摘要信息
Args:
cmd: 命令列表
Returns:
Dict[str, any]: 命令摘要信息
"""
cmd_str = ' '.join(cmd)
return {
'total_arguments': len(cmd),
'has_database': any(arg for arg in cmd if not arg.startswith('-')),
'has_tables': len([arg for arg in cmd if not arg.startswith('-')]) > 1,
'options_count': len([arg for arg in cmd if arg.startswith('--')]),
'compression_used': '--compress' in cmd_str or '--compression-algorithms' in cmd_str
}
################################################################################
# 文件: backupdatabase_manager.py
################################################################################
"""
数据库管理器模块
负责数据库连接、查询和数据库列表管理
"""
import logging
from typing import List, Optional, Tuple
import pymysql
from pymysql import Connection
from config_manager import ConfigManager
class DatabaseManager:
"""
数据库管理器
处理所有数据库相关的操作,包括连接、查询和列表管理
"""
def __init__(self, config_manager: ConfigManager):
"""
初始化数据库管理器
Args:
config_manager: 配置管理器实例
"""
self.config_manager = config_manager
self.config = config_manager.config
self._connection: Optional[Connection] = None
logging.debug("数据库管理器初始化完成")
def get_db_connection(self) -> Optional[Connection]:
"""
建立数据库连接
Returns:
Optional[Connection]: 数据库连接对象,失败返回None
Example:
>>> conn = database_manager.get_db_connection()
>>> if conn:
... print("连接成功")
"""
if self._connection and self._connection.open:
return self._connection
mysql_config = self.config['mysql']
try:
self._connection = pymysql.connect(
host=mysql_config['host'],
user=mysql_config['user'],
password=mysql_config['password'],
database=mysql_config.get('database', 'mysql'),
port=mysql_config.get('port', 3306),
charset=mysql_config.get('charset', 'utf8mb4'),
connect_timeout=10,
autocommit=True
)
logging.debug(f"数据库连接建立成功: {mysql_config['host']}:{mysql_config.get('port', 3306)}")
return self._connection
except pymysql.Error as e:
logging.error(f"数据库连接失败: {e}")
self._connection = None
return None
def close_connection(self):
"""关闭数据库连接"""
if self._connection and self._connection.open:
self._connection.close()
self._connection = None
logging.debug("数据库连接已关闭")
def test_connection(self) -> bool:
"""
测试数据库连接是否正常
Returns:
bool: 连接是否成功
"""
try:
conn = self.get_db_connection()
if conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
return result[0] == 1
return False
except Exception as e:
logging.debug(f"数据库连接测试失败: {e}")
return False
def test_database_connection(self) -> bool:
"""
测试数据库连接和内容(详细测试)
Returns:
bool: 测试是否成功
"""
try:
conn = self.get_db_connection()
if not conn:
print("❌ 无法建立数据库连接")
return False
with conn.cursor() as cursor:
# 检查默认数据库是否存在
default_db = self.config['mysql'].get('database')
if default_db:
cursor.execute("SHOW DATABASES LIKE %s", (default_db,))
db_exists = cursor.fetchone()
print(f"📊 数据库 {default_db} 存在: {db_exists is not None}")
# 获取所有用户数据库
user_dbs = self.get_all_databases()
print(f"📁 发现 {len(user_dbs)} 个用户数据库: {user_dbs}")
# 检查第一个用户数据库的表信息
if user_dbs:
sample_db = user_dbs[0]
try:
cursor.execute(f"USE `{sample_db}`")
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"📋 数据库 {sample_db} 中的表数量: {len(tables)}")
if tables:
table_names = [table[0] for table in tables]
print(f"📄 表列表: {table_names}")
# 检查第一个表的数据量
sample_table = tables[0][0]
cursor.execute(f"SELECT COUNT(*) FROM `{sample_table}`")
count = cursor.fetchone()[0]
print(f"🔢 表 {sample_table} 中的数据行数: {count}")
except Exception as e:
print(f"⚠️ 检查数据库 {sample_db} 时出错: {e}")
print("✅ 数据库连接测试成功")
return True
except Exception as e:
print(f"❌ 数据库测试失败: {e}")
return False
finally:
self.close_connection()
def get_all_databases(self) -> List[str]:
"""
获取MySQL服务器中的所有用户数据库
Returns:
List[str]: 用户数据库名称列表
Example:
>>> databases = database_manager.get_all_databases()
>>> print(f"发现 {len(databases)} 个数据库")
"""
try:
conn = self.get_db_connection()
if not conn:
return []
with conn.cursor() as cursor:
# 获取所有数据库
cursor.execute("SHOW DATABASES")
databases = [db[0] for db in cursor.fetchall()]
# 排除系统数据库
system_dbs = ['information_schema', 'mysql', 'performance_schema', 'sys']
user_databases = [db for db in databases if db not in system_dbs]
logging.info(f"发现 {len(user_databases)} 个用户数据库: {user_databases}")
return user_databases
except Exception as e:
logging.error(f"获取数据库列表失败: {e}")
return []
finally:
self.close_connection()
def get_databases_to_backup(self) -> List[str]:
"""
获取要备份的数据库列表
根据配置文件确定需要备份的数据库
Returns:
List[str]: 要备份的数据库名称列表
"""
backup_config = self.config['backup']
mysql_config = self.config['mysql']
# 获取配置中指定的数据库
configured_databases = backup_config.get('databases', [])
if configured_databases:
# 如果配置了特定数据库,使用配置的列表
if isinstance(configured_databases, str):
# 如果是字符串,按逗号分割
databases = [db.strip() for db in configured_databases.split(',')]
else:
# 如果是列表,直接使用
databases = configured_databases
else:
# 如果没有配置,使用默认数据库
default_db = mysql_config.get('database')
if default_db:
databases = [default_db]
else:
databases = []
# 过滤空值并去重
databases = list(set([db for db in databases if db and db.strip()]))
if not databases:
logging.warning("没有指定要备份的数据库,使用默认数据库")
default_db = mysql_config.get('database', 'test')
databases = [default_db]
# 验证数据库是否存在
valid_databases = []
all_databases = self.get_all_databases()
for db in databases:
if db in all_databases:
valid_databases.append(db)
else:
logging.warning(f"数据库 '{db}' 不存在,已从备份列表中排除")
if not valid_databases:
logging.error("没有有效的数据库可以备份")
return []
logging.info(f"要备份的数据库: {valid_databases}")
return valid_databases
def get_database_size(self, database: str) -> Optional[float]:
"""
获取数据库大小(MB)
Args:
database: 数据库名称
Returns:
Optional[float]: 数据库大小(MB),失败返回None
"""
try:
conn = self.get_db_connection()
if not conn:
return None
with conn.cursor() as cursor:
# 查询数据库大小
cursor.execute("""
SELECT SUM(data_length + index_length) / 1024 / 1024 as size_mb
FROM information_schema.TABLES
WHERE table_schema = %s
""", (database,))
result = cursor.fetchone()
return result[0] if result and result[0] else 0.0
except Exception as e:
logging.error(f"获取数据库 {database} 大小失败: {e}")
return None
finally:
self.close_connection()
def get_table_info(self, database: str) -> List[Tuple[str, int, float]]:
"""
获取数据库表信息
Args:
database: 数据库名称
Returns:
List[Tuple[str, int, float]]: 表信息列表(表名, 行数, 大小MB)
"""
try:
conn = self.get_db_connection()
if not conn:
return []
with conn.cursor() as cursor:
cursor.execute("""
SELECT
table_name,
table_rows,
(data_length + index_length) / 1024 / 1024 as size_mb
FROM information_schema.TABLES
WHERE table_schema = %s
ORDER BY size_mb DESC
""", (database,))
return cursor.fetchall()
except Exception as e:
logging.error(f"获取数据库 {database} 表信息失败: {e}")
return []
finally:
self.close_connection()
def __del__(self):
"""析构函数,确保连接关闭"""
self.close_connection()
################################################################################
# 文件: backupilename_generator.py
################################################################################
"""
文件名生成器模块
负责生成符合规范的备份文件名
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict
import re
from models.config_models import BackupConfig, MySQLConfig
class FilenameGenerator:
"""
备份文件名生成器
负责生成符合规范的备份文件名,支持多种命名格式
"""
# 备份类型到中文名称的映射
BACKUP_TYPE_NAMES = {
'startup': '开机备份',
'shutdown': '关机备份',
'manual': '手动备份',
'scheduled': '定时备份',
'emergency': '紧急备份'
}
# 文件名格式变量映射
FORMAT_VARIABLES = {
'timestamp': '时间戳 (YYYYMMDD_HHMMSS)',
'data_source': '数据源 (host:port)',
'database': '数据库名称',
'backup_type': '备份类型',
'year': '年份 (YYYY)',
'month': '月份 (MM)',
'day': '日期 (DD)',
'hour': '小时 (HH)',
'minute': '分钟 (MM)',
'second': '秒 (SS)'
}
@classmethod
def generate_custom_filename(cls, database: str, mysql_config: MySQLConfig,
backup_config: BackupConfig) -> str:
"""
生成自定义格式的文件名
Args:
database: 数据库名称
mysql_config: MySQL配置
backup_config: 备份配置
Returns:
str: 生成的文件名
Example:
>>> filename = FilenameGenerator.generate_custom_filename('mydb', mysql_config, backup_config)
>>> print(filename) # dump-20231201_143045-localhost_3306-mydb.sql
"""
# 获取时间戳
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S")
# 构建数据源标识
data_source = f"{mysql_config.host}_{mysql_config.port}"
# 清理数据源中的特殊字符
data_source = cls._sanitize_filename_part(data_source)
# 清理数据库名称
safe_database_name = cls._sanitize_filename_part(database)
# 获取文件名格式
filename_format = backup_config.filename_format
# 替换所有变量
filename = filename_format.format(
timestamp=timestamp,
data_source=data_source,
database=safe_database_name,
backup_type='backup',
year=now.strftime("%Y"),
month=now.strftime("%m"),
day=now.strftime("%d"),
hour=now.strftime("%H"),
minute=now.strftime("%M"),
second=now.strftime("%S")
)
# 确保文件扩展名
if not filename.endswith('.sql'):
filename += '.sql'
logging.debug(f"生成自定义文件名: {filename}")
return filename
@classmethod
def generate_legacy_filename(cls, database: str, backup_type: str) -> str:
"""
生成传统格式的文件名
Args:
database: 数据库名称
backup_type: 备份类型
Returns:
str: 生成的文件名
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
prefix = cls.BACKUP_TYPE_NAMES.get(backup_type, '备份')
safe_database_name = cls._sanitize_filename_part(database)
filename = f"{prefix}_{safe_database_name}_{timestamp}.sql"
logging.debug(f"生成传统文件名: {filename}")
return filename
@classmethod
def generate_backup_filename(cls, database: str, backup_type: str,
mysql_config: MySQLConfig, backup_config: BackupConfig) -> str:
"""
生成备份文件名(根据配置选择格式)
Args:
database: 数据库名称
backup_type: 备份类型
mysql_config: MySQL配置
backup_config: 备份配置
Returns:
str: 生成的文件名
"""
if backup_config.use_custom_filename:
return cls.generate_custom_filename(database, mysql_config, backup_config)
else:
return cls.generate_legacy_filename(database, backup_type)
@staticmethod
def get_backup_file_path(backup_dir: Path, filename: str) -> Path:
"""
获取完整的备份文件路径
Args:
backup_dir: 备份目录
filename: 文件名
Returns:
Path: 完整的文件路径
"""
file_path = backup_dir / filename
logging.debug(f"生成文件路径: {file_path}")
return file_path
@staticmethod
def _sanitize_filename_part(text: str) -> str:
"""
清理文件名部分,移除不安全的字符
Args:
text: 要清理的文本
Returns:
str: 清理后的安全文本
"""
# 替换不安全的字符为下划线
unsafe_chars = r'[/*?:"<>| ]'
safe_text = re.sub(unsafe_chars, '_', text)
# 移除连续的下划线
safe_text = re.sub(r'_+', '_', safe_text)
# 移除开头和结尾的下划线
safe_text = safe_text.strip('_')
return safe_text
@classmethod
def validate_filename_format(cls, format_str: str) -> bool:
"""
验证文件名格式是否有效
Args:
format_str: 文件名格式字符串
Returns:
bool: 格式是否有效
"""
try:
# 测试格式化
test_values = {var: 'test' for var in cls.FORMAT_VARIABLES.keys()}
format_str.format(**test_values)
return True
except KeyError as e:
logging.error(f"文件名格式包含未知变量: {e}")
return False
except Exception as e:
logging.error(f"文件名格式验证失败: {e}")
return False
@classmethod
def get_supported_variables(cls) -> Dict[str, str]:
"""
获取支持的格式变量
Returns:
Dict[str, str]: 变量名到描述的映射
"""
return cls.FORMAT_VARIABLES.copy()
@classmethod
def parse_filename(cls, filename: str) -> Dict[str, str]:
"""
解析备份文件名,提取信息
Args:
filename: 备份文件名
Returns:
Dict[str, str]: 解析出的信息字典
"""
info = {
'filename': filename,
'database': 'unknown',
'timestamp': 'unknown',
'backup_type': 'unknown'
}
try:
# 移除扩展名
name_without_ext = Path(filename).stem
if filename.endswith('.gz'):
name_without_ext = Path(name_without_ext).stem
# 尝试解析自定义格式
if name_without_ext.startswith('dump-'):
parts = name_without_ext.split('-')
if len(parts) >= 4:
info['timestamp'] = parts[1]
info['data_source'] = parts[2]
info['database'] = parts[3]
info['format'] = 'custom'
# 尝试解析传统格式
else:
for backup_type, chinese_name in cls.BACKUP_TYPE_NAMES.items():
if name_without_ext.startswith(chinese_name):
info['backup_type'] = backup_type
parts = name_without_ext.split('_')
if len(parts) >= 3:
info['database'] = parts[1]
info['timestamp'] = parts[2]
info['format'] = 'legacy'
break
return info
except Exception as e:
logging.warning(f"解析文件名失败 {filename}: {e}")
return info
################################################################################
# 文件: backup__init__.py
################################################################################
"""
备份核心模块
提供完整的数据库备份功能
"""
from .backup_manager import BackupManager
from .database_manager import DatabaseManager
from .command_builder import CommandBuilder
from .filename_generator import FilenameGenerator
from .backup_executor import BackupExecutor
from .backup_cleaner import BackupCleaner
__all__ = [
'BackupManager',
'DatabaseManager',
'CommandBuilder',
'FilenameGenerator',
'BackupExecutor',
'BackupCleaner'
]
################################################################################
# 文件: modelsackup_models.py
################################################################################
"""
备份数据模型模块
定义备份过程中使用的数据结构和结果对象
"""
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from pathlib import Path
from datetime import datetime
@dataclass
class BackupResult:
"""
备份结果数据模型
记录单次备份的执行结果和统计信息
Attributes:
database: 数据库名称
success: 是否成功完成备份
backup_file: 备份文件路径
file_size: 文件大小(MB)
error_message: 错误信息(如果失败)
duration: 执行时长(秒)
start_time: 开始时间
end_time: 结束时间
"""
database: str
success: bool
backup_file: Optional[Path]
file_size: float = 0.0
error_message: str = ""
duration: float = 0.0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def __post_init__(self):
"""初始化后处理"""
if self.start_time and self.end_time:
self.duration = (self.end_time - self.start_time).total_seconds()
def to_dict(self) -> Dict[str, Any]:
"""
转换为字典格式
Returns:
Dict[str, Any]: 字典形式的结果数据
"""
return {
'database': self.database,
'success': self.success,
'backup_file': str(self.backup_file) if self.backup_file else None,
'file_size': self.file_size,
'error_message': self.error_message,
'duration': self.duration,
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None
}
@dataclass
class BackupTask:
"""
备份任务数据模型
描述一个备份任务的完整信息和配置
Attributes:
database: 数据库名称
backup_type: 备份类型(startup/shutdown/manual)
timestamp: 任务创建时间戳
config: 备份配置字典
mysql_config: MySQL配置字典
tables: 要备份的特定表列表
"""
database: str
backup_type: str
timestamp: str
config: Dict[str, Any]
mysql_config: Dict[str, Any]
tables: List[str] = None
def __post_init__(self):
"""初始化后处理"""
if self.tables is None:
self.tables = []
def get_task_id(self) -> str:
"""
生成任务唯一标识
Returns:
str: 任务ID
"""
return f"{self.database}_{self.backup_type}_{self.timestamp}"
def validate(self) -> List[str]:
"""
验证任务配置有效性
Returns:
List[str]: 错误消息列表
"""
errors = []
if not self.database:
errors.append("数据库名称不能为空")
if self.backup_type not in ['startup', 'shutdown', 'manual']:
errors.append("备份类型必须是: startup, shutdown, manual")
return errors
################################################################################
# 文件: modelsconfig_models.py
################################################################################
"""
配置数据模型模块
定义配置相关的数据结构和验证规则
"""
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
@dataclass
class MySQLConfig:
"""
MySQL数据库连接配置数据模型
Attributes:
host: 数据库主机地址
user: 数据库用户名
password: 数据库密码
database: 默认数据库
port: 数据库端口,默认3306
charset: 字符集,默认utf8mb4
"""
host: str
user: str
password: str
database: str
port: int = 3306
charset: str = "utf8mb4"
def validate(self) -> List[str]:
"""
验证配置有效性
Returns:
List[str]: 错误消息列表,空列表表示验证通过
"""
errors = []
if not self.host:
errors.append("MySQL host 不能为空")
if not self.user:
errors.append("MySQL user 不能为空")
if not self.password:
errors.append("MySQL password 不能为空")
if self.port <= 0 or self.port > 65535:
errors.append("MySQL port 必须在 1-65535 范围内")
return errors
@dataclass
class BackupConfig:
"""
备份配置数据模型
Attributes:
backup_dir: 备份文件存储目录
keep_backups: 保留的备份文件数量,默认20
compression: 是否启用压缩,默认True
timeout: 备份超时时间(秒),默认300
use_custom_filename: 是否使用自定义文件名格式,默认True
filename_format: 文件名格式模板
add_drop_table: 是否添加DROP TABLE语句,默认True
disable_keys: 是否禁用键,默认True
lock_tables: 是否锁定表,默认False
add_drop_trigger: 是否添加DROP TRIGGER语句,默认True
no_data: 是否不备份数据(仅结构),默认False
complete_insert: 是否使用完整的INSERT语句,默认True
create_options: 是否包含CREATE选项,默认True
routines: 是否备份存储过程和函数,默认True
extended_insert: 是否使用扩展INSERT语句,默认True
databases: 要备份的数据库列表
tables: 要备份的特定表列表
"""
backup_dir: str
keep_backups: int = 20
compression: bool = True
timeout: int = 300
use_custom_filename: bool = True
filename_format: str = "dump-{timestamp}-{data_source}-{database}.sql"
add_drop_table: bool = True
disable_keys: bool = True
lock_tables: bool = False
add_drop_trigger: bool = True
no_data: bool = False
complete_insert: bool = True
create_options: bool = True
routines: bool = True
extended_insert: bool = True
databases: List[str] = None
tables: List[str] = None
def __post_init__(self):
"""初始化后处理"""
if self.databases is None:
self.databases = []
if self.tables is None:
self.tables = []
def validate(self) -> List[str]:
"""
验证配置有效性
Returns:
List[str]: 错误消息列表
"""
errors = []
if not self.backup_dir:
errors.append("备份目录不能为空")
if self.keep_backups <= 0:
errors.append("保留备份数量必须大于0")
if self.timeout <= 0:
errors.append("备份超时时间必须大于0")
return errors
@dataclass
class LoggingConfig:
"""
日志配置数据模型
Attributes:
log_dir: 日志文件目录,默认"logs"
log_level: 日志级别,默认"INFO"
max_log_size: 最大日志文件大小,默认"10MB"
backup_count: 保留的日志文件数量,默认5
"""
log_dir: str = "logs"
log_level: str = "INFO"
max_log_size: str = "10MB"
backup_count: int = 5
def validate(self) -> List[str]:
"""
验证配置有效性
Returns:
List[str]: 错误消息列表
"""
errors = []
if not self.log_dir:
errors.append("日志目录不能为空")
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if self.log_level.upper() not in valid_levels:
errors.append(f"日志级别必须是: {', '.join(valid_levels)}")
return errors
@dataclass
class ScheduleConfig:
"""
调度配置数据模型
Attributes:
startup_delay: 开机后延迟执行时间(秒),默认120
shutdown_timeout: 关机备份超时时间(秒),默认300
wait_mysql_timeout: 等待MySQL启动超时时间(秒),默认300
"""
startup_delay: int = 120
shutdown_timeout: int = 300
wait_mysql_timeout: int = 300
def validate(self) -> List[str]:
"""
验证配置有效性
Returns:
List[str]: 错误消息列表
"""
errors = []
if self.startup_delay < 0:
errors.append("启动延迟时间不能为负数")
if self.shutdown_timeout <= 0:
errors.append("关机超时时间必须大于0")
if self.wait_mysql_timeout <= 0:
errors.append("等待MySQL超时时间必须大于0")
return errors
################################################################################
# 文件: models__init__.py
################################################################################
"""
数据模型模块
定义系统使用的数据结构和类型
"""
from .config_models import MySQLConfig, BackupConfig, LoggingConfig, ScheduleConfig
from .backup_models import BackupResult, BackupTask
__all__ = [
'MySQLConfig',
'BackupConfig',
'LoggingConfig',
'ScheduleConfig',
'BackupResult',
'BackupTask'
]
################################################################################
# 文件: schedule ask_scheduler.py
################################################################################
"""
任务调度器模块
负责备份任务的调度、执行和监控
"""
import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from backup.backup_manager import BackupManager
class TaskScheduler:
"""
任务调度器
管理备份任务的调度和执行,提供任务监控功能
"""
def __init__(self, backup_manager: BackupManager):
"""
初始化任务调度器
Args:
backup_manager: 备份管理器实例
"""
self.backup_manager = backup_manager
self.config = backup_manager.config
self.is_running = False
self.task_history = []
logging.info("任务调度器初始化完成")
def schedule_startup_backup(self):
"""
调度开机备份
等待系统稳定后执行备份
"""
startup_delay = self.config['schedule'].get('startup_delay', 120)
logging.info(f"调度开机备份,等待 {startup_delay} 秒系统稳定...")
# 记录任务开始
task_id = self._record_task_start('startup')
try:
# 等待系统稳定
time.sleep(startup_delay)
# 等待MySQL服务启动
if self.backup_manager.wait_for_mysql():
success = self.backup_manager.backup_database('startup')
if success:
logging.info("✅ 开机备份完成")
self._record_task_success(task_id)
else:
logging.error("❌ 开机备份失败")
self._record_task_failure(task_id, "备份执行失败")
else:
error_msg = "MySQL服务未就绪"
logging.error(f"❌ 开机备份失败:{error_msg}")
self._record_task_failure(task_id, error_msg)
except Exception as e:
error_msg = f"开机备份异常: {e}"
logging.error(f"❌ {error_msg}")
self._record_task_failure(task_id, error_msg)
def schedule_shutdown_backup(self):
"""
调度关机备份
在系统关机前执行备份
"""
logging.info("调度关机备份...")
# 记录任务开始
task_id = self._record_task_start('shutdown')
try:
success = self.backup_manager.backup_database('shutdown')
if success:
logging.info("✅ 关机备份完成")
self._record_task_success(task_id)
else:
logging.error("❌ 关机备份失败")
self._record_task_failure(task_id, "备份执行失败")
except Exception as e:
error_msg = f"关机备份异常: {e}"
logging.error(f"❌ {error_msg}")
self._record_task_failure(task_id, error_msg)
def schedule_manual_backup(self) -> bool:
"""
调度手动备份
Returns:
bool: 是否备份成功
"""
logging.info("调度手动备份...")
task_id = self._record_task_start('manual')
try:
success = self.backup_manager.backup_database('manual')
if success:
logging.info("✅ 手动备份完成")
self._record_task_success(task_id)
return True
else:
logging.error("❌ 手动备份失败")
self._record_task_failure(task_id, "备份执行失败")
return False
except Exception as e:
error_msg = f"手动备份异常: {e}"
logging.error(f"❌ {error_msg}")
self._record_task_failure(task_id, error_msg)
return False
def _record_task_start(self, task_type: str) -> str:
"""
记录任务开始
Args:
task_type: 任务类型
Returns:
str: 任务ID
"""
task_id = f"{task_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
task_record = {
'id': task_id,
'type': task_type,
'start_time': datetime.now(),
'status': 'running',
'success': None,
'error_message': None,
'duration': None
}
self.task_history.append(task_record)
logging.debug(f"任务开始记录: {task_id}")
return task_id
def _record_task_success(self, task_id: str):
"""
记录任务成功
Args:
task_id: 任务ID
"""
for task in self.task_history:
if task['id'] == task_id and task['status'] == 'running':
task['status'] = 'completed'
task['success'] = True
task['duration'] = (datetime.now() - task['start_time']).total_seconds()
logging.debug(f"任务成功记录: {task_id}")
break
def _record_task_failure(self, task_id: str, error_message: str):
"""
记录任务失败
Args:
task_id: 任务ID
error_message: 错误信息
"""
for task in self.task_history:
if task['id'] == task_id and task['status'] == 'running':
task['status'] = 'failed'
task['success'] = False
task['error_message'] = error_message
task['duration'] = (datetime.now() - task['start_time']).total_seconds()
logging.debug(f"任务失败记录: {task_id}")
break
def get_task_history(self, limit: int = 50) -> list:
"""
获取任务历史
Args:
limit: 返回的记录数量限制
Returns:
list: 任务历史记录
"""
# 按开始时间倒序排列,返回最新的记录
sorted_history = sorted(
self.task_history,
key=lambda x: x['start_time'],
reverse=True
)
return sorted_history[:limit]
def get_task_statistics(self) -> Dict[str, Any]:
"""
获取任务统计信息
Returns:
Dict[str, Any]: 任务统计信息
"""
if not self.task_history:
return {
'total_tasks': 0,
'successful_tasks': 0,
'failed_tasks': 0,
'success_rate': 0,
'average_duration': 0
}
total_tasks = len(self.task_history)
successful_tasks = len([t for t in self.task_history if t.get('success')])
failed_tasks = total_tasks - successful_tasks
success_rate = (successful_tasks / total_tasks) * 100 if total_tasks > 0 else 0
# 计算平均持续时间(只计算已完成的任务)
completed_tasks = [t for t in self.task_history if t.get('duration') is not None]
average_duration = (
sum(t['duration'] for t in completed_tasks) / len(completed_tasks)
if completed_tasks else 0
)
# 按任务类型统计
task_types = {}
for task in self.task_history:
task_type = task['type']
if task_type not in task_types:
task_types[task_type] = {'total': 0, 'success': 0, 'failed': 0}
task_types[task_type]['total'] += 1
if task.get('success'):
task_types[task_type]['success'] += 1
else:
task_types[task_type]['failed'] += 1
return {
'total_tasks': total_tasks,
'successful_tasks': successful_tasks,
'failed_tasks': failed_tasks,
'success_rate': round(success_rate, 2),
'average_duration': round(average_duration, 2),
'task_types': task_types,
'first_task': (
min(self.task_history, key=lambda x: x['start_time'])['start_time'].isoformat()
if self.task_history else None
),
'last_task': (
max(self.task_history, key=lambda x: x['start_time'])['start_time'].isoformat()
if self.task_history else None
)
}
def cleanup_old_history(self, days: int = 30):
"""
清理旧的任务历史记录
Args:
days: 保留天数
"""
cutoff_time = datetime.now() - timedelta(days=days)
initial_count = len(self.task_history)
self.task_history = [
task for task in self.task_history
if task['start_time'] >= cutoff_time
]
removed_count = initial_count - len(self.task_history)
if removed_count > 0:
logging.info(f"清理了 {removed_count} 条超过 {days} 天的任务历史记录")
def get_recent_failures(self, hours: int = 24) -> list:
"""
获取最近的失败任务
Args:
hours: 时间范围(小时)
Returns:
list: 失败任务列表
"""
cutoff_time = datetime.now() - timedelta(hours=hours)
failures = [
task for task in self.task_history
if task.get('success') is False and task['start_time'] >= cutoff_time
]
return sorted(failures, key=lambda x: x['start_time'], reverse=True)
def is_system_healthy(self) -> Dict[str, Any]:
"""
检查系统健康状态
Returns:
Dict[str, Any]: 系统健康状态
"""
# 获取最近的失败任务
recent_failures = self.get_recent_failures(24)
# 检查数据库连接
db_healthy = self.backup_manager.test_database_connection()
# 检查备份目录
from utils.file_utils import ensure_directory
backup_dir = self.backup_manager.backup_config.backup_dir
dir_healthy = ensure_directory(backup_dir)
# 综合健康评估
health_status = 'healthy'
issues = []
if recent_failures:
health_status = 'warning'
issues.append(f"最近24小时有 {len(recent_failures)} 个失败任务")
if not db_healthy:
health_status = 'critical'
issues.append("数据库连接失败")
if not dir_healthy:
health_status = 'critical'
issues.append("备份目录不可用")
return {
'status': health_status,
'timestamp': datetime.now().isoformat(),
'database_connection': db_healthy,
'backup_directory': dir_healthy,
'recent_failures': len(recent_failures),
'issues': issues,
'suggestions': self._get_health_suggestions(health_status, issues)
}
def _get_health_suggestions(self, health_status: str, issues: list) -> list:
"""
获取健康问题建议
Args:
health_status: 健康状态
issues: 问题列表
Returns:
list: 建议列表
"""
suggestions = []
if health_status == 'critical':
if "数据库连接失败" in issues:
suggestions.append("检查MySQL服务是否运行,验证连接配置")
if "备份目录不可用" in issues:
suggestions.append("检查备份目录权限,或更换备份目录")
if health_status == 'warning':
if "失败任务" in str(issues):
suggestions.append("查看任务历史记录,分析失败原因")
if health_status == 'healthy':
suggestions.append("系统运行正常,继续保持")
return suggestions
def __del__(self):
"""析构函数,清理资源"""
if self.is_running:
self.is_running = False
logging.info("任务调度器已停止")
################################################################################
# 文件: schedule__init__.py
################################################################################
"""
任务调度模块
提供备份任务的调度和执行管理
"""
from .task_scheduler import TaskScheduler
__all__ = ['TaskScheduler']
################################################################################
# 文件: utilserror_handler.py
################################################################################
"""
错误处理模块
提供统一的错误处理和异常管理
"""
import logging
import subprocess
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
from datetime import datetime
class BackupErrorHandler:
"""
备份错误处理器
专门处理备份过程中可能出现的各种错误
"""
# 错误类型映射
ERROR_PATTERNS = {
'DATABASE_NOT_FOUND': [
"Unknown database",
"database.*doesn't exist"
],
'ACCESS_DENIED': [
"Access denied",
"password.*failed",
"connection.*refused"
],
'CONNECTION_FAILED': [
"Can't connect to MySQL server",
"connection.*failed",
"timeout"
],
'LOCK_TABLES_ERROR': [
"when using LOCK TABLES",
"lock.*wait",
"deadlock"
],
'PERMISSION_DENIED': [
"permission denied",
"cannot open",
"read-only"
],
'DISK_FULL': [
"no space left",
"disk full"
],
'MYSQLDUMP_NOT_FOUND': [
"mysqldump.*not found",
"command not found"
]
}
def handle_backup_error(self, result: subprocess.CompletedProcess,
backup_file: Path, database: str) -> Tuple[bool, str]:
"""
处理备份错误
Args:
result: 子进程执行结果
backup_file: 备份文件路径
database: 数据库名称
Returns:
Tuple[bool, str]: (是否成功, 错误信息)
"""
logging.error(f"mysqldump 返回错误代码: {result.returncode}")
stderr = result.stderr.strip()
logging.error(f"错误信息: {stderr}")
# 分析错误类型
error_type, error_message = self._analyze_error_type(stderr, database)
# 记录详细的错误信息
self._log_detailed_error(error_type, stderr, database)
# 清理失败的备份文件
self._cleanup_failed_backup(backup_file)
return False, error_message
def _analyze_error_type(self, stderr: str, database: str) -> Tuple[str, str]:
"""
分析错误类型
Args:
stderr: 错误输出
database: 数据库名称
Returns:
Tuple[str, str]: (错误类型, 错误信息)
"""
stderr_lower = stderr.lower()
for error_type, patterns in self.ERROR_PATTERNS.items():
for pattern in patterns:
if pattern.lower() in stderr_lower:
return self._get_error_message(error_type, database, stderr)
# 未知错误
return "UNKNOWN_ERROR", f"未知错误: {stderr.strip()}"
def _get_error_message(self, error_type: str, database: str, stderr: str) -> Tuple[str, str]:
"""
获取错误消息
Args:
error_type: 错误类型
database: 数据库名称
stderr: 错误输出
Returns:
Tuple[str, str]: (错误类型, 错误消息)
"""
error_messages = {
'DATABASE_NOT_FOUND': f"数据库 '{database}' 不存在或无法访问",
'ACCESS_DENIED': "数据库访问被拒绝,请检查用户名和密码",
'CONNECTION_FAILED': "无法连接到MySQL服务器,请检查网络连接和服务器状态",
'LOCK_TABLES_ERROR': "表锁定错误,请尝试禁用锁表选项或检查数据库权限",
'PERMISSION_DENIED': "文件权限错误,请检查备份目录的写入权限",
'DISK_FULL': "磁盘空间不足,无法创建备份文件",
'MYSQLDUMP_NOT_FOUND': "mysqldump命令未找到,请确保MySQL客户端工具已安装"
}
message = error_messages.get(error_type, f"未知错误类型: {error_type}")
return error_type, f"{message} (详情: {stderr[:100]}...)"
def _log_detailed_error(self, error_type: str, stderr: str, database: str):
"""
记录详细的错误信息
Args:
error_type: 错误类型
stderr: 错误输出
database: 数据库名称
"""
error_context = {
'timestamp': datetime.now().isoformat(),
'database': database,
'error_type': error_type,
'error_message': stderr,
'suggested_action': self._get_suggested_action(error_type)
}
logging.error(f"错误详情: {error_context}")
def _get_suggested_action(self, error_type: str) -> str:
"""
获取建议的解决措施
Args:
error_type: 错误类型
Returns:
str: 建议的解决措施
"""
actions = {
'DATABASE_NOT_FOUND': "检查数据库名称是否正确,确保数据库存在",
'ACCESS_DENIED': "验证MySQL用户名和密码,检查用户权限",
'CONNECTION_FAILED': "检查MySQL服务是否运行,网络连接是否正常",
'LOCK_TABLES_ERROR': "在配置中禁用锁表选项,或授予LOCK TABLES权限",
'PERMISSION_DENIED': "检查备份目录的写入权限,或更换备份目录",
'DISK_FULL': "清理磁盘空间或更换备份目录",
'MYSQLDUMP_NOT_FOUND': "安装MySQL客户端工具,或确保mysqldump在PATH中"
}
return actions.get(error_type, "查看详细错误日志以获取更多信息")
def _cleanup_failed_backup(self, backup_file: Path):
"""
清理失败的备份文件
Args:
backup_file: 要清理的备份文件路径
"""
try:
if backup_file.exists():
backup_file.unlink()
logging.info(f"已清理失败的备份文件: {backup_file}")
except Exception as e:
logging.error(f"清理备份文件失败 {backup_file}: {e}")
class DatabaseErrorHandler:
"""
数据库错误处理器
处理数据库连接和操作相关的错误
"""
def handle_connection_error(self, error: Exception, host: str, port: int) -> Tuple[bool, str]:
"""
处理数据库连接错误
Args:
error: 异常对象
host: 主机地址
port: 端口号
Returns:
Tuple[bool, str]: (是否成功, 错误信息)
"""
error_msg = str(error).lower()
if "access denied" in error_msg:
return False, f"数据库访问被拒绝: {host}:{port}"
elif "can't connect" in error_msg:
return False, f"无法连接到数据库服务器: {host}:{port}"
elif "unknown database" in error_msg:
return False, "指定的数据库不存在"
elif "timeout" in error_msg:
return False, f"数据库连接超时: {host}:{port}"
else:
return False, f"数据库连接错误: {error}"
def handle_query_error(self, error: Exception, query: str = "") -> Tuple[bool, str]:
"""
处理数据库查询错误
Args:
error: 异常对象
query: 查询语句(可选)
Returns:
Tuple[bool, str]: (是否成功, 错误信息)
"""
error_msg = str(error).lower()
if "table doesn't exist" in error_msg:
return False, "查询的表不存在"
elif "syntax error" in error_msg:
return False, "SQL语法错误"
elif "permission denied" in error_msg:
return False, "没有执行查询的权限"
else:
base_msg = f"数据库查询错误: {error}"
if query:
base_msg += f" (查询: {query[:50]}...)"
return False, base_msg
class FileSystemErrorHandler:
"""
文件系统错误处理器
处理文件操作相关的错误
"""
def handle_file_error(self, error: Exception, file_path: Path, operation: str) -> Tuple[bool, str]:
"""
处理文件操作错误
Args:
error: 异常对象
file_path: 文件路径
operation: 操作类型
Returns:
Tuple[bool, str]: (是否成功, 错误信息)
"""
error_msg = str(error).lower()
if "permission denied" in error_msg:
return False, f"文件权限不足: {file_path}"
elif "no such file or directory" in error_msg:
return False, f"文件或目录不存在: {file_path}"
elif "file exists" in error_msg:
return False, f"文件已存在: {file_path}"
elif "disk full" in error_msg or "no space" in error_msg:
return False, f"磁盘空间不足: {file_path}"
elif "is a directory" in error_msg:
return False, f"路径是目录而不是文件: {file_path}"
else:
return False, f"文件{operation}错误 {file_path}: {error}"
def handle_directory_error(self, error: Exception, dir_path: Path, operation: str) -> Tuple[bool, str]:
"""
处理目录操作错误
Args:
error: 异常对象
dir_path: 目录路径
operation: 操作类型
Returns:
Tuple[bool, str]: (是否成功, 错误信息)
"""
error_msg = str(error).lower()
if "permission denied" in error_msg:
return False, f"目录权限不足: {dir_path}"
elif "no such file or directory" in error_msg:
return False, f"目录不存在: {dir_path}"
elif "file exists" in error_msg:
return False, f"路径已存在但不是目录: {dir_path}"
elif "not empty" in error_msg:
return False, f"目录不为空: {dir_path}"
else:
return False, f"目录{operation}错误 {dir_path}: {error}"
class ErrorReporter:
"""
错误报告器
生成详细的错误报告和统计
"""
def __init__(self):
self.error_count = 0
self.error_types = {}
self.first_error_time = None
self.last_error_time = None
def record_error(self, error_type: str, error_message: str, context: Dict[str, Any] = None):
"""
记录错误
Args:
error_type: 错误类型
error_message: 错误消息
context: 错误上下文信息
"""
self.error_count += 1
current_time = datetime.now()
# 更新时间戳
if not self.first_error_time:
self.first_error_time = current_time
self.last_error_time = current_time
# 统计错误类型
self.error_types[error_type] = self.error_types.get(error_type, 0) + 1
# 记录错误详情
error_detail = {
'type': error_type,
'message': error_message,
'timestamp': current_time.isoformat(),
'context': context or {}
}
logging.error(f"错误记录: {error_detail}")
def get_error_report(self) -> Dict[str, Any]:
"""
获取错误报告
Returns:
Dict[str, Any]: 错误报告
"""
return {
'total_errors': self.error_count,
'error_types': self.error_types,
'first_error': self.first_error_time.isoformat() if self.first_error_time else None,
'last_error': self.last_error_time.isoformat() if self.last_error_time else None,
'time_span_hours': (
(self.last_error_time - self.first_error_time).total_seconds() / 3600
if self.first_error_time and self.last_error_time else 0
)
}
def reset_statistics(self):
"""重置错误统计"""
self.error_count = 0
self.error_types = {}
self.first_error_time = None
self.last_error_time = None
logging.info("错误统计已重置")
################################################################################
# 文件: utilsile_utils.py
################################################################################
"""
文件操作工具模块
提供文件压缩、目录管理等文件相关操作
"""
import os
import gzip
import shutil
import logging
from pathlib import Path
from typing import Optional, List
from datetime import datetime
def compress_file(file_path: Path) -> Optional[str]:
"""
压缩文件为.gz格式
Args:
file_path: 要压缩的文件路径
Returns:
Optional[str]: 压缩后的文件路径,失败返回None
Example:
>>> compressed = compress_file(Path('backup.sql'))
>>> print(compressed) # backup.sql.gz
"""
if not file_path.exists():
logging.error(f"要压缩的文件不存在: {file_path}")
return None
try:
compressed_file = f"{file_path}.gz"
original_size = file_path.stat().st_size
logging.debug(f"开始压缩文件: {file_path} -> {compressed_file}")
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 验证压缩文件
if not Path(compressed_file).exists():
logging.error(f"压缩文件创建失败: {compressed_file}")
return None
compressed_size = Path(compressed_file).stat().st_size
compression_ratio = (1 - compressed_size / original_size) * 100
# 删除原始文件
os.remove(file_path)
logging.info(
f"文件压缩完成: {file_path.name} -> {Path(compressed_file).name} "
f"({original_size / 1024 / 1024:.2f} MB -> {compressed_size / 1024 / 1024:.2f} MB, "
f"压缩率: {compression_ratio:.1f}%)"
)
return compressed_file
except Exception as e:
logging.error(f"文件压缩失败 {file_path}: {e}")
return None
def decompress_file(compressed_file: Path) -> Optional[str]:
"""
解压缩.gz文件
Args:
compressed_file: 要解压的.gz文件路径
Returns:
Optional[str]: 解压后的文件路径,失败返回None
"""
if not compressed_file.exists():
logging.error(f"要解压的文件不存在: {compressed_file}")
return None
try:
# 移除.gz扩展名得到原始文件名
original_file = str(compressed_file).replace('.gz', '')
with gzip.open(compressed_file, 'rb') as f_in:
with open(original_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 删除压缩文件
os.remove(compressed_file)
logging.info(f"文件解压完成: {compressed_file.name} -> {Path(original_file).name}")
return original_file
except Exception as e:
logging.error(f"文件解压失败 {compressed_file}: {e}")
return None
def ensure_directory(directory: Path) -> bool:
"""
确保目录存在,如果不存在则创建
Args:
directory: 目录路径
Returns:
bool: 是否成功确保目录存在
"""
try:
directory.mkdir(parents=True, exist_ok=True)
logging.debug(f"目录已确保存在: {directory}")
return True
except Exception as e:
logging.error(f"创建目录失败 {directory}: {e}")
return False
def get_file_size(file_path: Path) -> Optional[float]:
"""
获取文件大小(MB)
Args:
file_path: 文件路径
Returns:
Optional[float]: 文件大小(MB),失败返回None
"""
try:
if file_path.exists():
return file_path.stat().st_size / (1024 * 1024)
return None
except Exception as e:
logging.error(f"获取文件大小失败 {file_path}: {e}")
return None
def format_file_size(size_bytes: int) -> str:
"""
格式化文件大小
Args:
size_bytes: 字节数
Returns:
str: 格式化后的大小字符串
"""
units = ['B', 'KB', 'MB', 'GB', 'TB']
size = float(size_bytes)
for unit in units:
if size < 1024.0 or unit == 'TB':
break
size /= 1024.0
return f"{size:.2f} {unit}"
def list_files(directory: Path, pattern: str = "*") -> List[Path]:
"""
列出目录中的文件
Args:
directory: 目录路径
pattern: 文件模式
Returns:
List[Path]: 文件路径列表
"""
if not directory.exists():
logging.warning(f"目录不存在: {directory}")
return []
try:
files = list(directory.glob(pattern))
return sorted(files, key=lambda x: x.stat().st_mtime, reverse=True)
except Exception as e:
logging.error(f"列出文件失败 {directory}: {e}")
return []
def safe_delete_file(file_path: Path) -> bool:
"""
安全删除文件
Args:
file_path: 要删除的文件路径
Returns:
bool: 是否删除成功
"""
try:
if file_path.exists():
file_path.unlink()
logging.debug(f"文件已删除: {file_path}")
return True
return True # 文件不存在也算删除成功
except Exception as e:
logging.error(f"删除文件失败 {file_path}: {e}")
return False
def backup_file(original_file: Path, backup_suffix: str = ".bak") -> Optional[Path]:
"""
备份文件
Args:
original_file: 原始文件路径
backup_suffix: 备份文件后缀
Returns:
Optional[Path]: 备份文件路径,失败返回None
"""
if not original_file.exists():
logging.warning(f"要备份的文件不存在: {original_file}")
return None
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = original_file.with_suffix(f"{backup_suffix}.{timestamp}")
shutil.copy2(original_file, backup_file)
logging.info(f"文件备份完成: {original_file} -> {backup_file}")
return backup_file
except Exception as e:
logging.error(f"文件备份失败 {original_file}: {e}")
return None
def calculate_directory_size(directory: Path) -> Optional[int]:
"""
计算目录总大小(字节)
Args:
directory: 目录路径
Returns:
Optional[int]: 目录总大小(字节),失败返回None
"""
if not directory.exists():
return None
try:
total_size = 0
for file_path in directory.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return total_size
except Exception as e:
logging.error(f"计算目录大小失败 {directory}: {e}")
return None
class FileManager:
"""
文件管理器
提供高级文件管理功能
"""
def __init__(self, base_directory: Path):
self.base_directory = base_directory
ensure_directory(base_directory)
def create_backup_structure(self) -> bool:
"""
创建备份目录结构
Returns:
bool: 是否创建成功
"""
try:
directories = [
self.base_directory / "daily",
self.base_directory / "weekly",
self.base_directory / "monthly",
self.base_directory / "logs"
]
for directory in directories:
ensure_directory(directory)
logging.info(f"备份目录结构创建完成: {self.base_directory}")
return True
except Exception as e:
logging.error(f"创建备份目录结构失败: {e}")
return False
def cleanup_empty_directories(self) -> int:
"""
清理空目录
Returns:
int: 清理的目录数量
"""
cleaned_count = 0
try:
for directory in self.base_directory.rglob('*'):
if directory.is_dir() and not any(directory.iterdir()):
directory.rmdir()
cleaned_count += 1
logging.debug(f"清理空目录: {directory}")
if cleaned_count > 0:
logging.info(f"清理了 {cleaned_count} 个空目录")
return cleaned_count
except Exception as e:
logging.error(f"清理空目录失败: {e}")
return 0
def get_storage_info(self) -> dict:
"""
获取存储信息
Returns:
dict: 存储信息字典
"""
try:
total_size = calculate_directory_size(self.base_directory) or 0
file_count = sum(1 for _ in self.base_directory.rglob('*') if _.is_file())
# 按扩展名统计
extensions = {}
for file_path in self.base_directory.rglob('*'):
if file_path.is_file():
ext = file_path.suffix.lower() or '无扩展名'
extensions[ext] = extensions.get(ext, 0) + 1
return {
'total_size_bytes': total_size,
'total_size_formatted': format_file_size(total_size),
'file_count': file_count,
'extensions': extensions,
'directory': str(self.base_directory)
}
except Exception as e:
logging.error(f"获取存储信息失败: {e}")
return {'error': str(e)}
################################################################################
# 文件: utilslogger.py
################################################################################
"""
日志配置模块
负责系统的日志配置和管理
"""
import logging
import sys
from pathlib import Path
from typing import Optional
from logging.handlers import RotatingFileHandler
from config_manager import ConfigManager
def setup_logging(config_manager: ConfigManager, log_name: str = "mysql_auto_backup"):
"""
设置日志配置
Args:
config_manager: 配置管理器实例
log_name: 日志名称,用于生成日志文件名
"""
log_config = config_manager.config['logging']
log_dir = Path(log_config['log_dir'])
# 如果是相对路径,转换为绝对路径
if not log_dir.is_absolute():
log_dir = Path(__file__).parent.parent / log_dir
# 确保日志目录存在
log_dir.mkdir(parents=True, exist_ok=True)
# 设置日志级别
log_level = getattr(logging, log_config['log_level'].upper(), logging.INFO)
# 清除之前的处理器
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件处理器 - 使用轮转文件
log_file = log_dir / f"{log_name}.log"
file_handler = RotatingFileHandler(
log_file,
maxBytes=parse_size(log_config.get('max_log_size', '10MB')),
backupCount=log_config.get('backup_count', 5),
encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.setLevel(log_level)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
# 配置根日志记录器
logging.basicConfig(
level=log_level,
handlers=[file_handler, console_handler]
)
# 设置第三方库的日志级别
logging.getLogger('pymysql').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.info(f"日志系统初始化完成 - 级别: {log_level}, 文件: {log_file}")
def parse_size(size_str: str) -> int:
"""
解析大小字符串为字节数
Args:
size_str: 大小字符串,如 '10MB', '1GB'
Returns:
int: 字节数
"""
size_str = size_str.upper().strip()
# 定义单位映射
units = {
'B': 1,
'KB': 1024,
'MB': 1024 ** 2,
'GB': 1024 ** 3,
'TB': 1024 ** 4
}
# 提取数字和单位
number = ''
unit = ''
for char in size_str:
if char.isdigit() or char == '.':
number += char
else:
unit += char
if not number:
return 10 * 1024 * 1024 # 默认10MB
number = float(number)
unit = unit.strip()
# 如果没有单位,默认使用字节
if not unit:
return int(number)
# 查找匹配的单位
for u, multiplier in units.items():
if unit.startswith(u):
return int(number * multiplier)
# 如果没有匹配的单位,使用默认值
logging.warning(f"未知的大小单位: {unit},使用默认值10MB")
return 10 * 1024 * 1024
def get_logger(name: str) -> logging.Logger:
"""
获取指定名称的日志记录器
Args:
name: 日志记录器名称
Returns:
logging.Logger: 日志记录器实例
"""
return logging.getLogger(name)
def set_log_level(level: str):
"""
设置日志级别
Args:
level: 日志级别 ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
"""
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if level.upper() not in valid_levels:
raise ValueError(f"日志级别必须是: {', '.join(valid_levels)}")
logging.getLogger().setLevel(getattr(logging, level.upper()))
class LogManager:
"""
日志管理器
提供更高级的日志管理功能
"""
def __init__(self, config_manager: ConfigManager):
self.config_manager = config_manager
self.loggers = {}
def get_module_logger(self, module_name: str) -> logging.Logger:
"""
获取模块专用的日志记录器
Args:
module_name: 模块名称
Returns:
logging.Logger: 模块日志记录器
"""
if module_name not in self.loggers:
logger = logging.getLogger(module_name)
self.loggers[module_name] = logger
return self.loggers[module_name]
def enable_debug_mode(self):
"""启用调试模式"""
set_log_level('DEBUG')
logging.info("调试模式已启用")
def disable_debug_mode(self):
"""禁用调试模式"""
log_config = self.config_manager.config['logging']
log_level = log_config['log_level'].upper()
set_log_level(log_level)
logging.info(f"调试模式已禁用,恢复为 {log_level} 级别")
def get_log_stats(self) -> dict:
"""
获取日志统计信息
Returns:
dict: 日志统计信息
"""
log_config = self.config_manager.config['logging']
log_dir = Path(log_config['log_dir'])
if not log_dir.is_absolute():
log_dir = Path(__file__).parent.parent / log_dir
stats = {
'log_directory': str(log_dir),
'current_level': logging.getLevelName(logging.getLogger().level),
'active_loggers': len(self.loggers),
'log_files': []
}
# 统计日志文件
if log_dir.exists():
for log_file in log_dir.glob("*.log*"):
file_stats = {
'name': log_file.name,
'size': format_file_size(log_file.stat().st_size),
'modified': log_file.stat().st_mtime
}
stats['log_files'].append(file_stats)
return stats
################################################################################
# 文件: utilssystem_utils.py
################################################################################
"""
系统工具模块
提供系统相关的工具函数和系统信息获取
"""
import os
import sys
import platform
import subprocess
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict
def is_windows() -> bool:
"""
检查当前系统是否为Windows
Returns:
bool: 如果是Windows系统返回True
"""
return platform.system().lower() == 'windows'
def is_linux() -> bool:
"""
检查当前系统是否为Linux
Returns:
bool: 如果是Linux系统返回True
"""
return platform.system().lower() == 'linux'
def is_macos() -> bool:
"""
检查当前系统是否为macOS
Returns:
bool: 如果是macOS系统返回True
"""
return platform.system().lower() == 'darwin'
def get_system_info() -> Dict[str, str]:
"""
获取系统信息
Returns:
Dict[str, str]: 系统信息字典
"""
try:
info = {
'platform': platform.system(),
'platform_release': platform.release(),
'platform_version': platform.version(),
'architecture': platform.machine(),
'processor': platform.processor(),
'python_version': platform.python_version(),
'python_implementation': platform.python_implementation()
}
# 添加额外信息
if is_windows():
info['windows_edition'] = platform.win32_edition()
elif is_linux():
info['libc_version'] = platform.libc_ver()
return info
except Exception as e:
logging.error(f"获取系统信息失败: {e}")
return {'error': str(e)}
def execute_command(cmd: list, timeout: int = 300) -> Tuple[bool, str, str]:
"""
执行系统命令
Args:
cmd: 命令列表
timeout: 超时时间(秒)
Returns:
Tuple[bool, str, str]: (是否成功, 标准输出, 标准错误)
"""
try:
logging.debug(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
encoding='utf-8',
errors='ignore'
)
success = result.returncode == 0
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if not success:
logging.warning(f"命令执行失败: {stderr}")
else:
logging.debug(f"命令执行成功")
return success, stdout, stderr
except subprocess.TimeoutExpired:
error_msg = f"命令执行超时 ({timeout}秒)"
logging.error(error_msg)
return False, "", error_msg
except Exception as e:
error_msg = f"命令执行异常: {e}"
logging.error(error_msg)
return False, "", error_msg
def check_command_exists(command: str) -> bool:
"""
检查命令是否存在
Args:
command: 命令名称
Returns:
bool: 命令是否存在
"""
try:
if is_windows():
# Windows系统
cmd = ['where', command]
else:
# Linux/macOS系统
cmd = ['which', command]
success, _, _ = execute_command(cmd)
return success
except Exception as e:
logging.debug(f"检查命令存在性失败 {command}: {e}")
return False
def get_disk_usage(path: Path) -> Optional[Dict[str, float]]:
"""
获取磁盘使用情况
Args:
path: 路径
Returns:
Optional[Dict[str, float]]: 磁盘使用信息,失败返回None
"""
try:
if is_windows():
# Windows系统使用wmic
drive = path.drive
cmd = ['wmic', 'logicaldisk', 'where', f'deviceid="{drive}"', 'get', 'size,freespace']
success, stdout, _ = execute_command(cmd)
if success and stdout:
lines = stdout.strip().split('
')
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 2:
total = int(parts[0])
free = int(parts[1])
return {
'total_gb': total / (1024 ** 3),
'free_gb': free / (1024 ** 3),
'used_gb': (total - free) / (1024 ** 3),
'usage_percent': ((total - free) / total) * 100
}
else:
# Linux/macOS系统使用df
cmd = ['df', '-B1', str(path)]
success, stdout, _ = execute_command(cmd)
if success and stdout:
lines = stdout.strip().split('
')
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 4:
total = int(parts[1])
used = int(parts[2])
available = int(parts[3])
return {
'total_gb': total / (1024 ** 3),
'used_gb': used / (1024 ** 3),
'free_gb': available / (1024 ** 3),
'usage_percent': (used / total) * 100
}
return None
except Exception as e:
logging.error(f"获取磁盘使用情况失败: {e}")
return None
def get_memory_usage() -> Optional[Dict[str, float]]:
"""
获取内存使用情况
Returns:
Optional[Dict[str, float]]: 内存使用信息,失败返回None
"""
try:
if is_windows():
# Windows系统使用wmic
cmd = ['wmic', 'OS', 'get', 'TotalVisibleMemorySize,FreePhysicalMemory', '/value']
success, stdout, _ = execute_command(cmd)
if success and stdout:
lines = stdout.strip().split('
')
total = None
free = None
for line in lines:
if 'TotalVisibleMemorySize' in line:
total = int(line.split('=')[1])
elif 'FreePhysicalMemory' in line:
free = int(line.split('=')[1])
if total and free is not None:
used = total - free
return {
'total_mb': total / 1024,
'used_mb': used / 1024,
'free_mb': free / 1024,
'usage_percent': (used / total) * 100
}
else:
# Linux系统使用free命令
cmd = ['free', '-m']
success, stdout, _ = execute_command(cmd)
if success and stdout:
lines = stdout.strip().split('
')
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 7:
total = int(parts[1])
used = int(parts[2])
free = int(parts[3])
return {
'total_mb': total,
'used_mb': used,
'free_mb': free,
'usage_percent': (used / total) * 100
}
return None
except Exception as e:
logging.error(f"获取内存使用情况失败: {e}")
return None
def create_symlink(source: Path, target: Path) -> bool:
"""
创建符号链接
Args:
source: 源文件/目录
target: 目标链接
Returns:
bool: 是否创建成功
"""
try:
if target.exists():
target.unlink()
if is_windows():
# Windows系统
import ctypes
if source.is_file():
# 文件符号链接
subprocess.run(['mklink', str(target), str(source)], shell=True, check=True)
else:
# 目录符号链接
subprocess.run(['mklink', '/D', str(target), str(source)], shell=True, check=True)
else:
# Linux/macOS系统
target.symlink_to(source)
logging.debug(f"创建符号链接: {source} -> {target}")
return True
except Exception as e:
logging.error(f"创建符号链接失败 {source} -> {target}: {e}")
return False
class SystemMonitor:
"""
系统监控器
监控系统资源使用情况
"""
def __init__(self):
self.start_time = None
def start_monitoring(self):
"""开始监控"""
import time
self.start_time = time.time()
logging.info("系统监控已启动")
def get_system_health(self) -> Dict[str, any]:
"""
获取系统健康状态
Returns:
Dict[str, any]: 系统健康状态信息
"""
health = {
'timestamp': None,
'disk_usage': None,
'memory_usage': None,
'system_info': get_system_info(),
'warnings': []
}
try:
import time
health['timestamp'] = time.time()
# 检查磁盘使用情况
current_dir = Path.cwd()
disk_info = get_disk_usage(current_dir)
if disk_info:
health['disk_usage'] = disk_info
if disk_info['usage_percent'] > 90:
health['warnings'].append('磁盘使用率超过90%')
# 检查内存使用情况
memory_info = get_memory_usage()
if memory_info:
health['memory_usage'] = memory_info
if memory_info['usage_percent'] > 85:
health['warnings'].append('内存使用率超过85%')
return health
except Exception as e:
logging.error(f"获取系统健康状态失败: {e}")
health['error'] = str(e)
return health
def stop_monitoring(self):
"""停止监控"""
if self.start_time:
import time
duration = time.time() - self.start_time
logging.info(f"系统监控已停止,运行时间: {duration:.2f}秒")
self.start_time = None
################################################################################
# 文件: utilsalidation_utils.py
################################################################################
"""
数据验证工具模块
提供配置验证、路径验证等验证功能
"""
import re
import logging
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
from urllib.parse import urlparse
from models.config_models import MySQLConfig, BackupConfig
def validate_mysql_connection(host: str, port: int, user: str, password: str,
database: str = "mysql") -> Tuple[bool, str]:
"""
验证MySQL连接参数
Args:
host: 主机地址
port: 端口号
user: 用户名
password: 密码
database: 数据库名
Returns:
Tuple[bool, str]: (是否有效, 错误信息)
"""
errors = []
# 验证主机地址
if not host or not host.strip():
errors.append("MySQL主机地址不能为空")
elif len(host) > 255:
errors.append("MySQL主机地址过长")
# 验证端口
if not isinstance(port, int) or port < 1 or port > 65535:
errors.append("MySQL端口必须在1-65535范围内")
# 验证用户名
if not user or not user.strip():
errors.append("MySQL用户名不能为空")
elif len(user) > 32:
errors.append("MySQL用户名过长")
# 验证密码
if not password:
errors.append("MySQL密码不能为空")
elif len(password) > 255:
errors.append("MySQL密码过长")
# 验证数据库名
if database and len(database) > 64:
errors.append("MySQL数据库名过长")
if errors:
return False, "; ".join(errors)
else:
return True, "MySQL连接参数验证通过"
def validate_backup_config(backup_dir: str, keep_backups: int, timeout: int,
compression: bool, databases: List[str]) -> Tuple[bool, str]:
"""
验证备份配置参数
Args:
backup_dir: 备份目录
keep_backups: 保留备份数量
timeout: 超时时间
compression: 是否压缩
databases: 数据库列表
Returns:
Tuple[bool, str]: (是否有效, 错误信息)
"""
errors = []
# 验证备份目录
if not backup_dir or not backup_dir.strip():
errors.append("备份目录不能为空")
else:
try:
# 尝试创建路径对象
path = Path(backup_dir)
# 检查路径是否包含非法字符
if any(char in backup_dir for char in ['*', '?', '"', '<', '>', '|']):
errors.append("备份目录包含非法字符")
except Exception as e:
errors.append(f"备份目录无效: {e}")
# 验证保留备份数量
if not isinstance(keep_backups, int) or keep_backups < 1:
errors.append("保留备份数量必须为正整数")
elif keep_backups > 1000:
errors.append("保留备份数量不能超过1000")
# 验证超时时间
if not isinstance(timeout, int) or timeout < 30:
errors.append("备份超时时间必须至少30秒")
elif timeout > 3600: # 1小时
errors.append("备份超时时间不能超过3600秒")
# 验证压缩设置
if not isinstance(compression, bool):
errors.append("压缩设置必须为布尔值")
# 验证数据库列表
if not databases:
errors.append("至少需要指定一个数据库")
else:
for db in databases:
if not db or not isinstance(db, str):
errors.append("数据库名称不能为空且必须为字符串")
elif len(db) > 64:
errors.append(f"数据库名称过长: {db}")
elif not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', db):
errors.append(f"数据库名称包含非法字符: {db}")
if errors:
return False, "; ".join(errors)
else:
return True, "备份配置验证通过"
def validate_file_path(file_path: str, check_writable: bool = False) -> Tuple[bool, str]:
"""
验证文件路径
Args:
file_path: 文件路径
check_writable: 是否检查可写性
Returns:
Tuple[bool, str]: (是否有效, 错误信息)
"""
try:
path = Path(file_path)
# 检查路径是否包含非法字符
invalid_chars = ['*', '?', '"', '<', '>', '|', '']
if any(char in str(path) for char in invalid_chars):
return False, "文件路径包含非法字符"
# 检查父目录是否存在
parent_dir = path.parent
if not parent_dir.exists():
return False, f"父目录不存在: {parent_dir}"
# 检查是否可写(如果需要)
if check_writable:
try:
# 尝试创建测试文件
test_file = parent_dir / ".write_test"
test_file.touch()
test_file.unlink()
except Exception as e:
return False, f"目录不可写: {e}"
return True, "文件路径验证通过"
except Exception as e:
return False, f"文件路径验证失败: {e}"
def validate_email(email: str) -> bool:
"""
验证电子邮件地址格式
Args:
email: 电子邮件地址
Returns:
bool: 是否有效
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def validate_url(url: str) -> bool:
"""
验证URL格式
Args:
url: URL地址
Returns:
bool: 是否有效
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def validate_port(port: Any) -> bool:
"""
验证端口号
Args:
port: 端口号
Returns:
bool: 是否有效
"""
try:
port_int = int(port)
return 1 <= port_int <= 65535
except (ValueError, TypeError):
return False
def validate_filename(filename: str) -> Tuple[bool, str]:
"""
验证文件名
Args:
filename: 文件名
Returns:
Tuple[bool, str]: (是否有效, 错误信息)
"""
if not filename or not filename.strip():
return False, "文件名不能为空"
# 检查长度
if len(filename) > 255:
return False, "文件名过长"
# 检查非法字符
invalid_chars = ['/', '', ':', '*', '?', '"', '<', '>', '|', '']
if any(char in filename for char in invalid_chars):
return False, "文件名包含非法字符"
# 检查保留名称(Windows)
reserved_names = [
'CON', 'PRN', 'AUX', 'NUL',
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
]
if filename.upper() in reserved_names:
return False, "文件名是系统保留名称"
return True, "文件名验证通过"
class ConfigValidator:
"""
配置验证器
提供完整的配置验证功能
"""
@staticmethod
def validate_complete_config(config: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""
验证完整配置
Args:
config: 配置字典
Returns:
Tuple[bool, List[str]]: (是否有效, 错误列表)
"""
errors = []
# 验证MySQL配置
mysql_config = config.get('mysql', {})
if not mysql_config:
errors.append("MySQL配置缺失")
else:
valid, msg = validate_mysql_connection(
mysql_config.get('host', ''),
mysql_config.get('port', 3306),
mysql_config.get('user', ''),
mysql_config.get('password', ''),
mysql_config.get('database', 'mysql')
)
if not valid:
errors.append(f"MySQL配置错误: {msg}")
# 验证备份配置
backup_config = config.get('backup', {})
if not backup_config:
errors.append("备份配置缺失")
else:
valid, msg = validate_backup_config(
backup_config.get('backup_dir', ''),
backup_config.get('keep_backups', 20),
backup_config.get('timeout', 300),
backup_config.get('compression', True),
backup_config.get('databases', [])
)
if not valid:
errors.append(f"备份配置错误: {msg}")
# 验证日志配置
logging_config = config.get('logging', {})
if logging_config:
log_dir = logging_config.get('log_dir', '')
if log_dir:
valid, msg = validate_file_path(log_dir, check_writable=True)
if not valid:
errors.append(f"日志目录错误: {msg}")
log_level = logging_config.get('log_level', 'INFO')
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if log_level.upper() not in valid_levels:
errors.append(f"日志级别无效: {log_level}")
return len(errors) == 0, errors
@staticmethod
def create_mysql_config_object(config: Dict[str, Any]) -> Optional[MySQLConfig]:
"""
从配置字典创建MySQL配置对象
Args:
config: 配置字典
Returns:
Optional[MySQLConfig]: MySQL配置对象,验证失败返回None
"""
mysql_config = config.get('mysql', {})
try:
return MySQLConfig(
host=mysql_config['host'],
user=mysql_config['user'],
password=mysql_config['password'],
database=mysql_config.get('database', 'mysql'),
port=mysql_config.get('port', 3306),
charset=mysql_config.get('charset', 'utf8mb4')
)
except KeyError as e:
logging.error(f"创建MySQL配置对象失败: 缺少必要字段 {e}")
return None
@staticmethod
def create_backup_config_object(config: Dict[str, Any]) -> Optional[BackupConfig]:
"""
从配置字典创建备份配置对象
Args:
config: 配置字典
Returns:
Optional[BackupConfig]: 备份配置对象,验证失败返回None
"""
backup_config = config.get('backup', {})
try:
return BackupConfig(
backup_dir=backup_config['backup_dir'],
keep_backups=backup_config.get('keep_backups', 20),
compression=backup_config.get('compression', True),
timeout=backup_config.get('timeout', 300),
use_custom_filename=backup_config.get('use_custom_filename', True),
filename_format=backup_config.get('filename_format', 'dump-{timestamp}-{data_source}-{database}.sql'),
add_drop_table=backup_config.get('add_drop_table', True),
disable_keys=backup_config.get('disable_keys', True),
lock_tables=backup_config.get('lock_tables', False),
add_drop_trigger=backup_config.get('add_drop_trigger', True),
no_data=backup_config.get('no_data', False),
complete_insert=backup_config.get('complete_insert', True),
create_options=backup_config.get('create_options', True),
routines=backup_config.get('routines', True),
extended_insert=backup_config.get('extended_insert', True),
databases=backup_config.get('databases', []),
tables=backup_config.get('tables', [])
)
except KeyError as e:
logging.error(f"创建备份配置对象失败: 缺少必要字段 {e}")
return None
################################################################################
# 文件: utils__init__.py
################################################################################
"""
工具类模块
提供系统所需的各类工具函数和辅助类
"""
from .logger import setup_logging
from .file_utils import compress_file, ensure_directory, get_file_size, format_file_size
from .system_utils import is_windows, is_linux, get_system_info, execute_command
from .validation_utils import validate_mysql_connection, validate_backup_config, validate_file_path
from .error_handler import BackupErrorHandler, DatabaseErrorHandler, FileSystemErrorHandler
__all__ = [
'setup_logging',
'compress_file',
'ensure_directory',
'get_file_size',
'format_file_size',
'is_windows',
'is_linux',
'get_system_info',
'execute_command',
'validate_mysql_connection',
'validate_backup_config',
'validate_file_path',
'BackupErrorHandler',
'DatabaseErrorHandler',
'FileSystemErrorHandler'
]