3 Commits

Author SHA1 Message Date
xiaochao
17e3229b29 优化了SSH连接的管理,使用连接池来复用连接,减少连接建立的开销。
主板CPU温度获取优化,改为sensors方式获取。有主板CPU温度获取错误的情况提交issues,并带上sensors命令的输出日志,我会做适配。
2025-07-21 11:56:33 +08:00
xiaochao
11d1352b20 优化飞牛sshd进程数,可能获取实体会比较慢,需要等待一段时间集成才会显示实体 2025-07-12 15:47:37 +08:00
xiaochao
57d14b48f8 修改CPU和主板温度获取方式,可能不支持服务器级硬件和特别老的主板 2025-07-12 14:47:34 +08:00
5 changed files with 570 additions and 580 deletions

View File

@@ -1,6 +1,7 @@
import logging import logging
import asyncio import asyncio
import asyncssh import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -34,21 +35,16 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh = None self.ssh = None
self.ssh_closed = True self.ssh_closed = True
# SSH连接池管理
self.ssh_pool = []
self.ssh_pool_size = 3 # 连接池大小
self.ssh_pool_lock = asyncio.Lock()
self.ups_manager = UPSManager(self) self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self) self.vm_manager = VMManager(self)
self.use_sudo = False self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -65,96 +61,271 @@ class FlynasCoordinator(DataUpdateCoordinator):
self._system_online = False self._system_online = False
self._ping_task = None self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒) self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
async def async_connect(self): # 添加日志方法
if self.ssh is None or self.ssh_closed: self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
_LOGGER.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
_LOGGER.info(message)
def _warning_log(self, message: str):
"""警告日志"""
_LOGGER.warning(message)
def _error_log(self, message: str):
"""错误日志"""
_LOGGER.error(message)
async def get_ssh_connection(self):
"""从连接池获取可用的SSH连接"""
async with self.ssh_pool_lock:
# 检查现有连接
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
# 测试连接是否活跃
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True) # 标记为使用中
self._debug_log(f"复用连接池中的连接 {i}")
return ssh, i
except Exception:
# 连接失效,移除
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
# 如果连接池未满,创建新连接
if len(self.ssh_pool) < self.ssh_pool_size:
try:
ssh = await asyncssh.connect(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
# 检查并设置权限状态
await self._setup_connection_permissions(ssh)
connection_id = len(self.ssh_pool)
self.ssh_pool.append((ssh, True))
self._debug_log(f"创建新的SSH连接 {connection_id}")
return ssh, connection_id
except Exception as e:
self._debug_log(f"创建SSH连接失败: {e}")
raise
# 连接池满且所有连接都在使用中,等待可用连接
self._debug_log("所有连接都在使用中,等待可用连接...")
for _ in range(50): # 最多等待5秒
await asyncio.sleep(0.1)
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True)
self._debug_log(f"等待后获得连接 {i}")
return ssh, i
except Exception:
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
raise Exception("无法获取SSH连接")
async def _setup_connection_permissions(self, ssh):
"""为新连接设置权限状态"""
try:
# 检查是否为root用户
result = await ssh.run("id -u", timeout=3)
if result.stdout.strip() == "0":
self._debug_log("当前用户是 root")
self.use_sudo = False
return
# 尝试切换到root会话
if self.root_password:
try:
await ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami = await ssh.run("whoami")
if "root" in whoami.stdout:
self._info_log("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
return
except Exception:
pass
# 尝试使用登录密码sudo
try: try:
self.ssh = await asyncssh.connect( await ssh.run(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5 # 缩短连接超时时间
)
if await self.is_root_user():
_LOGGER.debug("当前用户是 root")
self.use_sudo = False
self.ssh_closed = False
return True
result = await self.ssh.run(
f"echo '{self.password}' | sudo -S -i", f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n", input=self.password + "\n",
timeout=5 timeout=5
) )
whoami = await ssh.run("whoami")
whoami_result = await self.ssh.run("whoami") if "root" in whoami.stdout:
if "root" in whoami_result.stdout: self._info_log("成功切换到 root 会话(使用登录密码)")
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False self.use_sudo = False
self.ssh_closed = False return
return True except Exception:
else: pass
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami_result = await self.ssh.run("whoami")
if "root" in whoami_result.stdout:
_LOGGER.info("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
self.ssh_closed = False
return True
else:
# 切换到 root 会话失败,将使用 sudo
self.use_sudo = True
else:
# 非 root 用户且未提供 root 密码,将使用 sudo
self.use_sudo = True
self.ssh_closed = False # 设置为使用sudo模式
_LOGGER.info("SSH 连接已建立到 %s", self.host) self.use_sudo = True
return True self._debug_log("设置为使用sudo模式")
except Exception as e:
self.ssh = None except Exception as e:
self.ssh_closed = True self._debug_log(f"设置连接权限失败: {e}")
_LOGGER.debug("连接失败: %s", str(e)) self.use_sudo = True
return False
return True async def release_ssh_connection(self, connection_id):
"""释放SSH连接回连接池"""
async with self.ssh_pool_lock:
if 0 <= connection_id < len(self.ssh_pool):
ssh, _ = self.ssh_pool[connection_id]
self.ssh_pool[connection_id] = (ssh, False) # 标记为可用
self._debug_log(f"释放SSH连接 {connection_id}")
async def is_root_user(self): async def close_all_ssh_connections(self):
"""关闭所有SSH连接"""
async with self.ssh_pool_lock:
for ssh, _ in self.ssh_pool:
try:
ssh.close()
except:
pass
self.ssh_pool.clear()
self._debug_log("已关闭所有SSH连接")
async def async_connect(self):
"""建立并保持持久SSH连接 - 兼容旧代码"""
try: try:
result = await self.ssh.run("id -u", timeout=3) ssh, connection_id = await self.get_ssh_connection()
return result.stdout.strip() == "0" await self.release_ssh_connection(connection_id)
return True
except Exception: except Exception:
return False return False
async def async_disconnect(self): async def async_disconnect(self):
if self.ssh is not None and not self.ssh_closed: """断开SSH连接 - 兼容旧代码"""
try: await self.close_all_ssh_connections()
self.ssh.close()
self.ssh_closed = True
_LOGGER.debug("SSH connection closed")
except Exception as e:
_LOGGER.debug("Error closing SSH connection: %s", str(e))
finally:
self.ssh = None
async def is_ssh_connected(self) -> bool: async def run_command(self, command: str, retries=2) -> str:
if self.ssh is None or self.ssh_closed: """执行SSH命令使用连接池"""
return False # 系统离线时直接返回空字符串
if not self._system_online:
return ""
ssh = None
connection_id = None
try: try:
test_command = "echo 'connection_test'" # 从连接池获取连接
result = await self.ssh.run(test_command, timeout=2) ssh, connection_id = await self.get_ssh_connection()
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError): # 构建完整命令
return False if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
# 释放连接回连接池
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def run_command_direct(self, command: str) -> str:
"""直接执行命令,获取独立连接 - 用于并发任务"""
if not self._system_online:
return ""
ssh = None
connection_id = None
try:
ssh, connection_id = await self.get_ssh_connection()
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"直接命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self._debug_log(f"启动系统状态监控,每{self._retry_interval}秒检测一次")
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self._info_log("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def ping_system(self) -> bool: async def ping_system(self) -> bool:
"""轻量级系统状态检测""" """轻量级系统状态检测"""
@@ -174,189 +345,81 @@ class FlynasCoordinator(DataUpdateCoordinator):
except Exception: except Exception:
return False return False
async def run_command(self, command: str, retries=2) -> str:
# 系统离线时直接返回空字符串,避免抛出异常
if not self._system_online:
return ""
for attempt in range(retries):
try:
if not await self.is_ssh_connected():
if not await self.async_connect():
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
return ""
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.debug("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except asyncssh.Error as e:
_LOGGER.debug("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.debug("Unexpected error: %s", str(e))
if attempt == retries - 1:
return ""
return ""
async def get_network_macs(self):
try:
output = await self.run_command("ip link show")
macs = {}
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE)
matches = pattern.findall(output)
for interface, mac in matches:
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e:
self.logger.debug("获取MAC地址失败: %s", str(e))
return {}
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self.logger.debug("启动系统状态监控,每%d秒检测一次", self._retry_interval)
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self.logger.info("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def _async_update_data(self): async def _async_update_data(self):
_LOGGER.debug("Starting data update...") """数据更新入口,优化命令执行频率"""
self._debug_log("开始数据更新...")
is_online = await self.ping_system() is_online = await self.ping_system()
self._system_online = is_online self._system_online = is_online
if not is_online: if not is_online:
_LOGGER.debug("系统离线,跳过数据更新") self._debug_log("系统离线,跳过数据更新")
# 修复:确保 self.data 结构有效 # 启动后台监控任务
if self.data is None or not isinstance(self.data, dict):
self.data = {}
if "system" not in self.data or not isinstance(self.data.get("system"), dict):
self.data["system"] = {}
self.data["system"]["status"] = "off"
# 启动后台监控任务(非阻塞)
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.async_disconnect() await self.close_all_ssh_connections()
# 直接返回空数据,不阻塞 return self.get_default_data()
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
# 系统在线处理 # 系统在线处理
try: try:
# 确保SSH连接 # 预热连接池并确保权限设置正确
if not await self.async_connect(): await self.async_connect()
self.data["system"]["status"] = "off"
return { # 获取系统状态信息
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
status = "on" status = "on"
disks = await self.disk_manager.get_disks_info() # 串行获取信息以确保稳定性
self._debug_log("开始获取系统信息...")
system = await self.system_manager.get_system_info() system = await self.system_manager.get_system_info()
ups_info = await self.ups_manager.get_ups_info() self._debug_log("系统信息获取完成")
vms = await self.vm_manager.get_vm_list()
self._debug_log("开始获取磁盘信息...")
disks = await self.disk_manager.get_disks_info()
self._debug_log(f"磁盘信息获取完成,数量: {len(disks)}")
self._debug_log("开始获取UPS信息...")
ups_info = await self.ups_manager.get_ups_info()
self._debug_log("UPS信息获取完成")
self._debug_log("开始获取虚拟机信息...")
vms = await self.vm_manager.get_vm_list()
self._debug_log(f"虚拟机信息获取完成,数量: {len(vms)}")
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) try:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
except Exception as e:
self._debug_log(f"获取VM标题失败 {vm['name']}: {e}")
vm["title"] = vm["name"]
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker and self.docker_manager:
docker_containers = await self.docker_manager.get_containers() self._debug_log("开始获取Docker信息...")
try:
docker_containers = await self.docker_manager.get_containers()
self._debug_log(f"Docker信息获取完成数量: {len(docker_containers)}")
except Exception as e:
self._debug_log(f"Docker信息获取失败: {e}")
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers
} }
self._debug_log(f"数据更新完成: disks={len(disks)}, vms={len(vms)}, containers={len(docker_containers)}")
return data return data
except Exception as e: except Exception as e:
_LOGGER.debug("数据更新失败: %s", str(e)) self._error_log(f"数据更新失败: {str(e)}")
# 检查错误类型,如果是连接问题,标记为离线
self._system_online = False self._system_online = False
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
return { return self.get_default_data()
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
async def reboot_system(self):
await self.system_manager.reboot_system()
async def shutdown_system(self):
await self.system_manager.shutdown_system()
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
self.async_update_listeners()
class UPSDataUpdateCoordinator(DataUpdateCoordinator): class UPSDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, main_coordinator): def __init__(self, hass: HomeAssistant, config, main_coordinator):
@@ -388,10 +451,7 @@ class UPSDataUpdateCoordinator(DataUpdateCoordinator):
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
try: try:
if not hasattr(self, 'vm_manager'): result = await self.main_coordinator.vm_manager.control_vm(vm_name, action)
self.vm_manager = VMManager(self)
result = await self.vm_manager.control_vm(vm_name, action)
return result return result
except Exception as e: except Exception as e:
_LOGGER.debug("虚拟机控制失败: %s", str(e)) _LOGGER.debug("虚拟机控制失败: %s", str(e))

View File

@@ -1,7 +1,7 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.4", "version": "1.3.6",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/anxms/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@anxms"],

View File

@@ -1,7 +1,5 @@
import re
import logging import logging
import asyncio import asyncio
import json
import os import os
from datetime import datetime from datetime import datetime
@@ -11,10 +9,41 @@ class SystemManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("system_manager") self.logger = _LOGGER.getChild("system_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # 调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.sensors_debug_path = "/config/fn_nas_debug" # 调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.sensors_debug_path = "/config/fn_nas_debug"
# 温度传感器缓存
self.cpu_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"driver_type": None,
"label": None
}
self.mobo_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"label": None
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_system_info(self) -> dict: async def get_system_info(self) -> dict:
"""获取系统信息""" """获取系统信息"""
system_info = {} system_info = {}
@@ -23,10 +52,8 @@ class SystemManager:
uptime_output = await self.coordinator.run_command("cat /proc/uptime") uptime_output = await self.coordinator.run_command("cat /proc/uptime")
if uptime_output: if uptime_output:
try: try:
# 保存原始秒数
uptime_seconds = float(uptime_output.split()[0]) uptime_seconds = float(uptime_output.split()[0])
system_info["uptime_seconds"] = uptime_seconds system_info["uptime_seconds"] = uptime_seconds
# 保存格式化字符串
system_info["uptime"] = self.format_uptime(uptime_seconds) system_info["uptime"] = self.format_uptime(uptime_seconds)
except (ValueError, IndexError): except (ValueError, IndexError):
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
@@ -34,43 +61,20 @@ class SystemManager:
else: else:
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
system_info["uptime"] = "未知" system_info["uptime"] = "未知"
# 获取 sensors 命令输出使用JSON格式 # 一次性获取CPU和主板温度
sensors_output = await self.coordinator.run_command( temps = await self.get_temperatures_from_sensors()
"sensors -j 2>/dev/null || sensors 2>/dev/null || echo 'No sensor data'" system_info["cpu_temperature"] = temps["cpu"]
) system_info["motherboard_temperature"] = temps["motherboard"]
# 保存传感器数据以便调试
self.save_sensor_data_for_debug(sensors_output)
self.logger.debug("Sensors output: %s", sensors_output[:500] + "..." if len(sensors_output) > 500 else sensors_output)
# 提取 CPU 温度(改进算法)
cpu_temp = self.extract_cpu_temp(sensors_output)
system_info["cpu_temperature"] = cpu_temp
# 提取主板温度(改进算法)
mobo_temp = self.extract_mobo_temp(sensors_output)
system_info["motherboard_temperature"] = mobo_temp
# 尝试备用方法获取CPU温度
if cpu_temp == "未知":
backup_cpu_temp = await self.get_cpu_temp_fallback()
if backup_cpu_temp:
system_info["cpu_temperature"] = backup_cpu_temp
# 新增:获取内存信息
mem_info = await self.get_memory_info() mem_info = await self.get_memory_info()
system_info.update(mem_info) system_info.update(mem_info)
# 新增:获取存储卷信息
vol_info = await self.get_vol_usage() vol_info = await self.get_vol_usage()
system_info["volumes"] = vol_info system_info["volumes"] = vol_info
return system_info return system_info
except Exception as e: except Exception as e:
self.logger.error("Error getting system info: %s", str(e)) self.logger.error("Error getting system info: %s", str(e))
# 在异常处理中返回空数据
return { return {
"uptime_seconds": 0, "uptime_seconds": 0,
"uptime": "未知", "uptime": "未知",
@@ -81,71 +85,165 @@ class SystemManager:
"memory_available": "未知", "memory_available": "未知",
"volumes": {} "volumes": {}
} }
def save_sensor_data_for_debug(self, sensors_output: str): async def get_temperatures_from_sensors(self) -> dict:
"""保存传感器数据以便调试""" """一次性获取CPU和主板温度"""
if not self.debug_enabled:
return
try: try:
# 创建调试目录 command = "sensors"
if not os.path.exists(self.sensors_debug_path): self._debug_log(f"执行sensors命令获取温度: {command}")
os.makedirs(self.sensors_debug_path)
# 生成文件名 sensors_output = await self.coordinator.run_command(command)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if self.debug_enabled:
filename = os.path.join(self.sensors_debug_path, f"sensors_{timestamp}.log") self._debug_log(f"sensors命令输出长度: {len(sensors_output) if sensors_output else 0}")
# 写入文件 if not sensors_output:
with open(filename, "w") as f: self._warning_log("sensors命令无输出")
f.write(sensors_output) return {"cpu": "未知", "motherboard": "未知"}
# 同时解析CPU和主板温度
cpu_temp = self.extract_cpu_temp_from_sensors(sensors_output)
mobo_temp = self.extract_mobo_temp_from_sensors(sensors_output)
# 记录获取结果
if cpu_temp != "未知":
self._info_log(f"通过sensors获取CPU温度成功: {cpu_temp}")
else:
self._warning_log("sensors命令未找到CPU温度")
if mobo_temp != "未知":
self._info_log(f"通过sensors获取主板温度成功: {mobo_temp}")
else:
self._warning_log("sensors命令未找到主板温度")
return {"cpu": cpu_temp, "motherboard": mobo_temp}
self.logger.info("Saved sensors output to %s for debugging", filename)
except Exception as e: except Exception as e:
self.logger.error("Failed to save sensor data: %s", str(e)) self._error_log(f"使用sensors命令获取温度失败: {e}")
return {"cpu": "未知", "motherboard": "未知"}
async def get_cpu_temp_fallback(self) -> str:
"""备用方法获取CPU温度""" async def get_cpu_temp_from_kernel(self) -> str:
self.logger.info("Trying fallback methods to get CPU temperature") """获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
# 方法1: 从/sys/class/thermal读取 return temps["cpu"]
async def get_mobo_temp_from_kernel(self) -> str:
"""获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
async def get_cpu_temp_from_sensors(self) -> str:
"""使用sensors命令获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["cpu"]
async def get_mobo_temp_from_sensors(self) -> str:
"""使用sensors命令获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
def extract_cpu_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取CPU温度"""
try: try:
for i in range(5): # 检查前5个可能的传感器 lines = sensors_output.split('\n')
path = f"/sys/class/thermal/thermal_zone{i}/temp" self._debug_log(f"解析sensors输出{len(lines)}")
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit(): for i, line in enumerate(lines):
temp = float(output) / 1000.0 line_lower = line.lower().strip()
self.logger.info("Found CPU temperature via thermal zone: %.1f°C", temp) if self.debug_enabled:
return f"{temp:.1f} °C" self._debug_log(f"{i+1}行: {line_lower}")
except Exception:
pass # AMD CPU温度关键词
if any(keyword in line_lower for keyword in [
# 方法2: 从hwmon设备读取 "tctl", "tdie", "k10temp"
]):
self._debug_log(f"找到AMD CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取AMD CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析AMD温度失败: {e}")
continue
# Intel CPU温度关键词
if any(keyword in line_lower for keyword in [
"package id", "core 0", "coretemp"
]) and not any(exclude in line_lower for exclude in ["fan"]):
self._debug_log(f"找到Intel CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取Intel CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析Intel温度失败: {e}")
continue
# 通用CPU温度模式
if ('cpu' in line_lower or 'processor' in line_lower) and '+' in line and '°c' in line_lower:
self._debug_log(f"找到通用CPU温度行: {line}")
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取通用CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析通用CPU温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到CPU温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors CPU温度输出失败: {e}")
return "未知"
def extract_mobo_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取主板温度"""
try: try:
for i in range(5): # 检查前5个可能的hwmon设备 lines = sensors_output.split('\n')
for j in range(5): # 检查每个设备的前5个温度传感器 self._debug_log(f"解析主板温度,共{len(lines)}")
path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null") for i, line in enumerate(lines):
if output and output.isdigit(): line_lower = line.lower().strip()
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via hwmon: %.1f°C", temp) # 主板温度关键词
return f"{temp:.1f} °C" if any(keyword in line_lower for keyword in [
except Exception: "motherboard", "mobo", "mb", "system", "chipset",
pass "ambient", "temp1:", "temp2:", "temp3:", "systin"
]) and not any(cpu_keyword in line_lower for cpu_keyword in [
# 方法3: 使用psutil库如果可用 "cpu", "core", "package", "processor", "tctl", "tdie"
try: ]) and not any(exclude in line_lower for exclude in ["fan", "rpm"]):
output = await self.coordinator.run_command("python3 -c 'import psutil; print(psutil.sensors_temperatures().get(\"coretemp\")[0].current)' 2>/dev/null")
if output and output.replace('.', '', 1).isdigit(): self._debug_log(f"找到可能的主板温度行: {line}")
temp = float(output)
self.logger.info("Found CPU temperature via psutil: %.1f°C", temp) if '+' in line and '°c' in line_lower:
return f"{temp:.1f} °C" try:
except Exception: temp_match = line.split('+')[1].split('°')[0].strip()
pass temp = float(temp_match)
# 主板温度通常在15-70度之间
self.logger.warning("All fallback methods failed to get CPU temperature") if 15 <= temp <= 70:
return "" self._info_log(f"从sensors提取主板温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
else:
self._debug_log(f"主板温度值超出合理范围: {temp:.1f}°C")
except (ValueError, IndexError) as e:
self._debug_log(f"解析主板温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到主板温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors主板温度输出失败: {e}")
return "未知"
def format_uptime(self, seconds: float) -> str: def format_uptime(self, seconds: float) -> str:
"""格式化运行时间为易读格式""" """格式化运行时间为易读格式"""
try: try:
@@ -166,223 +264,6 @@ class SystemManager:
self.logger.error("Failed to format uptime: %s", str(e)) self.logger.error("Failed to format uptime: %s", str(e))
return "未知" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取 CPU 温度,优先获取 Package id 0"""
# 优先尝试获取 Package id 0 温度值
package_id_pattern = r'Package id 0:\s*\+?(\d+\.?\d*)°C'
package_match = re.search(package_id_pattern, sensors_output, re.IGNORECASE)
if package_match:
try:
package_temp = float(package_match.group(1))
self.logger.debug("优先使用 Package id 0 温度: %.1f°C", package_temp)
return f"{package_temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("Package id 0 解析错误: %s", str(e))
# 其次尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
self.logger.debug("JSON sensors data: %s", json.dumps(data, indent=2))
# 查找包含Package相关键名的温度值
for key, values in data.items():
if any(kw in key.lower() for kw in ["package", "pkg", "physical"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Package温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception as e:
self.logger.debug("JSON值错误: %s", str(e))
# 新增尝试直接获取Tdie/Tctl温度AMD CPU
for key, values in data.items():
if "k10temp" in key.lower():
for subkey, temp_value in values.items():
if "tdie" in subkey.lower() or "tctl" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Tdie/Tctl温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except:
pass
except Exception as e:
self.logger.warning("JSON解析失败: %s", str(e))
# 最后尝试其他模式
other_patterns = [
r'Package id 0:\s*\+?(\d+\.?\d*)°C', # 再次尝试确保捕获
r'CPU Temperature:\s*\+?(\d+\.?\d*)°C',
r'cpu_thermal:\s*\+?(\d+\.?\d*)°C',
r'Tdie:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'Tctl:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'PECI Agent \d:\s*\+?(\d+\.?\d*)°C',
r'Composite:\s*\+?(\d+\.?\d*)°C',
r'CPU\s+Temp:\s*\+?(\d+\.?\d*)°C',
r'k10temp-pci\S*:\s*\+?(\d+\.?\d*)°C',
r'Physical id 0:\s*\+?(\d+\.?\d*)°C'
]
for pattern in other_patterns:
match = re.search(pattern, sensors_output, re.IGNORECASE)
if match:
try:
temp = float(match.group(1))
self.logger.debug("匹配到CPU温度: %s: %.1f°C", pattern, temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError):
continue
# 如果所有方法都失败返回未知
return "未知"
def extract_temp_from_systin(self, systin_data: dict) -> float:
"""从 SYSTIN 数据结构中提取温度值"""
if not systin_data:
return None
# 尝试从不同键名获取温度值
for key in ["temp1_input", "input", "value"]:
temp = systin_data.get(key)
if temp is not None:
try:
return float(temp)
except (TypeError, ValueError):
continue
return None
def extract_mobo_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取主板温度"""
# 首先尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
# 查找包含主板相关键名的温度值
candidates = []
for key, values in data.items():
# 优先检查 SYSTIN 键
if "systin" in key.lower():
temp = self.extract_temp_from_systin(values)
if temp is not None:
return f"{temp:.1f} °C"
if any(kw in key.lower() for kw in ["system", "motherboard", "mb", "board", "pch", "chipset", "sys", "baseboard", "systin"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
candidates.append(temp_value)
self.logger.debug("Found mobo temp candidate in JSON: %s/%s = %.1f°C", key, subkey, temp_value)
except Exception:
pass
# 如果有候选值,取平均值
if candidates:
avg_temp = sum(candidates) / len(candidates)
return f"{avg_temp:.1f} °C"
# 新增:尝试直接获取 SYSTIN 的温度值
systin_temp = self.extract_temp_from_systin(data.get("nct6798-isa-02a0", {}).get("SYSTIN", {}))
if systin_temp is not None:
return f"{systin_temp:.1f} °C"
except Exception as e:
self.logger.warning("Failed to parse sensors JSON: %s", str(e))
# 改进SYSTIN提取逻辑
systin_patterns = [
r'SYSTIN:\s*[+\-]?\s*(\d+\.?\d*)\s*°C', # 标准格式
r'SYSTIN[:\s]+[+\-]?\s*(\d+\.?\d*)\s*°C', # 兼容无冒号或多余空格
r'System Temp:\s*[+\-]?\s*(\d+\.?\d*)\s*°C' # 备选方案
]
for pattern in systin_patterns:
systin_match = re.search(pattern, sensors_output, re.IGNORECASE)
if systin_match:
try:
temp = float(systin_match.group(1))
self.logger.debug("Found SYSTIN temperature: %.1f°C", temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("SYSTIN match error: %s", str(e))
continue
for line in sensors_output.splitlines():
if 'SYSTIN' in line or 'System Temp' in line:
# 改进的温度值提取正则
match = re.search(r'[+\-]?\s*(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
self.logger.debug("Found mobo temp in line: %s: %.1f°C", line.strip(), temp)
return f"{temp:.1f} °C"
except ValueError:
continue
# 如果找不到SYSTIN尝试其他主板温度模式
other_patterns = [
r'System Temp:\s*\+?(\d+\.?\d*)°C',
r'MB Temperature:\s*\+?(\d+\.?\d*)°C',
r'Motherboard:\s*\+?(\d+\.?\d*)°C',
r'SYS Temp:\s*\+?(\d+\.?\d*)°C',
r'Board Temp:\s*\+?(\d+\.?\d*)°C',
r'PCH_Temp:\s*\+?(\d+\.?\d*)°C',
r'Chipset:\s*\+?(\d+\.?\d*)°C',
r'Baseboard Temp:\s*\+?(\d+\.?\d*)°C',
r'System Temperature:\s*\+?(\d+\.?\d*)°C',
r'Mainboard Temp:\s*\+?(\d+\.?\d*)°C'
]
temp_values = []
for pattern in other_patterns:
matches = re.finditer(pattern, sensors_output, re.IGNORECASE)
for match in matches:
try:
temp = float(match.group(1))
temp_values.append(temp)
self.logger.debug("Found motherboard temperature with pattern: %s: %.1f°C", pattern, temp)
except (ValueError, IndexError):
continue
# 如果有找到温度值,取平均值
if temp_values:
avg_temp = sum(temp_values) / len(temp_values)
return f"{avg_temp:.1f} °C"
# 最后,尝试手动扫描所有温度值
fallback_candidates = []
for line in sensors_output.splitlines():
if '°C' in line:
# 跳过CPU相关的行
if any(kw in line.lower() for kw in ["core", "cpu", "package", "tccd", "k10temp", "processor", "amd", "intel", "nvme"]):
continue
# 跳过风扇和电压行
if any(kw in line.lower() for kw in ["fan", "volt", "vin", "+3.3", "+5", "+12", "vdd", "power", "crit", "max", "min"]):
continue
# 查找温度值
match = re.search(r'(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
# 合理温度范围检查 (0-80°C)
if 0 < temp < 80:
fallback_candidates.append(temp)
self.logger.debug("Fallback mobo candidate: %s -> %.1f°C", line.strip(), temp)
except ValueError:
continue
# 如果有候选值,取平均值
if fallback_candidates:
avg_temp = sum(fallback_candidates) / len(fallback_candidates)
self.logger.warning("Using fallback motherboard temperature detection")
return f"{avg_temp:.1f} °C"
return "未知"
async def get_memory_info(self) -> dict: async def get_memory_info(self) -> dict:
"""获取内存使用信息""" """获取内存使用信息"""
try: try:
@@ -408,7 +289,7 @@ class SystemManager:
} }
except Exception as e: except Exception as e:
self.logger.error("获取内存信息失败: %s", str(e)) self._error_log(f"获取内存信息失败: {str(e)}")
return {} return {}
async def get_vol_usage(self) -> dict: async def get_vol_usage(self) -> dict:
@@ -498,28 +379,28 @@ class SystemManager:
async def reboot_system(self): async def reboot_system(self):
"""重启系统""" """重启系统"""
self.logger.info("Initiating system reboot...") self._info_log("Initiating system reboot...")
try: try:
await self.coordinator.run_command("sudo reboot") await self.coordinator.run_command("sudo reboot")
self.logger.info("Reboot command sent") self._info_log("Reboot command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "rebooting" self.coordinator.data["system"]["status"] = "rebooting"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to reboot system: %s", str(e)) self._error_log(f"Failed to reboot system: {str(e)}")
raise raise
async def shutdown_system(self): async def shutdown_system(self):
"""关闭系统""" """关闭系统"""
self.logger.info("Initiating system shutdown...") self._info_log("Initiating system shutdown...")
try: try:
await self.coordinator.run_command("sudo shutdown -h now") await self.coordinator.run_command("sudo shutdown -h now")
self.logger.info("Shutdown command sent") self._info_log("Shutdown command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "off" self.coordinator.data["system"]["status"] = "off"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to shutdown system: %s", str(e)) self._error_log(f"Failed to shutdown system: {str(e)}")
raise raise

View File

@@ -11,9 +11,27 @@ class UPSManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("ups_manager") self.logger = _LOGGER.getChild("ups_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # UPS调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.ups_debug_path = "/config/fn_nas_ups_debug" # UPS调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.ups_debug_path = "/config/fn_nas_ups_debug"
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_ups_info(self) -> dict: async def get_ups_info(self) -> dict:
"""获取连接的UPS信息""" """获取连接的UPS信息"""
@@ -31,7 +49,7 @@ class UPSManager:
try: try:
# 尝试使用NUT工具获取UPS信息 # 尝试使用NUT工具获取UPS信息
self.logger.debug("尝试使用NUT工具获取UPS信息") self._debug_log("尝试使用NUT工具获取UPS信息")
output = await self.coordinator.run_command("upsc -l") output = await self.coordinator.run_command("upsc -l")
if output and "No such file" not in output: if output and "No such file" not in output:
@@ -39,11 +57,11 @@ class UPSManager:
ups_names = output.splitlines() ups_names = output.splitlines()
if ups_names: if ups_names:
ups_name = ups_names[0].strip() ups_name = ups_names[0].strip()
self.logger.debug("发现UPS: %s", ups_name) self._debug_log(f"发现UPS: {ups_name}")
# 获取详细的UPS信息 # 获取详细的UPS信息
ups_details = await self.coordinator.run_command(f"upsc {ups_name}") ups_details = await self.coordinator.run_command(f"upsc {ups_name}")
self.logger.debug("UPS详细信息: %s", ups_details) self._debug_log(f"UPS详细信息: {ups_details}")
# 保存UPS数据以便调试 # 保存UPS数据以便调试
self.save_ups_data_for_debug(ups_details) self.save_ups_data_for_debug(ups_details)
@@ -51,20 +69,20 @@ class UPSManager:
# 解析UPS信息 # 解析UPS信息
return self.parse_nut_ups_info(ups_details) return self.parse_nut_ups_info(ups_details)
else: else:
self.logger.debug("未找到连接的UPS") self._debug_log("未找到连接的UPS")
else: else:
self.logger.debug("未安装NUT工具尝试备用方法") self._debug_log("未安装NUT工具尝试备用方法")
# 备用方法尝试直接读取UPS状态 # 备用方法尝试直接读取UPS状态
return await self.get_ups_info_fallback() return await self.get_ups_info_fallback()
except Exception as e: except Exception as e:
self.logger.error("获取UPS信息时出错: %s", str(e), exc_info=True) self._error_log(f"获取UPS信息时出错: {str(e)}")
return ups_info return ups_info
async def get_ups_info_fallback(self) -> dict: async def get_ups_info_fallback(self) -> dict:
"""备用方法获取UPS信息""" """备用方法获取UPS信息"""
self.logger.info("尝试备用方法获取UPS信息") self._info_log("尝试备用方法获取UPS信息")
ups_info = { ups_info = {
"status": "未知", "status": "未知",
"battery_level": "未知", "battery_level": "未知",
@@ -81,7 +99,7 @@ class UPSManager:
# 方法1: 检查USB连接的UPS # 方法1: 检查USB连接的UPS
usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'") usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'")
if usb_ups_output and "No USB UPS" not in usb_ups_output: if usb_ups_output and "No USB UPS" not in usb_ups_output:
self.logger.debug("检测到USB UPS设备: %s", usb_ups_output) self._debug_log(f"检测到USB UPS设备: {usb_ups_output}")
ups_info["ups_type"] = "USB" ups_info["ups_type"] = "USB"
# 尝试从输出中提取型号 # 尝试从输出中提取型号
@@ -111,7 +129,7 @@ class UPSManager:
return ups_info return ups_info
except Exception as e: except Exception as e:
self.logger.error("备用方法获取UPS信息失败: %s", str(e)) self._error_log(f"备用方法获取UPS信息失败: {str(e)}")
return ups_info return ups_info
def parse_nut_ups_info(self, ups_output: str) -> dict: def parse_nut_ups_info(self, ups_output: str) -> dict:
@@ -253,6 +271,6 @@ class UPSManager:
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(ups_output) f.write(ups_output)
self.logger.info("保存UPS数据到 %s 用于调试", filename) self._info_log(f"保存UPS数据到 {filename} 用于调试")
except Exception as e: except Exception as e:
self.logger.error("保存UPS数据失败: %s", str(e)) self._error_log(f"保存UPS数据失败: {str(e)}")

View File

@@ -8,15 +8,40 @@ class VMManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.vms = [] self.vms = []
self.logger = _LOGGER.getChild("vm_manager")
# 根据Home Assistant的日志级别动态设置
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_vm_list(self): async def get_vm_list(self):
"""获取虚拟机列表及其状态""" """获取虚拟机列表及其状态"""
try: try:
self._debug_log("开始获取虚拟机列表")
output = await self.coordinator.run_command("virsh list --all") output = await self.coordinator.run_command("virsh list --all")
self._debug_log(f"virsh命令输出: {output}")
self.vms = self._parse_vm_list(output) self.vms = self._parse_vm_list(output)
self._info_log(f"获取到{len(self.vms)}个虚拟机")
return self.vms return self.vms
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机列表失败: %s", str(e)) self._error_log(f"获取虚拟机列表失败: {str(e)}")
return [] return []
def _parse_vm_list(self, output): def _parse_vm_list(self, output):
@@ -43,14 +68,18 @@ class VMManager:
async def get_vm_title(self, vm_name): async def get_vm_title(self, vm_name):
"""获取虚拟机的标题""" """获取虚拟机的标题"""
try: try:
self._debug_log(f"获取虚拟机{vm_name}的标题")
output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}") output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}")
# 在XML输出中查找<title>标签 # 在XML输出中查找<title>标签
match = re.search(r'<title>(.*?)</title>', output, re.DOTALL) match = re.search(r'<title>(.*?)</title>', output, re.DOTALL)
if match: if match:
return match.group(1).strip() title = match.group(1).strip()
self._debug_log(f"虚拟机{vm_name}标题: {title}")
return title
self._debug_log(f"虚拟机{vm_name}无标题,使用名称")
return vm_name # 如果没有标题,则返回虚拟机名称 return vm_name # 如果没有标题,则返回虚拟机名称
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机标题失败: %s", str(e)) self._error_log(f"获取虚拟机标题失败: {str(e)}")
return vm_name return vm_name
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
@@ -61,8 +90,10 @@ class VMManager:
command = f"virsh {action} {vm_name}" command = f"virsh {action} {vm_name}"
try: try:
self._info_log(f"执行虚拟机操作: {command}")
await self.coordinator.run_command(command) await self.coordinator.run_command(command)
self._info_log(f"虚拟机{vm_name}操作{action}成功")
return True return True
except Exception as e: except Exception as e:
_LOGGER.error("执行虚拟机操作失败: %s", str(e)) self._error_log(f"执行虚拟机操作失败: {str(e)}")
return False return False