16 Commits

Author SHA1 Message Date
xiaochao
035a7e7413 修改版本号 2025-12-04 17:50:50 +08:00
xiaochao
41d4e4cc0d modified: custom_components/fn_nas/button.py
modified:   custom_components/fn_nas/const.py
	modified:   custom_components/fn_nas/coordinator.py
	modified:   custom_components/fn_nas/disk_manager.py
	modified:   custom_components/fn_nas/sensor.py
增加ZFS存储池一致性检查控制
2025-12-04 17:47:00 +08:00
xiaochao
fcb46f429c 将硬盘健康状态改为二元传感器用于警报通知 2025-10-17 18:06:53 +08:00
xiaochao
4ae0b74e78 modified: custom_components/fn_nas/disk_manager.py
modified:   custom_components/fn_nas/manifest.json
	modified:   custom_components/fn_nas/sensor.py

优化硬盘检测逻辑
2025-10-16 18:32:42 +08:00
xiaochao
fd079c4ddf modified: README.md 2025-10-13 15:38:52 +08:00
xiaochao
9e799c8948 修改github地址 2025-10-13 15:31:49 +08:00
xiaochao
31ffc24e6a 飞牛nas系统
1、修复主板温度显示未知问题
虚拟机
1、增加强制关机按钮
2025-10-13 14:54:03 +08:00
xiaochao
905bbf9c96 修复nvme硬盘通电时间和总容量显示未知的问题 2025-10-10 19:15:43 +08:00
xiaochao
0f691e956f 修复可用空间不可用问题 2025-07-28 14:37:28 +08:00
xiaochao
25348fff9b 优化硬盘检测逻辑避免唤醒休眠硬盘 2025-07-28 14:10:23 +08:00
xiaochao
30b1b7d271 修复ha中关闭飞牛系统电脑报错问题 2025-07-28 13:50:09 +08:00
xiaochao
17e3229b29 优化了SSH连接的管理,使用连接池来复用连接,减少连接建立的开销。
主板CPU温度获取优化,改为sensors方式获取。有主板CPU温度获取错误的情况提交issues,并带上sensors命令的输出日志,我会做适配。
2025-07-21 11:56:33 +08:00
xiaochao
11d1352b20 优化飞牛sshd进程数,可能获取实体会比较慢,需要等待一段时间集成才会显示实体 2025-07-12 15:47:37 +08:00
xiaochao
57d14b48f8 修改CPU和主板温度获取方式,可能不支持服务器级硬件和特别老的主板 2025-07-12 14:47:34 +08:00
xiaochao
fae53cf5b9 修复飞牛nas关机后重启home assistant后,飞牛nas开机后部分实体显示不可用的问题
去除ssh连接数限制和缓存清理时间
2025-07-12 01:18:26 +08:00
xiaochao
f185b7e3ee 修复飞牛nas在关机到开机时,home assistant实体状态无法更新的问题 2025-07-11 23:45:05 +08:00
12 changed files with 2036 additions and 756 deletions

View File

@@ -33,7 +33,7 @@
1. 进入**HACS商店** 1. 进入**HACS商店**
2. 添加自定义存储库: 2. 添加自定义存储库:
```shell ```shell
https://github.com/anxms/fn_nas https://github.com/xiaochao99/fn_nas
``` ```
3. 搜索"飞牛NAS",点击下载 3. 搜索"飞牛NAS",点击下载
4. **重启Home Assistant服务** 4. **重启Home Assistant服务**

View File

@@ -1,62 +1,89 @@
import logging import logging
import asyncio
import asyncssh
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from .const import DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER # 导入新增常量 from homeassistant.helpers import config_validation as cv
from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER,
CONF_HOST, DEFAULT_PORT
)
from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
config = {**entry.data, **entry.options} config = {**entry.data, **entry.options}
coordinator = FlynasCoordinator(hass, config, entry)
coordinator = FlynasCoordinator(hass, config) # 直接初始化不阻塞等待NAS上线
await coordinator.async_config_entry_first_refresh()
_LOGGER.debug("协调器类型: %s", type(coordinator).__name__)
_LOGGER.debug("协调器是否有control_vm方法: %s", hasattr(coordinator, 'control_vm'))
_LOGGER.debug("协调器是否有vm_manager属性: %s", hasattr(coordinator, 'vm_manager'))
# 检查是否启用Docker并初始化Docker管理器如果有
enable_docker = config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
# 导入Docker管理器并初始化
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = { hass.data[DOMAIN][entry.entry_id] = {
DATA_UPDATE_COORDINATOR: coordinator, DATA_UPDATE_COORDINATOR: coordinator,
"ups_coordinator": ups_coordinator, "ups_coordinator": None,
CONF_ENABLE_DOCKER: enable_docker # 存储启用状态 CONF_ENABLE_DOCKER: coordinator.config.get(CONF_ENABLE_DOCKER, False)
} }
# 异步后台初始化
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) hass.async_create_task(async_delayed_setup(hass, entry, coordinator))
entry.async_on_unload(entry.add_update_listener(async_update_entry))
return True return True
async def async_delayed_setup(hass: HomeAssistant, entry: ConfigEntry, coordinator: FlynasCoordinator):
try:
# 不阻塞等待NAS上线直接尝试刷新数据
await coordinator.async_config_entry_first_refresh()
enable_docker = coordinator.config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, coordinator.config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id]["ups_coordinator"] = ups_coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_entry))
_LOGGER.info("飞牛NAS集成初始化完成")
except Exception as e:
_LOGGER.error("飞牛NAS集成初始化失败: %s", str(e))
await coordinator.async_disconnect()
if hasattr(coordinator, '_ping_task') and coordinator._ping_task:
coordinator._ping_task.cancel()
async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry):
await hass.config_entries.async_reload(entry.entry_id) """更新配置项"""
# 卸载现有集成
await async_unload_entry(hass, entry)
# 重新加载集成
await async_setup_entry(hass, entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) """卸载集成"""
# 获取集成数据
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {})
unload_ok = True
if unload_ok: if DATA_UPDATE_COORDINATOR in domain_data:
domain_data = hass.data[DOMAIN][entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR] coordinator = domain_data[DATA_UPDATE_COORDINATOR]
ups_coordinator = domain_data["ups_coordinator"] ups_coordinator = domain_data.get("ups_coordinator")
# 关闭主协调器的SSH连接 # 卸载平台
await coordinator.async_disconnect() unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# 关闭UPS协调器
await ups_coordinator.async_shutdown() if unload_ok:
# 关闭主协调器的SSH连接
await coordinator.async_disconnect()
# 关闭UPS协调器如果存在
if ups_coordinator:
await ups_coordinator.async_shutdown()
# 取消监控任务(如果存在)
if hasattr(coordinator, '_ping_task') and coordinator._ping_task and not coordinator._ping_task.done():
coordinator._ping_task.cancel()
# 从DOMAIN中移除该entry的数据
hass.data[DOMAIN].pop(entry.entry_id, None)
# 从DOMAIN中移除该entry的数据
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok return unload_ok

View File

@@ -0,0 +1,72 @@
import logging
from homeassistant.components.binary_sensor import BinarySensorEntity, BinarySensorDeviceClass
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN, HDD_HEALTH, DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, config_entry, async_add_entities):
domain_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR]
entities = []
existing_ids = set()
# 添加硬盘健康状态二元传感器
for disk in coordinator.data.get("disks", []):
health_uid = f"{config_entry.entry_id}_{disk['device']}_health_binary"
if health_uid not in existing_ids:
entities.append(
DiskHealthBinarySensor(
coordinator,
disk["device"],
f"硬盘 {disk.get('model', '未知')} 健康状态",
health_uid,
disk
)
)
existing_ids.add(health_uid)
async_add_entities(entities)
class DiskHealthBinarySensor(CoordinatorEntity, BinarySensorEntity):
def __init__(self, coordinator, device_id, name, unique_id, disk_info):
super().__init__(coordinator)
self.device_id = device_id
self._attr_name = name
self._attr_unique_id = unique_id
self.disk_info = disk_info
self._attr_device_info = {
"identifiers": {(DOMAIN, f"disk_{device_id}")},
"name": disk_info.get("model", "未知硬盘"),
"manufacturer": "硬盘设备",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_device_class = BinarySensorDeviceClass.PROBLEM
@property
def is_on(self):
"""返回True表示有问题False表示正常"""
for disk in self.coordinator.data.get("disks", []):
if disk["device"] == self.device_id:
health = disk.get("health", "未知")
# 将健康状态映射为二元状态
if health in ["正常", "良好", "OK", "ok", "good", "Good"]:
return False # 正常状态
elif health in ["警告", "异常", "错误", "warning", "Warning", "error", "Error", "bad", "Bad"]:
return True # 有问题状态
else:
# 未知状态也视为有问题
return True
return True # 默认视为有问题
@property
def icon(self):
"""根据状态返回图标"""
if self.is_on:
return "mdi:alert-circle" # 有问题时显示警告图标
else:
return "mdi:check-circle" # 正常时显示对勾图标

View File

@@ -3,7 +3,7 @@ from homeassistant.components.button import ButtonEntity
from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import ( from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, DEVICE_ID_NAS, CONF_ENABLE_DOCKER DOMAIN, DATA_UPDATE_COORDINATOR, DEVICE_ID_NAS, CONF_ENABLE_DOCKER, DEVICE_ID_ZFS
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -18,7 +18,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
# 1. 添加NAS重启按钮 # 1. 添加NAS重启按钮
entities.append(RebootButton(coordinator, config_entry.entry_id)) entities.append(RebootButton(coordinator, config_entry.entry_id))
# 2. 添加虚拟机重启按钮 # 2. 添加虚拟机重启按钮和强制关机按钮
if "vms" in coordinator.data: if "vms" in coordinator.data:
for vm in coordinator.data["vms"]: for vm in coordinator.data["vms"]:
entities.append( entities.append(
@@ -29,6 +29,14 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
config_entry.entry_id config_entry.entry_id
) )
) )
entities.append(
VMDestroyButton(
coordinator,
vm["name"],
vm.get("title", vm["name"]),
config_entry.entry_id
)
)
# 3. 添加Docker容器重启按钮如果启用了Docker功能 # 3. 添加Docker容器重启按钮如果启用了Docker功能
if enable_docker and "docker_containers" in coordinator.data: if enable_docker and "docker_containers" in coordinator.data:
@@ -44,6 +52,19 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
) )
# 4. 添加ZFS存储池scrub按钮
if "zpools" in coordinator.data:
for zpool in coordinator.data["zpools"]:
safe_name = zpool["name"].replace(" ", "_").replace("/", "_").replace(".", "_")
entities.append(
ZpoolScrubButton(
coordinator,
zpool["name"],
safe_name,
config_entry.entry_id
)
)
async_add_entities(entities) async_add_entities(entities)
class RebootButton(CoordinatorEntity, ButtonEntity): class RebootButton(CoordinatorEntity, ButtonEntity):
@@ -163,3 +184,96 @@ class DockerContainerRestartButton(CoordinatorEntity, ButtonEntity):
"操作类型": "重启容器", "操作类型": "重启容器",
"提示": "重启操作可能需要一些时间完成" "提示": "重启操作可能需要一些时间完成"
} }
class VMDestroyButton(CoordinatorEntity, ButtonEntity):
def __init__(self, coordinator, vm_name, vm_title, entry_id):
super().__init__(coordinator)
self.vm_name = vm_name
self.vm_title = vm_title
self._attr_name = f"{vm_title} 强制关机"
self._attr_unique_id = f"{entry_id}_flynas_vm_{vm_name}_destroy"
self._attr_device_info = {
"identifiers": {(DOMAIN, f"vm_{vm_name}")},
"name": vm_title,
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_icon = "mdi:power-off" # 使用关机图标
self.vm_manager = coordinator.vm_manager if hasattr(coordinator, 'vm_manager') else None
async def async_press(self):
"""强制关机虚拟机"""
if not self.vm_manager:
_LOGGER.error("vm_manager不可用无法强制关机虚拟机 %s", self.vm_name)
return
try:
success = await self.vm_manager.control_vm(self.vm_name, "destroy")
if success:
# 更新状态为"强制关机中"
for vm in self.coordinator.data["vms"]:
if vm["name"] == self.vm_name:
vm["state"] = "destroying"
self.async_write_ha_state()
# 在下次更新时恢复实际状态
self.coordinator.async_add_listener(self.async_write_ha_state)
except Exception as e:
_LOGGER.error("强制关机虚拟机时出错: %s", str(e), exc_info=True)
@property
def extra_state_attributes(self):
return {
"虚拟机名称": self.vm_name,
"操作类型": "强制关机",
"警告": "此操作会强制关闭虚拟机,可能导致数据丢失",
"提示": "仅在虚拟机无法正常关机时使用此功能"
}
class ZpoolScrubButton(CoordinatorEntity, ButtonEntity):
def __init__(self, coordinator, zpool_name, safe_name, entry_id):
super().__init__(coordinator)
self.zpool_name = zpool_name
self.safe_name = safe_name
self._attr_name = f"ZFS {zpool_name} 数据检查"
self._attr_unique_id = f"{entry_id}_zpool_{safe_name}_scrub"
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_icon = "mdi:harddisk-check"
@property
def available(self):
"""检查按钮是否可用当scrub进行中时不可点击"""
scrub_status = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
return not scrub_status.get("scrub_in_progress", False)
async def async_press(self):
"""执行ZFS存储池数据一致性检查"""
try:
# 检查是否已经有scrub在进行中
scrub_status = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
if scrub_status.get("scrub_in_progress", False):
self.coordinator.logger.warning(f"ZFS存储池 {self.zpool_name} 已在进行数据一致性检查")
return
success = await self.coordinator.scrub_zpool(self.zpool_name)
if success:
self.coordinator.logger.info(f"ZFS存储池 {self.zpool_name} 数据一致性检查启动成功")
# 立即刷新状态以更新按钮状态
await self.coordinator.async_request_refresh()
else:
self.coordinator.logger.error(f"ZFS存储池 {self.zpool_name} 数据一致性检查启动失败")
except Exception as e:
self.coordinator.logger.error(f"启动ZFS存储池 {self.zpool_name} 数据一致性检查时出错: {str(e)}", exc_info=True)
@property
def extra_state_attributes(self):
return {
"存储池名称": self.zpool_name,
"操作类型": "数据一致性检查",
"说明": "对ZFS存储池执行数据完整性和一致性验证",
"提示": "此操作可能需要较长时间完成,建议在低峰期执行"
}

View File

@@ -3,6 +3,7 @@ from homeassistant.const import Platform
DOMAIN = "fn_nas" DOMAIN = "fn_nas"
PLATFORMS = [ PLATFORMS = [
Platform.SENSOR, Platform.SENSOR,
Platform.BINARY_SENSOR,
Platform.SWITCH, Platform.SWITCH,
Platform.BUTTON Platform.BUTTON
] ]
@@ -33,6 +34,7 @@ HDD_STATUS = "status"
SYSTEM_INFO = "system" SYSTEM_INFO = "system"
FAN_SPEED = "fan_speed" FAN_SPEED = "fan_speed"
UPS_INFO = "ups_info" UPS_INFO = "ups_info"
ZFS_POOL = "zfs_pool"
ATTR_DISK_MODEL = "硬盘型号" ATTR_DISK_MODEL = "硬盘型号"
ATTR_SERIAL_NO = "序列号" ATTR_SERIAL_NO = "序列号"
@@ -48,8 +50,28 @@ ICON_TEMPERATURE = "mdi:thermometer"
ICON_HEALTH = "mdi:heart-pulse" ICON_HEALTH = "mdi:heart-pulse"
ICON_POWER = "mdi:power" ICON_POWER = "mdi:power"
ICON_RESTART = "mdi:restart" ICON_RESTART = "mdi:restart"
ICON_ZFS = "mdi:harddisk-plus"
# 设备标识符常量 # 设备标识符常量
DEVICE_ID_NAS = "flynas_nas_system" DEVICE_ID_NAS = "flynas_nas_system"
DEVICE_ID_UPS = "flynas_ups" DEVICE_ID_UPS = "flynas_ups"
DEVICE_ID_ZFS = "flynas_zfs"
CONF_NETWORK_MACS = "network_macs" CONF_NETWORK_MACS = "network_macs"
# ZFS相关常量
ATTR_ZPOOL_NAME = "存储池名称"
ATTR_ZPOOL_HEALTH = "健康状态"
ATTR_ZPOOL_SIZE = "总大小"
ATTR_ZPOOL_ALLOC = "已使用"
ATTR_ZPOOL_FREE = "可用空间"
ATTR_ZPOOL_CAPACITY = "使用率"
ATTR_ZPOOL_FRAGMENTATION = "碎片率"
ATTR_ZPOOL_CKPOINT = "检查点"
ATTR_ZPOOL_EXPANDSZ = "扩展大小"
ATTR_ZPOOL_DEDUP = "重复数据删除率"
ATTR_ZPOOL_SCRUB_STATUS = "检查状态"
ATTR_ZPOOL_SCRUB_PROGRESS = "检查进度"
ATTR_ZPOOL_SCRUB_SCAN_RATE = "扫描速度"
ATTR_ZPOOL_SCRUB_TIME_REMAINING = "剩余时间"
ATTR_ZPOOL_SCRUB_ISSUED = "已发出数据"
ATTR_ZPOOL_SCRUB_REPAIRED = "已修复数据"

View File

@@ -1,6 +1,7 @@
import logging import logging
import re import asyncio
import asyncssh import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -20,8 +21,10 @@ from .docker_manager import DockerManager
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class FlynasCoordinator(DataUpdateCoordinator): class FlynasCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config) -> None: def __init__(self, hass: HomeAssistant, config, config_entry) -> None:
self.config = config self.config = config
self.config_entry = config_entry
self.hass = hass
self.host = config[CONF_HOST] self.host = config[CONF_HOST]
self.port = config.get(CONF_PORT, DEFAULT_PORT) self.port = config.get(CONF_PORT, DEFAULT_PORT)
self.username = config[CONF_USERNAME] self.username = config[CONF_USERNAME]
@@ -32,21 +35,16 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh = None self.ssh = None
self.ssh_closed = True self.ssh_closed = True
# SSH连接池管理
self.ssh_pool = []
self.ssh_pool_size = 3 # 连接池大小
self.ssh_pool_lock = asyncio.Lock()
self.ups_manager = UPSManager(self) self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self) self.vm_manager = VMManager(self)
self.use_sudo = False self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -60,224 +58,419 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.disk_manager = DiskManager(self) self.disk_manager = DiskManager(self)
self.system_manager = SystemManager(self) self.system_manager = SystemManager(self)
self._system_online = False
self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
async def async_connect(self): # 添加日志方法
if self.ssh is None or self.ssh_closed: self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": [],
"zpools": [],
"scrub_status": {}
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
_LOGGER.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
_LOGGER.info(message)
def _warning_log(self, message: str):
"""警告日志"""
_LOGGER.warning(message)
def _error_log(self, message: str):
"""错误日志"""
_LOGGER.error(message)
async def get_ssh_connection(self):
"""从连接池获取可用的SSH连接"""
async with self.ssh_pool_lock:
# 检查现有连接
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
# 测试连接是否活跃
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True) # 标记为使用中
self._debug_log(f"复用连接池中的连接 {i}")
return ssh, i
except Exception:
# 连接失效,移除
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
# 如果连接池未满,创建新连接
if len(self.ssh_pool) < self.ssh_pool_size:
try:
ssh = await asyncssh.connect(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
# 检查并设置权限状态
await self._setup_connection_permissions(ssh)
connection_id = len(self.ssh_pool)
self.ssh_pool.append((ssh, True))
self._debug_log(f"创建新的SSH连接 {connection_id}")
return ssh, connection_id
except Exception as e:
self._debug_log(f"创建SSH连接失败: {e}")
raise
# 连接池满且所有连接都在使用中,等待可用连接
self._debug_log("所有连接都在使用中,等待可用连接...")
for _ in range(50): # 最多等待5秒
await asyncio.sleep(0.1)
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True)
self._debug_log(f"等待后获得连接 {i}")
return ssh, i
except Exception:
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
raise Exception("无法获取SSH连接")
async def _setup_connection_permissions(self, ssh):
"""为新连接设置权限状态"""
try:
# 检查是否为root用户
result = await ssh.run("id -u", timeout=3)
if result.stdout.strip() == "0":
self._debug_log("当前用户是 root")
self.use_sudo = False
return
# 尝试切换到root会话
if self.root_password:
try:
await ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami = await ssh.run("whoami")
if "root" in whoami.stdout:
self._info_log("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
return
except Exception:
pass
# 尝试使用登录密码sudo
try: try:
self.ssh = await asyncssh.connect( await ssh.run(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None
)
if await self.is_root_user():
_LOGGER.debug("当前用户是 root")
self.use_sudo = False
self.ssh_closed = False
return True
result = await self.ssh.run(
f"echo '{self.password}' | sudo -S -i", f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n", input=self.password + "\n",
timeout=10 timeout=5
) )
whoami = await ssh.run("whoami")
whoami_result = await self.ssh.run("whoami") if "root" in whoami.stdout:
if "root" in whoami_result.stdout: self._info_log("成功切换到 root 会话(使用登录密码)")
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False self.use_sudo = False
self.ssh_closed = False return
return True except Exception:
else: pass
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=10
)
whoami_result = await self.ssh.run("whoami") # 设置为使用sudo模式
if "root" in whoami_result.stdout: self.use_sudo = True
_LOGGER.info("成功切换到 root 会话(使用 root 密码)") self._debug_log("设置为使用sudo模式")
self.use_sudo = False
self.ssh_closed = False
return True
else:
_LOGGER.warning("切换到 root 会话失败,将使用 sudo")
self.use_sudo = True
else:
_LOGGER.warning("非 root 用户且未提供 root 密码,将使用 sudo")
self.use_sudo = True
self.ssh_closed = False except Exception as e:
_LOGGER.info("SSH 连接已建立到 %s", self.host) self._debug_log(f"设置连接权限失败: {e}")
return True self.use_sudo = True
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.error("连接失败: %s", str(e), exc_info=True)
return False
return True
async def is_root_user(self): async def release_ssh_connection(self, connection_id):
"""释放SSH连接回连接池"""
async with self.ssh_pool_lock:
if 0 <= connection_id < len(self.ssh_pool):
ssh, _ = self.ssh_pool[connection_id]
self.ssh_pool[connection_id] = (ssh, False) # 标记为可用
self._debug_log(f"释放SSH连接 {connection_id}")
async def close_all_ssh_connections(self):
"""关闭所有SSH连接"""
async with self.ssh_pool_lock:
for ssh, _ in self.ssh_pool:
try:
ssh.close()
except:
pass
self.ssh_pool.clear()
self._debug_log("已关闭所有SSH连接")
async def async_connect(self):
"""建立并保持持久SSH连接 - 兼容旧代码"""
try: try:
result = await self.ssh.run("id -u", timeout=5) ssh, connection_id = await self.get_ssh_connection()
return result.stdout.strip() == "0" await self.release_ssh_connection(connection_id)
return True
except Exception: except Exception:
return False return False
async def async_disconnect(self): async def async_disconnect(self):
if self.ssh is not None and not self.ssh_closed: """断开SSH连接 - 兼容旧代码"""
try: await self.close_all_ssh_connections()
self.ssh.close()
self.ssh_closed = True
_LOGGER.info("SSH connection closed")
except Exception as e:
_LOGGER.error("Error closing SSH connection: %s", str(e))
finally:
self.ssh = None
async def is_ssh_connected(self) -> bool:
if self.ssh is None or self.ssh_closed:
return False
try:
test_command = "echo 'connection_test'"
result = await self.ssh.run(test_command, timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError):
return False
async def run_command(self, command: str, retries=2) -> str: async def run_command(self, command: str, retries=2) -> str:
for attempt in range(retries): """执行SSH命令使用连接池"""
try: # 系统离线时直接返回空字符串
if not await self.is_ssh_connected(): if not self._system_online:
if not await self.async_connect(): return ""
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed("SSH 连接失败")
if self.use_sudo: ssh = None
if self.root_password or self.password: connection_id = None
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Command failed after {retries} attempts: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"SSH error after {retries} attempts: {str(e)}") from e
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.error("Unexpected error: %s", str(e), exc_info=True)
if attempt == retries - 1:
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
raise UpdateFailed(f"Unexpected error after {retries} attempts") from e
async def get_network_macs(self):
try: try:
output = await self.run_command("ip link show") # 从连接池获取连接
macs = {} ssh, connection_id = await self.get_ssh_connection()
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE) # 构建完整命令
matches = pattern.findall(output) if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
for interface, mac in matches: return result.stdout.strip()
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e: except Exception as e:
self.logger.error("获取MAC地址失败: %s", str(e)) self._debug_log(f"命令执行失败: {command}, 错误: {str(e)}")
return {} return ""
finally:
# 释放连接回连接池
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def run_command_direct(self, command: str) -> str:
"""直接执行命令,获取独立连接 - 用于并发任务"""
if not self._system_online:
return ""
ssh = None
connection_id = None
try:
ssh, connection_id = await self.get_ssh_connection()
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"直接命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def ping_system(self) -> bool:
"""轻量级系统状态检测"""
# 对于本地主机直接返回True
if self.host in ['localhost', '127.0.0.1']:
return True
try:
# 使用异步ping检测减少超时时间
proc = await asyncio.create_subprocess_exec(
'ping', '-c', '1', '-W', '1', self.host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await asyncio.wait_for(proc.wait(), timeout=2) # 总超时时间2秒
return proc.returncode == 0
except Exception:
return False
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self._debug_log(f"启动系统状态监控,每{self._retry_interval}秒检测一次")
# 使用指数退避策略,避免频繁检测
check_interval = self._retry_interval
max_interval = 300 # 最大5分钟检测一次
while True:
await asyncio.sleep(check_interval)
if await self.ping_system():
self._info_log("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
else:
# 系统仍然离线,增加检测间隔(指数退避)
check_interval = min(check_interval * 1.5, max_interval)
self._debug_log(f"系统仍离线,下次检测间隔: {check_interval}")
async def _async_update_data(self): async def _async_update_data(self):
_LOGGER.debug("Starting data update...") """数据更新入口,优化命令执行频率"""
self._debug_log("开始数据更新...")
is_online = await self.ping_system()
self._system_online = is_online
if not is_online:
self._debug_log("系统离线,跳过数据更新")
# 启动后台监控任务
if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.close_all_ssh_connections()
return self.get_default_data()
# 系统在线处理
try: try:
if await self.is_ssh_connected(): # 预热连接池并确保权限设置正确
status = "on" await self.async_connect()
else:
if not await self.async_connect():
status = "off"
else:
status = "on"
disks = await self.disk_manager.get_disks_info() # 获取系统状态信息
status = "on"
# 串行获取信息以确保稳定性
self._debug_log("开始获取系统信息...")
system = await self.system_manager.get_system_info() system = await self.system_manager.get_system_info()
self._debug_log("系统信息获取完成")
self._debug_log("开始获取磁盘信息...")
disks = await self.disk_manager.get_disks_info()
self._debug_log(f"磁盘信息获取完成,数量: {len(disks)}")
self._debug_log("开始获取ZFS存储池信息...")
zpools = await self.disk_manager.get_zpools()
self._debug_log(f"ZFS存储池信息获取完成数量: {len(zpools)}")
# 获取所有ZFS存储池的scrub状态
scrub_status = {}
for zpool in zpools:
self._debug_log(f"开始获取存储池 {zpool['name']} 的scrub状态...")
scrub_info = await self.disk_manager.get_zpool_status(zpool['name'])
scrub_status[zpool['name']] = scrub_info
self._debug_log(f"存储池 {zpool['name']} scrub状态获取完成")
self._debug_log("开始获取UPS信息...")
ups_info = await self.ups_manager.get_ups_info() ups_info = await self.ups_manager.get_ups_info()
self._debug_log("UPS信息获取完成")
self._debug_log("开始获取虚拟机信息...")
vms = await self.vm_manager.get_vm_list() vms = await self.vm_manager.get_vm_list()
self._debug_log(f"虚拟机信息获取完成,数量: {len(vms)}")
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) try:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
except Exception as e:
self._debug_log(f"获取VM标题失败 {vm['name']}: {e}")
vm["title"] = vm["name"]
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker and self.docker_manager:
docker_containers = await self.docker_manager.get_containers() self._debug_log("开始获取Docker信息...")
try:
docker_containers = await self.docker_manager.get_containers()
self._debug_log(f"Docker信息获取完成数量: {len(docker_containers)}")
except Exception as e:
self._debug_log(f"Docker信息获取失败: {e}")
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers,
"zpools": zpools,
"scrub_status": scrub_status
} }
self._debug_log(f"数据更新完成: disks={len(disks)}, vms={len(vms)}, containers={len(docker_containers)}")
return data return data
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update data: %s", str(e), exc_info=True) self._error_log(f"数据更新失败: {str(e)}")
return { self._system_online = False
"disks": [], if not self._ping_task or self._ping_task.done():
"system": { self._ping_task = asyncio.create_task(self._monitor_system_status())
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
async def reboot_system(self): return self.get_default_data()
await self.system_manager.reboot_system()
async def shutdown_system(self): async def shutdown_system(self):
await self.system_manager.shutdown_system() """关闭系统 - 委托给SystemManager"""
if self.data and "system" in self.data: return await self.system_manager.shutdown_system()
self.data["system"]["status"] = "off"
self.async_update_listeners() async def reboot_system(self):
"""重启系统 - 委托给SystemManager"""
return await self.system_manager.reboot_system()
async def scrub_zpool(self, pool_name: str) -> bool:
"""执行ZFS存储池数据一致性检查"""
try:
self._debug_log(f"开始对ZFS存储池 {pool_name} 执行scrub操作")
command = f"zpool scrub {pool_name}"
result = await self.run_command(command)
if result and not result.lower().startswith("cannot"):
self._debug_log(f"ZFS存储池 {pool_name} scrub操作启动成功")
return True
else:
self.logger.error(f"ZFS存储池 {pool_name} scrub操作失败: {result}")
return False
except Exception as e:
self.logger.error(f"执行ZFS存储池 {pool_name} scrub操作时出错: {str(e)}", exc_info=True)
return False
class UPSDataUpdateCoordinator(DataUpdateCoordinator): class UPSDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, main_coordinator): def __init__(self, hass: HomeAssistant, config, main_coordinator):
@@ -297,19 +490,20 @@ class UPSDataUpdateCoordinator(DataUpdateCoordinator):
self.ups_manager = UPSManager(main_coordinator) self.ups_manager = UPSManager(main_coordinator)
async def _async_update_data(self): async def _async_update_data(self):
# 如果主协调器检测到系统离线跳过UPS更新
if not self.main_coordinator._system_online:
return {}
try: try:
return await self.ups_manager.get_ups_info() return await self.ups_manager.get_ups_info()
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update UPS data: %s", str(e), exc_info=True) _LOGGER.debug("UPS数据更新失败: %s", str(e))
return {} return {}
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
try: try:
if not hasattr(self, 'vm_manager'): result = await self.main_coordinator.vm_manager.control_vm(vm_name, action)
self.vm_manager = VMManager(self)
result = await self.vm_manager.control_vm(vm_name, action)
return result return result
except Exception as e: except Exception as e:
_LOGGER.error("虚拟机控制失败: %s", str(e), exc_info=True) _LOGGER.debug("虚拟机控制失败: %s", str(e))
return False return False

View File

@@ -14,6 +14,7 @@ class DiskManager:
self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.disk_full_info_cache = {} # 缓存磁盘完整信息
self.first_run = True # 首次运行标志 self.first_run = True # 首次运行标志
self.initial_detection_done = False # 首次完整检测完成标志 self.initial_detection_done = False # 首次完整检测完成标志
self.disk_io_stats_cache = {} # 缓存磁盘I/O统计信息
def extract_value(self, text: str, patterns, default="未知", format_func=None): def extract_value(self, text: str, patterns, default="未知", format_func=None):
if not text: if not text:
@@ -35,75 +36,247 @@ class DiskManager:
self.logger.debug("No match found for patterns: %s", patterns) self.logger.debug("No match found for patterns: %s", patterns)
return default return default
async def check_disk_active(self, device: str, window: int = 30) -> bool: def _format_capacity(self, capacity_str: str) -> str:
"""检查硬盘在指定时间窗口内是否有活动""" """将容量字符串格式化为GB或TB格式"""
if not capacity_str or capacity_str == "未知":
return "未知"
try: try:
# 正确的路径是 /sys/block/{device}/stat # 处理逗号分隔的数字(如 "1,000,204,886,016 bytes"
stat_path = f"/sys/block/{device}/stat" capacity_str = capacity_str.replace(',', '')
# 读取统计文件 # 提取数字和单位
stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null") import re
if not stat_output: # 匹配数字和单位(如 "500 GB", "1.0 TB", "1000204886016 bytes", "1,000,204,886,016 bytes"
self.logger.debug(f"无法读取 {stat_path},默认返回活跃状态") match = re.search(r'(\d+(?:\.\d+)?)\s*([KMGT]?B|bytes?)', capacity_str, re.IGNORECASE)
return True if not match:
# 如果没有匹配到单位,尝试直接提取数字
numbers = re.findall(r'\d+', capacity_str)
if numbers:
# 取最大的数字(通常是容量值)
value = float(max(numbers, key=len))
bytes_value = value # 假设为字节
else:
return capacity_str
else:
value = float(match.group(1))
unit = match.group(2).upper()
# 解析统计信息 # 转换为字节
stats = stat_output.split() if unit in ['B', 'BYTE', 'BYTES']:
if len(stats) < 11: bytes_value = value
self.logger.debug(f"无效的统计信息格式:{stat_output}") elif unit in ['KB', 'KIB']:
return True bytes_value = value * 1024
elif unit in ['MB', 'MIB']:
bytes_value = value * 1024 * 1024
elif unit in ['GB', 'GIB']:
bytes_value = value * 1024 * 1024 * 1024
elif unit in ['TB', 'TIB']:
bytes_value = value * 1024 * 1024 * 1024 * 1024
else:
bytes_value = value # 默认假设为字节
# 关键字段当前正在进行的I/O操作数量第9个字段索引8 # 转换为合适的单位
in_flight = int(stats[8]) if bytes_value >= 1024**4: # 1 TB
return f"{bytes_value / (1024**4):.1f} TB"
# 如果当前有I/O操作直接返回活跃状态 elif bytes_value >= 1024**3: # 1 GB
if in_flight > 0: return f"{bytes_value / (1024**3):.1f} GB"
return True elif bytes_value >= 1024**2: # 1 MB
return f"{bytes_value / (1024**2):.1f} MB"
# 检查I/O操作时间第10个字段索引9 - io_ticks单位毫秒 elif bytes_value >= 1024: # 1 KB
io_ticks = int(stats[9]) return f"{bytes_value / 1024:.1f} KB"
else:
# 如果设备在窗口时间内有I/O活动返回活跃状态 return f"{bytes_value:.1f} B"
if io_ticks > window * 1000:
return True
# 所有检查都通过,返回非活跃状态
return False
except Exception as e: except Exception as e:
self.logger.error(f"检测硬盘活动状态失败: {str(e)}", exc_info=True) self.logger.debug(f"格式化容量失败: {capacity_str}, 错误: {e}")
return capacity_str
async def check_disk_active(self, device: str, window: int = 30, current_status: str = None) -> bool:
"""检查硬盘在指定时间窗口内是否有活动"""
try:
# 首先检查硬盘当前状态
if current_status is None:
current_status = await self.get_disk_activity(device)
else:
self.logger.debug(f"使用传入的状态: {device} = {current_status}")
# 如果硬盘处于休眠状态,直接返回非活跃
if current_status == "休眠中":
self.logger.debug(f"硬盘 {device} 处于休眠状态,不执行详细检测")
return False
# 如果硬盘处于空闲状态,检查是否有近期活动
if current_status == "空闲中":
# 检查缓存的统计信息来判断近期活动
stat_path = f"/sys/block/{device}/stat"
stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
if stat_output:
stats = stat_output.split()
if len(stats) >= 11:
try:
current_read_ios = int(stats[0])
current_write_ios = int(stats[4])
current_io_ticks = int(stats[9])
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
read_diff = current_read_ios - cached_stats.get('read_ios', 0)
write_diff = current_write_ios - cached_stats.get('write_ios', 0)
io_ticks_diff = current_io_ticks - cached_stats.get('io_ticks', 0)
# 如果在最近30秒内有I/O活动认为硬盘活跃
if read_diff > 0 or write_diff > 0 or io_ticks_diff > 100:
self.logger.debug(f"硬盘 {device} 近期有I/O活动需要更新信息")
return True
# 更新缓存
self.disk_io_stats_cache[device] = {
'read_ios': current_read_ios,
'write_ios': current_write_ios,
'io_ticks': current_io_ticks
}
except (ValueError, IndexError):
pass
# 如果硬盘空闲且没有近期活动,使用缓存信息
self.logger.debug(f"硬盘 {device} 处于空闲状态且无近期活动,使用缓存信息")
return False
# 如果硬盘处于活动中,返回活跃状态
if current_status == "活动中":
self.logger.debug(f"硬盘 {device} 处于活动中,执行详细检测")
return True
# 默认情况下返回活跃状态
self.logger.debug(f"硬盘 {device} 状态未知,默认执行详细检测")
return True
except Exception as e:
self.logger.error(f"检测硬盘活动状态失败: {str(e)}")
return True # 出错时默认执行检测 return True # 出错时默认执行检测
async def get_disk_activity(self, device: str) -> str: async def get_disk_power_state(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)""" """获取硬盘电源状态"""
try: try:
# 检查硬盘是否处于休眠状态 # 检查 SCSI 设备状态
state_path = f"/sys/block/{device}/device/state" state_path = f"/sys/block/{device}/device/state"
state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'") state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'")
state = state_output.strip().lower() state = state_output.strip().lower()
if state in ["standby", "sleep"]: if state in ["running", "active"]:
return "active"
elif state in ["standby", "sleep"]:
return state
# 对于某些设备尝试通过hdparm检查状态非侵入性
hdparm_output = await self.coordinator.run_command(f"hdparm -C /dev/{device} 2>/dev/null || echo 'unknown'")
if "standby" in hdparm_output.lower():
return "standby"
elif "sleeping" in hdparm_output.lower():
return "sleep"
elif "active/idle" in hdparm_output.lower():
return "active"
return "unknown"
except Exception as e:
self.logger.debug(f"获取磁盘 {device} 电源状态失败: {e}")
return "unknown"
async def get_disk_activity(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)"""
try:
# 先检查电源状态 - 这是最可靠的休眠检测方法
power_state = await self.get_disk_power_state(device)
if power_state in ["standby", "sleep"]:
self.logger.debug(f"硬盘 {device} 电源状态为 {power_state},判定为休眠中")
return "休眠中" return "休眠中"
# 检查最近一分钟内的硬盘活动 # 检查最近的I/O活动 - 使用非侵入性方式
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
stat_output = await self.coordinator.run_command(f"cat {stat_path}") stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
stats = stat_output.split()
if len(stats) >= 11: if stat_output:
# 第9个字段是最近完成的读操作数 stats = stat_output.split()
# 第10个字段是最近完成的写操作数 if len(stats) >= 11:
recent_reads = int(stats[8]) try:
recent_writes = int(stats[9]) in_flight = int(stats[8]) # 当前进行中的I/O
io_ticks = int(stats[9]) # I/O活动时间(ms)
if recent_reads > 0 or recent_writes > 0: # 如果有正在进行的I/O返回活动中
if in_flight > 0:
self.logger.debug(f"硬盘 {device} 有进行中的I/O操作: {in_flight}")
return "活动中"
# 检查缓存的统计信息来判断近期活动
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
current_read_ios = int(stats[0])
current_write_ios = int(stats[4])
read_diff = current_read_ios - cached_stats.get('read_ios', 0)
write_diff = current_write_ios - cached_stats.get('write_ios', 0)
io_ticks_diff = io_ticks - cached_stats.get('io_ticks', 0)
# 如果在最近30秒内有I/O活动认为硬盘活动中
if read_diff > 0 or write_diff > 0 or io_ticks_diff > 100: # 100ms内的活动
self.logger.debug(f"硬盘 {device} 近期有I/O活动: 读={read_diff}, 写={write_diff}, 活动时间={io_ticks_diff}ms")
# 更新缓存统计信息
self.disk_io_stats_cache[device] = {
'read_ios': current_read_ios,
'write_ios': current_write_ios,
'in_flight': in_flight,
'io_ticks': io_ticks
}
return "活动中"
else:
# 首次检测,保存当前状态并认为活跃
self.logger.debug(f"硬盘 {device} 首次检测,保存统计信息")
self.disk_io_stats_cache[device] = {
'read_ios': int(stats[0]),
'write_ios': int(stats[4]),
'in_flight': in_flight,
'io_ticks': io_ticks
}
return "活动中" # 首次检测默认返回活动中
# 更新缓存统计信息
self.disk_io_stats_cache[device] = {
'read_ios': int(stats[0]),
'write_ios': int(stats[4]),
'in_flight': in_flight,
'io_ticks': io_ticks
}
# 如果没有活动,返回空闲中
self.logger.debug(f"硬盘 {device} 处于空闲状态")
return "空闲中"
except (ValueError, IndexError) as e:
self.logger.debug(f"解析硬盘 {device} 统计信息失败: {e}")
return "活动中" # 出错时默认返回活动中,避免中断休眠
# 如果无法获取统计信息,检查硬盘是否可访问
try:
# 尝试读取设备信息,如果成功说明硬盘可访问
test_output = await self.coordinator.run_command(f"ls -la /dev/{device} 2>/dev/null")
if test_output and device in test_output:
self.logger.debug(f"硬盘 {device} 可访问但无统计信息,默认返回活动中")
return "活动中" return "活动中"
else:
return "空闲中" self.logger.debug(f"硬盘 {device} 不可访问,可能处于休眠状态")
return "休眠中"
except:
self.logger.debug(f"硬盘 {device} 检测失败,默认返回活动中")
return "活动中"
except Exception as e: except Exception as e:
self.logger.error(f"获取硬盘 {device} 状态失败: {str(e)}", exc_info=True) self.logger.error(f"获取硬盘 {device} 状态失败: {str(e)}", exc_info=True)
return "未知" return "活动中" # 出错时默认返回活动中,避免中断休眠
async def get_disks_info(self) -> list[dict]: async def get_disks_info(self) -> list[dict]:
disks = [] disks = []
@@ -172,31 +345,31 @@ class DiskManager:
disks.append(disk_info) disks.append(disk_info)
continue continue
# 检查硬盘是否活跃 # 检查硬盘是否活跃,传入当前状态确保一致性
is_active = await self.check_disk_active(device, window=30) is_active = await self.check_disk_active(device, window=30, current_status=status)
if not is_active: if not is_active:
self.logger.debug(f"硬盘 {device} 处于非活跃状态,使用上一次获取的信息") self.logger.debug(f"硬盘 {device} 处于非活跃状态,使用上一次获取的信息")
# 优先使用缓存的完整信息 # 优先使用缓存的完整信息
if cached_info: if cached_info:
disk_info.update({ disk_info.update({
"model": cached_info.get("model", "检测"), "model": cached_info.get("model", ""),
"serial": cached_info.get("serial", "检测"), "serial": cached_info.get("serial", ""),
"capacity": cached_info.get("capacity", "检测"), "capacity": cached_info.get("capacity", ""),
"health": cached_info.get("health", "检测"), "health": cached_info.get("health", ""),
"temperature": cached_info.get("temperature", "检测"), "temperature": cached_info.get("temperature", ""),
"power_on_hours": cached_info.get("power_on_hours", "检测"), "power_on_hours": cached_info.get("power_on_hours", ""),
"attributes": cached_info.get("attributes", {}) "attributes": cached_info.get("attributes", {})
}) })
else: else:
# 如果没有缓存信息,使用默认值 # 如果没有缓存信息,使用默认值
disk_info.update({ disk_info.update({
"model": "检测", "model": "",
"serial": "检测", "serial": "",
"capacity": "检测", "capacity": "",
"health": "检测", "health": "",
"temperature": "检测", "temperature": "",
"power_on_hours": "检测", "power_on_hours": "",
"attributes": {} "attributes": {}
}) })
@@ -240,31 +413,45 @@ class DiskManager:
async def _get_full_disk_info(self, disk_info, device_path): async def _get_full_disk_info(self, disk_info, device_path):
"""获取硬盘的完整信息(模型、序列号、健康状态等)""" """获取硬盘的完整信息(模型、序列号、健康状态等)"""
# 获取基本信息 # 获取基本信息 - 首先尝试NVMe格式
info_output = await self.coordinator.run_command(f"smartctl -i {device_path}") info_output = await self.coordinator.run_command(f"smartctl -i {device_path}")
self.logger.debug("smartctl -i output for %s: %s", disk_info["device"], info_output[:200] + "..." if len(info_output) > 200 else info_output) self.logger.debug("smartctl -i output for %s: %s", disk_info["device"], info_output[:200] + "..." if len(info_output) > 200 else info_output)
# 模型 # 检查是否为NVMe设备
is_nvme = "nvme" in disk_info["device"].lower()
# 模型 - 增强NVMe支持
disk_info["model"] = self.extract_value( disk_info["model"] = self.extract_value(
info_output, info_output,
[ [
r"Device Model:\s*(.+)", r"Device Model:\s*(.+)",
r"Model(?: Family)?\s*:\s*(.+)", r"Model(?: Family)?\s*:\s*(.+)",
r"Model\s*Number:\s*(.+)" r"Model Number:\s*(.+)",
r"Product:\s*(.+)", # NVMe格式
r"Model Number:\s*(.+)", # NVMe格式
] ]
) )
# 序列号 # 序列号 - 增强NVMe支持
disk_info["serial"] = self.extract_value( disk_info["serial"] = self.extract_value(
info_output, info_output,
r"Serial Number\s*:\s*(.+)" [
r"Serial Number\s*:\s*(.+)",
r"Serial Number:\s*(.+)", # NVMe格式
r"Serial\s*:\s*(.+)", # NVMe格式
]
) )
# 容量 # 容量 - 增强NVMe支持并转换为GB/TB格式
disk_info["capacity"] = self.extract_value( capacity_patterns = [
info_output, r"User Capacity:\s*([^\[]+)",
r"User Capacity:\s*([^[]+)" r"Namespace 1 Size/Capacity:\s*([^\[]+)", # NVMe格式
) r"Total NVM Capacity:\s*([^\[]+)", # NVMe格式
r"Capacity:\s*([^\[]+)", # NVMe格式
]
raw_capacity = self.extract_value(info_output, capacity_patterns)
disk_info["capacity"] = self._format_capacity(raw_capacity)
# 健康状态 # 健康状态
health_output = await self.coordinator.run_command(f"smartctl -H {device_path}") health_output = await self.coordinator.run_command(f"smartctl -H {device_path}")
@@ -341,6 +528,46 @@ class DiskManager:
# 改进的通电时间检测逻辑 - 处理特殊格式 # 改进的通电时间检测逻辑 - 处理特殊格式
power_on_hours = "未知" power_on_hours = "未知"
# 检查是否为NVMe设备
is_nvme = "nvme" in disk_info["device"].lower()
# 方法0NVMe设备的通电时间提取优先处理
if is_nvme:
# NVMe格式的通电时间提取 - 支持带逗号的数字格式
nvme_patterns = [
r"Power On Hours\s*:\s*([\d,]+)", # 支持带逗号的数字格式(如 "6,123"
r"Power On Time\s*:\s*([\d,]+)", # NVMe备用格式
r"Power on hours\s*:\s*([\d,]+)", # 小写格式
r"Power on time\s*:\s*([\d,]+)", # 小写格式
]
for pattern in nvme_patterns:
match = re.search(pattern, data_output, re.IGNORECASE)
if match:
try:
# 处理带逗号的数字格式(如 "6,123"
hours_str = match.group(1).replace(',', '')
hours = int(hours_str)
power_on_hours = f"{hours} 小时"
self.logger.debug("Found NVMe power_on_hours via pattern %s: %s", pattern, power_on_hours)
break
except:
continue
# 如果还没找到尝试在SMART数据部分查找
if power_on_hours == "未知":
# 查找SMART数据部分中的Power On Hours
smart_section_match = re.search(r"SMART/Health Information.*?Power On Hours\s*:\s*([\d,]+)",
data_output, re.IGNORECASE | re.DOTALL)
if smart_section_match:
try:
hours_str = smart_section_match.group(1).replace(',', '')
hours = int(hours_str)
power_on_hours = f"{hours} 小时"
self.logger.debug("Found NVMe power_on_hours in SMART section: %s", power_on_hours)
except:
pass
# 方法1提取属性9的RAW_VALUE处理特殊格式 # 方法1提取属性9的RAW_VALUE处理特殊格式
attr9_match = re.search( attr9_match = re.search(
r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?", r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?",
@@ -378,7 +605,7 @@ class DiskManager:
[ [
# 精确匹配属性9行 # 精确匹配属性9行
r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)\s*$", r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)\s*$",
r"^\s*9\s+Power On Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?",
# 通用匹配模式 # 通用匹配模式
r"9\s+Power_On_Hours\b.*?(\d+)\b", r"9\s+Power_On_Hours\b.*?(\d+)\b",
r"Power_On_Hours\b.*?(\d+)\b", r"Power_On_Hours\b.*?(\d+)\b",
@@ -424,7 +651,7 @@ class DiskManager:
# 添加额外属性:温度历史记录 # 添加额外属性:温度历史记录
temp_history = {} temp_history = {}
# 提取属性194的温度历史 # 提取属性194的温度历史
temp194_match = re.search(r"194\s+Temperature_Celsius+.*?\(\s*([\d\s]+)$", data_output) temp194_match = re.search(r"194\s+Temperature_Celsius+.*?(\s*[\d\s]+)$", data_output)
if temp194_match: if temp194_match:
try: try:
values = [int(x) for x in temp194_match.group(1).split()] values = [int(x) for x in temp194_match.group(1).split()]
@@ -440,3 +667,198 @@ class DiskManager:
# 保存额外属性 # 保存额外属性
disk_info["attributes"] = temp_history disk_info["attributes"] = temp_history
async def get_zpools(self) -> list[dict]:
"""获取ZFS存储池信息"""
zpools = []
try:
self.logger.debug("Fetching ZFS pool list...")
# 使用zpool list获取存储池信息包含所有字段
zpool_output = await self.coordinator.run_command("zpool list 2>/dev/null || echo 'NO_ZPOOL'")
self.logger.debug("zpool list output: %s", zpool_output)
if "NO_ZPOOL" in zpool_output or "command not found" in zpool_output.lower():
self.logger.info("系统未安装ZFS或没有ZFS存储池")
return []
# 解析zpool list输出
lines = zpool_output.splitlines()
# 跳过标题行,从第二行开始解析
for line in lines[1:]: # 跳过第一行标题
if line.strip():
# 分割制表符或连续空格
parts = re.split(r'\s+', line.strip())
if len(parts) >= 11: # 根据实际输出有11个字段
pool_info = {
"name": parts[0],
"size": parts[1],
"alloc": parts[2],
"free": parts[3],
"ckpoint": parts[4] if parts[4] != "-" else "",
"expand_sz": parts[5] if parts[5] != "-" else "",
"frag": parts[6] if parts[6] != "-" else "0%",
"capacity": parts[7],
"dedup": parts[8],
"health": parts[9],
"altroot": parts[10] if parts[10] != "-" else ""
}
zpools.append(pool_info)
self.logger.debug("Found ZFS pool: %s", pool_info["name"])
self.logger.info("Found %d ZFS pools", len(zpools))
return zpools
except Exception as e:
self.logger.error("Failed to get ZFS pool info: %s", str(e), exc_info=True)
return []
async def get_zpool_status(self, pool_name: str) -> dict:
"""获取ZFS存储池的详细状态信息包括scrub进度"""
try:
self.logger.debug(f"Getting ZFS pool status for {pool_name}")
status_output = await self.coordinator.run_command(f"zpool status {pool_name} 2>/dev/null || echo 'NO_POOL'")
if "NO_POOL" in status_output or "command not found" in status_output.lower():
self.logger.debug(f"ZFS pool {pool_name} not found")
return {"scrub_in_progress": False}
# 解析scrub信息
scrub_info = self._parse_scrub_info(status_output)
return scrub_info
except Exception as e:
self.logger.error(f"Failed to get ZFS pool status for {pool_name}: {str(e)}", exc_info=True)
return {"scrub_in_progress": False}
def _parse_scrub_info(self, status_output: str) -> dict:
"""解析zpool status中的scrub信息"""
scrub_info = {
"scrub_in_progress": False,
"scrub_status": "无检查",
"scrub_progress": "0%",
"scan_rate": "0/s",
"time_remaining": "",
"scanned": "0",
"issued": "0",
"repaired": "0",
"scrub_start_time": ""
}
lines = status_output.split('\n')
has_scan_section = False
# 首先判断是否有scan段这是判断scrub进行中的关键
for line in lines:
line = line.strip()
if line.startswith('scan:'):
has_scan_section = True
break
# 如果没有scan段直接返回无检查状态
if not has_scan_section:
return scrub_info
# 解析scan段的内容
in_scan_section = False
for line in lines:
line = line.strip()
# 检查是否进入scan部分
if line.startswith('scan:'):
in_scan_section = True
scrub_info["scrub_in_progress"] = True # 有scan段就表示在进行中
scan_line = line[5:].strip() # 去掉'scan:'
# 检查scrub具体状态
if 'scrub in progress' in scan_line or 'scrub resilvering' in scan_line:
scrub_info["scrub_status"] = "检查进行中"
scrub_info["scrub_progress"] = "0.1%" # 刚开始,显示微小进度表示进行中
# 解析开始时间
if 'since' in scan_line:
time_part = scan_line.split('since')[-1].strip()
scrub_info["scrub_start_time"] = time_part
elif 'scrub repaired' in scan_line or 'scrub completed' in scan_line:
scrub_info["scrub_status"] = "检查完成"
scrub_info["scrub_in_progress"] = False
elif 'scrub canceled' in scan_line:
scrub_info["scrub_status"] = "检查已取消"
scrub_info["scrub_in_progress"] = False
elif 'scrub paused' in scan_line:
scrub_info["scrub_status"] = "检查已暂停"
scrub_info["scrub_in_progress"] = False
else:
# 有scan段但没有具体状态说明默认为进行中
scrub_info["scrub_status"] = "检查进行中"
scrub_info["scrub_progress"] = "0.1%"
continue
# 如果在scan部分解析详细信息
if in_scan_section and line and not line.startswith('config'):
# 解析进度信息,例如: "2.10T / 2.10T scanned, 413G / 2.10T issued at 223M/s"
if 'scanned' in line and 'issued' in line:
parts = line.split(',')
# 解析扫描进度
if len(parts) >= 1:
scanned_part = parts[0].strip()
if ' / ' in scanned_part:
scanned_data = scanned_part.split(' / ')[0].strip()
total_data = scanned_part.split(' / ')[1].split()[0].strip()
scrub_info["scanned"] = f"{scanned_data}/{total_data}"
# 解析发出的数据
if len(parts) >= 2:
issued_part = parts[1].strip()
if ' / ' in issued_part:
issued_data = issued_part.split(' / ')[0].strip()
total_issued = issued_part.split(' / ')[1].split()[0].strip()
scrub_info["issued"] = f"{issued_data}/{total_issued}"
# 解析扫描速度
if 'at' in line:
speed_part = line.split('at')[-1].strip().split()[0]
scrub_info["scan_rate"] = speed_part
# 解析进度百分比和剩余时间
elif '%' in line and 'done' in line:
# 例如: "644M repaired, 19.23% done, 02:12:38 to go"
if '%' in line:
progress_match = re.search(r'(\d+\.?\d*)%', line)
if progress_match:
scrub_info["scrub_progress"] = f"{progress_match.group(1)}%"
if 'repaired' in line:
repaired_match = re.search(r'([\d.]+[KMGT]?).*repaired', line)
if repaired_match:
scrub_info["repaired"] = repaired_match.group(1)
if 'to go' in line:
time_match = re.search(r'(\d{2}:\d{2}:\d{2})\s+to\s+go', line)
if time_match:
scrub_info["time_remaining"] = time_match.group(1)
# 如果遇到空行或新章节退出scan部分
elif line == '' or line.startswith('config'):
break
return scrub_info
def _format_bytes(self, bytes_value: int) -> str:
"""将字节数格式化为易读的格式"""
try:
if bytes_value >= 1024**4: # 1 TB
return f"{bytes_value / (1024**4):.1f} TB"
elif bytes_value >= 1024**3: # 1 GB
return f"{bytes_value / (1024**3):.1f} GB"
elif bytes_value >= 1024**2: # 1 MB
return f"{bytes_value / (1024**2):.1f} MB"
elif bytes_value >= 1024: # 1 KB
return f"{bytes_value / 1024:.1f} KB"
else:
return f"{bytes_value} B"
except Exception:
return f"{bytes_value} B"

View File

@@ -1,10 +1,10 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.1", "version": "1.4.0",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/xiaochao99/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@xiaochao99"],
"requirements": ["asyncssh>=2.13.1"], "requirements": ["asyncssh>=2.13.1"],
"iot_class": "local_polling", "iot_class": "local_polling",
"config_flow": true "config_flow": true

View File

@@ -3,10 +3,16 @@ from homeassistant.components.sensor import SensorEntity, SensorDeviceClass, Sen
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.const import UnitOfTemperature from homeassistant.const import UnitOfTemperature
from .const import ( from .const import (
DOMAIN, HDD_TEMP, HDD_HEALTH, HDD_STATUS, SYSTEM_INFO, ICON_DISK, DOMAIN, HDD_TEMP, HDD_STATUS, SYSTEM_INFO, ICON_DISK,
ICON_TEMPERATURE, ICON_HEALTH, ATTR_DISK_MODEL, ATTR_SERIAL_NO, ICON_TEMPERATURE, ATTR_DISK_MODEL, ATTR_SERIAL_NO,
ATTR_POWER_ON_HOURS, ATTR_TOTAL_CAPACITY, ATTR_HEALTH_STATUS, ATTR_POWER_ON_HOURS, ATTR_TOTAL_CAPACITY, ATTR_HEALTH_STATUS,
DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR, ZFS_POOL, ICON_ZFS,
ATTR_ZPOOL_NAME, ATTR_ZPOOL_HEALTH, ATTR_ZPOOL_SIZE,
ATTR_ZPOOL_ALLOC, ATTR_ZPOOL_FREE, ATTR_ZPOOL_CAPACITY,
ATTR_ZPOOL_FRAGMENTATION, ATTR_ZPOOL_CKPOINT, ATTR_ZPOOL_EXPANDSZ,
ATTR_ZPOOL_DEDUP, ATTR_ZPOOL_SCRUB_STATUS, ATTR_ZPOOL_SCRUB_PROGRESS,
ATTR_ZPOOL_SCRUB_SCAN_RATE, ATTR_ZPOOL_SCRUB_TIME_REMAINING,
ATTR_ZPOOL_SCRUB_ISSUED, ATTR_ZPOOL_SCRUB_REPAIRED, DEVICE_ID_ZFS
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -38,22 +44,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
existing_ids.add(temp_uid) existing_ids.add(temp_uid)
# 健康状态传感器
health_uid = f"{config_entry.entry_id}_{disk['device']}_health"
if health_uid not in existing_ids:
entities.append(
DiskSensor(
coordinator,
disk["device"],
HDD_HEALTH,
f"硬盘 {disk.get('model', '未知')} 健康状态",
health_uid,
None,
ICON_HEALTH,
disk
)
)
existing_ids.add(health_uid)
# 硬盘状态传感器 # 硬盘状态传感器
status_uid = f"{config_entry.entry_id}_{disk['device']}_status" status_uid = f"{config_entry.entry_id}_{disk['device']}_status"
@@ -244,6 +235,77 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
existing_ids.add(sensor_uid) existing_ids.add(sensor_uid)
# 添加ZFS存储池传感器
if "zpools" in coordinator.data:
for zpool in coordinator.data["zpools"]:
safe_name = zpool["name"].replace(" ", "_").replace("/", "_").replace(".", "_")
# ZFS存储池健康状态传感器
health_uid = f"{config_entry.entry_id}_zpool_{safe_name}_health"
if health_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"health",
f"ZFS {zpool['name']} 健康状态",
health_uid,
None,
ICON_ZFS,
zpool
)
)
existing_ids.add(health_uid)
# ZFS存储池容量使用率传感器
capacity_uid = f"{config_entry.entry_id}_zpool_{safe_name}_capacity"
if capacity_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"capacity",
f"ZFS {zpool['name']} 使用率",
capacity_uid,
"%",
ICON_ZFS,
zpool,
device_class=SensorDeviceClass.POWER_FACTOR,
state_class=SensorStateClass.MEASUREMENT
)
)
existing_ids.add(capacity_uid)
# ZFS存储池总大小传感器
size_uid = f"{config_entry.entry_id}_zpool_{safe_name}_size"
if size_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"size",
f"ZFS {zpool['name']} 容量",
size_uid,
None, # 动态确定单位
ICON_ZFS,
zpool
)
)
existing_ids.add(size_uid)
# ZFS存储池scrub进度传感器
scrub_uid = f"{config_entry.entry_id}_zpool_{safe_name}_scrub"
if scrub_uid not in existing_ids:
entities.append(
ZFSScrubSensor(
coordinator,
zpool["name"],
f"ZFS {zpool['name']} 检查进度",
scrub_uid
)
)
existing_ids.add(scrub_uid)
# 添加剩余内存传感器 # 添加剩余内存传感器
mem_available_uid = f"{config_entry.entry_id}_memory_available" mem_available_uid = f"{config_entry.entry_id}_memory_available"
if mem_available_uid not in existing_ids: if mem_available_uid not in existing_ids:
@@ -302,7 +364,7 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
if disk["device"] == self.device_id: if disk["device"] == self.device_id:
if self.sensor_type == HDD_TEMP: if self.sensor_type == HDD_TEMP:
temp = disk.get("temperature") temp = disk.get("temperature")
if temp is None or temp == "未知" or temp == "未检测": if temp is None or temp == "未知":
return None return None
if isinstance(temp, str): if isinstance(temp, str):
try: try:
@@ -314,11 +376,7 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
elif isinstance(temp, (int, float)): elif isinstance(temp, (int, float)):
return temp return temp
return None return None
elif self.sensor_type == HDD_HEALTH:
health = disk.get("health", "未知")
if health == "未检测":
return "未检测"
return health if health != "未知" else "未知状态"
elif self.sensor_type == HDD_STATUS: elif self.sensor_type == HDD_STATUS:
return disk.get("status", "未知") return disk.get("status", "未知")
return None return None
@@ -329,6 +387,28 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
return SensorDeviceClass.TEMPERATURE return SensorDeviceClass.TEMPERATURE
return None return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
# 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property @property
def extra_state_attributes(self): def extra_state_attributes(self):
return { return {
@@ -602,6 +682,28 @@ class MemoryAvailableSensor(CoordinatorEntity, SensorEntity):
except (TypeError, ValueError): except (TypeError, ValueError):
return None return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
# 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property @property
def extra_state_attributes(self): def extra_state_attributes(self):
"""返回总内存和已用内存GB以及原始字节值""" """返回总内存和已用内存GB以及原始字节值"""
@@ -693,5 +795,157 @@ class VolumeAvailableSensor(CoordinatorEntity, SensorEntity):
"使用率": vol_info.get("use_percent", "未知") "使用率": vol_info.get("use_percent", "未知")
} }
class ZFSPoolSensor(CoordinatorEntity, SensorEntity):
"""ZFS存储池传感器"""
return attributes def __init__(self, coordinator, zpool_name, sensor_type, name, unique_id, unit, icon, zpool_info, device_class=None, state_class=None):
super().__init__(coordinator)
self.zpool_name = zpool_name
self.sensor_type = sensor_type
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_native_unit_of_measurement = unit
self._attr_icon = icon
self.zpool_info = zpool_info
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
# 设置设备类和状态类(如果提供)
if device_class:
self._attr_device_class = device_class
if state_class:
self._attr_state_class = state_class
@property
def native_value(self):
"""返回传感器的值"""
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
if self.sensor_type == "health":
# 健康状态中英文映射
health_map = {
"ONLINE": "在线",
"DEGRADED": "降级",
"FAULTED": "故障",
"OFFLINE": "离线",
"REMOVED": "已移除",
"UNAVAIL": "不可用"
}
return health_map.get(zpool.get("health", "UNKNOWN"), zpool.get("health", "未知"))
elif self.sensor_type == "capacity":
# 返回使用率数值(去掉百分号)
capacity = zpool.get("capacity", "0%")
try:
return float(capacity.replace("%", ""))
except ValueError:
return None
elif self.sensor_type == "size":
# 返回总大小的数值部分
size = zpool.get("size", "0")
try:
# 提取数字部分
import re
match = re.search(r'([\d.]+)', size)
if match:
return float(match.group(1))
return None
except (ValueError, AttributeError):
return None
return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
# 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property
def extra_state_attributes(self):
"""返回额外的状态属性"""
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
return {
ATTR_ZPOOL_NAME: zpool.get("name", "未知"),
ATTR_ZPOOL_HEALTH: zpool.get("health", "未知"),
ATTR_ZPOOL_SIZE: zpool.get("size", "未知"),
ATTR_ZPOOL_ALLOC: zpool.get("alloc", "未知"),
ATTR_ZPOOL_FREE: zpool.get("free", "未知"),
ATTR_ZPOOL_CAPACITY: zpool.get("capacity", "未知"),
ATTR_ZPOOL_FRAGMENTATION: zpool.get("frag", "未知"),
ATTR_ZPOOL_CKPOINT: zpool.get("ckpoint", "") if zpool.get("ckpoint") != "" else "",
ATTR_ZPOOL_EXPANDSZ: zpool.get("expand_sz", "") if zpool.get("expand_sz") != "" else "",
ATTR_ZPOOL_DEDUP: zpool.get("dedup", "未知"),
"根路径": zpool.get("altroot", "") if zpool.get("altroot") != "" else "默认"
}
return {}
class ZFSScrubSensor(CoordinatorEntity, SensorEntity):
"""ZFS存储池scrub进度传感器"""
def __init__(self, coordinator, zpool_name, name, unique_id):
super().__init__(coordinator)
self.zpool_name = zpool_name
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_native_unit_of_measurement = "%"
self._attr_icon = "mdi:progress-check"
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_device_class = SensorDeviceClass.POWER_FACTOR
self._attr_state_class = SensorStateClass.MEASUREMENT
self.scrub_cache = {}
@property
def native_value(self):
"""返回scrub进度百分比"""
# 获取scrub状态信息
scrub_info = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
progress_str = scrub_info.get("scrub_progress", "0%")
try:
# 提取数字部分
if progress_str and progress_str != "0%":
return float(progress_str.replace("%", ""))
return 0.0
except (ValueError, AttributeError):
return 0.0
@property
def extra_state_attributes(self):
"""返回scrub详细状态信息"""
scrub_info = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
return {
ATTR_ZPOOL_NAME: self.zpool_name,
ATTR_ZPOOL_SCRUB_STATUS: scrub_info.get("scrub_status", "无检查"),
ATTR_ZPOOL_SCRUB_PROGRESS: scrub_info.get("scrub_progress", "0%"),
ATTR_ZPOOL_SCRUB_SCAN_RATE: scrub_info.get("scan_rate", "0/s"),
ATTR_ZPOOL_SCRUB_TIME_REMAINING: scrub_info.get("time_remaining", ""),
ATTR_ZPOOL_SCRUB_ISSUED: scrub_info.get("issued", "0"),
ATTR_ZPOOL_SCRUB_REPAIRED: scrub_info.get("repaired", "0"),
"开始时间": scrub_info.get("scrub_start_time", ""),
"扫描数据": scrub_info.get("scanned", "0"),
"检查进行中": scrub_info.get("scrub_in_progress", False)
}

View File

@@ -1,7 +1,5 @@
import re
import logging import logging
import asyncio import asyncio
import json
import os import os
from datetime import datetime from datetime import datetime
@@ -11,9 +9,40 @@ class SystemManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("system_manager") self.logger = _LOGGER.getChild("system_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # 调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.sensors_debug_path = "/config/fn_nas_debug" # 调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.sensors_debug_path = "/config/fn_nas_debug"
# 温度传感器缓存
self.cpu_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"driver_type": None,
"label": None
}
self.mobo_temp_cache = {
"hwmon_id": None,
"temp_id": None,
"label": None
}
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_system_info(self) -> dict: async def get_system_info(self) -> dict:
"""获取系统信息""" """获取系统信息"""
@@ -23,10 +52,8 @@ class SystemManager:
uptime_output = await self.coordinator.run_command("cat /proc/uptime") uptime_output = await self.coordinator.run_command("cat /proc/uptime")
if uptime_output: if uptime_output:
try: try:
# 保存原始秒数
uptime_seconds = float(uptime_output.split()[0]) uptime_seconds = float(uptime_output.split()[0])
system_info["uptime_seconds"] = uptime_seconds system_info["uptime_seconds"] = uptime_seconds
# 保存格式化字符串
system_info["uptime"] = self.format_uptime(uptime_seconds) system_info["uptime"] = self.format_uptime(uptime_seconds)
except (ValueError, IndexError): except (ValueError, IndexError):
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
@@ -35,42 +62,19 @@ class SystemManager:
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
system_info["uptime"] = "未知" system_info["uptime"] = "未知"
# 获取 sensors 命令输出使用JSON格式 # 一次性获取CPU和主板温度
sensors_output = await self.coordinator.run_command( temps = await self.get_temperatures_from_sensors()
"sensors -j 2>/dev/null || sensors 2>/dev/null || echo 'No sensor data'" system_info["cpu_temperature"] = temps["cpu"]
) system_info["motherboard_temperature"] = temps["motherboard"]
# 保存传感器数据以便调试
self.save_sensor_data_for_debug(sensors_output)
self.logger.debug("Sensors output: %s", sensors_output[:500] + "..." if len(sensors_output) > 500 else sensors_output)
# 提取 CPU 温度(改进算法)
cpu_temp = self.extract_cpu_temp(sensors_output)
system_info["cpu_temperature"] = cpu_temp
# 提取主板温度(改进算法)
mobo_temp = self.extract_mobo_temp(sensors_output)
system_info["motherboard_temperature"] = mobo_temp
# 尝试备用方法获取CPU温度
if cpu_temp == "未知":
backup_cpu_temp = await self.get_cpu_temp_fallback()
if backup_cpu_temp:
system_info["cpu_temperature"] = backup_cpu_temp
# 新增:获取内存信息
mem_info = await self.get_memory_info() mem_info = await self.get_memory_info()
system_info.update(mem_info) system_info.update(mem_info)
# 新增:获取存储卷信息
vol_info = await self.get_vol_usage() vol_info = await self.get_vol_usage()
system_info["volumes"] = vol_info system_info["volumes"] = vol_info
return system_info return system_info
except Exception as e: except Exception as e:
self.logger.error("Error getting system info: %s", str(e)) self.logger.error("Error getting system info: %s", str(e))
# 在异常处理中返回空数据
return { return {
"uptime_seconds": 0, "uptime_seconds": 0,
"uptime": "未知", "uptime": "未知",
@@ -82,73 +86,271 @@ class SystemManager:
"volumes": {} "volumes": {}
} }
def save_sensor_data_for_debug(self, sensors_output: str): async def get_temperatures_from_sensors(self) -> dict:
"""保存传感器数据以便调试""" """一次性获取CPU和主板温度"""
if not self.debug_enabled:
return
try: try:
# 创建调试目录 command = "sensors"
if not os.path.exists(self.sensors_debug_path): self._debug_log(f"执行sensors命令获取温度: {command}")
os.makedirs(self.sensors_debug_path)
# 生成文件名 sensors_output = await self.coordinator.run_command(command)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if self.debug_enabled:
filename = os.path.join(self.sensors_debug_path, f"sensors_{timestamp}.log") self._debug_log(f"sensors命令输出长度: {len(sensors_output) if sensors_output else 0}")
# 写入文件 if not sensors_output:
with open(filename, "w") as f: self._warning_log("sensors命令无输出")
f.write(sensors_output) return {"cpu": "未知", "motherboard": "未知"}
# 同时解析CPU和主板温度
cpu_temp = self.extract_cpu_temp_from_sensors(sensors_output)
mobo_temp = self.extract_mobo_temp_from_sensors(sensors_output)
# 记录获取结果
if cpu_temp != "未知":
self._info_log(f"通过sensors获取CPU温度成功: {cpu_temp}")
else:
self._warning_log("sensors命令未找到CPU温度")
if mobo_temp != "未知":
self._info_log(f"通过sensors获取主板温度成功: {mobo_temp}")
else:
self._warning_log("sensors命令未找到主板温度")
return {"cpu": cpu_temp, "motherboard": mobo_temp}
self.logger.info("Saved sensors output to %s for debugging", filename)
except Exception as e: except Exception as e:
self.logger.error("Failed to save sensor data: %s", str(e)) self._error_log(f"使用sensors命令获取温度失败: {e}")
return {"cpu": "未知", "motherboard": "未知"}
async def get_cpu_temp_fallback(self) -> str: async def get_cpu_temp_from_kernel(self) -> str:
"""备用方法获取CPU温度""" """获取CPU温度 - 向后兼容"""
self.logger.info("Trying fallback methods to get CPU temperature") temps = await self.get_temperatures_from_sensors()
return temps["cpu"]
# 方法1: 从/sys/class/thermal读取 async def get_mobo_temp_from_kernel(self) -> str:
"""获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
async def get_cpu_temp_from_sensors(self) -> str:
"""使用sensors命令获取CPU温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["cpu"]
async def get_mobo_temp_from_sensors(self) -> str:
"""使用sensors命令获取主板温度 - 向后兼容"""
temps = await self.get_temperatures_from_sensors()
return temps["motherboard"]
def extract_cpu_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取CPU温度"""
try: try:
for i in range(5): # 检查前5个可能的传感器 lines = sensors_output.split('\n')
path = f"/sys/class/thermal/thermal_zone{i}/temp" self._debug_log(f"解析sensors输出{len(lines)}")
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit():
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via thermal zone: %.1f°C", temp)
return f"{temp:.1f} °C"
except Exception:
pass
# 方法2: 从hwmon设备读取 for i, line in enumerate(lines):
line_lower = line.lower().strip()
if self.debug_enabled:
self._debug_log(f"{i+1}行: {line_lower}")
# AMD CPU温度关键词
if any(keyword in line_lower for keyword in [
"tctl", "tdie", "k10temp"
]):
self._debug_log(f"找到AMD CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取AMD CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析AMD温度失败: {e}")
continue
# Intel CPU温度关键词
if any(keyword in line_lower for keyword in [
"package id", "core 0", "coretemp"
]) and not any(exclude in line_lower for exclude in ["fan"]):
self._debug_log(f"找到Intel CPU温度行: {line}")
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取Intel CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析Intel温度失败: {e}")
continue
# 通用CPU温度模式
if ('cpu' in line_lower or 'processor' in line_lower) and '+' in line and '°c' in line_lower:
self._debug_log(f"找到通用CPU温度行: {line}")
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp = float(temp_match)
if 0 < temp < 150:
self._info_log(f"从sensors提取通用CPU温度: {temp:.1f}°C")
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self._debug_log(f"解析通用CPU温度失败: {e}")
continue
self._warning_log("未在sensors输出中找到CPU温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors CPU温度输出失败: {e}")
return "未知"
def extract_mobo_temp_from_sensors(self, sensors_output: str) -> str:
"""从sensors输出中提取主板温度"""
try: try:
for i in range(5): # 检查前5个可能的hwmon设备 lines = sensors_output.split('\n')
for j in range(5): # 检查每个设备的前5个温度传感器 self._debug_log(f"解析主板温度,共{len(lines)}")
path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit():
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via hwmon: %.1f°C", temp)
return f"{temp:.1f} °C"
except Exception:
pass
# 方法3: 使用psutil库如果可用 for i, line in enumerate(lines):
line_lower = line.lower().strip()
# 主板温度关键词 - 扩展关键词列表
if any(keyword in line_lower for keyword in [
"motherboard", "mobo", "mb", "system", "chipset",
"ambient", "temp1:", "temp2:", "temp3:", "systin",
"acpitz", "thermal", "pch", "platform", "board",
"sys", "thermal zone", "acpi", "isa"
]) and not any(cpu_keyword in line_lower for cpu_keyword in [
"cpu", "core", "package", "processor", "tctl", "tdie"
]) and not any(exclude in line_lower for exclude in ["fan", "rpm"]):
self._debug_log(f"找到可能的主板温度行: {line}")
# 多种温度格式匹配
temp_value = None
# 格式1: +45.0°C (high = +80.0°C, crit = +95.0°C)
if '+' in line and '°c' in line_lower:
try:
temp_match = line.split('+')[1].split('°')[0].strip()
temp_value = float(temp_match)
except (ValueError, IndexError):
pass
# 格式2: 45.0°C
if temp_value is None and '°c' in line_lower:
try:
# 查找数字后跟°C的模式
import re
temp_match = re.search(r'(\d+\.?\d*)\s*°c', line_lower)
if temp_match:
temp_value = float(temp_match.group(1))
except (ValueError, AttributeError):
pass
# 格式3: 45.0 C (没有°符号)
if temp_value is None and (' c' in line_lower or 'c ' in line_lower):
try:
# 查找数字后跟C的模式
import re
temp_match = re.search(r'(\d+\.?\d*)\s*c', line_lower)
if temp_match:
temp_value = float(temp_match.group(1))
except (ValueError, AttributeError):
pass
if temp_value is not None:
# 主板温度通常在15-70度之间但放宽范围到10-80度
if 10 <= temp_value <= 80:
# 存储候选值,不立即返回
import re
if not hasattr(self, '_temp_candidates'):
self._temp_candidates = []
self._temp_candidates.append((temp_value, line))
self._debug_log(f"找到有效主板温度候选: {temp_value:.1f}°C")
else:
self._debug_log(f"主板温度值超出合理范围: {temp_value:.1f}°C")
continue
# 处理候选值
if hasattr(self, '_temp_candidates') and self._temp_candidates:
# 优先选择温度在25-45度之间的值典型主板温度
ideal_candidates = [t for t in self._temp_candidates if 25 <= t[0] <= 45]
if ideal_candidates:
best_temp = ideal_candidates[0][0] # 取第一个理想候选值
else:
# 如果没有理想值,取第一个候选值
best_temp = self._temp_candidates[0][0]
self._info_log(f"从sensors提取主板温度: {best_temp:.1f}°C")
# 清理候选值
delattr(self, '_temp_candidates')
return f"{best_temp:.1f} °C"
# 如果没有找到主板温度,尝试备用方法
self._debug_log("尝试备用方法获取主板温度")
mobo_temp = self._extract_mobo_temp_fallback(sensors_output)
if mobo_temp != "未知":
return mobo_temp
self._warning_log("未在sensors输出中找到主板温度")
return "未知"
except Exception as e:
self._error_log(f"解析sensors主板温度输出失败: {e}")
return "未知"
def _extract_mobo_temp_fallback(self, sensors_output: str) -> str:
"""备用方法获取主板温度"""
try: try:
output = await self.coordinator.run_command("python3 -c 'import psutil; print(psutil.sensors_temperatures().get(\"coretemp\")[0].current)' 2>/dev/null") lines = sensors_output.split('\n')
if output and output.replace('.', '', 1).isdigit():
temp = float(output)
self.logger.info("Found CPU temperature via psutil: %.1f°C", temp)
return f"{temp:.1f} °C"
except Exception:
pass
self.logger.warning("All fallback methods failed to get CPU temperature") # 方法1: 查找非CPU的温度传感器
return "" for line in lines:
line_lower = line.lower().strip()
# 跳过明显的CPU相关行
if any(cpu_keyword in line_lower for cpu_keyword in [
"cpu", "core", "package", "processor", "tctl", "tdie"
]):
continue
# 查找温度值
if '°c' in line_lower or ' c' in line_lower:
# 尝试提取温度值
import re
temp_match = re.search(r'(\d+\.?\d*)\s*[°]?\s*c', line_lower)
if temp_match:
temp_value = float(temp_match.group(1))
if 15 <= temp_value <= 60: # 主板温度合理范围
self._info_log(f"通过备用方法获取主板温度: {temp_value:.1f}°C")
return f"{temp_value:.1f} °C"
# 方法2: 查找hwmon设备中的主板温度
for i, line in enumerate(lines):
line_lower = line.lower()
if "hwmon" in line_lower and "temp" in line_lower:
# 检查接下来的几行是否有温度值
for j in range(i+1, min(i+5, len(lines))):
next_line = lines[j].lower()
if '°c' in next_line or ' c' in next_line:
import re
temp_match = re.search(r'(\d+\.?\d*)\s*[°]?\s*c', next_line)
if temp_match:
temp_value = float(temp_match.group(1))
if 15 <= temp_value <= 60:
self._info_log(f"通过hwmon获取主板温度: {temp_value:.1f}°C")
return f"{temp_value:.1f} °C"
return "未知"
except Exception as e:
self._debug_log(f"备用方法获取主板温度失败: {e}")
return "未知"
def format_uptime(self, seconds: float) -> str: def format_uptime(self, seconds: float) -> str:
"""格式化运行时间为易读格式""" """格式化运行时间为易读格式"""
try: try:
days, remainder = divmod(seconds, 86400)
days, remainder = divmod(seconds, 86400) days, remainder = divmod(seconds, 86400)
hours, remainder = divmod(remainder, 3600) hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60) minutes, seconds = divmod(remainder, 60)
@@ -166,223 +368,6 @@ class SystemManager:
self.logger.error("Failed to format uptime: %s", str(e)) self.logger.error("Failed to format uptime: %s", str(e))
return "未知" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取 CPU 温度,优先获取 Package id 0"""
# 优先尝试获取 Package id 0 温度值
package_id_pattern = r'Package id 0:\s*\+?(\d+\.?\d*)°C'
package_match = re.search(package_id_pattern, sensors_output, re.IGNORECASE)
if package_match:
try:
package_temp = float(package_match.group(1))
self.logger.debug("优先使用 Package id 0 温度: %.1f°C", package_temp)
return f"{package_temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("Package id 0 解析错误: %s", str(e))
# 其次尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
self.logger.debug("JSON sensors data: %s", json.dumps(data, indent=2))
# 查找包含Package相关键名的温度值
for key, values in data.items():
if any(kw in key.lower() for kw in ["package", "pkg", "physical"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Package温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception as e:
self.logger.debug("JSON值错误: %s", str(e))
# 新增尝试直接获取Tdie/Tctl温度AMD CPU
for key, values in data.items():
if "k10temp" in key.lower():
for subkey, temp_value in values.items():
if "tdie" in subkey.lower() or "tctl" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Tdie/Tctl温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except:
pass
except Exception as e:
self.logger.warning("JSON解析失败: %s", str(e))
# 最后尝试其他模式
other_patterns = [
r'Package id 0:\s*\+?(\d+\.?\d*)°C', # 再次尝试确保捕获
r'CPU Temperature:\s*\+?(\d+\.?\d*)°C',
r'cpu_thermal:\s*\+?(\d+\.?\d*)°C',
r'Tdie:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'Tctl:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'PECI Agent \d:\s*\+?(\d+\.?\d*)°C',
r'Composite:\s*\+?(\d+\.?\d*)°C',
r'CPU\s+Temp:\s*\+?(\d+\.?\d*)°C',
r'k10temp-pci\S*:\s*\+?(\d+\.?\d*)°C',
r'Physical id 0:\s*\+?(\d+\.?\d*)°C'
]
for pattern in other_patterns:
match = re.search(pattern, sensors_output, re.IGNORECASE)
if match:
try:
temp = float(match.group(1))
self.logger.debug("匹配到CPU温度: %s: %.1f°C", pattern, temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError):
continue
# 如果所有方法都失败返回未知
return "未知"
def extract_temp_from_systin(self, systin_data: dict) -> float:
"""从 SYSTIN 数据结构中提取温度值"""
if not systin_data:
return None
# 尝试从不同键名获取温度值
for key in ["temp1_input", "input", "value"]:
temp = systin_data.get(key)
if temp is not None:
try:
return float(temp)
except (TypeError, ValueError):
continue
return None
def extract_mobo_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取主板温度"""
# 首先尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
# 查找包含主板相关键名的温度值
candidates = []
for key, values in data.items():
# 优先检查 SYSTIN 键
if "systin" in key.lower():
temp = self.extract_temp_from_systin(values)
if temp is not None:
return f"{temp:.1f} °C"
if any(kw in key.lower() for kw in ["system", "motherboard", "mb", "board", "pch", "chipset", "sys", "baseboard", "systin"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
candidates.append(temp_value)
self.logger.debug("Found mobo temp candidate in JSON: %s/%s = %.1f°C", key, subkey, temp_value)
except Exception:
pass
# 如果有候选值,取平均值
if candidates:
avg_temp = sum(candidates) / len(candidates)
return f"{avg_temp:.1f} °C"
# 新增:尝试直接获取 SYSTIN 的温度值
systin_temp = self.extract_temp_from_systin(data.get("nct6798-isa-02a0", {}).get("SYSTIN", {}))
if systin_temp is not None:
return f"{systin_temp:.1f} °C"
except Exception as e:
self.logger.warning("Failed to parse sensors JSON: %s", str(e))
# 改进SYSTIN提取逻辑
systin_patterns = [
r'SYSTIN:\s*[+\-]?\s*(\d+\.?\d*)\s*°C', # 标准格式
r'SYSTIN[:\s]+[+\-]?\s*(\d+\.?\d*)\s*°C', # 兼容无冒号或多余空格
r'System Temp:\s*[+\-]?\s*(\d+\.?\d*)\s*°C' # 备选方案
]
for pattern in systin_patterns:
systin_match = re.search(pattern, sensors_output, re.IGNORECASE)
if systin_match:
try:
temp = float(systin_match.group(1))
self.logger.debug("Found SYSTIN temperature: %.1f°C", temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("SYSTIN match error: %s", str(e))
continue
for line in sensors_output.splitlines():
if 'SYSTIN' in line or 'System Temp' in line:
# 改进的温度值提取正则
match = re.search(r'[+\-]?\s*(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
self.logger.debug("Found mobo temp in line: %s: %.1f°C", line.strip(), temp)
return f"{temp:.1f} °C"
except ValueError:
continue
# 如果找不到SYSTIN尝试其他主板温度模式
other_patterns = [
r'System Temp:\s*\+?(\d+\.?\d*)°C',
r'MB Temperature:\s*\+?(\d+\.?\d*)°C',
r'Motherboard:\s*\+?(\d+\.?\d*)°C',
r'SYS Temp:\s*\+?(\d+\.?\d*)°C',
r'Board Temp:\s*\+?(\d+\.?\d*)°C',
r'PCH_Temp:\s*\+?(\d+\.?\d*)°C',
r'Chipset:\s*\+?(\d+\.?\d*)°C',
r'Baseboard Temp:\s*\+?(\d+\.?\d*)°C',
r'System Temperature:\s*\+?(\d+\.?\d*)°C',
r'Mainboard Temp:\s*\+?(\d+\.?\d*)°C'
]
temp_values = []
for pattern in other_patterns:
matches = re.finditer(pattern, sensors_output, re.IGNORECASE)
for match in matches:
try:
temp = float(match.group(1))
temp_values.append(temp)
self.logger.debug("Found motherboard temperature with pattern: %s: %.1f°C", pattern, temp)
except (ValueError, IndexError):
continue
# 如果有找到温度值,取平均值
if temp_values:
avg_temp = sum(temp_values) / len(temp_values)
return f"{avg_temp:.1f} °C"
# 最后,尝试手动扫描所有温度值
fallback_candidates = []
for line in sensors_output.splitlines():
if '°C' in line:
# 跳过CPU相关的行
if any(kw in line.lower() for kw in ["core", "cpu", "package", "tccd", "k10temp", "processor", "amd", "intel", "nvme"]):
continue
# 跳过风扇和电压行
if any(kw in line.lower() for kw in ["fan", "volt", "vin", "+3.3", "+5", "+12", "vdd", "power", "crit", "max", "min"]):
continue
# 查找温度值
match = re.search(r'(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
# 合理温度范围检查 (0-80°C)
if 0 < temp < 80:
fallback_candidates.append(temp)
self.logger.debug("Fallback mobo candidate: %s -> %.1f°C", line.strip(), temp)
except ValueError:
continue
# 如果有候选值,取平均值
if fallback_candidates:
avg_temp = sum(fallback_candidates) / len(fallback_candidates)
self.logger.warning("Using fallback motherboard temperature detection")
return f"{avg_temp:.1f} °C"
return "未知"
async def get_memory_info(self) -> dict: async def get_memory_info(self) -> dict:
"""获取内存使用信息""" """获取内存使用信息"""
try: try:
@@ -408,118 +393,259 @@ class SystemManager:
} }
except Exception as e: except Exception as e:
self.logger.error("获取内存信息失败: %s", str(e)) self._error_log(f"获取内存信息失败: {str(e)}")
return {} return {}
async def get_vol_usage(self) -> dict: async def get_vol_usage(self) -> dict:
"""获取 /vol* 开头的存储卷使用信息""" """获取 /vol* 开头的存储卷使用信息,避免唤醒休眠磁盘"""
try: try:
# 优先使用字节单位 # 首先尝试智能检测活跃卷
df_output = await self.coordinator.run_command("df -B 1 /vol* 2>/dev/null") active_vols = await self.check_active_volumes()
if df_output:
return self.parse_df_bytes(df_output)
df_output = await self.coordinator.run_command("df -h /vol*") if active_vols:
if df_output: # 只查询活跃的卷,避免使用通配符可能唤醒所有磁盘
return self.parse_df_human_readable(df_output) vol_list = " ".join(active_vols)
df_output = await self.coordinator.run_command(f"df -B 1 {vol_list} 2>/dev/null")
if df_output:
result = self.parse_df_bytes(df_output)
if result: # 确保有数据返回
return result
df_output = await self.coordinator.run_command(f"df -h {vol_list} 2>/dev/null")
if df_output:
result = self.parse_df_human_readable(df_output)
if result: # 确保有数据返回
return result
# 如果智能检测失败,回退到传统方法(仅在必要时)
self._debug_log("智能卷检测无结果,回退到传统检测方法")
# 优先使用字节单位,但添加错误处理
df_output = await self.coordinator.run_command("df -B 1 /vol* 2>/dev/null || true")
if df_output and "No such file or directory" not in df_output:
result = self.parse_df_bytes(df_output)
if result:
return result
df_output = await self.coordinator.run_command("df -h /vol* 2>/dev/null || true")
if df_output and "No such file or directory" not in df_output:
result = self.parse_df_human_readable(df_output)
if result:
return result
# 最后的回退:尝试检测任何挂载的卷
mount_output = await self.coordinator.run_command("mount | grep '/vol' || true")
if mount_output:
vol_points = []
for line in mount_output.splitlines():
parts = line.split()
for part in parts:
if part.startswith('/vol') and part not in vol_points:
vol_points.append(part)
if vol_points:
self._debug_log(f"从mount输出检测到卷: {vol_points}")
vol_list = " ".join(vol_points)
df_output = await self.coordinator.run_command(f"df -h {vol_list} 2>/dev/null || true")
if df_output:
return self.parse_df_human_readable(df_output)
self._debug_log("所有存储卷检测方法都失败,返回空字典")
return {} return {}
except Exception as e: except Exception as e:
self.logger.error("获取存储卷信息失败: %s", str(e)) self._error_log(f"获取存储卷信息失败: {str(e)}")
return {} return {}
async def check_active_volumes(self) -> list:
"""检查当前活跃的存储卷,避免唤醒休眠磁盘"""
try:
# 获取所有挂载点,这个操作不会访问磁盘内容
mount_output = await self.coordinator.run_command("mount | grep '/vol' 2>/dev/null || true")
if not mount_output:
self._debug_log("未找到任何/vol挂载点")
return []
active_vols = []
for line in mount_output.splitlines():
if '/vol' in line:
# 提取挂载点
parts = line.split()
mount_point = None
# 查找挂载点(通常在 'on' 关键词之后)
try:
on_index = parts.index('on')
if on_index + 1 < len(parts):
candidate = parts[on_index + 1]
# 严格检查是否以/vol开头
if candidate.startswith('/vol'):
mount_point = candidate
except ValueError:
# 如果没有 'on' 关键词,查找以/vol开头的部分
for part in parts:
if part.startswith('/vol'):
mount_point = part
break
# 过滤挂载点:只保留根级别的/vol*挂载点
if mount_point and self.is_root_vol_mount(mount_point):
# 检查这个卷对应的磁盘是否活跃
is_active = await self.is_volume_disk_active(mount_point)
if is_active:
active_vols.append(mount_point)
self._debug_log(f"添加活跃卷: {mount_point}")
else:
# 即使磁盘不活跃,也添加到列表中,但标记为可能休眠
# 这样可以保证有基本的存储信息
active_vols.append(mount_point)
self._debug_log(f"{mount_point} 对应磁盘可能休眠,但仍包含在检测中")
else:
self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
# 去重并排序
active_vols = sorted(list(set(active_vols)))
self._debug_log(f"最终检测到的根级别/vol存储卷: {active_vols}")
return active_vols
except Exception as e:
self._debug_log(f"检查活跃存储卷失败: {e}")
return []
def is_root_vol_mount(self, mount_point: str) -> bool:
"""检查是否为根级别的/vol挂载点"""
if not mount_point or not mount_point.startswith('/vol'):
return False
# 移除开头的/vol部分进行分析
remainder = mount_point[4:] # 去掉'/vol'
# 如果remainder为空说明是/vol这是根级别
if not remainder:
return True
# 如果remainder只是数字如/vol1, /vol2这是根级别
if remainder.isdigit():
return True
# 如果remainder是单个字母或字母数字组合且没有斜杠也认为是根级别
# 例如:/vola, /volb, /vol1a 等
if '/' not in remainder and len(remainder) <= 3:
return True
# 其他情况都认为是子目录,如:
# /vol1/docker/overlay2/...
# /vol1/data/...
# /vol1/config/...
self._debug_log(f"检测到子目录挂载点: {mount_point}")
return False
def parse_df_bytes(self, df_output: str) -> dict: def parse_df_bytes(self, df_output: str) -> dict:
"""解析df命令的字节输出"""
volumes = {} volumes = {}
for line in df_output.splitlines()[1:]: try:
parts = line.split() for line in df_output.splitlines()[1:]: # 跳过标题行
if len(parts) < 6: parts = line.split()
continue if len(parts) < 6:
continue
mount_point = parts[-1] mount_point = parts[-1]
# 只处理 /vol 开头的挂载点 # 严格检查只处理根级别的 /vol 挂载点
if not mount_point.startswith("/vol"): if not self.is_root_vol_mount(mount_point):
continue self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
continue
try: try:
size_bytes = int(parts[1]) size_bytes = int(parts[1])
used_bytes = int(parts[2]) used_bytes = int(parts[2])
avail_bytes = int(parts[3]) avail_bytes = int(parts[3])
use_percent = parts[4] use_percent = parts[4]
def bytes_to_human(b): def bytes_to_human(b):
for unit in ['', 'K', 'M', 'G', 'T']: for unit in ['', 'K', 'M', 'G', 'T']:
if abs(b) < 1024.0: if abs(b) < 1024.0:
return f"{b:.1f}{unit}" return f"{b:.1f}{unit}"
b /= 1024.0 b /= 1024.0
return f"{b:.1f}P" return f"{b:.1f}P"
volumes[mount_point] = { volumes[mount_point] = {
"filesystem": parts[0], "filesystem": parts[0],
"size": bytes_to_human(size_bytes), "size": bytes_to_human(size_bytes),
"used": bytes_to_human(used_bytes), "used": bytes_to_human(used_bytes),
"available": bytes_to_human(avail_bytes), "available": bytes_to_human(avail_bytes),
"use_percent": use_percent "use_percent": use_percent
} }
except (ValueError, IndexError) as e: self._debug_log(f"添加根级别/vol存储卷信息: {mount_point}")
self.logger.debug("解析存储卷行失败: %s - %s", line, str(e)) except (ValueError, IndexError) as e:
continue self._debug_log(f"解析存储卷行失败: {line} - {str(e)}")
continue
except Exception as e:
self._error_log(f"解析df字节输出失败: {e}")
return volumes return volumes
def parse_df_human_readable(self, df_output: str) -> dict: def parse_df_human_readable(self, df_output: str) -> dict:
"""解析df命令输出"""
volumes = {} volumes = {}
for line in df_output.splitlines()[1:]: try:
parts = line.split() for line in df_output.splitlines()[1:]: # 跳过标题行
if len(parts) < 6: parts = line.split()
continue if len(parts) < 6:
continue
mount_point = parts[-1] mount_point = parts[-1]
if not mount_point.startswith("/vol"): # 严格检查只处理根级别的 /vol 挂载点
continue if not self.is_root_vol_mount(mount_point):
self._debug_log(f"跳过非根级别vol挂载点: {mount_point}")
continue
try: try:
size = parts[1] size = parts[1]
used = parts[2] used = parts[2]
avail = parts[3] avail = parts[3]
use_percent = parts[4] use_percent = parts[4]
volumes[mount_point] = { volumes[mount_point] = {
"filesystem": parts[0], "filesystem": parts[0],
"size": size, "size": size,
"used": used, "used": used,
"available": avail, "available": avail,
"use_percent": use_percent "use_percent": use_percent
} }
except (ValueError, IndexError) as e: self._debug_log(f"添加根级别/vol存储卷信息: {mount_point}")
self.logger.debug("解析存储卷行失败: %s - %s", line, str(e)) except (ValueError, IndexError) as e:
continue self._debug_log(f"解析存储卷行失败: {line} - {str(e)}")
continue
except Exception as e:
self._error_log(f"解析df输出失败: {e}")
return volumes return volumes
async def reboot_system(self): async def reboot_system(self):
"""重启系统""" """重启系统"""
self.logger.info("Initiating system reboot...") self._info_log("Initiating system reboot...")
try: try:
await self.coordinator.run_command("sudo reboot") await self.coordinator.run_command("sudo reboot")
self.logger.info("Reboot command sent") self._info_log("Reboot command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "rebooting" self.coordinator.data["system"]["status"] = "rebooting"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to reboot system: %s", str(e)) self._error_log(f"Failed to reboot system: {str(e)}")
raise raise
async def shutdown_system(self): async def shutdown_system(self):
"""关闭系统""" """关闭系统"""
self.logger.info("Initiating system shutdown...") self._info_log("Initiating system shutdown...")
try: try:
await self.coordinator.run_command("sudo shutdown -h now") await self.coordinator.run_command("sudo shutdown -h now")
self.logger.info("Shutdown command sent") self._info_log("Shutdown command sent")
if "system" in self.coordinator.data: if "system" in self.coordinator.data:
self.coordinator.data["system"]["status"] = "off" self.coordinator.data["system"]["status"] = "off"
self.coordinator.async_update_listeners() self.coordinator.async_update_listeners()
except Exception as e: except Exception as e:
self.logger.error("Failed to shutdown system: %s", str(e)) self._error_log(f"Failed to shutdown system: {str(e)}")
raise raise

View File

@@ -11,9 +11,27 @@ class UPSManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("ups_manager") self.logger = _LOGGER.getChild("ups_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # UPS调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.ups_debug_path = "/config/fn_nas_ups_debug" # UPS调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.ups_debug_path = "/config/fn_nas_ups_debug"
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_ups_info(self) -> dict: async def get_ups_info(self) -> dict:
"""获取连接的UPS信息""" """获取连接的UPS信息"""
@@ -31,7 +49,7 @@ class UPSManager:
try: try:
# 尝试使用NUT工具获取UPS信息 # 尝试使用NUT工具获取UPS信息
self.logger.debug("尝试使用NUT工具获取UPS信息") self._debug_log("尝试使用NUT工具获取UPS信息")
output = await self.coordinator.run_command("upsc -l") output = await self.coordinator.run_command("upsc -l")
if output and "No such file" not in output: if output and "No such file" not in output:
@@ -39,11 +57,11 @@ class UPSManager:
ups_names = output.splitlines() ups_names = output.splitlines()
if ups_names: if ups_names:
ups_name = ups_names[0].strip() ups_name = ups_names[0].strip()
self.logger.debug("发现UPS: %s", ups_name) self._debug_log(f"发现UPS: {ups_name}")
# 获取详细的UPS信息 # 获取详细的UPS信息
ups_details = await self.coordinator.run_command(f"upsc {ups_name}") ups_details = await self.coordinator.run_command(f"upsc {ups_name}")
self.logger.debug("UPS详细信息: %s", ups_details) self._debug_log(f"UPS详细信息: {ups_details}")
# 保存UPS数据以便调试 # 保存UPS数据以便调试
self.save_ups_data_for_debug(ups_details) self.save_ups_data_for_debug(ups_details)
@@ -51,20 +69,20 @@ class UPSManager:
# 解析UPS信息 # 解析UPS信息
return self.parse_nut_ups_info(ups_details) return self.parse_nut_ups_info(ups_details)
else: else:
self.logger.debug("未找到连接的UPS") self._debug_log("未找到连接的UPS")
else: else:
self.logger.debug("未安装NUT工具尝试备用方法") self._debug_log("未安装NUT工具尝试备用方法")
# 备用方法尝试直接读取UPS状态 # 备用方法尝试直接读取UPS状态
return await self.get_ups_info_fallback() return await self.get_ups_info_fallback()
except Exception as e: except Exception as e:
self.logger.error("获取UPS信息时出错: %s", str(e), exc_info=True) self._error_log(f"获取UPS信息时出错: {str(e)}")
return ups_info return ups_info
async def get_ups_info_fallback(self) -> dict: async def get_ups_info_fallback(self) -> dict:
"""备用方法获取UPS信息""" """备用方法获取UPS信息"""
self.logger.info("尝试备用方法获取UPS信息") self._info_log("尝试备用方法获取UPS信息")
ups_info = { ups_info = {
"status": "未知", "status": "未知",
"battery_level": "未知", "battery_level": "未知",
@@ -81,7 +99,7 @@ class UPSManager:
# 方法1: 检查USB连接的UPS # 方法1: 检查USB连接的UPS
usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'") usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'")
if usb_ups_output and "No USB UPS" not in usb_ups_output: if usb_ups_output and "No USB UPS" not in usb_ups_output:
self.logger.debug("检测到USB UPS设备: %s", usb_ups_output) self._debug_log(f"检测到USB UPS设备: {usb_ups_output}")
ups_info["ups_type"] = "USB" ups_info["ups_type"] = "USB"
# 尝试从输出中提取型号 # 尝试从输出中提取型号
@@ -111,7 +129,7 @@ class UPSManager:
return ups_info return ups_info
except Exception as e: except Exception as e:
self.logger.error("备用方法获取UPS信息失败: %s", str(e)) self._error_log(f"备用方法获取UPS信息失败: {str(e)}")
return ups_info return ups_info
def parse_nut_ups_info(self, ups_output: str) -> dict: def parse_nut_ups_info(self, ups_output: str) -> dict:
@@ -253,6 +271,6 @@ class UPSManager:
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(ups_output) f.write(ups_output)
self.logger.info("保存UPS数据到 %s 用于调试", filename) self._info_log(f"保存UPS数据到 {filename} 用于调试")
except Exception as e: except Exception as e:
self.logger.error("保存UPS数据失败: %s", str(e)) self._error_log(f"保存UPS数据失败: {str(e)}")

View File

@@ -8,15 +8,40 @@ class VMManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.vms = [] self.vms = []
self.logger = _LOGGER.getChild("vm_manager")
# 根据Home Assistant的日志级别动态设置
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_vm_list(self): async def get_vm_list(self):
"""获取虚拟机列表及其状态""" """获取虚拟机列表及其状态"""
try: try:
self._debug_log("开始获取虚拟机列表")
output = await self.coordinator.run_command("virsh list --all") output = await self.coordinator.run_command("virsh list --all")
self._debug_log(f"virsh命令输出: {output}")
self.vms = self._parse_vm_list(output) self.vms = self._parse_vm_list(output)
self._info_log(f"获取到{len(self.vms)}个虚拟机")
return self.vms return self.vms
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机列表失败: %s", str(e)) self._error_log(f"获取虚拟机列表失败: {str(e)}")
return [] return []
def _parse_vm_list(self, output): def _parse_vm_list(self, output):
@@ -43,26 +68,32 @@ class VMManager:
async def get_vm_title(self, vm_name): async def get_vm_title(self, vm_name):
"""获取虚拟机的标题""" """获取虚拟机的标题"""
try: try:
self._debug_log(f"获取虚拟机{vm_name}的标题")
output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}") output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}")
# 在XML输出中查找<title>标签 # 在XML输出中查找<title>标签
match = re.search(r'<title>(.*?)</title>', output, re.DOTALL) match = re.search(r'<title>(.*?)</title>', output, re.DOTALL)
if match: if match:
return match.group(1).strip() title = match.group(1).strip()
self._debug_log(f"虚拟机{vm_name}标题: {title}")
return title
self._debug_log(f"虚拟机{vm_name}无标题,使用名称")
return vm_name # 如果没有标题,则返回虚拟机名称 return vm_name # 如果没有标题,则返回虚拟机名称
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机标题失败: %s", str(e)) self._error_log(f"获取虚拟机标题失败: {str(e)}")
return vm_name return vm_name
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
"""控制虚拟机操作""" """控制虚拟机操作"""
valid_actions = ["start", "shutdown", "reboot"] valid_actions = ["start", "shutdown", "reboot", "destroy"]
if action not in valid_actions: if action not in valid_actions:
raise ValueError(f"无效操作: {action}") raise ValueError(f"无效操作: {action}")
command = f"virsh {action} {vm_name}" command = f"virsh {action} {vm_name}"
try: try:
self._info_log(f"执行虚拟机操作: {command}")
await self.coordinator.run_command(command) await self.coordinator.run_command(command)
self._info_log(f"虚拟机{vm_name}操作{action}成功")
return True return True
except Exception as e: except Exception as e:
_LOGGER.error("执行虚拟机操作失败: %s", str(e)) self._error_log(f"执行虚拟机操作失败: {str(e)}")
return False return False