优化ssh连接数

增加硬盘信息缓存过期时间
This commit is contained in:
xiaochao
2025-07-03 18:38:39 +08:00
parent e3bb42e3de
commit 7702b96941
5 changed files with 237 additions and 162 deletions

View File

@@ -18,7 +18,11 @@ from .const import (
CONF_UPS_SCAN_INTERVAL, CONF_UPS_SCAN_INTERVAL,
DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ROOT_PASSWORD,
CONF_ENABLE_DOCKER CONF_ENABLE_DOCKER,
CONF_MAX_CONNECTIONS,
DEFAULT_MAX_CONNECTIONS,
CONF_CACHE_TIMEOUT,
DEFAULT_CACHE_TIMEOUT
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -71,7 +75,17 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
default=DEFAULT_SCAN_INTERVAL default=DEFAULT_SCAN_INTERVAL
): int, ): int,
# 添加启用Docker的选项 # 添加启用Docker的选项
vol.Optional(CONF_ENABLE_DOCKER, default=False): bool vol.Optional(CONF_ENABLE_DOCKER, default=False): bool,
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=DEFAULT_MAX_CONNECTIONS
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=DEFAULT_CACHE_TIMEOUT
): int
}) })
return self.async_show_form( return self.async_show_form(
@@ -104,6 +118,9 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.ssh_config[CONF_MAC] = selected_mac self.ssh_config[CONF_MAC] = selected_mac
# 确保将CONF_ENABLE_DOCKER也存入配置项 # 确保将CONF_ENABLE_DOCKER也存入配置项
self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker
# 添加连接池和缓存配置
self.ssh_config[CONF_MAX_CONNECTIONS] = self.ssh_config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.ssh_config[CONF_CACHE_TIMEOUT] = self.ssh_config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
return self.async_create_entry( return self.async_create_entry(
title=self.ssh_config[CONF_HOST], title=self.ssh_config[CONF_HOST],
data=self.ssh_config data=self.ssh_config
@@ -220,7 +237,17 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
vol.Optional( vol.Optional(
CONF_ENABLE_DOCKER, CONF_ENABLE_DOCKER,
default=data.get(CONF_ENABLE_DOCKER, False) default=data.get(CONF_ENABLE_DOCKER, False)
): bool ): bool,
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=data.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=data.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
): int
}) })
return self.async_show_form( return self.async_show_form(

View File

@@ -52,4 +52,10 @@ ICON_RESTART = "mdi:restart"
# 设备标识符常量 # 设备标识符常量
DEVICE_ID_NAS = "flynas_nas_system" DEVICE_ID_NAS = "flynas_nas_system"
DEVICE_ID_UPS = "flynas_ups" DEVICE_ID_UPS = "flynas_ups"
CONF_NETWORK_MACS = "network_macs" CONF_NETWORK_MACS = "network_macs"
# 新增配置常量
CONF_MAX_CONNECTIONS = "max_connections"
CONF_CACHE_TIMEOUT = "cache_timeout"
DEFAULT_MAX_CONNECTIONS = 3
DEFAULT_CACHE_TIMEOUT = 30 # 单位:分钟

View File

@@ -1,6 +1,8 @@
import logging import logging
import re import re
import asyncssh import asyncssh
import asyncio
import time
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -9,7 +11,8 @@ from .const import (
DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD,
CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL, CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL,
DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER, CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS,
CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
) )
from .disk_manager import DiskManager from .disk_manager import DiskManager
from .system_manager import SystemManager from .system_manager import SystemManager
@@ -29,12 +32,13 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.root_password = config.get(CONF_ROOT_PASSWORD) self.root_password = config.get(CONF_ROOT_PASSWORD)
self.mac = config.get(CONF_MAC, "") self.mac = config.get(CONF_MAC, "")
self.enable_docker = config.get(CONF_ENABLE_DOCKER, False) self.enable_docker = config.get(CONF_ENABLE_DOCKER, False)
self.max_connections = config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.cache_timeout = config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh = None self.ssh_pool = [] # SSH连接池
self.ssh_closed = True self.active_commands = 0 # 当前活动命令数
self.ups_manager = UPSManager(self) self.ssh_closed = True # 初始状态为关闭
self.vm_manager = VMManager(self) self.use_sudo = False # 初始化use_sudo属性
self.use_sudo = False
self.data = { self.data = {
"disks": [], "disks": [],
@@ -60,164 +64,186 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.disk_manager = DiskManager(self) self.disk_manager = DiskManager(self)
self.system_manager = SystemManager(self) self.system_manager = SystemManager(self)
self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self)
async def async_connect(self): async def get_ssh_connection(self):
if self.ssh is None or self.ssh_closed: """从连接池获取或创建SSH连接"""
# 如果连接池中有可用连接且没有超过最大活动命令数
while len(self.ssh_pool) > 0 and self.active_commands < self.max_connections:
conn = self.ssh_pool.pop()
if await self.is_connection_alive(conn):
self.active_commands += 1
return conn
else:
await self.close_connection(conn)
# 如果没有可用连接,创建新连接
if self.active_commands < self.max_connections:
try: try:
self.ssh = await asyncssh.connect( conn = await asyncssh.connect(
self.host, self.host,
port=self.port, port=self.port,
username=self.username, username=self.username,
password=self.password, password=self.password,
known_hosts=None known_hosts=None,
connect_timeout=10
) )
self.active_commands += 1
self.ssh_closed = False
if await self.is_root_user(): # 确定是否需要sudo权限
_LOGGER.debug("当前用户是 root") await self.determine_sudo_setting(conn)
self.use_sudo = False
self.ssh_closed = False
return True
result = await self.ssh.run( return conn
f"echo '{self.password}' | sudo -S -i", except Exception as e:
input=self.password + "\n", _LOGGER.error("创建SSH连接失败: %s", str(e), exc_info=True)
raise UpdateFailed(f"SSH连接失败: {str(e)}")
else:
await asyncio.sleep(0.1)
return await self.get_ssh_connection()
async def determine_sudo_setting(self, conn):
"""确定是否需要使用sudo权限"""
try:
# 检查当前用户是否是root
result = await conn.run("id -u", timeout=5)
if result.stdout.strip() == "0":
_LOGGER.debug("当前用户是root不需要sudo")
self.use_sudo = False
return
except Exception as e:
_LOGGER.warning("检查用户ID失败: %s", str(e))
# 检查是否可以使用密码sudo
try:
result = await conn.run(
f"echo '{self.password}' | sudo -S whoami",
input=self.password + "\n",
timeout=10
)
if "root" in result.stdout:
_LOGGER.info("可以使用用户密码sudo")
self.use_sudo = True
return
except Exception as e:
_LOGGER.debug("无法使用用户密码sudo: %s", str(e))
# 如果有root密码尝试使用root密码sudo
if self.root_password:
try:
result = await conn.run(
f"echo '{self.root_password}' | sudo -S whoami",
input=self.root_password + "\n",
timeout=10 timeout=10
) )
if "root" in result.stdout:
whoami_result = await self.ssh.run("whoami") _LOGGER.info("可以使用root密码sudo")
if "root" in whoami_result.stdout: self.use_sudo = True
_LOGGER.info("成功切换到 root 会话(使用登录密码)") return
self.use_sudo = False
self.ssh_closed = False
return True
else:
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=10
)
whoami_result = await self.ssh.run("whoami")
if "root" in whoami_result.stdout:
_LOGGER.info("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
self.ssh_closed = False
return True
else:
_LOGGER.warning("切换到 root 会话失败,将使用 sudo")
self.use_sudo = True
else:
_LOGGER.warning("非 root 用户且未提供 root 密码,将使用 sudo")
self.use_sudo = True
self.ssh_closed = False
_LOGGER.info("SSH 连接已建立到 %s", self.host)
return True
except Exception as e: except Exception as e:
self.ssh = None _LOGGER.debug("无法使用root密码sudo: %s", str(e))
self.ssh_closed = True
_LOGGER.error("连接失败: %s", str(e), exc_info=True) _LOGGER.warning("无法获取root权限将使用普通用户执行命令")
return False self.use_sudo = False
async def release_ssh_connection(self, conn):
"""释放连接回连接池"""
self.active_commands -= 1
if conn and not conn.is_closed():
if len(self.ssh_pool) < self.max_connections:
self.ssh_pool.append(conn)
else:
await self.close_connection(conn)
else:
# 如果连接已经关闭,直接丢弃
pass
async def close_connection(self, conn):
"""关闭SSH连接"""
try:
if conn and not conn.is_closed():
conn.close()
except Exception as e:
_LOGGER.debug("关闭SSH连接时出错: %s", str(e))
async def is_connection_alive(self, conn) -> bool:
"""检查连接是否存活"""
try:
# 发送一个简单的命令测试连接
result = await conn.run("echo 'connection_test'", timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError, ConnectionResetError):
return False
async def run_command(self, command: str, retries=2) -> str:
"""使用连接池执行命令"""
conn = None
try:
conn = await self.get_ssh_connection()
# 根据sudo设置执行命令
if self.use_sudo:
password = self.root_password if self.root_password else self.password
if password:
full_command = f"sudo -S {command}"
result = await conn.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await conn.run(full_command, check=True)
else:
result = await conn.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
# 连接可能已损坏,关闭它
await self.close_connection(conn)
conn = None
if retries > 0:
return await self.run_command(command, retries-1)
else:
raise UpdateFailed(f"Command failed: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH连接错误: %s", str(e))
await self.close_connection(conn)
conn = None
if retries > 0:
return await self.run_command(command, retries-1)
else:
raise UpdateFailed(f"SSH错误: {str(e)}") from e
except Exception as e:
_LOGGER.error("意外错误: %s", str(e), exc_info=True)
await self.close_connection(conn)
conn = None
if retries > 0:
return await self.run_command(command, retries-1)
else:
raise UpdateFailed(f"意外错误: {str(e)}") from e
finally:
if conn:
await self.release_ssh_connection(conn)
async def async_connect(self):
"""建立SSH连接使用连接池"""
# 连接池已处理连接,此方法现在主要用于初始化
return True return True
async def is_root_user(self): async def is_ssh_connected(self) -> bool:
try: """检查是否有活动的SSH连接"""
result = await self.ssh.run("id -u", timeout=5) return len(self.ssh_pool) > 0 or self.active_commands > 0
return result.stdout.strip() == "0"
except Exception:
return False
async def async_disconnect(self): async def async_disconnect(self):
if self.ssh is not None and not self.ssh_closed: """关闭所有SSH连接"""
try: # 关闭连接池中的所有连接
self.ssh.close() for conn in self.ssh_pool:
self.ssh_closed = True await self.close_connection(conn)
_LOGGER.info("SSH connection closed") self.ssh_pool = []
except Exception as e: self.active_commands = 0
_LOGGER.error("Error closing SSH connection: %s", str(e)) self.ssh_closed = True
finally: self.use_sudo = False # 重置sudo设置
self.ssh = None
async def is_ssh_connected(self) -> bool:
if self.ssh is None or self.ssh_closed:
return False
try:
test_command = "echo 'connection_test'"
result = await self.ssh.run(test_command, timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError):
return False
async def run_command(self, command: str, retries=2) -> str:
for attempt in range(retries):
try:
if not await self.is_ssh_connected():
if not await self.async_connect():
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed("SSH 连接失败")
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Command failed after {retries} attempts: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"SSH error after {retries} attempts: {str(e)}") from e
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.error("Unexpected error: %s", str(e), exc_info=True)
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Unexpected error after {retries} attempts") from e
async def get_network_macs(self):
try:
output = await self.run_command("ip link show")
macs = {}
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE)
matches = pattern.findall(output)
for interface, mac in matches:
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e:
self.logger.error("获取MAC地址失败: %s", str(e))
return {}
async def _async_update_data(self): async def _async_update_data(self):
_LOGGER.debug("Starting data update...") _LOGGER.debug("Starting data update...")
@@ -231,16 +257,19 @@ class FlynasCoordinator(DataUpdateCoordinator):
else: else:
status = "on" status = "on"
# 使用已初始化的管理器获取数据
disks = await self.disk_manager.get_disks_info() disks = await self.disk_manager.get_disks_info()
system = await self.system_manager.get_system_info() system = await self.system_manager.get_system_info()
ups_info = await self.ups_manager.get_ups_info() ups_info = await self.ups_manager.get_ups_info()
vms = await self.vm_manager.get_vm_list() vms = await self.vm_manager.get_vm_list()
# 获取虚拟机标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
# 获取Docker容器信息如果启用
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker and hasattr(self, 'docker_manager') and self.docker_manager:
docker_containers = await self.docker_manager.get_containers() docker_containers = await self.docker_manager.get_containers()
data = { data = {

View File

@@ -1,7 +1,8 @@
import re import re
import logging import logging
import asyncio import asyncio
from .const import CONF_IGNORE_DISKS import time
from .const import CONF_IGNORE_DISKS, CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -14,7 +15,10 @@ class DiskManager:
self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.disk_full_info_cache = {} # 缓存磁盘完整信息
self.first_run = True # 首次运行标志 self.first_run = True # 首次运行标志
self.initial_detection_done = False # 首次完整检测完成标志 self.initial_detection_done = False # 首次完整检测完成标志
self.cache_expiry = {} # 缓存过期时间(时间戳)
# 获取缓存超时配置(分钟),转换为秒
self.cache_timeout = self.coordinator.config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
def extract_value(self, text: str, patterns, default="未知", format_func=None): def extract_value(self, text: str, patterns, default="未知", format_func=None):
if not text: if not text:
return default return default
@@ -38,7 +42,6 @@ class DiskManager:
async def check_disk_active(self, device: str, window: int = 30) -> bool: async def check_disk_active(self, device: str, window: int = 30) -> bool:
"""检查硬盘在指定时间窗口内是否有活动""" """检查硬盘在指定时间窗口内是否有活动"""
try: try:
# 正确的路径是 /sys/block/{device}/stat
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
# 读取统计文件 # 读取统计文件
@@ -91,8 +94,6 @@ class DiskManager:
stats = stat_output.split() stats = stat_output.split()
if len(stats) >= 11: if len(stats) >= 11:
# 第9个字段是最近完成的读操作数
# 第10个字段是最近完成的写操作数
recent_reads = int(stats[8]) recent_reads = int(stats[8])
recent_writes = int(stats[9]) recent_writes = int(stats[9])
@@ -108,6 +109,14 @@ class DiskManager:
async def get_disks_info(self) -> list[dict]: async def get_disks_info(self) -> list[dict]:
disks = [] disks = []
try: try:
# 清理过期缓存
now = time.time()
for device in list(self.disk_full_info_cache.keys()):
if now - self.cache_expiry.get(device, 0) > self.cache_timeout:
self.logger.debug(f"磁盘 {device} 的缓存已过期,清除")
del self.disk_full_info_cache[device]
del self.cache_expiry[device]
self.logger.debug("Fetching disk list...") self.logger.debug("Fetching disk list...")
lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE") lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE")
self.logger.debug("lsblk output: %s", lsblk_output) self.logger.debug("lsblk output: %s", lsblk_output)
@@ -148,14 +157,15 @@ class DiskManager:
# 检查是否有缓存的完整信息 # 检查是否有缓存的完整信息
cached_info = self.disk_full_info_cache.get(device, {}) cached_info = self.disk_full_info_cache.get(device, {})
# 优化点:首次运行时强制获取完整信息 # 首次运行时强制获取完整信息
if self.first_run: if self.first_run:
self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息") self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息")
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存 # 更新缓存并设置过期时间
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)
@@ -206,8 +216,9 @@ class DiskManager:
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存 # 更新缓存并设置过期时间
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)

View File

@@ -10,7 +10,9 @@
"username": "用户名", "username": "用户名",
"password": "密码", "password": "密码",
"scan_interval": "数据更新间隔(秒)", "scan_interval": "数据更新间隔(秒)",
"enable_docker": "启用docker控制" "enable_docker": "启用docker控制",
"max_connections": "最大连接数",
"cache_timeout": "缓存过期时间"
} }
}, },
"select_mac": { "select_mac": {