优化飞牛sshd进程数,可能获取实体会比较慢,需要等待一段时间集成才会显示实体

This commit is contained in:
xiaochao
2025-07-12 15:47:37 +08:00
parent 57d14b48f8
commit 11d1352b20
2 changed files with 180 additions and 211 deletions

View File

@@ -1,6 +1,8 @@
# coordinator.py (文档9)
import logging import logging
import asyncio import asyncio
import asyncssh import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -38,17 +40,8 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.vm_manager = VMManager(self) self.vm_manager = VMManager(self)
self.use_sudo = False self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -65,67 +58,94 @@ class FlynasCoordinator(DataUpdateCoordinator):
self._system_online = False self._system_online = False
self._ping_task = None self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒) self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
async def async_connect(self): async def async_connect(self):
if self.ssh is None or self.ssh_closed: """建立并保持持久SSH连接"""
if self.ssh is not None and not self.ssh_closed:
try: try:
self.ssh = await asyncssh.connect( # 测试连接是否仍然活跃
self.host, await self.ssh.run("echo 'connection_test'", timeout=1)
port=self.port, return True
username=self.username, except (asyncssh.Error, TimeoutError):
password=self.password, _LOGGER.debug("现有连接失效,准备重建")
known_hosts=None, await self.async_disconnect()
connect_timeout=5 # 缩短连接超时时间
) try:
self.ssh = await asyncssh.connect(
if await self.is_root_user(): self.host,
_LOGGER.debug("当前用户是 root") port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
self.ssh_closed = False
_LOGGER.info("已建立持久SSH连接到 %s", self.host)
# 检查权限状态
if await self.is_root_user():
_LOGGER.debug("当前用户是 root")
self.use_sudo = False
else:
# 尝试切换到root会话
if await self.try_switch_to_root():
self.use_sudo = False self.use_sudo = False
self.ssh_closed = False
return True return True
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.debug("连接失败: %s", str(e))
return False
async def try_switch_to_root(self):
"""尝试切换到root会话"""
try:
if self.root_password:
result = await self.ssh.run( result = await self.ssh.run(
f"echo '{self.password}' | sudo -S -i", f"echo '{self.root_password}' | sudo -S -i",
input=self.password + "\n", input=self.root_password + "\n",
timeout=5 timeout=5
) )
whoami = await self.ssh.run("whoami")
whoami_result = await self.ssh.run("whoami") if "root" in whoami.stdout:
if "root" in whoami_result.stdout: _LOGGER.info("成功切换到 root 会话(使用 root 密码)")
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False
self.ssh_closed = False
return True return True
else:
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami_result = await self.ssh.run("whoami")
if "root" in whoami_result.stdout:
_LOGGER.info("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
self.ssh_closed = False
return True
else:
# 切换到 root 会话失败,将使用 sudo
self.use_sudo = True
else:
# 非 root 用户且未提供 root 密码,将使用 sudo
self.use_sudo = True
self.ssh_closed = False result = await self.ssh.run(
_LOGGER.info("SSH 连接已建立到 %s", self.host) f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n",
timeout=5
)
whoami = await self.ssh.run("whoami")
if "root" in whoami.stdout:
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
return True return True
except Exception as e:
self.ssh = None self.use_sudo = True
self.ssh_closed = True return False
_LOGGER.debug("连接失败: %s", str(e)) except Exception:
return False self.use_sudo = True
return True return False
async def is_root_user(self): async def is_root_user(self):
try: try:
@@ -135,26 +155,74 @@ class FlynasCoordinator(DataUpdateCoordinator):
return False return False
async def async_disconnect(self): async def async_disconnect(self):
"""断开SSH连接"""
if self.ssh is not None and not self.ssh_closed: if self.ssh is not None and not self.ssh_closed:
try: try:
self.ssh.close() self.ssh.close()
self.ssh_closed = True self.ssh_closed = True
_LOGGER.debug("SSH connection closed") _LOGGER.debug("已关闭SSH连接")
except Exception as e: except Exception as e:
_LOGGER.debug("Error closing SSH connection: %s", str(e)) _LOGGER.debug("关闭SSH连接时出错: %s", str(e))
finally: finally:
self.ssh = None self.ssh = None
async def is_ssh_connected(self) -> bool: async def run_command(self, command: str, retries=2) -> str:
if self.ssh is None or self.ssh_closed: """执行SSH命令使用持久连接"""
return False current_time = asyncio.get_event_loop().time()
# 连接冷却机制:避免短时间内频繁创建新连接
if current_time - self._last_command_time < 1.0 and self._command_count > 5:
await asyncio.sleep(0.5)
self._last_command_time = current_time
self._command_count += 1
# 系统离线时直接返回空字符串
if not self._system_online:
return ""
try: try:
test_command = "echo 'connection_test'" # 确保连接有效
result = await self.ssh.run(test_command, timeout=2) if not await self.async_connect():
return result.exit_status == 0 and "connection_test" in result.stdout return ""
except (asyncssh.Error, TimeoutError):
return False # 使用sudo执行命令
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, timeout=10)
else:
result = await self.ssh.run(command, timeout=10)
return result.stdout.strip()
except (asyncssh.Error, TimeoutError) as e:
_LOGGER.debug("命令执行失败: %s, 错误: %s", command, str(e))
# 标记连接失效
self.ssh_closed = True
return ""
except Exception as e:
_LOGGER.debug("执行命令时出现意外错误: %s", str(e))
self.ssh_closed = True
return ""
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self.logger.debug("启动系统状态监控,每%d秒检测一次", self._retry_interval)
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self.logger.info("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def ping_system(self) -> bool: async def ping_system(self) -> bool:
"""轻量级系统状态检测""" """轻量级系统状态检测"""
@@ -174,155 +242,51 @@ class FlynasCoordinator(DataUpdateCoordinator):
except Exception: except Exception:
return False return False
async def run_command(self, command: str, retries=2) -> str:
# 系统离线时直接返回空字符串,避免抛出异常
if not self._system_online:
return ""
for attempt in range(retries):
try:
if not await self.is_ssh_connected():
if not await self.async_connect():
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
return ""
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.debug("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except asyncssh.Error as e:
_LOGGER.debug("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.debug("Unexpected error: %s", str(e))
if attempt == retries - 1:
return ""
return ""
async def get_network_macs(self):
try:
output = await self.run_command("ip link show")
macs = {}
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE)
matches = pattern.findall(output)
for interface, mac in matches:
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e:
self.logger.debug("获取MAC地址失败: %s", str(e))
return {}
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self.logger.debug("启动系统状态监控,每%d秒检测一次", self._retry_interval)
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self.logger.info("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def _async_update_data(self): async def _async_update_data(self):
_LOGGER.debug("Starting data update...") """数据更新入口,优化命令执行频率"""
_LOGGER.debug("开始数据更新...")
is_online = await self.ping_system() is_online = await self.ping_system()
self._system_online = is_online self._system_online = is_online
if not is_online: if not is_online:
_LOGGER.debug("系统离线,跳过数据更新") _LOGGER.debug("系统离线,跳过数据更新")
# 修复:确保 self.data 结构有效 # 启动后台监控任务
if self.data is None or not isinstance(self.data, dict):
self.data = {}
if "system" not in self.data or not isinstance(self.data.get("system"), dict):
self.data["system"] = {}
self.data["system"]["status"] = "off"
# 启动后台监控任务(非阻塞)
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.async_disconnect() await self.async_disconnect()
# 直接返回空数据,不阻塞 return self.get_default_data()
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
# 系统在线处理 # 系统在线处理
try: try:
# 确保SSH连接 # 确保连接有效
if not await self.async_connect(): if not await self.async_connect():
self.data["system"]["status"] = "off" return self.get_default_data()
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
# 获取系统状态信息
status = "on" status = "on"
# 并行获取磁盘、UPS和系统信息
system_task = asyncio.create_task(self.system_manager.get_system_info())
disks_task = asyncio.create_task(self.disk_manager.get_disks_info())
ups_task = asyncio.create_task(self.ups_manager.get_ups_info())
vms_task = asyncio.create_task(self.vm_manager.get_vm_list())
disks = await self.disk_manager.get_disks_info() # 等待并行任务完成
system = await self.system_manager.get_system_info() system, disks, ups_info, vms = await asyncio.gather(
ups_info = await self.ups_manager.get_ups_info() system_task, disks_task, ups_task, vms_task
vms = await self.vm_manager.get_vm_list() )
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker:
docker_containers = await self.docker_manager.get_containers() docker_containers = await self.docker_manager.get_containers()
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers
@@ -332,29 +296,34 @@ class FlynasCoordinator(DataUpdateCoordinator):
except Exception as e: except Exception as e:
_LOGGER.debug("数据更新失败: %s", str(e)) _LOGGER.debug("数据更新失败: %s", str(e))
# 检查错误类型,如果是连接问题,标记为离线
self._system_online = False self._system_online = False
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
return { return self.get_default_data()
"disks": [],
"system": { def get_default_data(self):
"uptime": "未知", """获取默认数据(离线状态)"""
"cpu_temperature": "未知", return {
"motherboard_temperature": "未知", "disks": [],
"status": "off" "system": {
}, "uptime": "未知",
"ups": {}, "cpu_temperature": "未知",
"vms": [] "motherboard_temperature": "未知",
} "status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
async def reboot_system(self): async def reboot_system(self):
await self.system_manager.reboot_system() await self.system_manager.reboot_system()
async def shutdown_system(self): async def shutdown_system(self):
await self.system_manager.shutdown_system() await self.system_manager.shutdown_system()
if self.data and "system" in self.data: # 更新状态,但使用安全的方式
if self.data and isinstance(self.data, dict) and "system" in self.data:
self.data["system"]["status"] = "off" self.data["system"]["status"] = "off"
self.async_update_listeners() self.async_update_listeners()

View File

@@ -1,7 +1,7 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.4", "version": "1.3.5",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/anxms/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@anxms"],