以土豆之名,行学习之实

MySQL自动备份系统


MySQL自动备份系统

完整的目录结构

mysql_auto_backup/
├── main.py                          # 程序主入口
├── config_manager.py                # 配置管理模块
├── requirements.txt                 # 依赖包列表
├── setup_windows_tasks.bat         # Windows计划任务设置
├── backup/                          # 备份核心模块
│   ├── __init__.py
│   ├── backup_manager.py           # 备份管理器(主协调器)
│   ├── database_manager.py         # 数据库管理
│   ├── command_builder.py          # 命令构建器
│   ├── filename_generator.py       # 文件名生成器
│   ├── backup_executor.py          # 备份执行器
│   └── backup_cleaner.py           # 备份清理器
├── utils/                           # 工具类模块
│   ├── __init__.py
│   ├── logger.py                   # 日志配置
│   ├── file_utils.py               # 文件操作工具
│   ├── system_utils.py             # 系统工具
│   ├── validation_utils.py         # 数据验证工具
│   └── error_handler.py            # 错误处理器
├── models/                          # 数据模型模块
│   ├── __init__.py
│   ├── config_models.py            # 配置数据模型
│   └── backup_models.py            # 备份数据模型
└── schedule/                        # 任务调度模块
    ├── __init__.py
    └── task_scheduler.py           # 任务调度器


系统特点
✅ 模块化设计 - 代码结构清晰,易于维护
✅ 完整的日志系统 - 详细的执行日志和错误记录
✅ 压缩功能 - 自动压缩备份文件,节省空间
✅ 灵活的配置 - 支持自定义文件名格式和备份选项
✅ 错误处理 - 完善的错误处理和恢复机制
✅ 进度跟踪 - 实时显示备份进度和性能指标


系统功能验证,测试功能:
1. 测试数据库连接
    python main.py test
2. 查看所有数据库
    python main.py list
3. 备份所有数据库
    python main.py all
4. 查看配置
    python main.py config
5. 设置Windows计划任务
    运行 setup_windows_tasks.bat 来设置开机和关机自动备份。

下一步建议
验证备份文件:解压一个备份文件检查内容是否完整
测试恢复功能:使用生成的备份文件测试数据库恢复
配置计划任务:设置自动备份确保数据安全
监控磁盘空间:定期检查备份目录的磁盘使用情况


################################################################################
# 文件: config_manager.py
################################################################################

"""
配置管理模块
负责配置文件的加载、保存、验证和默认配置管理
"""

import json
import logging
from pathlib import Path
from typing import Dict, Any, Optional

from models.config_models import MySQLConfig, BackupConfig, LoggingConfig, ScheduleConfig


class ConfigManager:
    """
    配置管理器
    提供配置文件的完整生命周期管理
    """

    def __init__(self, config_file: str = "config.json"):
        """
        初始化配置管理器

        Args:
            config_file: 配置文件路径
        """
        self.config_file = Path(config_file)
        self.default_config = self.get_default_config()
        self.config = self.load_config()

    def get_default_config(self) -> Dict[str, Any]:
        """
        获取默认配置

        Returns:
            Dict[str, Any]: 完整的默认配置字典
        """
        return {
            "mysql": {
                "host": "localhost",
                "port": 3306,
                "user": "root",
                "password": "1234",
                "database": "td",
                "charset": "utf8mb4"
            },
            "backup": {
                "backup_dir": "backups",
                "keep_backups": 20,
                "compression": True,
                "timeout": 300,
                "use_custom_filename": True,
                "filename_format": "dump-{timestamp}-{data_source}-{database}.sql",
                "add_drop_table": True,
                "disable_keys": True,
                "lock_tables": False,
                "add_drop_trigger": True,
                "no_data": False,
                "complete_insert": True,
                "create_options": True,
                "routines": True,
                "extended_insert": True,
                "databases": ["td", "yxkj"],
                "tables": [],
                "include_routines": True,
                "include_triggers": True,
                "include_events": True
            },
            "logging": {
                "log_dir": "logs",
                "log_level": "INFO",
                "max_log_size": "10MB",
                "backup_count": 5
            },
            "schedule": {
                "startup_delay": 120,
                "shutdown_timeout": 300,
                "wait_mysql_timeout": 300
            }
        }

    def load_config(self) -> Dict[str, Any]:
        """
        加载配置文件

        Returns:
            Dict[str, Any]: 合并后的配置字典

        Raises:
            FileNotFoundError: 配置文件不存在且创建失败
            json.JSONDecodeError: 配置文件格式错误
        """
        # 如果配置文件不存在,创建默认配置
        if not self.config_file.exists():
            logging.warning(f"配置文件不存在: {self.config_file},正在创建默认配置...")
            if not self.create_default_config():
                raise FileNotFoundError(f"无法创建配置文件: {self.config_file}")

        try:
            # 读取配置文件
            with open(self.config_file, 'r', encoding='utf-8') as f:
                user_config = json.load(f)

            # 深度合并配置
            config = self.merge_config(self.default_config, user_config)
            logging.info(f"配置文件加载成功: {self.config_file}")

            # 验证配置
            self.validate_config(config)

            return config

        except json.JSONDecodeError as e:
            logging.error(f"配置文件格式错误: {e}")
            raise
        except Exception as e:
            logging.error(f"配置文件加载失败: {e}")
            raise

    def merge_config(self, default: Dict, user: Dict) -> Dict:
        """
        深度合并配置

        Args:
            default: 默认配置
            user: 用户配置

        Returns:
            Dict: 合并后的配置
        """
        result = default.copy()

        for key, value in user.items():
            if key in result and isinstance(result[key], dict) and isinstance(value, dict):
                # 递归合并字典
                result[key] = self.merge_config(result[key], value)
            else:
                # 直接覆盖其他类型
                result[key] = value

        return result

    def create_default_config(self) -> bool:
        """
        创建默认配置文件

        Returns:
            bool: 是否创建成功
        """
        try:
            # 确保目录存在
            self.config_file.parent.mkdir(parents=True, exist_ok=True)

            # 写入默认配置
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(self.default_config, f, indent=4, ensure_ascii=False)

            logging.info(f"默认配置文件创建成功: {self.config_file}")
            return True

        except Exception as e:
            logging.error(f"创建默认配置文件失败: {e}")
            return False

    def validate_config(self, config: Dict[str, Any]):
        """
        验证配置有效性

        Args:
            config: 要验证的配置字典

        Raises:
            ValueError: 配置验证失败
        """
        # 验证MySQL配置
        mysql_config = config.get("mysql", {})
        if not mysql_config.get("host"):
            raise ValueError("MySQL host 不能为空")
        if not mysql_config.get("user"):
            raise ValueError("MySQL user 不能为空")
        if not mysql_config.get("password"):
            raise ValueError("MySQL password 不能为空")

        # 验证备份配置
        backup_config = config.get("backup", {})
        if not backup_config.get("backup_dir"):
            raise ValueError("备份目录不能为空")
        if backup_config.get("keep_backups", 0) <= 0:
            raise ValueError("保留备份数量必须大于0")

        logging.debug("配置验证通过")

    def save_config(self, new_config: Dict[str, Any]) -> bool:
        """
        保存配置到文件

        Args:
            new_config: 新的配置字典

        Returns:
            bool: 是否保存成功
        """
        try:
            # 验证新配置
            self.validate_config(new_config)

            # 保存到文件
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(new_config, f, indent=4, ensure_ascii=False)

            # 更新内存中的配置
            self.config = self.merge_config(self.default_config, new_config)

            logging.info("配置保存成功")
            return True

        except Exception as e:
            logging.error(f"配置保存失败: {e}")
            return False

    def get_mysql_config(self) -> Dict[str, Any]:
        """获取MySQL配置"""
        return self.config.get("mysql", {})

    def get_backup_config(self) -> Dict[str, Any]:
        """获取备份配置"""
        return self.config.get("backup", {})

    def get_logging_config(self) -> Dict[str, Any]:
        """获取日志配置"""
        return self.config.get("logging", {})

    def get_schedule_config(self) -> Dict[str, Any]:
        """获取调度配置"""
        return self.config.get("schedule", {})

    def get_config_object(self) -> Dict[str, Any]:
        """获取完整的配置对象"""
        return self.config.copy()


################################################################################
# 文件: main.py
################################################################################

#!/usr/bin/env python3
"""
MySQL自动备份系统 - 主程序入口
提供命令行接口,支持多种备份模式和配置管理
"""

import sys
import time
import logging
from backup.backup_manager import BackupManager
from config_manager import ConfigManager
from utils.logger import setup_logging


def main():
    """
    主函数 - 程序入口点
    解析命令行参数并执行相应的备份操作
    """
    try:
        # 支持指定配置文件
        config_file = sys.argv[2] if len(sys.argv) > 2 else "config.json"

        # 初始化配置管理器
        config_manager = ConfigManager(config_file)

        # 设置日志
        setup_logging(config_manager)

        # 创建备份管理器
        backup_manager = BackupManager(config_manager)

        # 根据命令行参数确定操作类型
        if len(sys.argv) > 1:
            action = sys.argv[1].lower()
        else:
            action = 'manual'

        # 执行对应的操作
        execute_action(backup_manager, action)

    except Exception as e:
        logging.error(f"程序执行异常: {e}")
        import traceback
        logging.error(traceback.format_exc())
        sys.exit(1)


def execute_action(backup_manager: BackupManager, action: str):
    """
    执行对应的备份操作

    Args:
        backup_manager: 备份管理器实例
        action: 操作类型
    """
    if action == 'startup':
        execute_startup_backup(backup_manager)
    elif action == 'shutdown':
        execute_shutdown_backup(backup_manager)
    elif action == 'manual':
        execute_manual_backup(backup_manager)
    elif action == 'all':
        execute_all_databases_backup(backup_manager)
    elif action == 'list':
        list_databases(backup_manager)
    elif action == 'config':
        show_config(backup_manager)
    elif action == 'test':
        test_database_connection(backup_manager)
    else:
        show_usage()


def execute_startup_backup(backup_manager: BackupManager):
    """执行开机备份"""
    logging.info("=== 执行开机备份 ===")

    # 等待MySQL服务启动
    startup_delay = backup_manager.config['schedule'].get('startup_delay', 120)
    logging.info(f"等待 {startup_delay} 秒系统稳定...")
    time.sleep(startup_delay)

    if backup_manager.wait_for_mysql():
        success = backup_manager.backup_database('startup')
        if success:
            logging.info("开机备份完成")
        else:
            logging.error("开机备份失败")
    else:
        logging.error("开机备份失败:MySQL服务未就绪")


def execute_shutdown_backup(backup_manager: BackupManager):
    """执行关机备份"""
    logging.info("=== 执行关机备份 ===")
    success = backup_manager.backup_database('shutdown')
    if success:
        logging.info("关机备份完成")
    else:
        logging.error("关机备份失败")


def execute_manual_backup(backup_manager: BackupManager):
    """执行手动备份"""
    logging.info("=== 执行手动备份 ===")
    success = backup_manager.backup_database('manual')
    if success:
        logging.info("手动备份完成")
    else:
        logging.error("手动备份失败")


def execute_all_databases_backup(backup_manager: BackupManager):
    """备份所有数据库"""
    logging.info("=== 备份所有数据库 ===")
    success = backup_manager.backup_all_databases('manual')
    if success:
        logging.info("所有数据库备份完成")
    else:
        logging.error("所有数据库备份失败")


def list_databases(backup_manager: BackupManager):
    """显示所有数据库"""
    databases = backup_manager.get_all_databases()
    print("所有用户数据库:")
    for db in databases:
        print(f"  - {db}")


def show_config(backup_manager: BackupManager):
    """显示当前配置"""
    import json
    print("当前配置:")
    print(json.dumps(backup_manager.config, indent=2, ensure_ascii=False))


def test_database_connection(backup_manager: BackupManager):
    """测试数据库连接"""
    success = backup_manager.test_database_connection()
    if success:
        print("数据库连接测试成功")
    else:
        print("数据库连接测试失败")


def show_usage():
    """显示使用说明"""
    print("MySQL自动备份系统 - 使用说明")
    print("=" * 50)
    print("用法:")
    print("  python main.py startup [config_file]   # 开机备份")
    print("  python main.py shutdown [config_file]  # 关机备份")
    print("  python main.py manual [config_file]    # 手动备份指定数据库")
    print("  python main.py all [config_file]       # 备份所有数据库")
    print("  python main.py list [config_file]      # 显示所有数据库")
    print("  python main.py config [config_file]    # 显示配置")
    print("  python main.py test [config_file]      # 测试数据库连接")
    print()
    print("示例:")
    print("  python main.py manual                  # 使用默认配置手动备份")
    print("  python main.py startup my_config.json  # 使用自定义配置开机备份")


if __name__ == "__main__":
    main()


################################################################################
# 文件: backupackup_cleaner.py
################################################################################

"""
备份清理器模块
负责清理旧的备份文件,管理备份存储空间
"""

import os
import logging
from pathlib import Path
from typing import List, Dict
from datetime import datetime, timedelta

from models.config_models import BackupConfig


class BackupCleaner:
    """
    备份清理器
    负责管理备份文件的存储空间,清理旧的备份文件
    """

    def __init__(self, backup_config: BackupConfig):
        """
        初始化备份清理器

        Args:
            backup_config: 备份配置对象
        """
        self.backup_config = backup_config
        logging.debug("备份清理器初始化完成")

    def clean_old_backups(self):
        """
        清理旧的备份文件
        根据配置的保留数量删除多余的备份文件
        """
        try:
            backup_dir = Path(self.backup_config.backup_dir)
            if not backup_dir.exists():
                logging.warning(f"备份目录不存在: {backup_dir}")
                return

            # 获取所有备份文件
            backup_files = self._get_all_backup_files(backup_dir)
            if not backup_files:
                logging.info("没有找到备份文件")
                return

            # 按修改时间排序(最新的在前)
            backup_files.sort(key=os.path.getmtime, reverse=True)

            # 计算需要保留的文件数量
            keep_count = self.backup_config.keep_backups
            files_to_keep = backup_files[:keep_count]
            files_to_delete = backup_files[keep_count:]

            # 删除多余的文件
            self._delete_backup_files(files_to_delete)

            # 记录清理结果
            self._log_cleanup_result(len(files_to_keep), len(files_to_delete))

        except Exception as e:
            logging.error(f"清理旧备份文件失败: {e}")

    def _get_all_backup_files(self, backup_dir: Path) -> List[Path]:
        """
        获取所有备份文件

        Args:
            backup_dir: 备份目录

        Returns:
            List[Path]: 备份文件路径列表
        """
        backup_files = []

        # 根据压缩设置确定要搜索的文件模式
        patterns = ["*.sql.gz", "*.sql"] if self.backup_config.compression else ["*.sql"]

        for pattern in patterns:
            try:
                files = list(backup_dir.glob(pattern))
                backup_files.extend(files)
                logging.debug(f"找到 {len(files)} 个 {pattern} 文件")
            except Exception as e:
                logging.warning(f"搜索 {pattern} 文件时出错: {e}")

        return backup_files

    def _delete_backup_files(self, files_to_delete: List[Path]):
        """
        删除备份文件

        Args:
            files_to_delete: 要删除的文件列表
        """
        if not files_to_delete:
            return

        deleted_count = 0
        total_size = 0

        for old_file in files_to_delete:
            try:
                if old_file.exists():
                    file_size = old_file.stat().st_size
                    os.remove(old_file)
                    deleted_count += 1
                    total_size += file_size
                    logging.info(f"删除旧备份文件: {old_file.name} ({file_size / 1024 / 1024:.2f} MB)")
            except Exception as e:
                logging.error(f"删除文件失败 {old_file}: {e}")

        # 记录删除统计
        if deleted_count > 0:
            total_size_mb = total_size / (1024 * 1024)
            logging.info(f"清理完成: 删除了 {deleted_count} 个文件,释放 {total_size_mb:.2f} MB 空间")

    def _log_cleanup_result(self, kept_count: int, deleted_count: int):
        """
        记录清理结果

        Args:
            kept_count: 保留的文件数量
            deleted_count: 删除的文件数量
        """
        if deleted_count == 0:
            logging.info(f"备份文件清理完成: 保留 {kept_count} 个文件,无需删除")
        else:
            logging.info(
                f"备份文件清理完成: 保留 {kept_count} 个文件,删除 {deleted_count} 个旧文件"
            )

    def get_backup_stats(self) -> Dict[str, any]:
        """
        获取备份文件统计信息

        Returns:
            Dict[str, any]: 统计信息字典
        """
        try:
            backup_dir = Path(self.backup_config.backup_dir)
            if not backup_dir.exists():
                return {'error': '备份目录不存在'}

            backup_files = self._get_all_backup_files(backup_dir)

            if not backup_files:
                return {
                    'total_files': 0,
                    'total_size_mb': 0,
                    'oldest_file': None,
                    'newest_file': None
                }

            # 计算统计信息
            total_size = sum(f.stat().st_size for f in backup_files)
            file_times = [(f, f.stat().st_mtime) for f in backup_files]

            oldest_file = min(file_times, key=lambda x: x[1])[0]
            newest_file = max(file_times, key=lambda x: x[1])[0]

            return {
                'total_files': len(backup_files),
                'total_size_mb': round(total_size / (1024 * 1024), 2),
                'oldest_file': {
                    'name': oldest_file.name,
                    'size_mb': round(oldest_file.stat().st_size / (1024 * 1024), 2),
                    'modified': datetime.fromtimestamp(oldest_file.stat().st_mtime).isoformat()
                },
                'newest_file': {
                    'name': newest_file.name,
                    'size_mb': round(newest_file.stat().st_size / (1024 * 1024), 2),
                    'modified': datetime.fromtimestamp(newest_file.stat().st_mtime).isoformat()
                },
                'keep_backups': self.backup_config.keep_backups,
                'files_to_keep': min(len(backup_files), self.backup_config.keep_backups)
            }

        except Exception as e:
            logging.error(f"获取备份统计信息失败: {e}")
            return {'error': str(e)}

    def cleanup_by_age(self, days: int):
        """
        根据文件年龄清理备份文件

        Args:
            days: 保留天数,超过这个天数的文件将被删除
        """
        try:
            backup_dir = Path(self.backup_config.backup_dir)
            if not backup_dir.exists():
                return

            cutoff_time = datetime.now() - timedelta(days=days)
            backup_files = self._get_all_backup_files(backup_dir)

            files_to_delete = []
            for backup_file in backup_files:
                file_time = datetime.fromtimestamp(backup_file.stat().st_mtime)
                if file_time < cutoff_time:
                    files_to_delete.append(backup_file)

            self._delete_backup_files(files_to_delete)
            logging.info(f"按年龄清理完成: 删除了 {len(files_to_delete)} 个超过 {days} 天的备份文件")

        except Exception as e:
            logging.error(f"按年龄清理备份文件失败: {e}")

    def get_disk_usage(self) -> Dict[str, any]:
        """
        获取磁盘使用情况

        Returns:
            Dict[str, any]: 磁盘使用信息
        """
        try:
            backup_dir = Path(self.backup_config.backup_dir)
            if not backup_dir.exists():
                return {'error': '备份目录不存在'}

            # 获取备份目录所在磁盘的使用情况
            stat = os.statvfs(backup_dir)
            total_space = stat.f_blocks * stat.f_frsize
            free_space = stat.f_bavail * stat.f_frsize
            used_space = total_space - free_space

            return {
                'total_space_gb': round(total_space / (1024 ** 3), 2),
                'used_space_gb': round(used_space / (1024 ** 3), 2),
                'free_space_gb': round(free_space / (1024 ** 3), 2),
                'usage_percentage': round((used_space / total_space) * 100, 2)
            }

        except Exception as e:
            logging.error(f"获取磁盘使用情况失败: {e}")
            return {'error': str(e)}


################################################################################
# 文件: backupackup_executor.py
################################################################################

"""
备份执行器模块
负责执行备份命令和处理执行结果
"""

import os
import subprocess
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple
from datetime import datetime

from models.backup_models import BackupResult
from models.config_models import BackupConfig
from utils.error_handler import BackupErrorHandler


class BackupExecutor:
    """
    备份执行器
    负责执行备份命令和处理执行结果,包括错误处理和结果记录
    """

    def __init__(self, backup_config: BackupConfig):
        """
        初始化备份执行器

        Args:
            backup_config: 备份配置对象
        """
        self.backup_config = backup_config
        self.error_handler = BackupErrorHandler()
        logging.debug("备份执行器初始化完成")

    def execute_backup_command(self, cmd: list, backup_file: Path,
                               database: str) -> Tuple[bool, Optional[str]]:
        """
        执行备份命令

        Args:
            cmd: 要执行的命令列表
            backup_file: 备份文件路径
            database: 数据库名称

        Returns:
            Tuple[bool, Optional[str]]: (是否成功, 错误信息)

        Example:
            >>> success, error = executor.execute_backup_command(cmd, backup_file, 'mydb')
            >>> if success:
            ...     print("备份成功")
        """
        logging.info(f"执行数据库 {database} 的备份命令")

        try:
            # 确保备份目录存在
            backup_file.parent.mkdir(parents=True, exist_ok=True)

            # 执行备份命令
            with open(backup_file, 'w', encoding='utf-8') as f:
                result = subprocess.run(
                    cmd,
                    stdout=f,
                    stderr=subprocess.PIPE,
                    text=True,
                    timeout=self.backup_config.timeout
                )

            # 处理执行结果
            return self._handle_execution_result(result, backup_file, database)

        except subprocess.TimeoutExpired:
            error_msg = f"数据库 {database} 备份超时 (超过 {self.backup_config.timeout} 秒)"
            logging.error(error_msg)
            self._cleanup_failed_backup(backup_file)
            return False, error_msg

        except Exception as e:
            error_msg = f"数据库 {database} 备份过程中发生错误: {e}"
            logging.error(error_msg)
            self._cleanup_failed_backup(backup_file)
            return False, error_msg

    def _handle_execution_result(self, result: subprocess.CompletedProcess,
                                 backup_file: Path, database: str) -> Tuple[bool, Optional[str]]:
        """
        处理命令执行结果

        Args:
            result: 子进程执行结果
            backup_file: 备份文件路径
            database: 数据库名称

        Returns:
            Tuple[bool, Optional[str]]: (是否成功, 错误信息)
        """
        if result.returncode != 0:
            # 处理错误情况
            return self.error_handler.handle_backup_error(result, backup_file, database)
        else:
            # 成功情况下的处理
            return self._handle_successful_backup(backup_file, database)

    def _handle_successful_backup(self, backup_file: Path, database: str) -> Tuple[bool, Optional[str]]:
        """
        处理成功的备份

        Args:
            backup_file: 备份文件路径
            database: 数据库名称

        Returns:
            Tuple[bool, Optional[str]]: (True, None)
        """
        # 检查备份文件是否为空
        self._check_backup_file_size(backup_file, database)

        # 验证备份文件完整性
        if not self._validate_backup_file(backup_file):
            error_msg = f"数据库 {database} 备份文件验证失败"
            logging.error(error_msg)
            self._cleanup_failed_backup(backup_file)
            return False, error_msg

        logging.info(f"数据库 {database} 备份命令执行成功")
        return True, None

    def _check_backup_file_size(self, backup_file: Path, database: str):
        """
        检查备份文件大小

        Args:
            backup_file: 备份文件路径
            database: 数据库名称
        """
        if backup_file.exists():
            file_size = backup_file.stat().st_size
            if file_size == 0:
                logging.warning(f"数据库 '{database}' 的备份文件为空")
            else:
                size_mb = file_size / (1024 * 1024)
                logging.debug(f"备份文件大小: {size_mb:.2f} MB")
        else:
            logging.warning(f"备份文件不存在: {backup_file}")

    def _validate_backup_file(self, backup_file: Path) -> bool:
        """
        验证备份文件完整性

        Args:
            backup_file: 备份文件路径

        Returns:
            bool: 文件是否有效
        """
        if not backup_file.exists():
            return False

        try:
            # 检查文件是否包含有效的SQL内容
            with open(backup_file, 'r', encoding='utf-8', errors='ignore') as f:
                first_line = f.readline().strip()

            # 基本的SQL文件验证
            valid_starts = ['--', '/*', 'CREATE', 'DROP', 'INSERT']
            if any(first_line.startswith(prefix) for prefix in valid_starts):
                return True
            else:
                logging.warning(f"备份文件可能无效,第一行内容: {first_line}")
                return False

        except Exception as e:
            logging.warning(f"备份文件验证失败: {e}")
            return False

    def _cleanup_failed_backup(self, backup_file: Path):
        """
        清理失败的备份文件

        Args:
            backup_file: 要清理的备份文件路径
        """
        try:
            if backup_file.exists():
                backup_file.unlink()
                logging.info(f"已清理失败的备份文件: {backup_file}")
        except Exception as e:
            logging.error(f"清理备份文件失败 {backup_file}: {e}")

    def create_backup_result(self, database: str, success: bool,
                             backup_file: Optional[Path] = None,
                             error_message: str = "",
                             start_time: Optional[datetime] = None,
                             end_time: Optional[datetime] = None) -> BackupResult:
        """
        创建备份结果对象

        Args:
            database: 数据库名称
            success: 是否成功
            backup_file: 备份文件路径
            error_message: 错误信息
            start_time: 开始时间
            end_time: 结束时间

        Returns:
            BackupResult: 备份结果对象
        """
        file_size = 0.0
        if backup_file and backup_file.exists():
            file_size = backup_file.stat().st_size / (1024 * 1024)  # 转换为MB

        return BackupResult(
            database=database,
            success=success,
            backup_file=backup_file,
            file_size=file_size,
            error_message=error_message,
            start_time=start_time or datetime.now(),
            end_time=end_time or datetime.now()
        )

    def get_performance_metrics(self, backup_result: BackupResult) -> Dict[str, any]:
        """
        获取性能指标

        Args:
            backup_result: 备份结果对象

        Returns:
            Dict[str, any]: 性能指标字典
        """
        return {
            'duration_seconds': backup_result.duration,
            'file_size_mb': backup_result.file_size,
            'throughput_mb_per_second': (
                backup_result.file_size / backup_result.duration
                if backup_result.duration > 0 else 0
            ),
            'success': backup_result.success,
            'database': backup_result.database
        }


################################################################################
# 文件: backupackup_manager.py
################################################################################

"""
备份管理器模块
协调各个备份组件,管理整个备份流程的主控制器
"""

import os
import time
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime

from config_manager import ConfigManager
from .database_manager import DatabaseManager
from .command_builder import CommandBuilder
from .filename_generator import FilenameGenerator
from .backup_executor import BackupExecutor
from .backup_cleaner import BackupCleaner
from models.config_models import MySQLConfig, BackupConfig
from models.backup_models import BackupResult


class BackupManager:
    """
    备份管理器
    作为备份系统的核心协调器,管理整个备份流程
    """

    def __init__(self, config_manager: ConfigManager):
        """
        初始化备份管理器

        Args:
            config_manager: 配置管理器实例
        """
        self.config_manager = config_manager
        self.config = config_manager.config

        # 初始化配置对象
        self.mysql_config = self._create_mysql_config()
        self.backup_config = self._create_backup_config()

        # 初始化各个组件
        self.database_manager = DatabaseManager(self.config_manager)
        self.command_builder = CommandBuilder()
        self.filename_generator = FilenameGenerator()
        self.backup_executor = BackupExecutor(self.backup_config)
        self.backup_cleaner = BackupCleaner(self.backup_config)

        # 设置备份目录
        self._setup_backup_dir()

        logging.info("备份管理器初始化完成")

    def _create_mysql_config(self) -> MySQLConfig:
        """
        从配置字典创建MySQL配置对象

        Returns:
            MySQLConfig: MySQL配置对象
        """
        mysql_dict = self.config['mysql']
        return MySQLConfig(
            host=mysql_dict['host'],
            user=mysql_dict['user'],
            password=mysql_dict['password'],
            database=mysql_dict.get('database', 'mysql'),
            port=mysql_dict.get('port', 3306),
            charset=mysql_dict.get('charset', 'utf8mb4')
        )

    def _create_backup_config(self) -> BackupConfig:
        """
        从配置字典创建备份配置对象

        Returns:
            BackupConfig: 备份配置对象
        """
        backup_dict = self.config['backup']
        return BackupConfig(
            backup_dir=backup_dict['backup_dir'],
            keep_backups=backup_dict.get('keep_backups', 20),
            compression=backup_dict.get('compression', True),
            timeout=backup_dict.get('timeout', 300),
            use_custom_filename=backup_dict.get('use_custom_filename', True),
            filename_format=backup_dict.get('filename_format', 'dump-{timestamp}-{data_source}-{database}.sql'),
            add_drop_table=backup_dict.get('add_drop_table', True),
            disable_keys=backup_dict.get('disable_keys', True),
            lock_tables=backup_dict.get('lock_tables', False),
            add_drop_trigger=backup_dict.get('add_drop_trigger', True),
            no_data=backup_dict.get('no_data', False),
            complete_insert=backup_dict.get('complete_insert', True),
            create_options=backup_dict.get('create_options', True),
            routines=backup_dict.get('routines', True),
            extended_insert=backup_dict.get('extended_insert', True),
            databases=backup_dict.get('databases', []),
            tables=backup_dict.get('tables', [])
        )

    def _setup_backup_dir(self):
        """
        设置备份目录
        确保备份目录存在且为绝对路径
        """
        backup_dir = Path(self.backup_config.backup_dir)

        # 如果是相对路径,转换为绝对路径
        if not backup_dir.is_absolute():
            backup_dir = Path(__file__).parent.parent / backup_dir
            self.backup_config.backup_dir = str(backup_dir)

        # 确保备份目录存在
        backup_dir.mkdir(parents=True, exist_ok=True)
        logging.info(f"备份目录设置完成: {backup_dir}")

    def backup_database(self, backup_type: str = 'manual') -> bool:
        """
        执行数据库备份 - 支持单个或多个数据库

        Args:
            backup_type: 备份类型 (startup/shutdown/manual)

        Returns:
            bool: 是否至少有一个数据库备份成功

        Example:
            >>> backup_manager.backup_database('manual')
            True
        """
        logging.info(f"开始执行{backup_type}备份...")

        try:
            # 获取要备份的数据库列表
            databases_to_backup = self.database_manager.get_databases_to_backup()
            if not databases_to_backup:
                logging.error("没有找到要备份的数据库")
                return False

            success_count = 0
            total_count = len(databases_to_backup)
            results = []

            # 逐个备份数据库
            for database in databases_to_backup:
                backup_result = self.backup_single_database(database, backup_type)
                results.append(backup_result)

                if backup_result.success:
                    success_count += 1
                    self._log_backup_success(backup_result)
                else:
                    self._log_backup_failure(backup_result)

            # 记录备份统计
            self._log_backup_statistics(success_count, total_count, backup_type)

            # 清理旧备份
            if success_count > 0:
                self.backup_cleaner.clean_old_backups()

            return success_count > 0

        except Exception as e:
            logging.error(f"{backup_type}备份过程中发生错误: {e}")
            import traceback
            logging.error(traceback.format_exc())
            return False

    def backup_single_database(self, database: str, backup_type: str = 'manual') -> BackupResult:
        """
        备份单个数据库

        Args:
            database: 数据库名称
            backup_type: 备份类型

        Returns:
            BackupResult: 包含详细结果的备份结果对象
        """
        start_time = datetime.now()

        try:
            backup_dir = Path(self.backup_config.backup_dir)

            # 生成备份文件名和路径
            filename = self.filename_generator.generate_backup_filename(
                database, backup_type, self.mysql_config, self.backup_config
            )
            backup_file = self.filename_generator.get_backup_file_path(backup_dir, filename)

            logging.info(f"开始备份数据库: {database}")
            logging.info(f"备份文件: {backup_file}")

            # 构建备份命令
            cmd = self.command_builder.build_complete_command(
                database, self.mysql_config, self.backup_config
            )

            # 记录安全命令(隐藏密码)
            safe_cmd = self.command_builder.get_safe_command_for_logging(cmd, self.mysql_config)
            logging.debug(f"执行命令: {safe_cmd}")

            # 执行备份命令
            success, error_message = self.backup_executor.execute_backup_command(
                cmd, backup_file, database
            )

            # 处理压缩
            final_file = backup_file
            if success and self.backup_config.compression:
                from utils.file_utils import compress_file
                compressed_file = compress_file(backup_file)
                if compressed_file and Path(compressed_file).exists():
                    final_file = Path(compressed_file)
                    logging.debug(f"文件压缩完成: {final_file}")
                else:
                    error_message = "文件压缩失败"
                    success = False

            # 创建备份结果
            end_time = datetime.now()
            return self.backup_executor.create_backup_result(
                database=database,
                success=success,
                backup_file=final_file,
                error_message=error_message,
                start_time=start_time,
                end_time=end_time
            )

        except Exception as e:
            error_msg = f"数据库 {database} 备份过程中发生未知错误: {e}"
            logging.error(error_msg)
            end_time = datetime.now()
            return self.backup_executor.create_backup_result(
                database=database,
                success=False,
                error_message=error_msg,
                start_time=start_time,
                end_time=end_time
            )

    def _log_backup_success(self, backup_result: BackupResult):
        """
        记录备份成功日志

        Args:
            backup_result: 备份结果对象
        """
        if backup_result.backup_file:
            logging.info(
                f"✅ 数据库 {backup_result.database} 备份成功: "
                f"{backup_result.backup_file} "
                f"({backup_result.file_size:.2f} MB, "
                f"耗时: {backup_result.duration:.2f}秒)"
            )

    def _log_backup_failure(self, backup_result: BackupResult):
        """
        记录备份失败日志

        Args:
            backup_result: 备份结果对象
        """
        logging.error(
            f"❌ 数据库 {backup_result.database} 备份失败: {backup_result.error_message}"
        )

    def _log_backup_statistics(self, success_count: int, total_count: int, backup_type: str):
        """
        记录备份统计信息

        Args:
            success_count: 成功数量
            total_count: 总数量
            backup_type: 备份类型
        """
        if success_count == total_count:
            logging.info(f"🎉 {backup_type}备份完成: 全部 {total_count} 个数据库备份成功")
        elif success_count > 0:
            logging.warning(
                f"⚠️ {backup_type}备份完成: {success_count}/{total_count} 个数据库备份成功"
            )
        else:
            logging.error(f"💥 {backup_type}备份完成: 全部 {total_count} 个数据库备份失败")

    def backup_all_databases(self, backup_type: str = 'manual') -> bool:
        """
        备份所有用户数据库

        Args:
            backup_type: 备份类型

        Returns:
            bool: 是否备份成功
        """
        try:
            all_databases = self.database_manager.get_all_databases()
            if not all_databases:
                logging.error("没有找到用户数据库")
                return False

            # 临时修改配置,备份所有数据库
            original_databases = self.backup_config.databases.copy()
            self.backup_config.databases = all_databases

            success = self.backup_database(backup_type)

            # 恢复原配置
            self.backup_config.databases = original_databases

            return success

        except Exception as e:
            logging.error(f"备份所有数据库失败: {e}")
            return False

    def wait_for_mysql(self, timeout: Optional[int] = None) -> bool:
        """
        等待MySQL服务启动

        Args:
            timeout: 超时时间(秒),None表示使用配置中的超时时间

        Returns:
            bool: 是否在超时时间内成功连接
        """
        if timeout is None:
            timeout = self.config['schedule'].get('wait_mysql_timeout', 300)

        logging.info(f"等待MySQL服务启动,超时时间: {timeout}秒...")
        start_time = time.time()

        while time.time() - start_time < timeout:
            try:
                if self.database_manager.test_connection():
                    logging.info("✅ MySQL服务已就绪")
                    return True
            except Exception as e:
                logging.debug(f"等待MySQL服务: {e}")

            # 等待10秒后重试
            time.sleep(10)

        logging.error(f"❌ 等待MySQL服务启动超时 ({timeout}秒)")
        return False

    def clean_old_backups(self):
        """清理旧的备份文件"""
        self.backup_cleaner.clean_old_backups()

    def test_database_connection(self) -> bool:
        """
        测试数据库连接

        Returns:
            bool: 连接是否成功
        """
        return self.database_manager.test_database_connection()

    def get_all_databases(self) -> List[str]:
        """
        获取所有用户数据库

        Returns:
            List[str]: 数据库名称列表
        """
        return self.database_manager.get_all_databases()

    def get_databases_to_backup(self) -> List[str]:
        """
        获取要备份的数据库列表

        Returns:
            List[str]: 要备份的数据库名称列表
        """
        return self.database_manager.get_databases_to_backup()

    def get_backup_files(self) -> List[Path]:
        """
        获取所有备份文件

        Returns:
            List[Path]: 备份文件路径列表
        """
        backup_dir = Path(self.backup_config.backup_dir)
        patterns = ["*.sql.gz", "*.sql"] if self.backup_config.compression else ["*.sql"]

        backup_files = []
        for pattern in patterns:
            backup_files.extend(list(backup_dir.glob(pattern)))

        return sorted(backup_files, key=lambda x: x.stat().st_mtime, reverse=True)

    def get_backup_info(self) -> Dict[str, Any]:
        """
        获取备份系统信息

        Returns:
            Dict[str, Any]: 备份系统信息字典
        """
        backup_files = self.get_backup_files()
        total_size = sum(f.stat().st_size for f in backup_files) / (1024 * 1024)  # MB

        return {
            'backup_dir': self.backup_config.backup_dir,
            'total_backups': len(backup_files),
            'total_size_mb': round(total_size, 2),
            'compression_enabled': self.backup_config.compression,
            'keep_backups': self.backup_config.keep_backups,
            'recent_backups': [str(f) for f in backup_files[:5]]  # 最近5个备份
        }


################################################################################
# 文件: backupcommand_builder.py
################################################################################

"""
命令构建器模块
负责构建mysqldump命令和参数
"""

import logging
from typing import List, Dict

from models.config_models import BackupConfig, MySQLConfig


class CommandBuilder:
    """
    mysqldump命令构建器
    负责构建和执行备份命令的各个组件
    """

    # mysqldump选项映射表
    OPTION_MAPPING = {
        'add_drop_table': '--add-drop-table',
        'disable_keys': '--disable-keys',
        'lock_tables': '--lock-tables',
        'add_drop_trigger': '--add-drop-trigger',
        'no_data': '--no-data',
        'complete_insert': '--complete-insert',
        'create_options': '--create-options',
        'routines': '--routines',
        'extended_insert': '--extended-insert'
    }

    @staticmethod
    def build_base_command(mysql_config: MySQLConfig) -> List[str]:
        """
        构建基础mysqldump命令

        Args:
            mysql_config: MySQL配置对象

        Returns:
            List[str]: 基础命令参数列表

        Example:
            >>> cmd = CommandBuilder.build_base_command(mysql_config)
            >>> print(cmd)
            ['mysqldump', '-hlocalhost', '-P3306', '-uroot', '-p***', '--single-transaction']
        """
        base_cmd = [
            'mysqldump',
            f'-h{mysql_config.host}',
            f'-P{mysql_config.port}',
            f'-u{mysql_config.user}',
            f'-p{mysql_config.password}',
            '--single-transaction',  # 使用事务保证一致性
        ]

        logging.debug(f"构建基础命令: {len(base_cmd)} 个参数")
        return base_cmd

    @staticmethod
    def add_backup_options(cmd: List[str], backup_config: BackupConfig) -> List[str]:
        """
        添加备份选项到命令

        Args:
            cmd: 基础命令列表
            backup_config: 备份配置对象

        Returns:
            List[str]: 添加了备份选项的命令列表
        """
        options_added = 0

        # 添加启用的选项
        for config_key, option in CommandBuilder.OPTION_MAPPING.items():
            if getattr(backup_config, config_key, False):
                cmd.append(option)
                options_added += 1
                logging.debug(f"添加选项: {option}")

        # 特殊处理锁表选项
        if not backup_config.lock_tables:
            cmd.append('--skip-lock-tables')
            logging.debug("添加选项: --skip-lock-tables")
            options_added += 1

        # 特殊处理扩展插入选项
        if not backup_config.extended_insert:
            cmd.append('--skip-extended-insert')
            logging.debug("添加选项: --skip-extended-insert")
            options_added += 1

        logging.debug(f"添加了 {options_added} 个备份选项")
        return cmd

    @staticmethod
    def add_database_and_tables(cmd: List[str], database: str, tables: List[str]) -> List[str]:
        """
        添加数据库和表到命令

        Args:
            cmd: 命令列表
            database: 数据库名称
            tables: 要备份的表列表

        Returns:
            List[str]: 完整的命令列表
        """
        # 添加数据库
        cmd.append(database)
        logging.debug(f"添加数据库: {database}")

        # 添加特定表(如果有)
        if tables:
            cmd.extend(tables)
            logging.debug(f"添加 {len(tables)} 个特定表: {tables}")

        return cmd

    @classmethod
    def build_complete_command(cls, database: str, mysql_config: MySQLConfig,
                               backup_config: BackupConfig) -> List[str]:
        """
        构建完整的mysqldump命令

        Args:
            database: 数据库名称
            mysql_config: MySQL配置
            backup_config: 备份配置

        Returns:
            List[str]: 完整的命令参数列表

        Example:
            >>> cmd = CommandBuilder.build_complete_command('mydb', mysql_config, backup_config)
            >>> print(' '.join(cmd))
        """
        logging.info(f"为数据库 {database} 构建备份命令")

        # 构建基础命令
        cmd = cls.build_base_command(mysql_config)

        # 添加备份选项
        cmd = cls.add_backup_options(cmd, backup_config)

        # 添加数据库和表
        tables = backup_config.tables or []
        cmd = cls.add_database_and_tables(cmd, database, tables)

        logging.debug(f"命令构建完成,共 {len(cmd)} 个参数")
        return cmd

    @staticmethod
    def get_safe_command_for_logging(cmd: List[str], mysql_config: MySQLConfig) -> str:
        """
        获取用于日志记录的安全命令(隐藏密码)

        Args:
            cmd: 原始命令列表
            mysql_config: MySQL配置(用于获取密码)

        Returns:
            str: 隐藏密码的安全命令字符串

        Example:
            >>> safe_cmd = CommandBuilder.get_safe_command_for_logging(cmd, mysql_config)
            >>> logging.info(f"执行命令: {safe_cmd}")
        """
        safe_cmd = ' '.join(cmd)
        # 替换密码为星号
        safe_cmd = safe_cmd.replace(mysql_config.password, '***')

        # 额外的安全处理:确保密码不会以其他形式泄露
        password_patterns = [
            f"-p{mysql_config.password}",
            f"--password={mysql_config.password}"
        ]

        for pattern in password_patterns:
            safe_cmd = safe_cmd.replace(pattern, "-p***")

        return safe_cmd

    @classmethod
    def validate_command(cls, cmd: List[str]) -> bool:
        """
        验证命令是否有效

        Args:
            cmd: 要验证的命令列表

        Returns:
            bool: 命令是否有效
        """
        if not cmd:
            logging.error("命令为空")
            return False

        if cmd[0] != 'mysqldump':
            logging.error("命令必须以 'mysqldump' 开头")
            return False

        # 检查必要的参数
        required_params = ['-h', '-u', '-p']
        cmd_str = ' '.join(cmd)
        for param in required_params:
            if param not in cmd_str:
                logging.error(f"缺少必要参数: {param}")
                return False

        logging.debug("命令验证通过")
        return True

    @staticmethod
    def get_command_summary(cmd: List[str]) -> Dict[str, any]:
        """
        获取命令摘要信息

        Args:
            cmd: 命令列表

        Returns:
            Dict[str, any]: 命令摘要信息
        """
        cmd_str = ' '.join(cmd)
        return {
            'total_arguments': len(cmd),
            'has_database': any(arg for arg in cmd if not arg.startswith('-')),
            'has_tables': len([arg for arg in cmd if not arg.startswith('-')]) > 1,
            'options_count': len([arg for arg in cmd if arg.startswith('--')]),
            'compression_used': '--compress' in cmd_str or '--compression-algorithms' in cmd_str
        }


################################################################################
# 文件: backupdatabase_manager.py
################################################################################

"""
数据库管理器模块
负责数据库连接、查询和数据库列表管理
"""

import logging
from typing import List, Optional, Tuple
import pymysql
from pymysql import Connection

from config_manager import ConfigManager


class DatabaseManager:
    """
    数据库管理器
    处理所有数据库相关的操作,包括连接、查询和列表管理
    """

    def __init__(self, config_manager: ConfigManager):
        """
        初始化数据库管理器

        Args:
            config_manager: 配置管理器实例
        """
        self.config_manager = config_manager
        self.config = config_manager.config
        self._connection: Optional[Connection] = None

        logging.debug("数据库管理器初始化完成")

    def get_db_connection(self) -> Optional[Connection]:
        """
        建立数据库连接

        Returns:
            Optional[Connection]: 数据库连接对象,失败返回None

        Example:
            >>> conn = database_manager.get_db_connection()
            >>> if conn:
            ...     print("连接成功")
        """
        if self._connection and self._connection.open:
            return self._connection

        mysql_config = self.config['mysql']
        try:
            self._connection = pymysql.connect(
                host=mysql_config['host'],
                user=mysql_config['user'],
                password=mysql_config['password'],
                database=mysql_config.get('database', 'mysql'),
                port=mysql_config.get('port', 3306),
                charset=mysql_config.get('charset', 'utf8mb4'),
                connect_timeout=10,
                autocommit=True
            )
            logging.debug(f"数据库连接建立成功: {mysql_config['host']}:{mysql_config.get('port', 3306)}")
            return self._connection

        except pymysql.Error as e:
            logging.error(f"数据库连接失败: {e}")
            self._connection = None
            return None

    def close_connection(self):
        """关闭数据库连接"""
        if self._connection and self._connection.open:
            self._connection.close()
            self._connection = None
            logging.debug("数据库连接已关闭")

    def test_connection(self) -> bool:
        """
        测试数据库连接是否正常

        Returns:
            bool: 连接是否成功
        """
        try:
            conn = self.get_db_connection()
            if conn:
                with conn.cursor() as cursor:
                    cursor.execute("SELECT 1")
                    result = cursor.fetchone()
                    return result[0] == 1
            return False
        except Exception as e:
            logging.debug(f"数据库连接测试失败: {e}")
            return False

    def test_database_connection(self) -> bool:
        """
        测试数据库连接和内容(详细测试)

        Returns:
            bool: 测试是否成功
        """
        try:
            conn = self.get_db_connection()
            if not conn:
                print("❌ 无法建立数据库连接")
                return False

            with conn.cursor() as cursor:
                # 检查默认数据库是否存在
                default_db = self.config['mysql'].get('database')
                if default_db:
                    cursor.execute("SHOW DATABASES LIKE %s", (default_db,))
                    db_exists = cursor.fetchone()
                    print(f"📊 数据库 {default_db} 存在: {db_exists is not None}")

                # 获取所有用户数据库
                user_dbs = self.get_all_databases()
                print(f"📁 发现 {len(user_dbs)} 个用户数据库: {user_dbs}")

                # 检查第一个用户数据库的表信息
                if user_dbs:
                    sample_db = user_dbs[0]
                    try:
                        cursor.execute(f"USE `{sample_db}`")
                        cursor.execute("SHOW TABLES")
                        tables = cursor.fetchall()
                        print(f"📋 数据库 {sample_db} 中的表数量: {len(tables)}")

                        if tables:
                            table_names = [table[0] for table in tables]
                            print(f"📄 表列表: {table_names}")

                            # 检查第一个表的数据量
                            sample_table = tables[0][0]
                            cursor.execute(f"SELECT COUNT(*) FROM `{sample_table}`")
                            count = cursor.fetchone()[0]
                            print(f"🔢 表 {sample_table} 中的数据行数: {count}")
                    except Exception as e:
                        print(f"⚠️ 检查数据库 {sample_db} 时出错: {e}")

            print("✅ 数据库连接测试成功")
            return True

        except Exception as e:
            print(f"❌ 数据库测试失败: {e}")
            return False
        finally:
            self.close_connection()

    def get_all_databases(self) -> List[str]:
        """
        获取MySQL服务器中的所有用户数据库

        Returns:
            List[str]: 用户数据库名称列表

        Example:
            >>> databases = database_manager.get_all_databases()
            >>> print(f"发现 {len(databases)} 个数据库")
        """
        try:
            conn = self.get_db_connection()
            if not conn:
                return []

            with conn.cursor() as cursor:
                # 获取所有数据库
                cursor.execute("SHOW DATABASES")
                databases = [db[0] for db in cursor.fetchall()]

                # 排除系统数据库
                system_dbs = ['information_schema', 'mysql', 'performance_schema', 'sys']
                user_databases = [db for db in databases if db not in system_dbs]

                logging.info(f"发现 {len(user_databases)} 个用户数据库: {user_databases}")
                return user_databases

        except Exception as e:
            logging.error(f"获取数据库列表失败: {e}")
            return []
        finally:
            self.close_connection()

    def get_databases_to_backup(self) -> List[str]:
        """
        获取要备份的数据库列表
        根据配置文件确定需要备份的数据库

        Returns:
            List[str]: 要备份的数据库名称列表
        """
        backup_config = self.config['backup']
        mysql_config = self.config['mysql']

        # 获取配置中指定的数据库
        configured_databases = backup_config.get('databases', [])

        if configured_databases:
            # 如果配置了特定数据库,使用配置的列表
            if isinstance(configured_databases, str):
                # 如果是字符串,按逗号分割
                databases = [db.strip() for db in configured_databases.split(',')]
            else:
                # 如果是列表,直接使用
                databases = configured_databases
        else:
            # 如果没有配置,使用默认数据库
            default_db = mysql_config.get('database')
            if default_db:
                databases = [default_db]
            else:
                databases = []

        # 过滤空值并去重
        databases = list(set([db for db in databases if db and db.strip()]))

        if not databases:
            logging.warning("没有指定要备份的数据库,使用默认数据库")
            default_db = mysql_config.get('database', 'test')
            databases = [default_db]

        # 验证数据库是否存在
        valid_databases = []
        all_databases = self.get_all_databases()

        for db in databases:
            if db in all_databases:
                valid_databases.append(db)
            else:
                logging.warning(f"数据库 '{db}' 不存在,已从备份列表中排除")

        if not valid_databases:
            logging.error("没有有效的数据库可以备份")
            return []

        logging.info(f"要备份的数据库: {valid_databases}")
        return valid_databases

    def get_database_size(self, database: str) -> Optional[float]:
        """
        获取数据库大小(MB)

        Args:
            database: 数据库名称

        Returns:
            Optional[float]: 数据库大小(MB),失败返回None
        """
        try:
            conn = self.get_db_connection()
            if not conn:
                return None

            with conn.cursor() as cursor:
                # 查询数据库大小
                cursor.execute("""
                    SELECT SUM(data_length + index_length) / 1024 / 1024 as size_mb
                    FROM information_schema.TABLES 
                    WHERE table_schema = %s
                """, (database,))

                result = cursor.fetchone()
                return result[0] if result and result[0] else 0.0

        except Exception as e:
            logging.error(f"获取数据库 {database} 大小失败: {e}")
            return None
        finally:
            self.close_connection()

    def get_table_info(self, database: str) -> List[Tuple[str, int, float]]:
        """
        获取数据库表信息

        Args:
            database: 数据库名称

        Returns:
            List[Tuple[str, int, float]]: 表信息列表(表名, 行数, 大小MB)
        """
        try:
            conn = self.get_db_connection()
            if not conn:
                return []

            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT 
                        table_name,
                        table_rows,
                        (data_length + index_length) / 1024 / 1024 as size_mb
                    FROM information_schema.TABLES 
                    WHERE table_schema = %s
                    ORDER BY size_mb DESC
                """, (database,))

                return cursor.fetchall()

        except Exception as e:
            logging.error(f"获取数据库 {database} 表信息失败: {e}")
            return []
        finally:
            self.close_connection()

    def __del__(self):
        """析构函数,确保连接关闭"""
        self.close_connection()


################################################################################
# 文件: backupilename_generator.py
################################################################################

"""
文件名生成器模块
负责生成符合规范的备份文件名
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict
import re

from models.config_models import BackupConfig, MySQLConfig


class FilenameGenerator:
    """
    备份文件名生成器
    负责生成符合规范的备份文件名,支持多种命名格式
    """

    # 备份类型到中文名称的映射
    BACKUP_TYPE_NAMES = {
        'startup': '开机备份',
        'shutdown': '关机备份',
        'manual': '手动备份',
        'scheduled': '定时备份',
        'emergency': '紧急备份'
    }

    # 文件名格式变量映射
    FORMAT_VARIABLES = {
        'timestamp': '时间戳 (YYYYMMDD_HHMMSS)',
        'data_source': '数据源 (host:port)',
        'database': '数据库名称',
        'backup_type': '备份类型',
        'year': '年份 (YYYY)',
        'month': '月份 (MM)',
        'day': '日期 (DD)',
        'hour': '小时 (HH)',
        'minute': '分钟 (MM)',
        'second': '秒 (SS)'
    }

    @classmethod
    def generate_custom_filename(cls, database: str, mysql_config: MySQLConfig,
                                 backup_config: BackupConfig) -> str:
        """
        生成自定义格式的文件名

        Args:
            database: 数据库名称
            mysql_config: MySQL配置
            backup_config: 备份配置

        Returns:
            str: 生成的文件名

        Example:
            >>> filename = FilenameGenerator.generate_custom_filename('mydb', mysql_config, backup_config)
            >>> print(filename)  # dump-20231201_143045-localhost_3306-mydb.sql
        """
        # 获取时间戳
        now = datetime.now()
        timestamp = now.strftime("%Y%m%d_%H%M%S")

        # 构建数据源标识
        data_source = f"{mysql_config.host}_{mysql_config.port}"
        # 清理数据源中的特殊字符
        data_source = cls._sanitize_filename_part(data_source)

        # 清理数据库名称
        safe_database_name = cls._sanitize_filename_part(database)

        # 获取文件名格式
        filename_format = backup_config.filename_format

        # 替换所有变量
        filename = filename_format.format(
            timestamp=timestamp,
            data_source=data_source,
            database=safe_database_name,
            backup_type='backup',
            year=now.strftime("%Y"),
            month=now.strftime("%m"),
            day=now.strftime("%d"),
            hour=now.strftime("%H"),
            minute=now.strftime("%M"),
            second=now.strftime("%S")
        )

        # 确保文件扩展名
        if not filename.endswith('.sql'):
            filename += '.sql'

        logging.debug(f"生成自定义文件名: {filename}")
        return filename

    @classmethod
    def generate_legacy_filename(cls, database: str, backup_type: str) -> str:
        """
        生成传统格式的文件名

        Args:
            database: 数据库名称
            backup_type: 备份类型

        Returns:
            str: 生成的文件名
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        prefix = cls.BACKUP_TYPE_NAMES.get(backup_type, '备份')
        safe_database_name = cls._sanitize_filename_part(database)

        filename = f"{prefix}_{safe_database_name}_{timestamp}.sql"
        logging.debug(f"生成传统文件名: {filename}")
        return filename

    @classmethod
    def generate_backup_filename(cls, database: str, backup_type: str,
                                 mysql_config: MySQLConfig, backup_config: BackupConfig) -> str:
        """
        生成备份文件名(根据配置选择格式)

        Args:
            database: 数据库名称
            backup_type: 备份类型
            mysql_config: MySQL配置
            backup_config: 备份配置

        Returns:
            str: 生成的文件名
        """
        if backup_config.use_custom_filename:
            return cls.generate_custom_filename(database, mysql_config, backup_config)
        else:
            return cls.generate_legacy_filename(database, backup_type)

    @staticmethod
    def get_backup_file_path(backup_dir: Path, filename: str) -> Path:
        """
        获取完整的备份文件路径

        Args:
            backup_dir: 备份目录
            filename: 文件名

        Returns:
            Path: 完整的文件路径
        """
        file_path = backup_dir / filename
        logging.debug(f"生成文件路径: {file_path}")
        return file_path

    @staticmethod
    def _sanitize_filename_part(text: str) -> str:
        """
        清理文件名部分,移除不安全的字符

        Args:
            text: 要清理的文本

        Returns:
            str: 清理后的安全文本
        """
        # 替换不安全的字符为下划线
        unsafe_chars = r'[/*?:"<>| ]'
        safe_text = re.sub(unsafe_chars, '_', text)

        # 移除连续的下划线
        safe_text = re.sub(r'_+', '_', safe_text)

        # 移除开头和结尾的下划线
        safe_text = safe_text.strip('_')

        return safe_text

    @classmethod
    def validate_filename_format(cls, format_str: str) -> bool:
        """
        验证文件名格式是否有效

        Args:
            format_str: 文件名格式字符串

        Returns:
            bool: 格式是否有效
        """
        try:
            # 测试格式化
            test_values = {var: 'test' for var in cls.FORMAT_VARIABLES.keys()}
            format_str.format(**test_values)
            return True
        except KeyError as e:
            logging.error(f"文件名格式包含未知变量: {e}")
            return False
        except Exception as e:
            logging.error(f"文件名格式验证失败: {e}")
            return False

    @classmethod
    def get_supported_variables(cls) -> Dict[str, str]:
        """
        获取支持的格式变量

        Returns:
            Dict[str, str]: 变量名到描述的映射
        """
        return cls.FORMAT_VARIABLES.copy()

    @classmethod
    def parse_filename(cls, filename: str) -> Dict[str, str]:
        """
        解析备份文件名,提取信息

        Args:
            filename: 备份文件名

        Returns:
            Dict[str, str]: 解析出的信息字典
        """
        info = {
            'filename': filename,
            'database': 'unknown',
            'timestamp': 'unknown',
            'backup_type': 'unknown'
        }

        try:
            # 移除扩展名
            name_without_ext = Path(filename).stem
            if filename.endswith('.gz'):
                name_without_ext = Path(name_without_ext).stem

            # 尝试解析自定义格式
            if name_without_ext.startswith('dump-'):
                parts = name_without_ext.split('-')
                if len(parts) >= 4:
                    info['timestamp'] = parts[1]
                    info['data_source'] = parts[2]
                    info['database'] = parts[3]
                    info['format'] = 'custom'

            # 尝试解析传统格式
            else:
                for backup_type, chinese_name in cls.BACKUP_TYPE_NAMES.items():
                    if name_without_ext.startswith(chinese_name):
                        info['backup_type'] = backup_type
                        parts = name_without_ext.split('_')
                        if len(parts) >= 3:
                            info['database'] = parts[1]
                            info['timestamp'] = parts[2]
                        info['format'] = 'legacy'
                        break

            return info

        except Exception as e:
            logging.warning(f"解析文件名失败 {filename}: {e}")
            return info


################################################################################
# 文件: backup__init__.py
################################################################################

"""
备份核心模块
提供完整的数据库备份功能
"""

from .backup_manager import BackupManager
from .database_manager import DatabaseManager
from .command_builder import CommandBuilder
from .filename_generator import FilenameGenerator
from .backup_executor import BackupExecutor
from .backup_cleaner import BackupCleaner

__all__ = [
    'BackupManager',
    'DatabaseManager',
    'CommandBuilder',
    'FilenameGenerator',
    'BackupExecutor',
    'BackupCleaner'
]


################################################################################
# 文件: modelsackup_models.py
################################################################################

"""
备份数据模型模块
定义备份过程中使用的数据结构和结果对象
"""

from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from pathlib import Path
from datetime import datetime


@dataclass
class BackupResult:
    """
    备份结果数据模型
    记录单次备份的执行结果和统计信息

    Attributes:
        database: 数据库名称
        success: 是否成功完成备份
        backup_file: 备份文件路径
        file_size: 文件大小(MB)
        error_message: 错误信息(如果失败)
        duration: 执行时长(秒)
        start_time: 开始时间
        end_time: 结束时间
    """
    database: str
    success: bool
    backup_file: Optional[Path]
    file_size: float = 0.0
    error_message: str = ""
    duration: float = 0.0
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

    def __post_init__(self):
        """初始化后处理"""
        if self.start_time and self.end_time:
            self.duration = (self.end_time - self.start_time).total_seconds()

    def to_dict(self) -> Dict[str, Any]:
        """
        转换为字典格式

        Returns:
            Dict[str, Any]: 字典形式的结果数据
        """
        return {
            'database': self.database,
            'success': self.success,
            'backup_file': str(self.backup_file) if self.backup_file else None,
            'file_size': self.file_size,
            'error_message': self.error_message,
            'duration': self.duration,
            'start_time': self.start_time.isoformat() if self.start_time else None,
            'end_time': self.end_time.isoformat() if self.end_time else None
        }


@dataclass
class BackupTask:
    """
    备份任务数据模型
    描述一个备份任务的完整信息和配置

    Attributes:
        database: 数据库名称
        backup_type: 备份类型(startup/shutdown/manual)
        timestamp: 任务创建时间戳
        config: 备份配置字典
        mysql_config: MySQL配置字典
        tables: 要备份的特定表列表
    """
    database: str
    backup_type: str
    timestamp: str
    config: Dict[str, Any]
    mysql_config: Dict[str, Any]
    tables: List[str] = None

    def __post_init__(self):
        """初始化后处理"""
        if self.tables is None:
            self.tables = []

    def get_task_id(self) -> str:
        """
        生成任务唯一标识

        Returns:
            str: 任务ID
        """
        return f"{self.database}_{self.backup_type}_{self.timestamp}"

    def validate(self) -> List[str]:
        """
        验证任务配置有效性

        Returns:
            List[str]: 错误消息列表
        """
        errors = []
        if not self.database:
            errors.append("数据库名称不能为空")
        if self.backup_type not in ['startup', 'shutdown', 'manual']:
            errors.append("备份类型必须是: startup, shutdown, manual")
        return errors


################################################################################
# 文件: modelsconfig_models.py
################################################################################

"""
配置数据模型模块
定义配置相关的数据结构和验证规则
"""

from dataclasses import dataclass
from typing import List, Optional, Dict, Any


@dataclass
class MySQLConfig:
    """
    MySQL数据库连接配置数据模型

    Attributes:
        host: 数据库主机地址
        user: 数据库用户名
        password: 数据库密码
        database: 默认数据库
        port: 数据库端口,默认3306
        charset: 字符集,默认utf8mb4
    """
    host: str
    user: str
    password: str
    database: str
    port: int = 3306
    charset: str = "utf8mb4"

    def validate(self) -> List[str]:
        """
        验证配置有效性

        Returns:
            List[str]: 错误消息列表,空列表表示验证通过
        """
        errors = []
        if not self.host:
            errors.append("MySQL host 不能为空")
        if not self.user:
            errors.append("MySQL user 不能为空")
        if not self.password:
            errors.append("MySQL password 不能为空")
        if self.port <= 0 or self.port > 65535:
            errors.append("MySQL port 必须在 1-65535 范围内")
        return errors


@dataclass
class BackupConfig:
    """
    备份配置数据模型

    Attributes:
        backup_dir: 备份文件存储目录
        keep_backups: 保留的备份文件数量,默认20
        compression: 是否启用压缩,默认True
        timeout: 备份超时时间(秒),默认300
        use_custom_filename: 是否使用自定义文件名格式,默认True
        filename_format: 文件名格式模板
        add_drop_table: 是否添加DROP TABLE语句,默认True
        disable_keys: 是否禁用键,默认True
        lock_tables: 是否锁定表,默认False
        add_drop_trigger: 是否添加DROP TRIGGER语句,默认True
        no_data: 是否不备份数据(仅结构),默认False
        complete_insert: 是否使用完整的INSERT语句,默认True
        create_options: 是否包含CREATE选项,默认True
        routines: 是否备份存储过程和函数,默认True
        extended_insert: 是否使用扩展INSERT语句,默认True
        databases: 要备份的数据库列表
        tables: 要备份的特定表列表
    """
    backup_dir: str
    keep_backups: int = 20
    compression: bool = True
    timeout: int = 300
    use_custom_filename: bool = True
    filename_format: str = "dump-{timestamp}-{data_source}-{database}.sql"
    add_drop_table: bool = True
    disable_keys: bool = True
    lock_tables: bool = False
    add_drop_trigger: bool = True
    no_data: bool = False
    complete_insert: bool = True
    create_options: bool = True
    routines: bool = True
    extended_insert: bool = True
    databases: List[str] = None
    tables: List[str] = None

    def __post_init__(self):
        """初始化后处理"""
        if self.databases is None:
            self.databases = []
        if self.tables is None:
            self.tables = []

    def validate(self) -> List[str]:
        """
        验证配置有效性

        Returns:
            List[str]: 错误消息列表
        """
        errors = []
        if not self.backup_dir:
            errors.append("备份目录不能为空")
        if self.keep_backups <= 0:
            errors.append("保留备份数量必须大于0")
        if self.timeout <= 0:
            errors.append("备份超时时间必须大于0")
        return errors


@dataclass
class LoggingConfig:
    """
    日志配置数据模型

    Attributes:
        log_dir: 日志文件目录,默认"logs"
        log_level: 日志级别,默认"INFO"
        max_log_size: 最大日志文件大小,默认"10MB"
        backup_count: 保留的日志文件数量,默认5
    """
    log_dir: str = "logs"
    log_level: str = "INFO"
    max_log_size: str = "10MB"
    backup_count: int = 5

    def validate(self) -> List[str]:
        """
        验证配置有效性

        Returns:
            List[str]: 错误消息列表
        """
        errors = []
        if not self.log_dir:
            errors.append("日志目录不能为空")
        valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
        if self.log_level.upper() not in valid_levels:
            errors.append(f"日志级别必须是: {', '.join(valid_levels)}")
        return errors


@dataclass
class ScheduleConfig:
    """
    调度配置数据模型

    Attributes:
        startup_delay: 开机后延迟执行时间(秒),默认120
        shutdown_timeout: 关机备份超时时间(秒),默认300
        wait_mysql_timeout: 等待MySQL启动超时时间(秒),默认300
    """
    startup_delay: int = 120
    shutdown_timeout: int = 300
    wait_mysql_timeout: int = 300

    def validate(self) -> List[str]:
        """
        验证配置有效性

        Returns:
            List[str]: 错误消息列表
        """
        errors = []
        if self.startup_delay < 0:
            errors.append("启动延迟时间不能为负数")
        if self.shutdown_timeout <= 0:
            errors.append("关机超时时间必须大于0")
        if self.wait_mysql_timeout <= 0:
            errors.append("等待MySQL超时时间必须大于0")
        return errors


################################################################################
# 文件: models__init__.py
################################################################################

"""
数据模型模块
定义系统使用的数据结构和类型
"""

from .config_models import MySQLConfig, BackupConfig, LoggingConfig, ScheduleConfig
from .backup_models import BackupResult, BackupTask

__all__ = [
    'MySQLConfig',
    'BackupConfig',
    'LoggingConfig',
    'ScheduleConfig',
    'BackupResult',
    'BackupTask'
]


################################################################################
# 文件: schedule	ask_scheduler.py
################################################################################

"""
任务调度器模块
负责备份任务的调度、执行和监控
"""

import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta

from backup.backup_manager import BackupManager


class TaskScheduler:
    """
    任务调度器
    管理备份任务的调度和执行,提供任务监控功能
    """

    def __init__(self, backup_manager: BackupManager):
        """
        初始化任务调度器

        Args:
            backup_manager: 备份管理器实例
        """
        self.backup_manager = backup_manager
        self.config = backup_manager.config
        self.is_running = False
        self.task_history = []
        logging.info("任务调度器初始化完成")

    def schedule_startup_backup(self):
        """
        调度开机备份
        等待系统稳定后执行备份
        """
        startup_delay = self.config['schedule'].get('startup_delay', 120)
        logging.info(f"调度开机备份,等待 {startup_delay} 秒系统稳定...")

        # 记录任务开始
        task_id = self._record_task_start('startup')

        try:
            # 等待系统稳定
            time.sleep(startup_delay)

            # 等待MySQL服务启动
            if self.backup_manager.wait_for_mysql():
                success = self.backup_manager.backup_database('startup')
                if success:
                    logging.info("✅ 开机备份完成")
                    self._record_task_success(task_id)
                else:
                    logging.error("❌ 开机备份失败")
                    self._record_task_failure(task_id, "备份执行失败")
            else:
                error_msg = "MySQL服务未就绪"
                logging.error(f"❌ 开机备份失败:{error_msg}")
                self._record_task_failure(task_id, error_msg)

        except Exception as e:
            error_msg = f"开机备份异常: {e}"
            logging.error(f"❌ {error_msg}")
            self._record_task_failure(task_id, error_msg)

    def schedule_shutdown_backup(self):
        """
        调度关机备份
        在系统关机前执行备份
        """
        logging.info("调度关机备份...")

        # 记录任务开始
        task_id = self._record_task_start('shutdown')

        try:
            success = self.backup_manager.backup_database('shutdown')
            if success:
                logging.info("✅ 关机备份完成")
                self._record_task_success(task_id)
            else:
                logging.error("❌ 关机备份失败")
                self._record_task_failure(task_id, "备份执行失败")

        except Exception as e:
            error_msg = f"关机备份异常: {e}"
            logging.error(f"❌ {error_msg}")
            self._record_task_failure(task_id, error_msg)

    def schedule_manual_backup(self) -> bool:
        """
        调度手动备份

        Returns:
            bool: 是否备份成功
        """
        logging.info("调度手动备份...")

        task_id = self._record_task_start('manual')

        try:
            success = self.backup_manager.backup_database('manual')
            if success:
                logging.info("✅ 手动备份完成")
                self._record_task_success(task_id)
                return True
            else:
                logging.error("❌ 手动备份失败")
                self._record_task_failure(task_id, "备份执行失败")
                return False

        except Exception as e:
            error_msg = f"手动备份异常: {e}"
            logging.error(f"❌ {error_msg}")
            self._record_task_failure(task_id, error_msg)
            return False

    def _record_task_start(self, task_type: str) -> str:
        """
        记录任务开始

        Args:
            task_type: 任务类型

        Returns:
            str: 任务ID
        """
        task_id = f"{task_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        task_record = {
            'id': task_id,
            'type': task_type,
            'start_time': datetime.now(),
            'status': 'running',
            'success': None,
            'error_message': None,
            'duration': None
        }

        self.task_history.append(task_record)
        logging.debug(f"任务开始记录: {task_id}")

        return task_id

    def _record_task_success(self, task_id: str):
        """
        记录任务成功

        Args:
            task_id: 任务ID
        """
        for task in self.task_history:
            if task['id'] == task_id and task['status'] == 'running':
                task['status'] = 'completed'
                task['success'] = True
                task['duration'] = (datetime.now() - task['start_time']).total_seconds()
                logging.debug(f"任务成功记录: {task_id}")
                break

    def _record_task_failure(self, task_id: str, error_message: str):
        """
        记录任务失败

        Args:
            task_id: 任务ID
            error_message: 错误信息
        """
        for task in self.task_history:
            if task['id'] == task_id and task['status'] == 'running':
                task['status'] = 'failed'
                task['success'] = False
                task['error_message'] = error_message
                task['duration'] = (datetime.now() - task['start_time']).total_seconds()
                logging.debug(f"任务失败记录: {task_id}")
                break

    def get_task_history(self, limit: int = 50) -> list:
        """
        获取任务历史

        Args:
            limit: 返回的记录数量限制

        Returns:
            list: 任务历史记录
        """
        # 按开始时间倒序排列,返回最新的记录
        sorted_history = sorted(
            self.task_history,
            key=lambda x: x['start_time'],
            reverse=True
        )
        return sorted_history[:limit]

    def get_task_statistics(self) -> Dict[str, Any]:
        """
        获取任务统计信息

        Returns:
            Dict[str, Any]: 任务统计信息
        """
        if not self.task_history:
            return {
                'total_tasks': 0,
                'successful_tasks': 0,
                'failed_tasks': 0,
                'success_rate': 0,
                'average_duration': 0
            }

        total_tasks = len(self.task_history)
        successful_tasks = len([t for t in self.task_history if t.get('success')])
        failed_tasks = total_tasks - successful_tasks
        success_rate = (successful_tasks / total_tasks) * 100 if total_tasks > 0 else 0

        # 计算平均持续时间(只计算已完成的任务)
        completed_tasks = [t for t in self.task_history if t.get('duration') is not None]
        average_duration = (
            sum(t['duration'] for t in completed_tasks) / len(completed_tasks)
            if completed_tasks else 0
        )

        # 按任务类型统计
        task_types = {}
        for task in self.task_history:
            task_type = task['type']
            if task_type not in task_types:
                task_types[task_type] = {'total': 0, 'success': 0, 'failed': 0}

            task_types[task_type]['total'] += 1
            if task.get('success'):
                task_types[task_type]['success'] += 1
            else:
                task_types[task_type]['failed'] += 1

        return {
            'total_tasks': total_tasks,
            'successful_tasks': successful_tasks,
            'failed_tasks': failed_tasks,
            'success_rate': round(success_rate, 2),
            'average_duration': round(average_duration, 2),
            'task_types': task_types,
            'first_task': (
                min(self.task_history, key=lambda x: x['start_time'])['start_time'].isoformat()
                if self.task_history else None
            ),
            'last_task': (
                max(self.task_history, key=lambda x: x['start_time'])['start_time'].isoformat()
                if self.task_history else None
            )
        }

    def cleanup_old_history(self, days: int = 30):
        """
        清理旧的任务历史记录

        Args:
            days: 保留天数
        """
        cutoff_time = datetime.now() - timedelta(days=days)
        initial_count = len(self.task_history)

        self.task_history = [
            task for task in self.task_history
            if task['start_time'] >= cutoff_time
        ]

        removed_count = initial_count - len(self.task_history)
        if removed_count > 0:
            logging.info(f"清理了 {removed_count} 条超过 {days} 天的任务历史记录")

    def get_recent_failures(self, hours: int = 24) -> list:
        """
        获取最近的失败任务

        Args:
            hours: 时间范围(小时)

        Returns:
            list: 失败任务列表
        """
        cutoff_time = datetime.now() - timedelta(hours=hours)

        failures = [
            task for task in self.task_history
            if task.get('success') is False and task['start_time'] >= cutoff_time
        ]

        return sorted(failures, key=lambda x: x['start_time'], reverse=True)

    def is_system_healthy(self) -> Dict[str, Any]:
        """
        检查系统健康状态

        Returns:
            Dict[str, Any]: 系统健康状态
        """
        # 获取最近的失败任务
        recent_failures = self.get_recent_failures(24)

        # 检查数据库连接
        db_healthy = self.backup_manager.test_database_connection()

        # 检查备份目录
        from utils.file_utils import ensure_directory
        backup_dir = self.backup_manager.backup_config.backup_dir
        dir_healthy = ensure_directory(backup_dir)

        # 综合健康评估
        health_status = 'healthy'
        issues = []

        if recent_failures:
            health_status = 'warning'
            issues.append(f"最近24小时有 {len(recent_failures)} 个失败任务")

        if not db_healthy:
            health_status = 'critical'
            issues.append("数据库连接失败")

        if not dir_healthy:
            health_status = 'critical'
            issues.append("备份目录不可用")

        return {
            'status': health_status,
            'timestamp': datetime.now().isoformat(),
            'database_connection': db_healthy,
            'backup_directory': dir_healthy,
            'recent_failures': len(recent_failures),
            'issues': issues,
            'suggestions': self._get_health_suggestions(health_status, issues)
        }

    def _get_health_suggestions(self, health_status: str, issues: list) -> list:
        """
        获取健康问题建议

        Args:
            health_status: 健康状态
            issues: 问题列表

        Returns:
            list: 建议列表
        """
        suggestions = []

        if health_status == 'critical':
            if "数据库连接失败" in issues:
                suggestions.append("检查MySQL服务是否运行,验证连接配置")
            if "备份目录不可用" in issues:
                suggestions.append("检查备份目录权限,或更换备份目录")

        if health_status == 'warning':
            if "失败任务" in str(issues):
                suggestions.append("查看任务历史记录,分析失败原因")

        if health_status == 'healthy':
            suggestions.append("系统运行正常,继续保持")

        return suggestions

    def __del__(self):
        """析构函数,清理资源"""
        if self.is_running:
            self.is_running = False
            logging.info("任务调度器已停止")


################################################################################
# 文件: schedule__init__.py
################################################################################

"""
任务调度模块
提供备份任务的调度和执行管理
"""

from .task_scheduler import TaskScheduler

__all__ = ['TaskScheduler']


################################################################################
# 文件: utilserror_handler.py
################################################################################

"""
错误处理模块
提供统一的错误处理和异常管理
"""

import logging
import subprocess
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
from datetime import datetime


class BackupErrorHandler:
    """
    备份错误处理器
    专门处理备份过程中可能出现的各种错误
    """

    # 错误类型映射
    ERROR_PATTERNS = {
        'DATABASE_NOT_FOUND': [
            "Unknown database",
            "database.*doesn't exist"
        ],
        'ACCESS_DENIED': [
            "Access denied",
            "password.*failed",
            "connection.*refused"
        ],
        'CONNECTION_FAILED': [
            "Can't connect to MySQL server",
            "connection.*failed",
            "timeout"
        ],
        'LOCK_TABLES_ERROR': [
            "when using LOCK TABLES",
            "lock.*wait",
            "deadlock"
        ],
        'PERMISSION_DENIED': [
            "permission denied",
            "cannot open",
            "read-only"
        ],
        'DISK_FULL': [
            "no space left",
            "disk full"
        ],
        'MYSQLDUMP_NOT_FOUND': [
            "mysqldump.*not found",
            "command not found"
        ]
    }

    def handle_backup_error(self, result: subprocess.CompletedProcess,
                            backup_file: Path, database: str) -> Tuple[bool, str]:
        """
        处理备份错误

        Args:
            result: 子进程执行结果
            backup_file: 备份文件路径
            database: 数据库名称

        Returns:
            Tuple[bool, str]: (是否成功, 错误信息)
        """
        logging.error(f"mysqldump 返回错误代码: {result.returncode}")

        stderr = result.stderr.strip()
        logging.error(f"错误信息: {stderr}")

        # 分析错误类型
        error_type, error_message = self._analyze_error_type(stderr, database)

        # 记录详细的错误信息
        self._log_detailed_error(error_type, stderr, database)

        # 清理失败的备份文件
        self._cleanup_failed_backup(backup_file)

        return False, error_message

    def _analyze_error_type(self, stderr: str, database: str) -> Tuple[str, str]:
        """
        分析错误类型

        Args:
            stderr: 错误输出
            database: 数据库名称

        Returns:
            Tuple[str, str]: (错误类型, 错误信息)
        """
        stderr_lower = stderr.lower()

        for error_type, patterns in self.ERROR_PATTERNS.items():
            for pattern in patterns:
                if pattern.lower() in stderr_lower:
                    return self._get_error_message(error_type, database, stderr)

        # 未知错误
        return "UNKNOWN_ERROR", f"未知错误: {stderr.strip()}"

    def _get_error_message(self, error_type: str, database: str, stderr: str) -> Tuple[str, str]:
        """
        获取错误消息

        Args:
            error_type: 错误类型
            database: 数据库名称
            stderr: 错误输出

        Returns:
            Tuple[str, str]: (错误类型, 错误消息)
        """
        error_messages = {
            'DATABASE_NOT_FOUND': f"数据库 '{database}' 不存在或无法访问",
            'ACCESS_DENIED': "数据库访问被拒绝,请检查用户名和密码",
            'CONNECTION_FAILED': "无法连接到MySQL服务器,请检查网络连接和服务器状态",
            'LOCK_TABLES_ERROR': "表锁定错误,请尝试禁用锁表选项或检查数据库权限",
            'PERMISSION_DENIED': "文件权限错误,请检查备份目录的写入权限",
            'DISK_FULL': "磁盘空间不足,无法创建备份文件",
            'MYSQLDUMP_NOT_FOUND': "mysqldump命令未找到,请确保MySQL客户端工具已安装"
        }

        message = error_messages.get(error_type, f"未知错误类型: {error_type}")
        return error_type, f"{message} (详情: {stderr[:100]}...)"

    def _log_detailed_error(self, error_type: str, stderr: str, database: str):
        """
        记录详细的错误信息

        Args:
            error_type: 错误类型
            stderr: 错误输出
            database: 数据库名称
        """
        error_context = {
            'timestamp': datetime.now().isoformat(),
            'database': database,
            'error_type': error_type,
            'error_message': stderr,
            'suggested_action': self._get_suggested_action(error_type)
        }

        logging.error(f"错误详情: {error_context}")

    def _get_suggested_action(self, error_type: str) -> str:
        """
        获取建议的解决措施

        Args:
            error_type: 错误类型

        Returns:
            str: 建议的解决措施
        """
        actions = {
            'DATABASE_NOT_FOUND': "检查数据库名称是否正确,确保数据库存在",
            'ACCESS_DENIED': "验证MySQL用户名和密码,检查用户权限",
            'CONNECTION_FAILED': "检查MySQL服务是否运行,网络连接是否正常",
            'LOCK_TABLES_ERROR': "在配置中禁用锁表选项,或授予LOCK TABLES权限",
            'PERMISSION_DENIED': "检查备份目录的写入权限,或更换备份目录",
            'DISK_FULL': "清理磁盘空间或更换备份目录",
            'MYSQLDUMP_NOT_FOUND': "安装MySQL客户端工具,或确保mysqldump在PATH中"
        }

        return actions.get(error_type, "查看详细错误日志以获取更多信息")

    def _cleanup_failed_backup(self, backup_file: Path):
        """
        清理失败的备份文件

        Args:
            backup_file: 要清理的备份文件路径
        """
        try:
            if backup_file.exists():
                backup_file.unlink()
                logging.info(f"已清理失败的备份文件: {backup_file}")
        except Exception as e:
            logging.error(f"清理备份文件失败 {backup_file}: {e}")


class DatabaseErrorHandler:
    """
    数据库错误处理器
    处理数据库连接和操作相关的错误
    """

    def handle_connection_error(self, error: Exception, host: str, port: int) -> Tuple[bool, str]:
        """
        处理数据库连接错误

        Args:
            error: 异常对象
            host: 主机地址
            port: 端口号

        Returns:
            Tuple[bool, str]: (是否成功, 错误信息)
        """
        error_msg = str(error).lower()

        if "access denied" in error_msg:
            return False, f"数据库访问被拒绝: {host}:{port}"
        elif "can't connect" in error_msg:
            return False, f"无法连接到数据库服务器: {host}:{port}"
        elif "unknown database" in error_msg:
            return False, "指定的数据库不存在"
        elif "timeout" in error_msg:
            return False, f"数据库连接超时: {host}:{port}"
        else:
            return False, f"数据库连接错误: {error}"

    def handle_query_error(self, error: Exception, query: str = "") -> Tuple[bool, str]:
        """
        处理数据库查询错误

        Args:
            error: 异常对象
            query: 查询语句(可选)

        Returns:
            Tuple[bool, str]: (是否成功, 错误信息)
        """
        error_msg = str(error).lower()

        if "table doesn't exist" in error_msg:
            return False, "查询的表不存在"
        elif "syntax error" in error_msg:
            return False, "SQL语法错误"
        elif "permission denied" in error_msg:
            return False, "没有执行查询的权限"
        else:
            base_msg = f"数据库查询错误: {error}"
            if query:
                base_msg += f" (查询: {query[:50]}...)"
            return False, base_msg


class FileSystemErrorHandler:
    """
    文件系统错误处理器
    处理文件操作相关的错误
    """

    def handle_file_error(self, error: Exception, file_path: Path, operation: str) -> Tuple[bool, str]:
        """
        处理文件操作错误

        Args:
            error: 异常对象
            file_path: 文件路径
            operation: 操作类型

        Returns:
            Tuple[bool, str]: (是否成功, 错误信息)
        """
        error_msg = str(error).lower()

        if "permission denied" in error_msg:
            return False, f"文件权限不足: {file_path}"
        elif "no such file or directory" in error_msg:
            return False, f"文件或目录不存在: {file_path}"
        elif "file exists" in error_msg:
            return False, f"文件已存在: {file_path}"
        elif "disk full" in error_msg or "no space" in error_msg:
            return False, f"磁盘空间不足: {file_path}"
        elif "is a directory" in error_msg:
            return False, f"路径是目录而不是文件: {file_path}"
        else:
            return False, f"文件{operation}错误 {file_path}: {error}"

    def handle_directory_error(self, error: Exception, dir_path: Path, operation: str) -> Tuple[bool, str]:
        """
        处理目录操作错误

        Args:
            error: 异常对象
            dir_path: 目录路径
            operation: 操作类型

        Returns:
            Tuple[bool, str]: (是否成功, 错误信息)
        """
        error_msg = str(error).lower()

        if "permission denied" in error_msg:
            return False, f"目录权限不足: {dir_path}"
        elif "no such file or directory" in error_msg:
            return False, f"目录不存在: {dir_path}"
        elif "file exists" in error_msg:
            return False, f"路径已存在但不是目录: {dir_path}"
        elif "not empty" in error_msg:
            return False, f"目录不为空: {dir_path}"
        else:
            return False, f"目录{operation}错误 {dir_path}: {error}"


class ErrorReporter:
    """
    错误报告器
    生成详细的错误报告和统计
    """

    def __init__(self):
        self.error_count = 0
        self.error_types = {}
        self.first_error_time = None
        self.last_error_time = None

    def record_error(self, error_type: str, error_message: str, context: Dict[str, Any] = None):
        """
        记录错误

        Args:
            error_type: 错误类型
            error_message: 错误消息
            context: 错误上下文信息
        """
        self.error_count += 1
        current_time = datetime.now()

        # 更新时间戳
        if not self.first_error_time:
            self.first_error_time = current_time
        self.last_error_time = current_time

        # 统计错误类型
        self.error_types[error_type] = self.error_types.get(error_type, 0) + 1

        # 记录错误详情
        error_detail = {
            'type': error_type,
            'message': error_message,
            'timestamp': current_time.isoformat(),
            'context': context or {}
        }

        logging.error(f"错误记录: {error_detail}")

    def get_error_report(self) -> Dict[str, Any]:
        """
        获取错误报告

        Returns:
            Dict[str, Any]: 错误报告
        """
        return {
            'total_errors': self.error_count,
            'error_types': self.error_types,
            'first_error': self.first_error_time.isoformat() if self.first_error_time else None,
            'last_error': self.last_error_time.isoformat() if self.last_error_time else None,
            'time_span_hours': (
                (self.last_error_time - self.first_error_time).total_seconds() / 3600
                if self.first_error_time and self.last_error_time else 0
            )
        }

    def reset_statistics(self):
        """重置错误统计"""
        self.error_count = 0
        self.error_types = {}
        self.first_error_time = None
        self.last_error_time = None
        logging.info("错误统计已重置")


################################################################################
# 文件: utilsile_utils.py
################################################################################

"""
文件操作工具模块
提供文件压缩、目录管理等文件相关操作
"""

import os
import gzip
import shutil
import logging
from pathlib import Path
from typing import Optional, List
from datetime import datetime


def compress_file(file_path: Path) -> Optional[str]:
    """
    压缩文件为.gz格式

    Args:
        file_path: 要压缩的文件路径

    Returns:
        Optional[str]: 压缩后的文件路径,失败返回None

    Example:
        >>> compressed = compress_file(Path('backup.sql'))
        >>> print(compressed)  # backup.sql.gz
    """
    if not file_path.exists():
        logging.error(f"要压缩的文件不存在: {file_path}")
        return None

    try:
        compressed_file = f"{file_path}.gz"
        original_size = file_path.stat().st_size

        logging.debug(f"开始压缩文件: {file_path} -> {compressed_file}")

        with open(file_path, 'rb') as f_in:
            with gzip.open(compressed_file, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

        # 验证压缩文件
        if not Path(compressed_file).exists():
            logging.error(f"压缩文件创建失败: {compressed_file}")
            return None

        compressed_size = Path(compressed_file).stat().st_size
        compression_ratio = (1 - compressed_size / original_size) * 100

        # 删除原始文件
        os.remove(file_path)

        logging.info(
            f"文件压缩完成: {file_path.name} -> {Path(compressed_file).name} "
            f"({original_size / 1024 / 1024:.2f} MB -> {compressed_size / 1024 / 1024:.2f} MB, "
            f"压缩率: {compression_ratio:.1f}%)"
        )

        return compressed_file

    except Exception as e:
        logging.error(f"文件压缩失败 {file_path}: {e}")
        return None


def decompress_file(compressed_file: Path) -> Optional[str]:
    """
    解压缩.gz文件

    Args:
        compressed_file: 要解压的.gz文件路径

    Returns:
        Optional[str]: 解压后的文件路径,失败返回None
    """
    if not compressed_file.exists():
        logging.error(f"要解压的文件不存在: {compressed_file}")
        return None

    try:
        # 移除.gz扩展名得到原始文件名
        original_file = str(compressed_file).replace('.gz', '')

        with gzip.open(compressed_file, 'rb') as f_in:
            with open(original_file, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

        # 删除压缩文件
        os.remove(compressed_file)

        logging.info(f"文件解压完成: {compressed_file.name} -> {Path(original_file).name}")
        return original_file

    except Exception as e:
        logging.error(f"文件解压失败 {compressed_file}: {e}")
        return None


def ensure_directory(directory: Path) -> bool:
    """
    确保目录存在,如果不存在则创建

    Args:
        directory: 目录路径

    Returns:
        bool: 是否成功确保目录存在
    """
    try:
        directory.mkdir(parents=True, exist_ok=True)
        logging.debug(f"目录已确保存在: {directory}")
        return True
    except Exception as e:
        logging.error(f"创建目录失败 {directory}: {e}")
        return False


def get_file_size(file_path: Path) -> Optional[float]:
    """
    获取文件大小(MB)

    Args:
        file_path: 文件路径

    Returns:
        Optional[float]: 文件大小(MB),失败返回None
    """
    try:
        if file_path.exists():
            return file_path.stat().st_size / (1024 * 1024)
        return None
    except Exception as e:
        logging.error(f"获取文件大小失败 {file_path}: {e}")
        return None


def format_file_size(size_bytes: int) -> str:
    """
    格式化文件大小

    Args:
        size_bytes: 字节数

    Returns:
        str: 格式化后的大小字符串
    """
    units = ['B', 'KB', 'MB', 'GB', 'TB']
    size = float(size_bytes)

    for unit in units:
        if size < 1024.0 or unit == 'TB':
            break
        size /= 1024.0

    return f"{size:.2f} {unit}"


def list_files(directory: Path, pattern: str = "*") -> List[Path]:
    """
    列出目录中的文件

    Args:
        directory: 目录路径
        pattern: 文件模式

    Returns:
        List[Path]: 文件路径列表
    """
    if not directory.exists():
        logging.warning(f"目录不存在: {directory}")
        return []

    try:
        files = list(directory.glob(pattern))
        return sorted(files, key=lambda x: x.stat().st_mtime, reverse=True)
    except Exception as e:
        logging.error(f"列出文件失败 {directory}: {e}")
        return []


def safe_delete_file(file_path: Path) -> bool:
    """
    安全删除文件

    Args:
        file_path: 要删除的文件路径

    Returns:
        bool: 是否删除成功
    """
    try:
        if file_path.exists():
            file_path.unlink()
            logging.debug(f"文件已删除: {file_path}")
            return True
        return True  # 文件不存在也算删除成功
    except Exception as e:
        logging.error(f"删除文件失败 {file_path}: {e}")
        return False


def backup_file(original_file: Path, backup_suffix: str = ".bak") -> Optional[Path]:
    """
    备份文件

    Args:
        original_file: 原始文件路径
        backup_suffix: 备份文件后缀

    Returns:
        Optional[Path]: 备份文件路径,失败返回None
    """
    if not original_file.exists():
        logging.warning(f"要备份的文件不存在: {original_file}")
        return None

    try:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = original_file.with_suffix(f"{backup_suffix}.{timestamp}")

        shutil.copy2(original_file, backup_file)
        logging.info(f"文件备份完成: {original_file} -> {backup_file}")
        return backup_file

    except Exception as e:
        logging.error(f"文件备份失败 {original_file}: {e}")
        return None


def calculate_directory_size(directory: Path) -> Optional[int]:
    """
    计算目录总大小(字节)

    Args:
        directory: 目录路径

    Returns:
        Optional[int]: 目录总大小(字节),失败返回None
    """
    if not directory.exists():
        return None

    try:
        total_size = 0
        for file_path in directory.rglob('*'):
            if file_path.is_file():
                total_size += file_path.stat().st_size
        return total_size
    except Exception as e:
        logging.error(f"计算目录大小失败 {directory}: {e}")
        return None


class FileManager:
    """
    文件管理器
    提供高级文件管理功能
    """

    def __init__(self, base_directory: Path):
        self.base_directory = base_directory
        ensure_directory(base_directory)

    def create_backup_structure(self) -> bool:
        """
        创建备份目录结构

        Returns:
            bool: 是否创建成功
        """
        try:
            directories = [
                self.base_directory / "daily",
                self.base_directory / "weekly",
                self.base_directory / "monthly",
                self.base_directory / "logs"
            ]

            for directory in directories:
                ensure_directory(directory)

            logging.info(f"备份目录结构创建完成: {self.base_directory}")
            return True

        except Exception as e:
            logging.error(f"创建备份目录结构失败: {e}")
            return False

    def cleanup_empty_directories(self) -> int:
        """
        清理空目录

        Returns:
            int: 清理的目录数量
        """
        cleaned_count = 0

        try:
            for directory in self.base_directory.rglob('*'):
                if directory.is_dir() and not any(directory.iterdir()):
                    directory.rmdir()
                    cleaned_count += 1
                    logging.debug(f"清理空目录: {directory}")

            if cleaned_count > 0:
                logging.info(f"清理了 {cleaned_count} 个空目录")

            return cleaned_count

        except Exception as e:
            logging.error(f"清理空目录失败: {e}")
            return 0

    def get_storage_info(self) -> dict:
        """
        获取存储信息

        Returns:
            dict: 存储信息字典
        """
        try:
            total_size = calculate_directory_size(self.base_directory) or 0
            file_count = sum(1 for _ in self.base_directory.rglob('*') if _.is_file())

            # 按扩展名统计
            extensions = {}
            for file_path in self.base_directory.rglob('*'):
                if file_path.is_file():
                    ext = file_path.suffix.lower() or '无扩展名'
                    extensions[ext] = extensions.get(ext, 0) + 1

            return {
                'total_size_bytes': total_size,
                'total_size_formatted': format_file_size(total_size),
                'file_count': file_count,
                'extensions': extensions,
                'directory': str(self.base_directory)
            }

        except Exception as e:
            logging.error(f"获取存储信息失败: {e}")
            return {'error': str(e)}


################################################################################
# 文件: utilslogger.py
################################################################################

"""
日志配置模块
负责系统的日志配置和管理
"""

import logging
import sys
from pathlib import Path
from typing import Optional
from logging.handlers import RotatingFileHandler

from config_manager import ConfigManager


def setup_logging(config_manager: ConfigManager, log_name: str = "mysql_auto_backup"):
    """
    设置日志配置

    Args:
        config_manager: 配置管理器实例
        log_name: 日志名称,用于生成日志文件名
    """
    log_config = config_manager.config['logging']
    log_dir = Path(log_config['log_dir'])

    # 如果是相对路径,转换为绝对路径
    if not log_dir.is_absolute():
        log_dir = Path(__file__).parent.parent / log_dir

    # 确保日志目录存在
    log_dir.mkdir(parents=True, exist_ok=True)

    # 设置日志级别
    log_level = getattr(logging, log_config['log_level'].upper(), logging.INFO)

    # 清除之前的处理器
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)

    # 创建格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 文件处理器 - 使用轮转文件
    log_file = log_dir / f"{log_name}.log"
    file_handler = RotatingFileHandler(
        log_file,
        maxBytes=parse_size(log_config.get('max_log_size', '10MB')),
        backupCount=log_config.get('backup_count', 5),
        encoding='utf-8'
    )
    file_handler.setFormatter(formatter)
    file_handler.setLevel(log_level)

    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    console_handler.setLevel(log_level)

    # 配置根日志记录器
    logging.basicConfig(
        level=log_level,
        handlers=[file_handler, console_handler]
    )

    # 设置第三方库的日志级别
    logging.getLogger('pymysql').setLevel(logging.WARNING)
    logging.getLogger('urllib3').setLevel(logging.WARNING)

    logging.info(f"日志系统初始化完成 - 级别: {log_level}, 文件: {log_file}")


def parse_size(size_str: str) -> int:
    """
    解析大小字符串为字节数

    Args:
        size_str: 大小字符串,如 '10MB', '1GB'

    Returns:
        int: 字节数
    """
    size_str = size_str.upper().strip()

    # 定义单位映射
    units = {
        'B': 1,
        'KB': 1024,
        'MB': 1024 ** 2,
        'GB': 1024 ** 3,
        'TB': 1024 ** 4
    }

    # 提取数字和单位
    number = ''
    unit = ''

    for char in size_str:
        if char.isdigit() or char == '.':
            number += char
        else:
            unit += char

    if not number:
        return 10 * 1024 * 1024  # 默认10MB

    number = float(number)
    unit = unit.strip()

    # 如果没有单位,默认使用字节
    if not unit:
        return int(number)

    # 查找匹配的单位
    for u, multiplier in units.items():
        if unit.startswith(u):
            return int(number * multiplier)

    # 如果没有匹配的单位,使用默认值
    logging.warning(f"未知的大小单位: {unit},使用默认值10MB")
    return 10 * 1024 * 1024


def get_logger(name: str) -> logging.Logger:
    """
    获取指定名称的日志记录器

    Args:
        name: 日志记录器名称

    Returns:
        logging.Logger: 日志记录器实例
    """
    return logging.getLogger(name)


def set_log_level(level: str):
    """
    设置日志级别

    Args:
        level: 日志级别 ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    """
    valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    if level.upper() not in valid_levels:
        raise ValueError(f"日志级别必须是: {', '.join(valid_levels)}")

    logging.getLogger().setLevel(getattr(logging, level.upper()))


class LogManager:
    """
    日志管理器
    提供更高级的日志管理功能
    """

    def __init__(self, config_manager: ConfigManager):
        self.config_manager = config_manager
        self.loggers = {}

    def get_module_logger(self, module_name: str) -> logging.Logger:
        """
        获取模块专用的日志记录器

        Args:
            module_name: 模块名称

        Returns:
            logging.Logger: 模块日志记录器
        """
        if module_name not in self.loggers:
            logger = logging.getLogger(module_name)
            self.loggers[module_name] = logger

        return self.loggers[module_name]

    def enable_debug_mode(self):
        """启用调试模式"""
        set_log_level('DEBUG')
        logging.info("调试模式已启用")

    def disable_debug_mode(self):
        """禁用调试模式"""
        log_config = self.config_manager.config['logging']
        log_level = log_config['log_level'].upper()
        set_log_level(log_level)
        logging.info(f"调试模式已禁用,恢复为 {log_level} 级别")

    def get_log_stats(self) -> dict:
        """
        获取日志统计信息

        Returns:
            dict: 日志统计信息
        """
        log_config = self.config_manager.config['logging']
        log_dir = Path(log_config['log_dir'])

        if not log_dir.is_absolute():
            log_dir = Path(__file__).parent.parent / log_dir

        stats = {
            'log_directory': str(log_dir),
            'current_level': logging.getLevelName(logging.getLogger().level),
            'active_loggers': len(self.loggers),
            'log_files': []
        }

        # 统计日志文件
        if log_dir.exists():
            for log_file in log_dir.glob("*.log*"):
                file_stats = {
                    'name': log_file.name,
                    'size': format_file_size(log_file.stat().st_size),
                    'modified': log_file.stat().st_mtime
                }
                stats['log_files'].append(file_stats)

        return stats


################################################################################
# 文件: utilssystem_utils.py
################################################################################

"""
系统工具模块
提供系统相关的工具函数和系统信息获取
"""

import os
import sys
import platform
import subprocess
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict


def is_windows() -> bool:
    """
    检查当前系统是否为Windows

    Returns:
        bool: 如果是Windows系统返回True
    """
    return platform.system().lower() == 'windows'


def is_linux() -> bool:
    """
    检查当前系统是否为Linux

    Returns:
        bool: 如果是Linux系统返回True
    """
    return platform.system().lower() == 'linux'


def is_macos() -> bool:
    """
    检查当前系统是否为macOS

    Returns:
        bool: 如果是macOS系统返回True
    """
    return platform.system().lower() == 'darwin'


def get_system_info() -> Dict[str, str]:
    """
    获取系统信息

    Returns:
        Dict[str, str]: 系统信息字典
    """
    try:
        info = {
            'platform': platform.system(),
            'platform_release': platform.release(),
            'platform_version': platform.version(),
            'architecture': platform.machine(),
            'processor': platform.processor(),
            'python_version': platform.python_version(),
            'python_implementation': platform.python_implementation()
        }

        # 添加额外信息
        if is_windows():
            info['windows_edition'] = platform.win32_edition()
        elif is_linux():
            info['libc_version'] = platform.libc_ver()

        return info

    except Exception as e:
        logging.error(f"获取系统信息失败: {e}")
        return {'error': str(e)}


def execute_command(cmd: list, timeout: int = 300) -> Tuple[bool, str, str]:
    """
    执行系统命令

    Args:
        cmd: 命令列表
        timeout: 超时时间(秒)

    Returns:
        Tuple[bool, str, str]: (是否成功, 标准输出, 标准错误)
    """
    try:
        logging.debug(f"执行命令: {' '.join(cmd)}")

        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            encoding='utf-8',
            errors='ignore'
        )

        success = result.returncode == 0
        stdout = result.stdout.strip()
        stderr = result.stderr.strip()

        if not success:
            logging.warning(f"命令执行失败: {stderr}")
        else:
            logging.debug(f"命令执行成功")

        return success, stdout, stderr

    except subprocess.TimeoutExpired:
        error_msg = f"命令执行超时 ({timeout}秒)"
        logging.error(error_msg)
        return False, "", error_msg
    except Exception as e:
        error_msg = f"命令执行异常: {e}"
        logging.error(error_msg)
        return False, "", error_msg


def check_command_exists(command: str) -> bool:
    """
    检查命令是否存在

    Args:
        command: 命令名称

    Returns:
        bool: 命令是否存在
    """
    try:
        if is_windows():
            # Windows系统
            cmd = ['where', command]
        else:
            # Linux/macOS系统
            cmd = ['which', command]

        success, _, _ = execute_command(cmd)
        return success

    except Exception as e:
        logging.debug(f"检查命令存在性失败 {command}: {e}")
        return False


def get_disk_usage(path: Path) -> Optional[Dict[str, float]]:
    """
    获取磁盘使用情况

    Args:
        path: 路径

    Returns:
        Optional[Dict[str, float]]: 磁盘使用信息,失败返回None
    """
    try:
        if is_windows():
            # Windows系统使用wmic
            drive = path.drive
            cmd = ['wmic', 'logicaldisk', 'where', f'deviceid="{drive}"', 'get', 'size,freespace']
            success, stdout, _ = execute_command(cmd)

            if success and stdout:
                lines = stdout.strip().split('
')
                if len(lines) >= 2:
                    parts = lines[1].split()
                    if len(parts) >= 2:
                        total = int(parts[0])
                        free = int(parts[1])
                        return {
                            'total_gb': total / (1024 ** 3),
                            'free_gb': free / (1024 ** 3),
                            'used_gb': (total - free) / (1024 ** 3),
                            'usage_percent': ((total - free) / total) * 100
                        }
        else:
            # Linux/macOS系统使用df
            cmd = ['df', '-B1', str(path)]
            success, stdout, _ = execute_command(cmd)

            if success and stdout:
                lines = stdout.strip().split('
')
                if len(lines) >= 2:
                    parts = lines[1].split()
                    if len(parts) >= 4:
                        total = int(parts[1])
                        used = int(parts[2])
                        available = int(parts[3])
                        return {
                            'total_gb': total / (1024 ** 3),
                            'used_gb': used / (1024 ** 3),
                            'free_gb': available / (1024 ** 3),
                            'usage_percent': (used / total) * 100
                        }

        return None

    except Exception as e:
        logging.error(f"获取磁盘使用情况失败: {e}")
        return None


def get_memory_usage() -> Optional[Dict[str, float]]:
    """
    获取内存使用情况

    Returns:
        Optional[Dict[str, float]]: 内存使用信息,失败返回None
    """
    try:
        if is_windows():
            # Windows系统使用wmic
            cmd = ['wmic', 'OS', 'get', 'TotalVisibleMemorySize,FreePhysicalMemory', '/value']
            success, stdout, _ = execute_command(cmd)

            if success and stdout:
                lines = stdout.strip().split('
')
                total = None
                free = None

                for line in lines:
                    if 'TotalVisibleMemorySize' in line:
                        total = int(line.split('=')[1])
                    elif 'FreePhysicalMemory' in line:
                        free = int(line.split('=')[1])

                if total and free is not None:
                    used = total - free
                    return {
                        'total_mb': total / 1024,
                        'used_mb': used / 1024,
                        'free_mb': free / 1024,
                        'usage_percent': (used / total) * 100
                    }
        else:
            # Linux系统使用free命令
            cmd = ['free', '-m']
            success, stdout, _ = execute_command(cmd)

            if success and stdout:
                lines = stdout.strip().split('
')
                if len(lines) >= 2:
                    parts = lines[1].split()
                    if len(parts) >= 7:
                        total = int(parts[1])
                        used = int(parts[2])
                        free = int(parts[3])
                        return {
                            'total_mb': total,
                            'used_mb': used,
                            'free_mb': free,
                            'usage_percent': (used / total) * 100
                        }

        return None

    except Exception as e:
        logging.error(f"获取内存使用情况失败: {e}")
        return None


def create_symlink(source: Path, target: Path) -> bool:
    """
    创建符号链接

    Args:
        source: 源文件/目录
        target: 目标链接

    Returns:
        bool: 是否创建成功
    """
    try:
        if target.exists():
            target.unlink()

        if is_windows():
            # Windows系统
            import ctypes
            if source.is_file():
                # 文件符号链接
                subprocess.run(['mklink', str(target), str(source)], shell=True, check=True)
            else:
                # 目录符号链接
                subprocess.run(['mklink', '/D', str(target), str(source)], shell=True, check=True)
        else:
            # Linux/macOS系统
            target.symlink_to(source)

        logging.debug(f"创建符号链接: {source} -> {target}")
        return True

    except Exception as e:
        logging.error(f"创建符号链接失败 {source} -> {target}: {e}")
        return False


class SystemMonitor:
    """
    系统监控器
    监控系统资源使用情况
    """

    def __init__(self):
        self.start_time = None

    def start_monitoring(self):
        """开始监控"""
        import time
        self.start_time = time.time()
        logging.info("系统监控已启动")

    def get_system_health(self) -> Dict[str, any]:
        """
        获取系统健康状态

        Returns:
            Dict[str, any]: 系统健康状态信息
        """
        health = {
            'timestamp': None,
            'disk_usage': None,
            'memory_usage': None,
            'system_info': get_system_info(),
            'warnings': []
        }

        try:
            import time
            health['timestamp'] = time.time()

            # 检查磁盘使用情况
            current_dir = Path.cwd()
            disk_info = get_disk_usage(current_dir)
            if disk_info:
                health['disk_usage'] = disk_info
                if disk_info['usage_percent'] > 90:
                    health['warnings'].append('磁盘使用率超过90%')

            # 检查内存使用情况
            memory_info = get_memory_usage()
            if memory_info:
                health['memory_usage'] = memory_info
                if memory_info['usage_percent'] > 85:
                    health['warnings'].append('内存使用率超过85%')

            return health

        except Exception as e:
            logging.error(f"获取系统健康状态失败: {e}")
            health['error'] = str(e)
            return health

    def stop_monitoring(self):
        """停止监控"""
        if self.start_time:
            import time
            duration = time.time() - self.start_time
            logging.info(f"系统监控已停止,运行时间: {duration:.2f}秒")
        self.start_time = None


################################################################################
# 文件: utilsalidation_utils.py
################################################################################

"""
数据验证工具模块
提供配置验证、路径验证等验证功能
"""

import re
import logging
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
from urllib.parse import urlparse

from models.config_models import MySQLConfig, BackupConfig


def validate_mysql_connection(host: str, port: int, user: str, password: str,
                              database: str = "mysql") -> Tuple[bool, str]:
    """
    验证MySQL连接参数

    Args:
        host: 主机地址
        port: 端口号
        user: 用户名
        password: 密码
        database: 数据库名

    Returns:
        Tuple[bool, str]: (是否有效, 错误信息)
    """
    errors = []

    # 验证主机地址
    if not host or not host.strip():
        errors.append("MySQL主机地址不能为空")
    elif len(host) > 255:
        errors.append("MySQL主机地址过长")

    # 验证端口
    if not isinstance(port, int) or port < 1 or port > 65535:
        errors.append("MySQL端口必须在1-65535范围内")

    # 验证用户名
    if not user or not user.strip():
        errors.append("MySQL用户名不能为空")
    elif len(user) > 32:
        errors.append("MySQL用户名过长")

    # 验证密码
    if not password:
        errors.append("MySQL密码不能为空")
    elif len(password) > 255:
        errors.append("MySQL密码过长")

    # 验证数据库名
    if database and len(database) > 64:
        errors.append("MySQL数据库名过长")

    if errors:
        return False, "; ".join(errors)
    else:
        return True, "MySQL连接参数验证通过"


def validate_backup_config(backup_dir: str, keep_backups: int, timeout: int,
                           compression: bool, databases: List[str]) -> Tuple[bool, str]:
    """
    验证备份配置参数

    Args:
        backup_dir: 备份目录
        keep_backups: 保留备份数量
        timeout: 超时时间
        compression: 是否压缩
        databases: 数据库列表

    Returns:
        Tuple[bool, str]: (是否有效, 错误信息)
    """
    errors = []

    # 验证备份目录
    if not backup_dir or not backup_dir.strip():
        errors.append("备份目录不能为空")
    else:
        try:
            # 尝试创建路径对象
            path = Path(backup_dir)
            # 检查路径是否包含非法字符
            if any(char in backup_dir for char in ['*', '?', '"', '<', '>', '|']):
                errors.append("备份目录包含非法字符")
        except Exception as e:
            errors.append(f"备份目录无效: {e}")

    # 验证保留备份数量
    if not isinstance(keep_backups, int) or keep_backups < 1:
        errors.append("保留备份数量必须为正整数")
    elif keep_backups > 1000:
        errors.append("保留备份数量不能超过1000")

    # 验证超时时间
    if not isinstance(timeout, int) or timeout < 30:
        errors.append("备份超时时间必须至少30秒")
    elif timeout > 3600:  # 1小时
        errors.append("备份超时时间不能超过3600秒")

    # 验证压缩设置
    if not isinstance(compression, bool):
        errors.append("压缩设置必须为布尔值")

    # 验证数据库列表
    if not databases:
        errors.append("至少需要指定一个数据库")
    else:
        for db in databases:
            if not db or not isinstance(db, str):
                errors.append("数据库名称不能为空且必须为字符串")
            elif len(db) > 64:
                errors.append(f"数据库名称过长: {db}")
            elif not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', db):
                errors.append(f"数据库名称包含非法字符: {db}")

    if errors:
        return False, "; ".join(errors)
    else:
        return True, "备份配置验证通过"


def validate_file_path(file_path: str, check_writable: bool = False) -> Tuple[bool, str]:
    """
    验证文件路径

    Args:
        file_path: 文件路径
        check_writable: 是否检查可写性

    Returns:
        Tuple[bool, str]: (是否有效, 错误信息)
    """
    try:
        path = Path(file_path)

        # 检查路径是否包含非法字符
        invalid_chars = ['*', '?', '"', '<', '>', '|', '']
        if any(char in str(path) for char in invalid_chars):
            return False, "文件路径包含非法字符"

        # 检查父目录是否存在
        parent_dir = path.parent
        if not parent_dir.exists():
            return False, f"父目录不存在: {parent_dir}"

        # 检查是否可写(如果需要)
        if check_writable:
            try:
                # 尝试创建测试文件
                test_file = parent_dir / ".write_test"
                test_file.touch()
                test_file.unlink()
            except Exception as e:
                return False, f"目录不可写: {e}"

        return True, "文件路径验证通过"

    except Exception as e:
        return False, f"文件路径验证失败: {e}"


def validate_email(email: str) -> bool:
    """
    验证电子邮件地址格式

    Args:
        email: 电子邮件地址

    Returns:
        bool: 是否有效
    """
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))


def validate_url(url: str) -> bool:
    """
    验证URL格式

    Args:
        url: URL地址

    Returns:
        bool: 是否有效
    """
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except Exception:
        return False


def validate_port(port: Any) -> bool:
    """
    验证端口号

    Args:
        port: 端口号

    Returns:
        bool: 是否有效
    """
    try:
        port_int = int(port)
        return 1 <= port_int <= 65535
    except (ValueError, TypeError):
        return False


def validate_filename(filename: str) -> Tuple[bool, str]:
    """
    验证文件名

    Args:
        filename: 文件名

    Returns:
        Tuple[bool, str]: (是否有效, 错误信息)
    """
    if not filename or not filename.strip():
        return False, "文件名不能为空"

    # 检查长度
    if len(filename) > 255:
        return False, "文件名过长"

    # 检查非法字符
    invalid_chars = ['/', '', ':', '*', '?', '"', '<', '>', '|', '']
    if any(char in filename for char in invalid_chars):
        return False, "文件名包含非法字符"

    # 检查保留名称(Windows)
    reserved_names = [
        'CON', 'PRN', 'AUX', 'NUL',
        'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
        'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
    ]
    if filename.upper() in reserved_names:
        return False, "文件名是系统保留名称"

    return True, "文件名验证通过"


class ConfigValidator:
    """
    配置验证器
    提供完整的配置验证功能
    """

    @staticmethod
    def validate_complete_config(config: Dict[str, Any]) -> Tuple[bool, List[str]]:
        """
        验证完整配置

        Args:
            config: 配置字典

        Returns:
            Tuple[bool, List[str]]: (是否有效, 错误列表)
        """
        errors = []

        # 验证MySQL配置
        mysql_config = config.get('mysql', {})
        if not mysql_config:
            errors.append("MySQL配置缺失")
        else:
            valid, msg = validate_mysql_connection(
                mysql_config.get('host', ''),
                mysql_config.get('port', 3306),
                mysql_config.get('user', ''),
                mysql_config.get('password', ''),
                mysql_config.get('database', 'mysql')
            )
            if not valid:
                errors.append(f"MySQL配置错误: {msg}")

        # 验证备份配置
        backup_config = config.get('backup', {})
        if not backup_config:
            errors.append("备份配置缺失")
        else:
            valid, msg = validate_backup_config(
                backup_config.get('backup_dir', ''),
                backup_config.get('keep_backups', 20),
                backup_config.get('timeout', 300),
                backup_config.get('compression', True),
                backup_config.get('databases', [])
            )
            if not valid:
                errors.append(f"备份配置错误: {msg}")

        # 验证日志配置
        logging_config = config.get('logging', {})
        if logging_config:
            log_dir = logging_config.get('log_dir', '')
            if log_dir:
                valid, msg = validate_file_path(log_dir, check_writable=True)
                if not valid:
                    errors.append(f"日志目录错误: {msg}")

            log_level = logging_config.get('log_level', 'INFO')
            valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
            if log_level.upper() not in valid_levels:
                errors.append(f"日志级别无效: {log_level}")

        return len(errors) == 0, errors

    @staticmethod
    def create_mysql_config_object(config: Dict[str, Any]) -> Optional[MySQLConfig]:
        """
        从配置字典创建MySQL配置对象

        Args:
            config: 配置字典

        Returns:
            Optional[MySQLConfig]: MySQL配置对象,验证失败返回None
        """
        mysql_config = config.get('mysql', {})

        try:
            return MySQLConfig(
                host=mysql_config['host'],
                user=mysql_config['user'],
                password=mysql_config['password'],
                database=mysql_config.get('database', 'mysql'),
                port=mysql_config.get('port', 3306),
                charset=mysql_config.get('charset', 'utf8mb4')
            )
        except KeyError as e:
            logging.error(f"创建MySQL配置对象失败: 缺少必要字段 {e}")
            return None

    @staticmethod
    def create_backup_config_object(config: Dict[str, Any]) -> Optional[BackupConfig]:
        """
        从配置字典创建备份配置对象

        Args:
            config: 配置字典

        Returns:
            Optional[BackupConfig]: 备份配置对象,验证失败返回None
        """
        backup_config = config.get('backup', {})

        try:
            return BackupConfig(
                backup_dir=backup_config['backup_dir'],
                keep_backups=backup_config.get('keep_backups', 20),
                compression=backup_config.get('compression', True),
                timeout=backup_config.get('timeout', 300),
                use_custom_filename=backup_config.get('use_custom_filename', True),
                filename_format=backup_config.get('filename_format', 'dump-{timestamp}-{data_source}-{database}.sql'),
                add_drop_table=backup_config.get('add_drop_table', True),
                disable_keys=backup_config.get('disable_keys', True),
                lock_tables=backup_config.get('lock_tables', False),
                add_drop_trigger=backup_config.get('add_drop_trigger', True),
                no_data=backup_config.get('no_data', False),
                complete_insert=backup_config.get('complete_insert', True),
                create_options=backup_config.get('create_options', True),
                routines=backup_config.get('routines', True),
                extended_insert=backup_config.get('extended_insert', True),
                databases=backup_config.get('databases', []),
                tables=backup_config.get('tables', [])
            )
        except KeyError as e:
            logging.error(f"创建备份配置对象失败: 缺少必要字段 {e}")
            return None


################################################################################
# 文件: utils__init__.py
################################################################################

"""
工具类模块
提供系统所需的各类工具函数和辅助类
"""

from .logger import setup_logging
from .file_utils import compress_file, ensure_directory, get_file_size, format_file_size
from .system_utils import is_windows, is_linux, get_system_info, execute_command
from .validation_utils import validate_mysql_connection, validate_backup_config, validate_file_path
from .error_handler import BackupErrorHandler, DatabaseErrorHandler, FileSystemErrorHandler

__all__ = [
    'setup_logging',
    'compress_file',
    'ensure_directory',
    'get_file_size',
    'format_file_size',
    'is_windows',
    'is_linux',
    'get_system_info',
    'execute_command',
    'validate_mysql_connection',
    'validate_backup_config',
    'validate_file_path',
    'BackupErrorHandler',
    'DatabaseErrorHandler',
    'FileSystemErrorHandler'
]