9 Commits

Author SHA1 Message Date
xiaochao
0f691e956f 修复可用空间不可用问题 2025-07-28 14:37:28 +08:00
xiaochao
25348fff9b 优化硬盘检测逻辑避免唤醒休眠硬盘 2025-07-28 14:10:23 +08:00
xiaochao
30b1b7d271 修复ha中关闭飞牛系统电脑报错问题 2025-07-28 13:50:09 +08:00
xiaochao
17e3229b29 优化了SSH连接的管理,使用连接池来复用连接,减少连接建立的开销。
主板CPU温度获取优化,改为sensors方式获取。有主板CPU温度获取错误的情况提交issues,并带上sensors命令的输出日志,我会做适配。
2025-07-21 11:56:33 +08:00
xiaochao
11d1352b20 优化飞牛sshd进程数,可能获取实体会比较慢,需要等待一段时间集成才会显示实体 2025-07-12 15:47:37 +08:00
xiaochao
57d14b48f8 修改CPU和主板温度获取方式,可能不支持服务器级硬件和特别老的主板 2025-07-12 14:47:34 +08:00
xiaochao
fae53cf5b9 修复飞牛nas关机后重启home assistant后,飞牛nas开机后部分实体显示不可用的问题
去除ssh连接数限制和缓存清理时间
2025-07-12 01:18:26 +08:00
xiaochao
f185b7e3ee 修复飞牛nas在关机到开机时,home assistant实体状态无法更新的问题 2025-07-11 23:45:05 +08:00
xiaochao
e3bb42e3de 增加可用内存和存储空间实体 2025-07-01 14:18:22 +08:00
8 changed files with 1233 additions and 601 deletions

View File

@@ -1,62 +1,89 @@
import logging import logging
import asyncio
import asyncssh
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from .const import DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER # 导入新增常量 from homeassistant.helpers import config_validation as cv
from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER,
CONF_HOST, DEFAULT_PORT
)
from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
config = {**entry.data, **entry.options} config = {**entry.data, **entry.options}
coordinator = FlynasCoordinator(hass, config, entry)
coordinator = FlynasCoordinator(hass, config) # 直接初始化不阻塞等待NAS上线
await coordinator.async_config_entry_first_refresh()
_LOGGER.debug("协调器类型: %s", type(coordinator).__name__)
_LOGGER.debug("协调器是否有control_vm方法: %s", hasattr(coordinator, 'control_vm'))
_LOGGER.debug("协调器是否有vm_manager属性: %s", hasattr(coordinator, 'vm_manager'))
# 检查是否启用Docker并初始化Docker管理器如果有
enable_docker = config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
# 导入Docker管理器并初始化
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = { hass.data[DOMAIN][entry.entry_id] = {
DATA_UPDATE_COORDINATOR: coordinator, DATA_UPDATE_COORDINATOR: coordinator,
"ups_coordinator": ups_coordinator, "ups_coordinator": None,
CONF_ENABLE_DOCKER: enable_docker # 存储启用状态 CONF_ENABLE_DOCKER: coordinator.config.get(CONF_ENABLE_DOCKER, False)
} }
# 异步后台初始化
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) hass.async_create_task(async_delayed_setup(hass, entry, coordinator))
entry.async_on_unload(entry.add_update_listener(async_update_entry))
return True return True
async def async_delayed_setup(hass: HomeAssistant, entry: ConfigEntry, coordinator: FlynasCoordinator):
try:
# 不阻塞等待NAS上线直接尝试刷新数据
await coordinator.async_config_entry_first_refresh()
enable_docker = coordinator.config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, coordinator.config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id]["ups_coordinator"] = ups_coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_entry))
_LOGGER.info("飞牛NAS集成初始化完成")
except Exception as e:
_LOGGER.error("飞牛NAS集成初始化失败: %s", str(e))
await coordinator.async_disconnect()
if hasattr(coordinator, '_ping_task') and coordinator._ping_task:
coordinator._ping_task.cancel()
async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry):
await hass.config_entries.async_reload(entry.entry_id) """更新配置项"""
# 卸载现有集成
await async_unload_entry(hass, entry)
# 重新加载集成
await async_setup_entry(hass, entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) """卸载集成"""
# 获取集成数据
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {})
unload_ok = True
if unload_ok: if DATA_UPDATE_COORDINATOR in domain_data:
domain_data = hass.data[DOMAIN][entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR] coordinator = domain_data[DATA_UPDATE_COORDINATOR]
ups_coordinator = domain_data["ups_coordinator"] ups_coordinator = domain_data.get("ups_coordinator")
# 关闭主协调器的SSH连接 # 卸载平台
await coordinator.async_disconnect() unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# 关闭UPS协调器
await ups_coordinator.async_shutdown()
# 从DOMAIN中移除该entry的数据 if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id) # 关闭主协调器的SSH连接
await coordinator.async_disconnect()
# 关闭UPS协调器如果存在
if ups_coordinator:
await ups_coordinator.async_shutdown()
# 取消监控任务(如果存在)
if hasattr(coordinator, '_ping_task') and coordinator._ping_task and not coordinator._ping_task.done():
coordinator._ping_task.cancel()
# 从DOMAIN中移除该entry的数据
hass.data[DOMAIN].pop(entry.entry_id, None)
return unload_ok return unload_ok

View File

@@ -1,6 +1,7 @@
import logging import logging
import re import asyncio
import asyncssh import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -20,8 +21,10 @@ from .docker_manager import DockerManager
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class FlynasCoordinator(DataUpdateCoordinator): class FlynasCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config) -> None: def __init__(self, hass: HomeAssistant, config, config_entry) -> None:
self.config = config self.config = config
self.config_entry = config_entry
self.hass = hass
self.host = config[CONF_HOST] self.host = config[CONF_HOST]
self.port = config.get(CONF_PORT, DEFAULT_PORT) self.port = config.get(CONF_PORT, DEFAULT_PORT)
self.username = config[CONF_USERNAME] self.username = config[CONF_USERNAME]
@@ -32,21 +35,16 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh = None self.ssh = None
self.ssh_closed = True self.ssh_closed = True
# SSH连接池管理
self.ssh_pool = []
self.ssh_pool_size = 3 # 连接池大小
self.ssh_pool_lock = asyncio.Lock()
self.ups_manager = UPSManager(self) self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self) self.vm_manager = VMManager(self)
self.use_sudo = False self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -60,224 +58,385 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.disk_manager = DiskManager(self) self.disk_manager = DiskManager(self)
self.system_manager = SystemManager(self) self.system_manager = SystemManager(self)
self._system_online = False
self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
async def async_connect(self): # 添加日志方法
if self.ssh is None or self.ssh_closed: self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
_LOGGER.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
_LOGGER.info(message)
def _warning_log(self, message: str):
"""警告日志"""
_LOGGER.warning(message)
def _error_log(self, message: str):
"""错误日志"""
_LOGGER.error(message)
async def get_ssh_connection(self):
"""从连接池获取可用的SSH连接"""
async with self.ssh_pool_lock:
# 检查现有连接
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
# 测试连接是否活跃
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True) # 标记为使用中
self._debug_log(f"复用连接池中的连接 {i}")
return ssh, i
except Exception:
# 连接失效,移除
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
# 如果连接池未满,创建新连接
if len(self.ssh_pool) < self.ssh_pool_size:
try:
ssh = await asyncssh.connect(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
# 检查并设置权限状态
await self._setup_connection_permissions(ssh)
connection_id = len(self.ssh_pool)
self.ssh_pool.append((ssh, True))
self._debug_log(f"创建新的SSH连接 {connection_id}")
return ssh, connection_id
except Exception as e:
self._debug_log(f"创建SSH连接失败: {e}")
raise
# 连接池满且所有连接都在使用中,等待可用连接
self._debug_log("所有连接都在使用中,等待可用连接...")
for _ in range(50): # 最多等待5秒
await asyncio.sleep(0.1)
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True)
self._debug_log(f"等待后获得连接 {i}")
return ssh, i
except Exception:
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
raise Exception("无法获取SSH连接")
async def _setup_connection_permissions(self, ssh):
"""为新连接设置权限状态"""
try:
# 检查是否为root用户
result = await ssh.run("id -u", timeout=3)
if result.stdout.strip() == "0":
self._debug_log("当前用户是 root")
self.use_sudo = False
return
# 尝试切换到root会话
if self.root_password:
try:
await ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami = await ssh.run("whoami")
if "root" in whoami.stdout:
self._info_log("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
return
except Exception:
pass
# 尝试使用登录密码sudo
try: try:
self.ssh = await asyncssh.connect( await ssh.run(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None
)
if await self.is_root_user():
_LOGGER.debug("当前用户是 root")
self.use_sudo = False
self.ssh_closed = False
return True
result = await self.ssh.run(
f"echo '{self.password}' | sudo -S -i", f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n", input=self.password + "\n",
timeout=10 timeout=5
) )
whoami = await ssh.run("whoami")
whoami_result = await self.ssh.run("whoami") if "root" in whoami.stdout:
if "root" in whoami_result.stdout: self._info_log("成功切换到 root 会话(使用登录密码)")
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False self.use_sudo = False
self.ssh_closed = False return
return True except Exception:
else: pass
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=10
)
whoami_result = await self.ssh.run("whoami")
if "root" in whoami_result.stdout:
_LOGGER.info("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
self.ssh_closed = False
return True
else:
_LOGGER.warning("切换到 root 会话失败,将使用 sudo")
self.use_sudo = True
else:
_LOGGER.warning("非 root 用户且未提供 root 密码,将使用 sudo")
self.use_sudo = True
self.ssh_closed = False # 设置为使用sudo模式
_LOGGER.info("SSH 连接已建立到 %s", self.host) self.use_sudo = True
return True self._debug_log("设置为使用sudo模式")
except Exception as e:
self.ssh = None except Exception as e:
self.ssh_closed = True self._debug_log(f"设置连接权限失败: {e}")
_LOGGER.error("连接失败: %s", str(e), exc_info=True) self.use_sudo = True
return False
return True async def release_ssh_connection(self, connection_id):
"""释放SSH连接回连接池"""
async with self.ssh_pool_lock:
if 0 <= connection_id < len(self.ssh_pool):
ssh, _ = self.ssh_pool[connection_id]
self.ssh_pool[connection_id] = (ssh, False) # 标记为可用
self._debug_log(f"释放SSH连接 {connection_id}")
async def is_root_user(self): async def close_all_ssh_connections(self):
"""关闭所有SSH连接"""
async with self.ssh_pool_lock:
for ssh, _ in self.ssh_pool:
try:
ssh.close()
except:
pass
self.ssh_pool.clear()
self._debug_log("已关闭所有SSH连接")
async def async_connect(self):
"""建立并保持持久SSH连接 - 兼容旧代码"""
try: try:
result = await self.ssh.run("id -u", timeout=5) ssh, connection_id = await self.get_ssh_connection()
return result.stdout.strip() == "0" await self.release_ssh_connection(connection_id)
return True
except Exception: except Exception:
return False return False
async def async_disconnect(self): async def async_disconnect(self):
if self.ssh is not None and not self.ssh_closed: """断开SSH连接 - 兼容旧代码"""
try: await self.close_all_ssh_connections()
self.ssh.close()
self.ssh_closed = True
_LOGGER.info("SSH connection closed")
except Exception as e:
_LOGGER.error("Error closing SSH connection: %s", str(e))
finally:
self.ssh = None
async def is_ssh_connected(self) -> bool:
if self.ssh is None or self.ssh_closed:
return False
try:
test_command = "echo 'connection_test'"
result = await self.ssh.run(test_command, timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError):
return False
async def run_command(self, command: str, retries=2) -> str: async def run_command(self, command: str, retries=2) -> str:
for attempt in range(retries): """执行SSH命令使用连接池"""
try: # 系统离线时直接返回空字符串
if not await self.is_ssh_connected(): if not self._system_online:
if not await self.async_connect(): return ""
if self.data and "system" in self.data:
self.data["system"]["status"] = "off" ssh = None
raise UpdateFailed("SSH 连接失败") connection_id = None
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Command failed after {retries} attempts: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"SSH error after {retries} attempts: {str(e)}") from e
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.error("Unexpected error: %s", str(e), exc_info=True)
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Unexpected error after {retries} attempts") from e
async def get_network_macs(self):
try:
output = await self.run_command("ip link show")
macs = {}
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE)
matches = pattern.findall(output)
for interface, mac in matches:
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e:
self.logger.error("获取MAC地址失败: %s", str(e))
return {}
async def _async_update_data(self):
_LOGGER.debug("Starting data update...")
try: try:
if await self.is_ssh_connected(): # 从连接池获取连接
status = "on" ssh, connection_id = await self.get_ssh_connection()
else:
if not await self.async_connect(): # 构建完整命令
status = "off" if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else: else:
status = "on" full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
disks = await self.disk_manager.get_disks_info() return result.stdout.strip()
except Exception as e:
self._debug_log(f"命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
# 释放连接回连接池
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def run_command_direct(self, command: str) -> str:
"""直接执行命令,获取独立连接 - 用于并发任务"""
if not self._system_online:
return ""
ssh = None
connection_id = None
try:
ssh, connection_id = await self.get_ssh_connection()
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"直接命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def ping_system(self) -> bool:
"""轻量级系统状态检测"""
# 对于本地主机直接返回True
if self.host in ['localhost', '127.0.0.1']:
return True
try:
# 使用异步ping检测减少超时时间
proc = await asyncio.create_subprocess_exec(
'ping', '-c', '1', '-W', '1', self.host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await asyncio.wait_for(proc.wait(), timeout=2) # 总超时时间2秒
return proc.returncode == 0
except Exception:
return False
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self._debug_log(f"启动系统状态监控,每{self._retry_interval}秒检测一次")
# 使用指数退避策略,避免频繁检测
check_interval = self._retry_interval
max_interval = 300 # 最大5分钟检测一次
while True:
await asyncio.sleep(check_interval)
if await self.ping_system():
self._info_log("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
else:
# 系统仍然离线,增加检测间隔(指数退避)
check_interval = min(check_interval * 1.5, max_interval)
self._debug_log(f"系统仍离线,下次检测间隔: {check_interval}")
async def _async_update_data(self):
"""数据更新入口,优化命令执行频率"""
self._debug_log("开始数据更新...")
is_online = await self.ping_system()
self._system_online = is_online
if not is_online:
self._debug_log("系统离线,跳过数据更新")
# 启动后台监控任务
if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.close_all_ssh_connections()
return self.get_default_data()
# 系统在线处理
try:
# 预热连接池并确保权限设置正确
await self.async_connect()
# 获取系统状态信息
status = "on"
# 串行获取信息以确保稳定性
self._debug_log("开始获取系统信息...")
system = await self.system_manager.get_system_info() system = await self.system_manager.get_system_info()
ups_info = await self.ups_manager.get_ups_info() self._debug_log("系统信息获取完成")
vms = await self.vm_manager.get_vm_list()
self._debug_log("开始获取磁盘信息...")
disks = await self.disk_manager.get_disks_info()
self._debug_log(f"磁盘信息获取完成,数量: {len(disks)}")
self._debug_log("开始获取UPS信息...")
ups_info = await self.ups_manager.get_ups_info()
self._debug_log("UPS信息获取完成")
self._debug_log("开始获取虚拟机信息...")
vms = await self.vm_manager.get_vm_list()
self._debug_log(f"虚拟机信息获取完成,数量: {len(vms)}")
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) try:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
except Exception as e:
self._debug_log(f"获取VM标题失败 {vm['name']}: {e}")
vm["title"] = vm["name"]
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker and self.docker_manager:
docker_containers = await self.docker_manager.get_containers() self._debug_log("开始获取Docker信息...")
try:
docker_containers = await self.docker_manager.get_containers()
self._debug_log(f"Docker信息获取完成数量: {len(docker_containers)}")
except Exception as e:
self._debug_log(f"Docker信息获取失败: {e}")
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers
} }
self._debug_log(f"数据更新完成: disks={len(disks)}, vms={len(vms)}, containers={len(docker_containers)}")
return data return data
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update data: %s", str(e), exc_info=True) self._error_log(f"数据更新失败: {str(e)}")
return { self._system_online = False
"disks": [], if not self._ping_task or self._ping_task.done():
"system": { self._ping_task = asyncio.create_task(self._monitor_system_status())
"uptime": "未知",
"cpu_temperature": "未知", return self.get_default_data()
"motherboard_temperature": "未知",
"status": "off" async def shutdown_system(self):
}, """关闭系统 - 委托给SystemManager"""
"ups": {}, return await self.system_manager.shutdown_system()
"vms": []
}
async def reboot_system(self): async def reboot_system(self):
await self.system_manager.reboot_system() """重启系统 - 委托给SystemManager"""
return await self.system_manager.reboot_system()
async def shutdown_system(self):
await self.system_manager.shutdown_system()
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
self.async_update_listeners()
class UPSDataUpdateCoordinator(DataUpdateCoordinator): class UPSDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, main_coordinator): def __init__(self, hass: HomeAssistant, config, main_coordinator):
@@ -297,19 +456,20 @@ class UPSDataUpdateCoordinator(DataUpdateCoordinator):
self.ups_manager = UPSManager(main_coordinator) self.ups_manager = UPSManager(main_coordinator)
async def _async_update_data(self): async def _async_update_data(self):
# 如果主协调器检测到系统离线跳过UPS更新
if not self.main_coordinator._system_online:
return {}
try: try:
return await self.ups_manager.get_ups_info() return await self.ups_manager.get_ups_info()
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update UPS data: %s", str(e), exc_info=True) _LOGGER.debug("UPS数据更新失败: %s", str(e))
return {} return {}
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
try: try:
if not hasattr(self, 'vm_manager'): result = await self.main_coordinator.vm_manager.control_vm(vm_name, action)
self.vm_manager = VMManager(self)
result = await self.vm_manager.control_vm(vm_name, action)
return result return result
except Exception as e: except Exception as e:
_LOGGER.error("虚拟机控制失败: %s", str(e), exc_info=True) _LOGGER.debug("虚拟机控制失败: %s", str(e))
return False return False

View File

@@ -14,6 +14,7 @@ class DiskManager:
self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.disk_full_info_cache = {} # 缓存磁盘完整信息
self.first_run = True # 首次运行标志 self.first_run = True # 首次运行标志
self.initial_detection_done = False # 首次完整检测完成标志 self.initial_detection_done = False # 首次完整检测完成标志
self.disk_io_stats_cache = {} # 缓存磁盘I/O统计信息
def extract_value(self, text: str, patterns, default="未知", format_func=None): def extract_value(self, text: str, patterns, default="未知", format_func=None):
if not text: if not text:
@@ -38,10 +39,9 @@ class DiskManager:
async def check_disk_active(self, device: str, window: int = 30) -> bool: async def check_disk_active(self, device: str, window: int = 30) -> bool:
"""检查硬盘在指定时间窗口内是否有活动""" """检查硬盘在指定时间窗口内是否有活动"""
try: try:
# 正确的路径是 /sys/block/{device}/stat
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
# 读取统计文件 # 读取当前统计文件
stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null") stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
if not stat_output: if not stat_output:
self.logger.debug(f"无法读取 {stat_path},默认返回活跃状态") self.logger.debug(f"无法读取 {stat_path},默认返回活跃状态")
@@ -52,52 +52,148 @@ class DiskManager:
if len(stats) < 11: if len(stats) < 11:
self.logger.debug(f"无效的统计信息格式:{stat_output}") self.logger.debug(f"无效的统计信息格式:{stat_output}")
return True return True
# 关键字段当前正在进行的I/O操作数量第9个字段索引8
in_flight = int(stats[8])
# 如果当前有I/O操作直接返回活跃状态 try:
if in_flight > 0: # /sys/block/{device}/stat 字段说明:
# 0: read I/Os requests 读请求次数
# 1: read I/Os merged 读请求合并次数
# 2: read sectors 读扇区数
# 3: read ticks 读操作耗时(ms)
# 4: write I/Os requests 写请求次数
# 5: write I/Os merged 写请求合并次数
# 6: write sectors 写扇区数
# 7: write ticks 写操作耗时(ms)
# 8: in_flight 当前进行中的I/O请求数
# 9: io_ticks I/O活动时间(ms)
# 10: time_in_queue 队列中的总时间(ms)
current_stats = {
'read_ios': int(stats[0]),
'write_ios': int(stats[4]),
'in_flight': int(stats[8]),
'io_ticks': int(stats[9])
}
# 如果当前有正在进行的I/O操作直接返回活跃状态
if current_stats['in_flight'] > 0:
self.logger.debug(f"磁盘 {device} 有正在进行的I/O操作: {current_stats['in_flight']}")
self.disk_io_stats_cache[device] = current_stats
return True
# 检查是否有缓存的统计信息
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
# 比较I/O请求次数的变化
read_ios_diff = current_stats['read_ios'] - cached_stats['read_ios']
write_ios_diff = current_stats['write_ios'] - cached_stats['write_ios']
io_ticks_diff = current_stats['io_ticks'] - cached_stats['io_ticks']
self.logger.debug(f"磁盘 {device} I/O变化: 读={read_ios_diff}, 写={write_ios_diff}, 活动时间={io_ticks_diff}ms")
# 如果在检测窗口内有I/O活动认为磁盘活跃
if read_ios_diff > 0 or write_ios_diff > 0 or io_ticks_diff > 100: # 100ms内的活动
self.logger.debug(f"磁盘 {device} 在窗口期内有I/O活动")
self.disk_io_stats_cache[device] = current_stats
return True
# 检查io_ticks是否表明最近有活动
# io_ticks是累积值如果在合理范围内增长说明有轻微活动
if io_ticks_diff > 0 and io_ticks_diff < window * 1000: # 在窗口时间内的轻微活动
self.logger.debug(f"磁盘 {device} 有轻微I/O活动")
self.disk_io_stats_cache[device] = current_stats
return True
else:
# 首次检测,保存当前状态并认为活跃
self.logger.debug(f"磁盘 {device} 首次检测,保存统计信息")
self.disk_io_stats_cache[device] = current_stats
return True
# 更新缓存
self.disk_io_stats_cache[device] = current_stats
# 检查硬盘电源状态
power_state = await self.get_disk_power_state(device)
if power_state in ["standby", "sleep", "idle"]:
self.logger.debug(f"磁盘 {device} 处于省电状态: {power_state}")
return False
# 所有检查都通过,返回非活跃状态
self.logger.debug(f"磁盘 {device} 判定为非活跃状态")
return False
except (ValueError, IndexError) as e:
self.logger.debug(f"解析统计信息失败: {e}")
return True return True
# 检查I/O操作时间第10个字段索引9 - io_ticks单位毫秒
io_ticks = int(stats[9])
# 如果设备在窗口时间内有I/O活动返回活跃状态
if io_ticks > window * 1000:
return True
# 所有检查都通过,返回非活跃状态
return False
except Exception as e: except Exception as e:
self.logger.error(f"检测硬盘活动状态失败: {str(e)}", exc_info=True) self.logger.error(f"检测硬盘活动状态失败: {str(e)}")
return True # 出错时默认执行检测 return True # 出错时默认执行检测
async def get_disk_activity(self, device: str) -> str: async def get_disk_power_state(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)""" """获取硬盘电源状态"""
try: try:
# 检查硬盘是否处于休眠状态 # 检查 SCSI 设备状态
state_path = f"/sys/block/{device}/device/state" state_path = f"/sys/block/{device}/device/state"
state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'") state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'")
state = state_output.strip().lower() state = state_output.strip().lower()
if state in ["standby", "sleep"]: if state in ["running", "active"]:
return "active"
elif state in ["standby", "sleep"]:
return state
# 对于某些设备尝试通过hdparm检查状态非侵入性
hdparm_output = await self.coordinator.run_command(f"hdparm -C /dev/{device} 2>/dev/null || echo 'unknown'")
if "standby" in hdparm_output.lower():
return "standby"
elif "sleeping" in hdparm_output.lower():
return "sleep"
elif "active/idle" in hdparm_output.lower():
return "active"
return "unknown"
except Exception as e:
self.logger.debug(f"获取磁盘 {device} 电源状态失败: {e}")
return "unknown"
async def get_disk_activity(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)"""
try:
# 先检查电源状态
power_state = await self.get_disk_power_state(device)
if power_state in ["standby", "sleep"]:
return "休眠中" return "休眠中"
# 检查最近一分钟内的硬盘活动 # 检查最近的I/O活动
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
stat_output = await self.coordinator.run_command(f"cat {stat_path}") stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
stats = stat_output.split()
if len(stats) >= 11: if stat_output:
# 第9个字段是最近完成的读操作数 stats = stat_output.split()
# 第10个字段是最近完成的写操作数 if len(stats) >= 11:
recent_reads = int(stats[8]) try:
recent_writes = int(stats[9]) in_flight = int(stats[8]) # 当前进行中的I/O
if recent_reads > 0 or recent_writes > 0: # 如果有正在进行的I/O返回活动中
return "活动中" if in_flight > 0:
return "活动中"
# 检查缓存的统计信息来判断近期活动
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
current_read_ios = int(stats[0])
current_write_ios = int(stats[4])
read_diff = current_read_ios - cached_stats.get('read_ios', 0)
write_diff = current_write_ios - cached_stats.get('write_ios', 0)
if read_diff > 0 or write_diff > 0:
return "活动中"
except (ValueError, IndexError):
pass
return "空闲中" return "空闲中"

View File

@@ -1,7 +1,7 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.0", "version": "1.3.7",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/anxms/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@anxms"],

View File

@@ -243,6 +243,38 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
) )
existing_ids.add(sensor_uid) existing_ids.add(sensor_uid)
# 添加剩余内存传感器
mem_available_uid = f"{config_entry.entry_id}_memory_available"
if mem_available_uid not in existing_ids:
entities.append(
MemoryAvailableSensor(
coordinator,
"可用内存",
mem_available_uid,
"GB",
"mdi:memory"
)
)
existing_ids.add(mem_available_uid)
# 添加存储卷的剩余容量传感器(每个卷一个)
system_data = coordinator.data.get("system", {})
volumes = system_data.get("volumes", {})
for mount_point in volumes:
# 创建剩余容量传感器
vol_avail_uid = f"{config_entry.entry_id}_{mount_point.replace('/', '_')}_available"
if vol_avail_uid not in existing_ids:
entities.append(
VolumeAvailableSensor(
coordinator,
f"{mount_point} 可用空间",
vol_avail_uid,
"mdi:harddisk",
mount_point
)
)
existing_ids.add(vol_avail_uid)
async_add_entities(entities) async_add_entities(entities)
@@ -537,4 +569,129 @@ class DockerContainerStatusSensor(CoordinatorEntity, SensorEntity):
"dead": "死亡" "dead": "死亡"
} }
return status_map.get(container["status"], container["status"]) return status_map.get(container["status"], container["status"])
return "未知" return "未知"
class MemoryAvailableSensor(CoordinatorEntity, SensorEntity):
"""剩余内存传感器(包含总内存和已用内存作为属性)"""
def __init__(self, coordinator, name, unique_id, unit, icon):
super().__init__(coordinator)
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_native_unit_of_measurement = unit
self._attr_icon = icon
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_NAS)},
"name": "飞牛NAS系统监控",
"manufacturer": "飞牛"
}
self._attr_state_class = SensorStateClass.MEASUREMENT
@property
def native_value(self):
"""返回可用内存GB"""
system_data = self.coordinator.data.get("system", {})
mem_available = system_data.get("memory_available")
if mem_available is None or mem_available == "未知":
return None
try:
# 将字节转换为GB
return round(float(mem_available) / (1024 ** 3), 2)
except (TypeError, ValueError):
return None
@property
def extra_state_attributes(self):
"""返回总内存和已用内存GB以及原始字节值"""
system_data = self.coordinator.data.get("system", {})
mem_total = system_data.get("memory_total")
mem_used = system_data.get("memory_used")
mem_available = system_data.get("memory_available")
# 转换为GB
try:
mem_total_gb = round(float(mem_total) / (1024 ** 3), 2) if mem_total and mem_total != "未知" else None
except:
mem_total_gb = None
try:
mem_used_gb = round(float(mem_used) / (1024 ** 3), 2) if mem_used and mem_used != "未知" else None
except:
mem_used_gb = None
return {
"总内存 (GB)": mem_total_gb,
"已用内存 (GB)": mem_used_gb
}
class VolumeAvailableSensor(CoordinatorEntity, SensorEntity):
"""存储卷剩余容量传感器(包含总容量和已用容量作为属性)"""
def __init__(self, coordinator, name, unique_id, icon, mount_point):
super().__init__(coordinator)
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_icon = icon
self.mount_point = mount_point
# 设备信息归属到飞牛NAS系统
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_NAS)},
"name": "飞牛NAS系统监控",
"manufacturer": "飞牛"
}
self._attr_state_class = SensorStateClass.MEASUREMENT
@property
def native_value(self):
"""返回剩余容量(数值)"""
system_data = self.coordinator.data.get("system", {})
volumes = system_data.get("volumes", {})
vol_info = volumes.get(self.mount_point, {})
avail_str = vol_info.get("available", "未知")
if avail_str == "未知":
return None
try:
numeric_part = ''.join(filter(lambda x: x.isdigit() or x == '.', avail_str))
return float(numeric_part)
except (TypeError, ValueError):
return None
@property
def native_unit_of_measurement(self):
"""动态返回单位"""
system_data = self.coordinator.data.get("system", {})
volumes = system_data.get("volumes", {})
vol_info = volumes.get(self.mount_point, {})
avail_str = vol_info.get("available", "")
if avail_str.endswith("T") or avail_str.endswith("Ti"):
return "TB"
elif avail_str.endswith("G") or avail_str.endswith("Gi"):
return "GB"
elif avail_str.endswith("M") or avail_str.endswith("Mi"):
return "MB"
else:
return None # 未知单位
@property
def extra_state_attributes(self):
system_data = self.coordinator.data.get("system", {})
volumes = system_data.get("volumes", {})
vol_info = volumes.get(self.mount_point, {})
return {
"挂载点": self.mount_point,
"文件系统": vol_info.get("filesystem", "未知"),
"总容量": vol_info.get("size", "未知"),
"已用容量": vol_info.get("used", "未知"),
"使用率": vol_info.get("use_percent", "未知")
}
return attributes

View File

@@ -1,7 +1,5 @@
import re
import logging import logging
import asyncio import asyncio
import json
import os import os
from datetime import datetime from datetime import datetime
@@ -11,10 +9,41 @@ class SystemManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("system_manager") self.logger = _LOGGER.getChild("system_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # 调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.sensors_debug_path = "/config/fn_nas_debug" # 调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.sensors_debug_path = "/config/fn_nas_debug"
# 温度传感器缓存
self.cpu_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"driver_type": None,
"label": None
}
self.mobo_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"label": None
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_system_info(self) -> dict: async def get_system_info(self) -> dict:
"""获取系统信息""" """获取系统信息"""
system_info = {} system_info = {}
@@ -23,10 +52,8 @@ class SystemManager:
uptime_output = await self.coordinator.run_command("cat /proc/uptime") uptime_output = await self.coordinator.run_command("cat /proc/uptime")
if uptime_output: if uptime_output:
try: try:
# 保存原始秒数
uptime_seconds = float(uptime_output.split()[0]) uptime_seconds = float(uptime_output.split()[0])
system_info["uptime_seconds"] = uptime_seconds system_info["uptime_seconds"] = uptime_seconds
# 保存格式化字符串
system_info["uptime"] = self.format_uptime(uptime_seconds) system_info["uptime"] = self.format_uptime(uptime_seconds)
except (ValueError, IndexError): except (ValueError, IndexError):
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
@@ -34,105 +61,189 @@ class SystemManager:
else: else:
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
system_info["uptime"] = "未知" system_info["uptime"] = "未知"
# 获取 sensors 命令输出使用JSON格式 # 一次性获取CPU和主板温度
sensors_output = await self.coordinator.run_command( temps = await self.get_temperatures_from_sensors()
"sensors -j 2>/dev/null || sensors 2>/dev/null || echo 'No sensor data'" system_info["cpu_temperature"] = temps["cpu"]
) system_info["motherboard_temperature"] = temps["motherboard"]
# 保存传感器数据以便调试 mem_info = await self.get_memory_info()
self.save_sensor_data_for_debug(sensors_output) system_info.update(mem_info)
self.logger.debug("Sensors output: %s", sensors_output[:500] + "..." if len(sensors_output) > 500 else sensors_output) vol_info = await self.get_vol_usage()
system_info["volumes"] = vol_info
# 提取 CPU 温度(改进算法)
cpu_temp = self.extract_cpu_temp(sensors_output)
system_info["cpu_temperature"] = cpu_temp
# 提取主板温度(改进算法)
mobo_temp = self.extract_mobo_temp(sensors_output)
system_info["motherboard_temperature"] = mobo_temp
# 尝试备用方法获取CPU温度
if cpu_temp == "未知":
backup_cpu_temp = await self.get_cpu_temp_fallback()
if backup_cpu_temp:
system_info["cpu_temperature"] = backup_cpu_temp
return system_info return system_info
except Exception as e: except Exception as e:
self.logger.error("Error getting system info: %s", str(e)) self.logger.error("Error getting system info: %s", str(e))
return { return {
"uptime_seconds": 0, "uptime_seconds": 0,
"uptime": "未知", "uptime": "未知",
"cpu_temperature": "未知", "cpu_temperature": "未知",
"motherboard_temperature": "未知" "motherboard_temperature": "未知",
"memory_total": "未知",
"memory_used": "未知",
"memory_available": "未知",
"volumes": {}
} }
def save_sensor_data_for_debug(self, sensors_output: str): async def get_temperatures_from_sensors(self) -> dict:
"""保存传感器数据以便调试""" """一次性获取CPU和主板温度"""
if not self.debug_enabled:
return
try: try:
# 创建调试目录 command = "sensors"
if not os.path.exists(self.sensors_debug_path): self._debug_log(f"执行sensors命令获取温度: {command}")
os.makedirs(self.sensors_debug_path)
# 生成文件名 sensors_output = await self.coordinator.run_command(command)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if self.debug_enabled:
filename = os.path.join(self.sensors_debug_path, f"sensors_{timestamp}.log") self._debug_log(f"sensors命令输出长度: {len(sensors_output) if sensors_output else 0}")
# 写入文件 if not sensors_output:
with open(filename, "w") as f: self._warning_log("sensors命令无输出")
f.write(sensors_output) return {"cpu": "未知", "motherboard": "未知"}
# 同时解析CPU和主板温度
cpu_temp = self.extract_cpu_temp_from_sensors(sensors_output)
mobo_temp = self.extract_mobo_temp_from_sensors(sensors_output)
# 记录获取结果
if cpu_temp != "未知":
self._info_log(f"通过sensors获取CPU温度成功: {cpu_temp}")
else:
self._warning_log("sensors命令未找到CPU温度")
if mobo_temp != "未知":
self._info_log(f"通过sensors获取主板温度成功: {mobo_temp}")
else:
self._warning_log("sensors命令未找到主板温度")
return {"cpu": cpu_temp, "motherboard": mobo_temp}
self.logger.info("Saved sensors output to %s for debugging", filename)
except Exception as e: except Exception as e:
self.logger.error("Failed to save sensor data: %s", str(e)) self._error_log(f"使用sensors命令获取温度失败: {e}")
return {"cpu": "未知", "motherboard": "未知"}
async def get_cpu_temp_fallback(self) -> str:
"""备用方法获取CPU温度""" async def get_cpu_temp_from_kernel(self) -> str:
self.logger.info("Trying fallback methods to get CPU temperature") """获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
# 方法1: 从/sys/class/thermal读取 return temps["cpu"]
async def get_mobo_temp_from_kernel(self) -> str:
"""获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
async def get_cpu_temp_from_sensors(self) -> str:
"""使用sensors命令获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["cpu"]
async def get_mobo_temp_from_sensors(self) -> str:
"""使用sensors命令获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
def extract_cpu_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取CPU温度"""
try: try:
for i in range(5): # 检查前5个可能的传感器 lines = sensors_output.split('\n')
path = f"/sys/class/thermal/thermal_zone{i}/temp" self._debug_log(f"解析sensors输出{len(lines)}")
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit(): for i, line in enumerate(lines):
temp = float(output) / 1000.0 line_lower = line.lower().strip()
self.logger.info("Found CPU temperature via thermal zone: %.1f°C", temp) if self.debug_enabled:
return f"{temp:.1f} °C" self._debug_log(f"{i+1}行: {line_lower}")
except Exception:
pass # AMD CPU温度关键词
if any(keyword in line_lower for keyword in [
# 方法2: 从hwmon设备读取 "tctl", "tdie", "k10temp"
]):
self._debug_log(f"找到AMD CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取AMD CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析AMD温度失败: {e}")
continue
# Intel CPU温度关键词
if any(keyword in line_lower for keyword in [
"package id", "core 0", "coretemp"
]) and not any(exclude in line_lower for exclude in ["fan"]):
self._debug_log(f"找到Intel CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取Intel CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析Intel温度失败: {e}")
continue
# 通用CPU温度模式
if ('cpu' in line_lower or 'processor' in line_lower) and '+' in line and '°c' in line_lower:
self._debug_log(f"找到通用CPU温度行: {line}")
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取通用CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析通用CPU温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到CPU温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors CPU温度输出失败: {e}")
return "未知"
def extract_mobo_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取主板温度"""
try: try:
for i in range(5): # 检查前5个可能的hwmon设备 lines = sensors_output.split('\n')
for j in range(5): # 检查每个设备的前5个温度传感器 self._debug_log(f"解析主板温度,共{len(lines)}")
path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null") for i, line in enumerate(lines):
if output and output.isdigit(): line_lower = line.lower().strip()
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via hwmon: %.1f°C", temp) # 主板温度关键词
return f"{temp:.1f} °C" if any(keyword in line_lower for keyword in [
except Exception: "motherboard", "mobo", "mb", "system", "chipset",
pass "ambient", "temp1:", "temp2:", "temp3:", "systin"
]) and not any(cpu_keyword in line_lower for cpu_keyword in [
# 方法3: 使用psutil库如果可用 "cpu", "core", "package", "processor", "tctl", "tdie"
try: ]) and not any(exclude in line_lower for exclude in ["fan", "rpm"]):
output = await self.coordinator.run_command("python3 -c 'import psutil; print(psutil.sensors_temperatures().get(\"coretemp\")[0].current)' 2>/dev/null")
if output and output.replace('.', '', 1).isdigit(): self._debug_log(f"找到可能的主板温度行: {line}")
temp = float(output)
self.logger.info("Found CPU temperature via psutil: %.1f°C", temp) if '+' in line and '°c' in line_lower:
return f"{temp:.1f} °C" try:
except Exception: temp_match = line.split('+')[1].split('°')[0].strip()
pass temp = float(temp_match)
# 主板温度通常在15-70度之间
self.logger.warning("All fallback methods failed to get CPU temperature") if 15 <= temp <= 70:
return "" self._info_log(f"从sensors提取主板温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
else:
self._debug_log(f"主板温度值超出合理范围: {temp:.1f}°C")
except (ValueError, IndexError) as e:
self._debug_log(f"解析主板温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到主板温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors主板温度输出失败: {e}")
return "未知"
def format_uptime(self, seconds: float) -> str: def format_uptime(self, seconds: float) -> str:
"""格式化运行时间为易读格式""" """格式化运行时间为易读格式"""
try: try:
@@ -153,252 +264,284 @@ class SystemManager:
self.logger.error("Failed to format uptime: %s", str(e)) self.logger.error("Failed to format uptime: %s", str(e))
return "未知" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str: async def get_memory_info(self) -> dict:
"""从 sensors 输出中提取 CPU 温度,优先获取 Package id 0""" """获取内存使用信息"""
# 优先尝试获取 Package id 0 温度值 try:
package_id_pattern = r'Package id 0:\s*\+?(\d+\.?\d*)°C' # 使用 free 命令获取内存信息(-b 选项以字节为单位)
package_match = re.search(package_id_pattern, sensors_output, re.IGNORECASE) mem_output = await self.coordinator.run_command("free -b")
if package_match: if not mem_output:
try: return {}
package_temp = float(package_match.group(1))
self.logger.debug("优先使用 Package id 0 温度: %.1f°C", package_temp)
return f"{package_temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("Package id 0 解析错误: %s", str(e))
# 其次尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
self.logger.debug("JSON sensors data: %s", json.dumps(data, indent=2))
# 查找包含Package相关键名的温度值
for key, values in data.items():
if any(kw in key.lower() for kw in ["package", "pkg", "physical"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Package温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception as e:
self.logger.debug("JSON值错误: %s", str(e))
# 新增尝试直接获取Tdie/Tctl温度AMD CPU
for key, values in data.items():
if "k10temp" in key.lower():
for subkey, temp_value in values.items():
if "tdie" in subkey.lower() or "tctl" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Tdie/Tctl温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception:
pass
except Exception as e:
self.logger.warning("JSON解析失败: %s", str(e))
# 最后尝试其他模式
other_patterns = [
r'Package id 0:\s*\+?(\d+\.?\d*)°C', # 再次尝试确保捕获
r'CPU Temperature:\s*\+?(\d+\.?\d*)°C',
r'cpu_thermal:\s*\+?(\d+\.?\d*)°C',
r'Tdie:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'Tctl:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'PECI Agent \d:\s*\+?(\d+\.?\d*)°C',
r'Composite:\s*\+?(\d+\.?\d*)°C',
r'CPU\s+Temp:\s*\+?(\d+\.?\d*)°C',
r'k10temp-pci\S*:\s*\+?(\d+\.?\d*)°C',
r'Physical id 0:\s*\+?(\d+\.?\d*)°C'
]
for pattern in other_patterns:
match = re.search(pattern, sensors_output, re.IGNORECASE)
if match:
try:
temp = float(match.group(1))
self.logger.debug("匹配到CPU温度: %s: %.1f°C", pattern, temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError):
continue
# 如果所有方法都失败返回未知
return "未知"
def extract_temp_from_systin(self, systin_data: dict) -> float:
"""从 SYSTIN 数据结构中提取温度值"""
if not systin_data:
return None
# 尝试从不同键名获取温度值 # 解析输出
for key in ["temp1_input", "input", "value"]: lines = mem_output.splitlines()
temp = systin_data.get(key) if len(lines) < 2:
if temp is not None: return {}
try:
return float(temp) # 第二行是内存信息Mem行
except (TypeError, ValueError): mem_line = lines[1].split()
continue if len(mem_line) < 7:
return None return {}
return {
"memory_total": int(mem_line[1]),
"memory_used": int(mem_line[2]),
"memory_available": int(mem_line[6])
}
except Exception as e:
self._error_log(f"获取内存信息失败: {str(e)}")
return {}
def extract_mobo_temp(self, sensors_output: str) -> str: async def get_vol_usage(self) -> dict:
"""从 sensors 输出中提取主板温度""" """获取 /vol* 开头的存储卷使用信息,避免唤醒休眠磁盘"""
# 首先尝试解析JSON格式 try:
if sensors_output.strip().startswith('{'): # 首先尝试智能检测活跃卷
try: active_vols = await self.check_active_volumes()
data = json.loads(sensors_output)
if active_vols:
# 只查询活跃的卷,避免使用通配符可能唤醒所有磁盘
vol_list = " ".join(active_vols)
df_output = await self.coordinator.run_command(f"df -B 1 {vol_list} 2>/dev/null")
if df_output:
result = self.parse_df_bytes(df_output)
if result: # 确保有数据返回
return result
# 查找包含主板相关键名的温度值 df_output = await self.coordinator.run_command(f"df -h {vol_list} 2>/dev/null")
candidates = [] if df_output:
for key, values in data.items(): result = self.parse_df_human_readable(df_output)
# 优先检查 SYSTIN 键 if result: # 确保有数据返回
if "systin" in key.lower(): return result
temp = self.extract_temp_from_systin(values)
if temp is not None: # 如果智能检测失败,回退到传统方法(仅在必要时)
return f"{temp:.1f} °C" self._debug_log("智能卷检测无结果,回退到传统检测方法")
if any(kw in key.lower() for kw in ["system", "motherboard", "mb", "board", "pch", "chipset", "sys", "baseboard", "systin"]): # 优先使用字节单位,但添加错误处理
for subkey, temp_value in values.items(): df_output = await self.coordinator.run_command("df -B 1 /vol* 2>/dev/null || true")
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower(): if df_output and "No such file or directory" not in df_output:
try: result = self.parse_df_bytes(df_output)
if isinstance(temp_value, (int, float)): if result:
candidates.append(temp_value) return result
self.logger.debug("Found mobo temp candidate in JSON: %s/%s = %.1f°C", key, subkey, temp_value)
except Exception: df_output = await self.coordinator.run_command("df -h /vol* 2>/dev/null || true")
pass if df_output and "No such file or directory" not in df_output:
result = self.parse_df_human_readable(df_output)
if result:
return result
# 最后的回退:尝试检测任何挂载的卷
mount_output = await self.coordinator.run_command("mount | grep '/vol' || true")
if mount_output:
vol_points = []
for line in mount_output.splitlines():
parts = line.split()
for part in parts:
if part.startswith('/vol') and part not in vol_points:
vol_points.append(part)
# 如果有候选值,取平均值 if vol_points:
if candidates: self._debug_log(f"从mount输出检测到卷: {vol_points}")
avg_temp = sum(candidates) / len(candidates) vol_list = " ".join(vol_points)
return f"{avg_temp:.1f} °C" df_output = await self.coordinator.run_command(f"df -h {vol_list} 2>/dev/null || true")
if df_output:
return self.parse_df_human_readable(df_output)
self._debug_log("所有存储卷检测方法都失败,返回空字典")
return {}
except Exception as e:
self._error_log(f"获取存储卷信息失败: {str(e)}")
return {}
async def check_active_volumes(self) -> list:
"""检查当前活跃的存储卷,避免唤醒休眠磁盘"""
try:
# 获取所有挂载点,这个操作不会访问磁盘内容
mount_output = await self.coordinator.run_command("mount | grep '/vol' 2>/dev/null || true")
if not mount_output:
self._debug_log("未找到任何/vol挂载点")
return []
active_vols = []
for line in mount_output.splitlines():
if '/vol' in line:
# 提取挂载点
parts = line.split()
mount_point = None
# 新增:尝试直接获取 SYSTIN 的温度值 # 查找挂载点(通常在 'on' 关键词之后)
systin_temp = self.extract_temp_from_systin(data.get("nct6798-isa-02a0", {}).get("SYSTIN", {}))
if systin_temp is not None:
return f"{systin_temp:.1f} °C"
except Exception as e:
self.logger.warning("Failed to parse sensors JSON: %s", str(e))
# 改进SYSTIN提取逻辑
systin_patterns = [
r'SYSTIN:\s*[+\-]?\s*(\d+\.?\d*)\s*°C', # 标准格式
r'SYSTIN[:\s]+[+\-]?\s*(\d+\.?\d*)\s*°C', # 兼容无冒号或多余空格
r'System Temp:\s*[+\-]?\s*(\d+\.?\d*)\s*°C' # 备选方案
]
for pattern in systin_patterns:
systin_match = re.search(pattern, sensors_output, re.IGNORECASE)
if systin_match:
try:
temp = float(systin_match.group(1))
self.logger.debug("Found SYSTIN temperature: %.1f°C", temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("SYSTIN match error: %s", str(e))
continue
for line in sensors_output.splitlines():
if 'SYSTIN' in line or 'System Temp' in line:
# 改进的温度值提取正则
match = re.search(r'[+\-]?\s*(\d+\.?\d*)\s*°C', line)
if match:
try: try:
temp = float(match.group(1)) on_index = parts.index('on')
self.logger.debug("Found mobo temp in line: %s: %.1f°C", line.strip(), temp) if on_index + 1 < len(parts):
return f"{temp:.1f} °C" candidate = parts[on_index + 1]
# 严格检查是否以/vol开头
if candidate.startswith('/vol'):
mount_point = candidate
except ValueError: except ValueError:
continue # 如果没有 'on' 关键词,查找以/vol开头的部分
for part in parts:
if part.startswith('/vol'):
# 如果找不到SYSTIN尝试其他主板温度模式 mount_point = part
other_patterns = [ break
r'System Temp:\s*\+?(\d+\.?\d*)°C',
r'MB Temperature:\s*\+?(\d+\.?\d*)°C',
r'Motherboard:\s*\+?(\d+\.?\d*)°C',
r'SYS Temp:\s*\+?(\d+\.?\d*)°C',
r'Board Temp:\s*\+?(\d+\.?\d*)°C',
r'PCH_Temp:\s*\+?(\d+\.?\d*)°C',
r'Chipset:\s*\+?(\d+\.?\d*)°C',
r'Baseboard Temp:\s*\+?(\d+\.?\d*)°C',
r'System Temperature:\s*\+?(\d+\.?\d*)°C',
r'Mainboard Temp:\s*\+?(\d+\.?\d*)°C'
]
temp_values = []
for pattern in other_patterns:
matches = re.finditer(pattern, sensors_output, re.IGNORECASE)
for match in matches:
try:
temp = float(match.group(1))
temp_values.append(temp)
self.logger.debug("Found motherboard temperature with pattern: %s: %.1f°C", pattern, temp)
except (ValueError, IndexError):
continue
# 如果有找到温度值,取平均值
if temp_values:
avg_temp = sum(temp_values) / len(temp_values)
return f"{avg_temp:.1f} °C"
# 最后,尝试手动扫描所有温度值
fallback_candidates = []
for line in sensors_output.splitlines():
if '°C' in line:
# 跳过CPU相关的行
if any(kw in line.lower() for kw in ["core", "cpu", "package", "tccd", "k10temp", "processor", "amd", "intel", "nvme"]):
continue
# 跳过风扇和电压行 # 过滤挂载点:只保留根级别的/vol*挂载点
if any(kw in line.lower() for kw in ["fan", "volt", "vin", "+3.3", "+5", "+12", "vdd", "power", "crit", "max", "min"]): if mount_point and self.is_root_vol_mount(mount_point):
continue # 检查这个卷对应的磁盘是否活跃
is_active = await self.is_volume_disk_active(mount_point)
# 查找温度值 if is_active:
match = re.search(r'(\d+\.?\d*)\s*°C', line) active_vols.append(mount_point)
if match: self._debug_log(f"添加活跃卷: {mount_point}")
try: else:
temp = float(match.group(1)) # 即使磁盘不活跃,也添加到列表中,但标记为可能休眠
# 合理温度范围检查 (0-80°C) # 这样可以保证有基本的存储信息
if 0 < temp < 80: active_vols.append(mount_point)
fallback_candidates.append(temp) self._debug_log(f"{mount_point} 对应磁盘可能休眠,但仍包含在检测中")
self.logger.debug("Fallback mobo candidate: %s -> %.1f°C", line.strip(), temp) else:
except ValueError: self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
continue
# 去重并排序
active_vols = sorted(list(set(active_vols)))
self._debug_log(f"最终检测到的根级别/vol存储卷: {active_vols}")
return active_vols
except Exception as e:
self._debug_log(f"检查活跃存储卷失败: {e}")
return []
def is_root_vol_mount(self, mount_point: str) -> bool:
"""检查是否为根级别的/vol挂载点"""
if not mount_point or not mount_point.startswith('/vol'):
return False
# 如果有候选值,取平均值 # 移除开头的/vol部分进行分析
if fallback_candidates: remainder = mount_point[4:] # 去掉'/vol'
avg_temp = sum(fallback_candidates) / len(fallback_candidates)
self.logger.warning("Using fallback motherboard temperature detection")
return f"{avg_temp:.1f} °C"
# self.logger.warning("No motherboard temperature found in sensors output") # 如果remainder为空说明是/vol这是根级别
return "未知" if not remainder:
return True
# 如果remainder只是数字如/vol1, /vol2这是根级别
if remainder.isdigit():
return True
# 如果remainder是单个字母或字母数字组合且没有斜杠也认为是根级别
# 例如:/vola, /volb, /vol1a 等
if '/' not in remainder and len(remainder) <= 3:
return True
# 其他情况都认为是子目录,如:
# /vol1/docker/overlay2/...
# /vol1/data/...
# /vol1/config/...
self._debug_log(f"检测到子目录挂载点: {mount_point}")
return False
def parse_df_bytes(self, df_output: str) -> dict:
"""解析df命令的字节输出"""
volumes = {}
try:
for line in df_output.splitlines()[1:]: # 跳过标题行
parts = line.split()
if len(parts) < 6:
continue
mount_point = parts[-1]
# 严格检查只处理根级别的 /vol 挂载点
if not self.is_root_vol_mount(mount_point):
self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
continue
try:
size_bytes = int(parts[1])
used_bytes = int(parts[2])
avail_bytes = int(parts[3])
use_percent = parts[4]
def bytes_to_human(b):
for unit in ['', 'K', 'M', 'G', 'T']:
if abs(b) < 1024.0:
return f"{b:.1f}{unit}"
b /= 1024.0
return f"{b:.1f}P"
volumes[mount_point] = {
"filesystem": parts[0],
"size": bytes_to_human(size_bytes),
"used": bytes_to_human(used_bytes),
"available": bytes_to_human(avail_bytes),
"use_percent": use_percent
}
self._debug_log(f"添加根级别/vol存储卷信息: {mount_point}")
except (ValueError, IndexError) as e:
self._debug_log(f"解析存储卷行失败: {line} - {str(e)}")
continue
except Exception as e:
self._error_log(f"解析df字节输出失败: {e}")
return volumes
def parse_df_human_readable(self, df_output: str) -> dict:
"""解析df命令输出"""
volumes = {}
try:
for line in df_output.splitlines()[1:]: # 跳过标题行
parts = line.split()
if len(parts) < 6:
continue
mount_point = parts[-1]
# 严格检查只处理根级别的 /vol 挂载点
if not self.is_root_vol_mount(mount_point):
self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
continue
try:
size = parts[1]
used = parts[2]
avail = parts[3]
use_percent = parts[4]
volumes[mount_point] = {
"filesystem": parts[0],
"size": size,
"used": used,
"available": avail,
"use_percent": use_percent
}
self._debug_log(f"添加根级别/vol存储卷信息: {mount_point}")
except (ValueError, IndexError) as e:
self._debug_log(f"解析存储卷行失败: {line} - {str(e)}")
continue
except Exception as e:
self._error_log(f"解析df输出失败: {e}")
return volumes
async def reboot_system(self): async def reboot_system(self):
"""重启系统""" """重启系统"""
self.logger.info("Initiating system reboot...") self._info_log("Initiating system reboot...")
try: try:
await self.coordinator.run_command("sudo reboot") await self.coordinator.run_command("sudo reboot")
self.logger.info("Reboot command sent") self._info_log("Reboot command sent")
# 更新系统状态为重启中
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "rebooting" self.coordinator.data["system"]["status"] = "rebooting"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to reboot system: %s", str(e)) self._error_log(f"Failed to reboot system: {str(e)}")
raise raise
async def shutdown_system(self): async def shutdown_system(self):
"""关闭系统""" """关闭系统"""
self.logger.info("Initiating system shutdown...") self._info_log("Initiating system shutdown...")
try: try:
await self.coordinator.run_command("sudo shutdown -h now") await self.coordinator.run_command("sudo shutdown -h now")
self.logger.info("Shutdown command sent") self._info_log("Shutdown command sent")
# 立即更新系统状态为关闭
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "off" self.coordinator.data["system"]["status"] = "off"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to shutdown system: %s", str(e)) self._error_log(f"Failed to shutdown system: {str(e)}")
raise raise

View File

@@ -11,9 +11,27 @@ class UPSManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("ups_manager") self.logger = _LOGGER.getChild("ups_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # UPS调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.ups_debug_path = "/config/fn_nas_ups_debug" # UPS调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.ups_debug_path = "/config/fn_nas_ups_debug"
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_ups_info(self) -> dict: async def get_ups_info(self) -> dict:
"""获取连接的UPS信息""" """获取连接的UPS信息"""
@@ -31,7 +49,7 @@ class UPSManager:
try: try:
# 尝试使用NUT工具获取UPS信息 # 尝试使用NUT工具获取UPS信息
self.logger.debug("尝试使用NUT工具获取UPS信息") self._debug_log("尝试使用NUT工具获取UPS信息")
output = await self.coordinator.run_command("upsc -l") output = await self.coordinator.run_command("upsc -l")
if output and "No such file" not in output: if output and "No such file" not in output:
@@ -39,11 +57,11 @@ class UPSManager:
ups_names = output.splitlines() ups_names = output.splitlines()
if ups_names: if ups_names:
ups_name = ups_names[0].strip() ups_name = ups_names[0].strip()
self.logger.debug("发现UPS: %s", ups_name) self._debug_log(f"发现UPS: {ups_name}")
# 获取详细的UPS信息 # 获取详细的UPS信息
ups_details = await self.coordinator.run_command(f"upsc {ups_name}") ups_details = await self.coordinator.run_command(f"upsc {ups_name}")
self.logger.debug("UPS详细信息: %s", ups_details) self._debug_log(f"UPS详细信息: {ups_details}")
# 保存UPS数据以便调试 # 保存UPS数据以便调试
self.save_ups_data_for_debug(ups_details) self.save_ups_data_for_debug(ups_details)
@@ -51,20 +69,20 @@ class UPSManager:
# 解析UPS信息 # 解析UPS信息
return self.parse_nut_ups_info(ups_details) return self.parse_nut_ups_info(ups_details)
else: else:
self.logger.debug("未找到连接的UPS") self._debug_log("未找到连接的UPS")
else: else:
self.logger.debug("未安装NUT工具尝试备用方法") self._debug_log("未安装NUT工具尝试备用方法")
# 备用方法尝试直接读取UPS状态 # 备用方法尝试直接读取UPS状态
return await self.get_ups_info_fallback() return await self.get_ups_info_fallback()
except Exception as e: except Exception as e:
self.logger.error("获取UPS信息时出错: %s", str(e), exc_info=True) self._error_log(f"获取UPS信息时出错: {str(e)}")
return ups_info return ups_info
async def get_ups_info_fallback(self) -> dict: async def get_ups_info_fallback(self) -> dict:
"""备用方法获取UPS信息""" """备用方法获取UPS信息"""
self.logger.info("尝试备用方法获取UPS信息") self._info_log("尝试备用方法获取UPS信息")
ups_info = { ups_info = {
"status": "未知", "status": "未知",
"battery_level": "未知", "battery_level": "未知",
@@ -81,7 +99,7 @@ class UPSManager:
# 方法1: 检查USB连接的UPS # 方法1: 检查USB连接的UPS
usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'") usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'")
if usb_ups_output and "No USB UPS" not in usb_ups_output: if usb_ups_output and "No USB UPS" not in usb_ups_output:
self.logger.debug("检测到USB UPS设备: %s", usb_ups_output) self._debug_log(f"检测到USB UPS设备: {usb_ups_output}")
ups_info["ups_type"] = "USB" ups_info["ups_type"] = "USB"
# 尝试从输出中提取型号 # 尝试从输出中提取型号
@@ -111,7 +129,7 @@ class UPSManager:
return ups_info return ups_info
except Exception as e: except Exception as e:
self.logger.error("备用方法获取UPS信息失败: %s", str(e)) self._error_log(f"备用方法获取UPS信息失败: {str(e)}")
return ups_info return ups_info
def parse_nut_ups_info(self, ups_output: str) -> dict: def parse_nut_ups_info(self, ups_output: str) -> dict:
@@ -253,6 +271,6 @@ class UPSManager:
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(ups_output) f.write(ups_output)
self.logger.info("保存UPS数据到 %s 用于调试", filename) self._info_log(f"保存UPS数据到 {filename} 用于调试")
except Exception as e: except Exception as e:
self.logger.error("保存UPS数据失败: %s", str(e)) self._error_log(f"保存UPS数据失败: {str(e)}")

View File

@@ -8,15 +8,40 @@ class VMManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.vms = [] self.vms = []
self.logger = _LOGGER.getChild("vm_manager")
# 根据Home Assistant的日志级别动态设置
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_vm_list(self): async def get_vm_list(self):
"""获取虚拟机列表及其状态""" """获取虚拟机列表及其状态"""
try: try:
self._debug_log("开始获取虚拟机列表")
output = await self.coordinator.run_command("virsh list --all") output = await self.coordinator.run_command("virsh list --all")
self._debug_log(f"virsh命令输出: {output}")
self.vms = self._parse_vm_list(output) self.vms = self._parse_vm_list(output)
self._info_log(f"获取到{len(self.vms)}个虚拟机")
return self.vms return self.vms
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机列表失败: %s", str(e)) self._error_log(f"获取虚拟机列表失败: {str(e)}")
return [] return []
def _parse_vm_list(self, output): def _parse_vm_list(self, output):
@@ -43,14 +68,18 @@ class VMManager:
async def get_vm_title(self, vm_name): async def get_vm_title(self, vm_name):
"""获取虚拟机的标题""" """获取虚拟机的标题"""
try: try:
self._debug_log(f"获取虚拟机{vm_name}的标题")
output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}") output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}")
# 在XML输出中查找<title>标签 # 在XML输出中查找<title>标签
match = re.search(r'<title>(.*?)</title>', output, re.DOTALL) match = re.search(r'<title>(.*?)</title>', output, re.DOTALL)
if match: if match:
return match.group(1).strip() title = match.group(1).strip()
self._debug_log(f"虚拟机{vm_name}标题: {title}")
return title
self._debug_log(f"虚拟机{vm_name}无标题,使用名称")
return vm_name # 如果没有标题,则返回虚拟机名称 return vm_name # 如果没有标题,则返回虚拟机名称
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机标题失败: %s", str(e)) self._error_log(f"获取虚拟机标题失败: {str(e)}")
return vm_name return vm_name
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
@@ -61,8 +90,10 @@ class VMManager:
command = f"virsh {action} {vm_name}" command = f"virsh {action} {vm_name}"
try: try:
self._info_log(f"执行虚拟机操作: {command}")
await self.coordinator.run_command(command) await self.coordinator.run_command(command)
self._info_log(f"虚拟机{vm_name}操作{action}成功")
return True return True
except Exception as e: except Exception as e:
_LOGGER.error("执行虚拟机操作失败: %s", str(e)) self._error_log(f"执行虚拟机操作失败: {str(e)}")
return False return False