From 7702b9694104a4389fab2395bd874da4fd6221dc Mon Sep 17 00:00:00 2001 From: xiaochao Date: Thu, 3 Jul 2025 18:38:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ssh=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=95=B0=20=E5=A2=9E=E5=8A=A0=E7=A1=AC=E7=9B=98=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E7=BC=93=E5=AD=98=E8=BF=87=E6=9C=9F=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- custom_components/fn_nas/config_flow.py | 33 +- custom_components/fn_nas/const.py | 8 +- custom_components/fn_nas/coordinator.py | 327 ++++++++++-------- custom_components/fn_nas/disk_manager.py | 27 +- .../fn_nas/translations/zh-Hans.json | 4 +- 5 files changed, 237 insertions(+), 162 deletions(-) diff --git a/custom_components/fn_nas/config_flow.py b/custom_components/fn_nas/config_flow.py index 8b4f083..6774516 100644 --- a/custom_components/fn_nas/config_flow.py +++ b/custom_components/fn_nas/config_flow.py @@ -18,7 +18,11 @@ from .const import ( CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL, CONF_ROOT_PASSWORD, - CONF_ENABLE_DOCKER + CONF_ENABLE_DOCKER, + CONF_MAX_CONNECTIONS, + DEFAULT_MAX_CONNECTIONS, + CONF_CACHE_TIMEOUT, + DEFAULT_CACHE_TIMEOUT ) _LOGGER = logging.getLogger(__name__) @@ -71,7 +75,17 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): default=DEFAULT_SCAN_INTERVAL ): int, # 添加启用Docker的选项 - vol.Optional(CONF_ENABLE_DOCKER, default=False): bool + vol.Optional(CONF_ENABLE_DOCKER, default=False): bool, + # 新增:最大连接数 + vol.Optional( + CONF_MAX_CONNECTIONS, + default=DEFAULT_MAX_CONNECTIONS + ): int, + # 新增:缓存超时时间(分钟) + vol.Optional( + CONF_CACHE_TIMEOUT, + default=DEFAULT_CACHE_TIMEOUT + ): int }) return self.async_show_form( @@ -104,6 +118,9 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): self.ssh_config[CONF_MAC] = selected_mac # 确保将CONF_ENABLE_DOCKER也存入配置项 self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker + # 添加连接池和缓存配置 + self.ssh_config[CONF_MAX_CONNECTIONS] = self.ssh_config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS) + self.ssh_config[CONF_CACHE_TIMEOUT] = self.ssh_config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) return self.async_create_entry( title=self.ssh_config[CONF_HOST], data=self.ssh_config @@ -220,7 +237,17 @@ class OptionsFlowHandler(config_entries.OptionsFlow): vol.Optional( CONF_ENABLE_DOCKER, default=data.get(CONF_ENABLE_DOCKER, False) - ): bool + ): bool, + # 新增:最大连接数 + vol.Optional( + CONF_MAX_CONNECTIONS, + default=data.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS) + ): int, + # 新增:缓存超时时间(分钟) + vol.Optional( + CONF_CACHE_TIMEOUT, + default=data.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) + ): int }) return self.async_show_form( diff --git a/custom_components/fn_nas/const.py b/custom_components/fn_nas/const.py index a7acffe..c966eb4 100644 --- a/custom_components/fn_nas/const.py +++ b/custom_components/fn_nas/const.py @@ -52,4 +52,10 @@ ICON_RESTART = "mdi:restart" # 设备标识符常量 DEVICE_ID_NAS = "flynas_nas_system" DEVICE_ID_UPS = "flynas_ups" -CONF_NETWORK_MACS = "network_macs" \ No newline at end of file +CONF_NETWORK_MACS = "network_macs" + +# 新增配置常量 +CONF_MAX_CONNECTIONS = "max_connections" +CONF_CACHE_TIMEOUT = "cache_timeout" +DEFAULT_MAX_CONNECTIONS = 3 +DEFAULT_CACHE_TIMEOUT = 30 # 单位:分钟 \ No newline at end of file diff --git a/custom_components/fn_nas/coordinator.py b/custom_components/fn_nas/coordinator.py index 7bce521..9629332 100644 --- a/custom_components/fn_nas/coordinator.py +++ b/custom_components/fn_nas/coordinator.py @@ -1,6 +1,8 @@ import logging import re import asyncssh +import asyncio +import time from datetime import timedelta from homeassistant.core import HomeAssistant from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed @@ -9,7 +11,8 @@ from .const import ( DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL, DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL, - CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER + CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER, CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS, + CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT ) from .disk_manager import DiskManager from .system_manager import SystemManager @@ -29,12 +32,13 @@ class FlynasCoordinator(DataUpdateCoordinator): self.root_password = config.get(CONF_ROOT_PASSWORD) self.mac = config.get(CONF_MAC, "") self.enable_docker = config.get(CONF_ENABLE_DOCKER, False) + self.max_connections = config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS) + self.cache_timeout = config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60 self.docker_manager = DockerManager(self) if self.enable_docker else None - self.ssh = None - self.ssh_closed = True - self.ups_manager = UPSManager(self) - self.vm_manager = VMManager(self) - self.use_sudo = False + self.ssh_pool = [] # SSH连接池 + self.active_commands = 0 # 当前活动命令数 + self.ssh_closed = True # 初始状态为关闭 + self.use_sudo = False # 初始化use_sudo属性 self.data = { "disks": [], @@ -60,164 +64,186 @@ class FlynasCoordinator(DataUpdateCoordinator): self.disk_manager = DiskManager(self) self.system_manager = SystemManager(self) + self.ups_manager = UPSManager(self) + self.vm_manager = VMManager(self) - async def async_connect(self): - if self.ssh is None or self.ssh_closed: + async def get_ssh_connection(self): + """从连接池获取或创建SSH连接""" + # 如果连接池中有可用连接且没有超过最大活动命令数 + while len(self.ssh_pool) > 0 and self.active_commands < self.max_connections: + conn = self.ssh_pool.pop() + if await self.is_connection_alive(conn): + self.active_commands += 1 + return conn + else: + await self.close_connection(conn) + + # 如果没有可用连接,创建新连接 + if self.active_commands < self.max_connections: try: - self.ssh = await asyncssh.connect( + conn = await asyncssh.connect( self.host, port=self.port, username=self.username, password=self.password, - known_hosts=None + known_hosts=None, + connect_timeout=10 ) + self.active_commands += 1 + self.ssh_closed = False - if await self.is_root_user(): - _LOGGER.debug("当前用户是 root") - self.use_sudo = False - self.ssh_closed = False - return True + # 确定是否需要sudo权限 + await self.determine_sudo_setting(conn) - result = await self.ssh.run( - f"echo '{self.password}' | sudo -S -i", - input=self.password + "\n", + return conn + except Exception as e: + _LOGGER.error("创建SSH连接失败: %s", str(e), exc_info=True) + raise UpdateFailed(f"SSH连接失败: {str(e)}") + else: + await asyncio.sleep(0.1) + return await self.get_ssh_connection() + + async def determine_sudo_setting(self, conn): + """确定是否需要使用sudo权限""" + try: + # 检查当前用户是否是root + result = await conn.run("id -u", timeout=5) + if result.stdout.strip() == "0": + _LOGGER.debug("当前用户是root,不需要sudo") + self.use_sudo = False + return + except Exception as e: + _LOGGER.warning("检查用户ID失败: %s", str(e)) + + # 检查是否可以使用密码sudo + try: + result = await conn.run( + f"echo '{self.password}' | sudo -S whoami", + input=self.password + "\n", + timeout=10 + ) + if "root" in result.stdout: + _LOGGER.info("可以使用用户密码sudo") + self.use_sudo = True + return + except Exception as e: + _LOGGER.debug("无法使用用户密码sudo: %s", str(e)) + + # 如果有root密码,尝试使用root密码sudo + if self.root_password: + try: + result = await conn.run( + f"echo '{self.root_password}' | sudo -S whoami", + input=self.root_password + "\n", timeout=10 ) - - whoami_result = await self.ssh.run("whoami") - if "root" in whoami_result.stdout: - _LOGGER.info("成功切换到 root 会话(使用登录密码)") - self.use_sudo = False - self.ssh_closed = False - return True - else: - if self.root_password: - result = await self.ssh.run( - f"echo '{self.root_password}' | sudo -S -i", - input=self.root_password + "\n", - timeout=10 - ) - - whoami_result = await self.ssh.run("whoami") - if "root" in whoami_result.stdout: - _LOGGER.info("成功切换到 root 会话(使用 root 密码)") - self.use_sudo = False - self.ssh_closed = False - return True - else: - _LOGGER.warning("切换到 root 会话失败,将使用 sudo") - self.use_sudo = True - else: - _LOGGER.warning("非 root 用户且未提供 root 密码,将使用 sudo") - self.use_sudo = True - - self.ssh_closed = False - _LOGGER.info("SSH 连接已建立到 %s", self.host) - return True + if "root" in result.stdout: + _LOGGER.info("可以使用root密码sudo") + self.use_sudo = True + return except Exception as e: - self.ssh = None - self.ssh_closed = True - _LOGGER.error("连接失败: %s", str(e), exc_info=True) - return False + _LOGGER.debug("无法使用root密码sudo: %s", str(e)) + + _LOGGER.warning("无法获取root权限,将使用普通用户执行命令") + self.use_sudo = False + + async def release_ssh_connection(self, conn): + """释放连接回连接池""" + self.active_commands -= 1 + if conn and not conn.is_closed(): + if len(self.ssh_pool) < self.max_connections: + self.ssh_pool.append(conn) + else: + await self.close_connection(conn) + else: + # 如果连接已经关闭,直接丢弃 + pass + + async def close_connection(self, conn): + """关闭SSH连接""" + try: + if conn and not conn.is_closed(): + conn.close() + except Exception as e: + _LOGGER.debug("关闭SSH连接时出错: %s", str(e)) + + async def is_connection_alive(self, conn) -> bool: + """检查连接是否存活""" + try: + # 发送一个简单的命令测试连接 + result = await conn.run("echo 'connection_test'", timeout=2) + return result.exit_status == 0 and "connection_test" in result.stdout + except (asyncssh.Error, TimeoutError, ConnectionResetError): + return False + + async def run_command(self, command: str, retries=2) -> str: + """使用连接池执行命令""" + conn = None + try: + conn = await self.get_ssh_connection() + + # 根据sudo设置执行命令 + if self.use_sudo: + password = self.root_password if self.root_password else self.password + if password: + full_command = f"sudo -S {command}" + result = await conn.run(full_command, input=password + "\n", check=True) + else: + full_command = f"sudo {command}" + result = await conn.run(full_command, check=True) + else: + result = await conn.run(command, check=True) + + return result.stdout.strip() + except asyncssh.process.ProcessError as e: + if e.exit_status in [4, 32]: + return "" + _LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status) + # 连接可能已损坏,关闭它 + await self.close_connection(conn) + conn = None + if retries > 0: + return await self.run_command(command, retries-1) + else: + raise UpdateFailed(f"Command failed: {command}") from e + except asyncssh.Error as e: + _LOGGER.error("SSH连接错误: %s", str(e)) + await self.close_connection(conn) + conn = None + if retries > 0: + return await self.run_command(command, retries-1) + else: + raise UpdateFailed(f"SSH错误: {str(e)}") from e + except Exception as e: + _LOGGER.error("意外错误: %s", str(e), exc_info=True) + await self.close_connection(conn) + conn = None + if retries > 0: + return await self.run_command(command, retries-1) + else: + raise UpdateFailed(f"意外错误: {str(e)}") from e + finally: + if conn: + await self.release_ssh_connection(conn) + + async def async_connect(self): + """建立SSH连接(使用连接池)""" + # 连接池已处理连接,此方法现在主要用于初始化 return True - async def is_root_user(self): - try: - result = await self.ssh.run("id -u", timeout=5) - return result.stdout.strip() == "0" - except Exception: - return False + async def is_ssh_connected(self) -> bool: + """检查是否有活动的SSH连接""" + return len(self.ssh_pool) > 0 or self.active_commands > 0 async def async_disconnect(self): - if self.ssh is not None and not self.ssh_closed: - try: - self.ssh.close() - self.ssh_closed = True - _LOGGER.info("SSH connection closed") - except Exception as e: - _LOGGER.error("Error closing SSH connection: %s", str(e)) - finally: - self.ssh = None - - async def is_ssh_connected(self) -> bool: - if self.ssh is None or self.ssh_closed: - return False - - try: - test_command = "echo 'connection_test'" - result = await self.ssh.run(test_command, timeout=2) - return result.exit_status == 0 and "connection_test" in result.stdout - except (asyncssh.Error, TimeoutError): - return False - - async def run_command(self, command: str, retries=2) -> str: - for attempt in range(retries): - try: - if not await self.is_ssh_connected(): - if not await self.async_connect(): - if self.data and "system" in self.data: - self.data["system"]["status"] = "off" - raise UpdateFailed("SSH 连接失败") - - if self.use_sudo: - if self.root_password or self.password: - password = self.root_password if self.root_password else self.password - full_command = f"sudo -S {command}" - result = await self.ssh.run(full_command, input=password + "\n", check=True) - else: - full_command = f"sudo {command}" - result = await self.ssh.run(full_command, check=True) - else: - result = await self.ssh.run(command, check=True) - - return result.stdout.strip() - - except asyncssh.process.ProcessError as e: - if e.exit_status in [4, 32]: - return "" - _LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status) - self.ssh = None - self.ssh_closed = True - if attempt == retries - 1: - if self.data and "system" in self.data: - self.data["system"]["status"] = "off" - raise UpdateFailed(f"Command failed after {retries} attempts: {command}") from e - - except asyncssh.Error as e: - _LOGGER.error("SSH connection error: %s", str(e)) - self.ssh = None - self.ssh_closed = True - if attempt == retries - 1: - if self.data and "system" in self.data: - self.data["system"]["status"] = "off" - raise UpdateFailed(f"SSH error after {retries} attempts: {str(e)}") from e - - except Exception as e: - self.ssh = None - self.ssh_closed = True - _LOGGER.error("Unexpected error: %s", str(e), exc_info=True) - if attempt == retries - 1: - if self.data and "system" in self.data: - self.data["system"]["status"] = "off" - raise UpdateFailed(f"Unexpected error after {retries} attempts") from e - - async def get_network_macs(self): - try: - output = await self.run_command("ip link show") - macs = {} - - pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE) - matches = pattern.findall(output) - - for interface, mac in matches: - if interface == "lo" or mac == "00:00:00:00:00:00": - continue - macs[mac] = interface - - return macs - except Exception as e: - self.logger.error("获取MAC地址失败: %s", str(e)) - return {} + """关闭所有SSH连接""" + # 关闭连接池中的所有连接 + for conn in self.ssh_pool: + await self.close_connection(conn) + self.ssh_pool = [] + self.active_commands = 0 + self.ssh_closed = True + self.use_sudo = False # 重置sudo设置 async def _async_update_data(self): _LOGGER.debug("Starting data update...") @@ -231,16 +257,19 @@ class FlynasCoordinator(DataUpdateCoordinator): else: status = "on" + # 使用已初始化的管理器获取数据 disks = await self.disk_manager.get_disks_info() system = await self.system_manager.get_system_info() ups_info = await self.ups_manager.get_ups_info() vms = await self.vm_manager.get_vm_list() + # 获取虚拟机标题 for vm in vms: vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) + # 获取Docker容器信息(如果启用) docker_containers = [] - if self.enable_docker: + if self.enable_docker and hasattr(self, 'docker_manager') and self.docker_manager: docker_containers = await self.docker_manager.get_containers() data = { diff --git a/custom_components/fn_nas/disk_manager.py b/custom_components/fn_nas/disk_manager.py index 2afb98c..559dc72 100644 --- a/custom_components/fn_nas/disk_manager.py +++ b/custom_components/fn_nas/disk_manager.py @@ -1,7 +1,8 @@ import re import logging import asyncio -from .const import CONF_IGNORE_DISKS +import time +from .const import CONF_IGNORE_DISKS, CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT _LOGGER = logging.getLogger(__name__) @@ -14,7 +15,10 @@ class DiskManager: self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.first_run = True # 首次运行标志 self.initial_detection_done = False # 首次完整检测完成标志 - + self.cache_expiry = {} # 缓存过期时间(时间戳) + # 获取缓存超时配置(分钟),转换为秒 + self.cache_timeout = self.coordinator.config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60 + def extract_value(self, text: str, patterns, default="未知", format_func=None): if not text: return default @@ -38,7 +42,6 @@ class DiskManager: async def check_disk_active(self, device: str, window: int = 30) -> bool: """检查硬盘在指定时间窗口内是否有活动""" try: - # 正确的路径是 /sys/block/{device}/stat stat_path = f"/sys/block/{device}/stat" # 读取统计文件 @@ -91,8 +94,6 @@ class DiskManager: stats = stat_output.split() if len(stats) >= 11: - # 第9个字段是最近完成的读操作数 - # 第10个字段是最近完成的写操作数 recent_reads = int(stats[8]) recent_writes = int(stats[9]) @@ -108,6 +109,14 @@ class DiskManager: async def get_disks_info(self) -> list[dict]: disks = [] try: + # 清理过期缓存 + now = time.time() + for device in list(self.disk_full_info_cache.keys()): + if now - self.cache_expiry.get(device, 0) > self.cache_timeout: + self.logger.debug(f"磁盘 {device} 的缓存已过期,清除") + del self.disk_full_info_cache[device] + del self.cache_expiry[device] + self.logger.debug("Fetching disk list...") lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE") self.logger.debug("lsblk output: %s", lsblk_output) @@ -148,14 +157,15 @@ class DiskManager: # 检查是否有缓存的完整信息 cached_info = self.disk_full_info_cache.get(device, {}) - # 优化点:首次运行时强制获取完整信息 + # 首次运行时强制获取完整信息 if self.first_run: self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息") try: # 执行完整的信息获取 await self._get_full_disk_info(disk_info, device_path) - # 更新缓存 + # 更新缓存并设置过期时间 self.disk_full_info_cache[device] = disk_info.copy() + self.cache_expiry[device] = now except Exception as e: self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True) # 使用缓存信息(如果有) @@ -206,8 +216,9 @@ class DiskManager: try: # 执行完整的信息获取 await self._get_full_disk_info(disk_info, device_path) - # 更新缓存 + # 更新缓存并设置过期时间 self.disk_full_info_cache[device] = disk_info.copy() + self.cache_expiry[device] = now except Exception as e: self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True) # 使用缓存信息(如果有) diff --git a/custom_components/fn_nas/translations/zh-Hans.json b/custom_components/fn_nas/translations/zh-Hans.json index ac30a7f..aebed37 100644 --- a/custom_components/fn_nas/translations/zh-Hans.json +++ b/custom_components/fn_nas/translations/zh-Hans.json @@ -10,7 +10,9 @@ "username": "用户名", "password": "密码", "scan_interval": "数据更新间隔(秒)", - "enable_docker": "启用docker控制" + "enable_docker": "启用docker控制", + "max_connections": "最大连接数", + "cache_timeout": "缓存过期时间" } }, "select_mac": {