16 Commits

Author SHA1 Message Date
xiaochao
035a7e7413 修改版本号 2025-12-04 17:50:50 +08:00
xiaochao
41d4e4cc0d modified: custom_components/fn_nas/button.py
modified:   custom_components/fn_nas/const.py
	modified:   custom_components/fn_nas/coordinator.py
	modified:   custom_components/fn_nas/disk_manager.py
	modified:   custom_components/fn_nas/sensor.py
增加ZFS存储池一致性检查控制
2025-12-04 17:47:00 +08:00
xiaochao
fcb46f429c 将硬盘健康状态改为二元传感器用于警报通知 2025-10-17 18:06:53 +08:00
xiaochao
4ae0b74e78 modified: custom_components/fn_nas/disk_manager.py
modified:   custom_components/fn_nas/manifest.json
	modified:   custom_components/fn_nas/sensor.py

优化硬盘检测逻辑
2025-10-16 18:32:42 +08:00
xiaochao
fd079c4ddf modified: README.md 2025-10-13 15:38:52 +08:00
xiaochao
9e799c8948 修改github地址 2025-10-13 15:31:49 +08:00
xiaochao
31ffc24e6a 飞牛nas系统
1、修复主板温度显示未知问题
虚拟机
1、增加强制关机按钮
2025-10-13 14:54:03 +08:00
xiaochao
905bbf9c96 修复nvme硬盘通电时间和总容量显示未知的问题 2025-10-10 19:15:43 +08:00
xiaochao
0f691e956f 修复可用空间不可用问题 2025-07-28 14:37:28 +08:00
xiaochao
25348fff9b 优化硬盘检测逻辑避免唤醒休眠硬盘 2025-07-28 14:10:23 +08:00
xiaochao
30b1b7d271 修复ha中关闭飞牛系统电脑报错问题 2025-07-28 13:50:09 +08:00
xiaochao
17e3229b29 优化了SSH连接的管理,使用连接池来复用连接,减少连接建立的开销。
主板CPU温度获取优化,改为sensors方式获取。有主板CPU温度获取错误的情况提交issues,并带上sensors命令的输出日志,我会做适配。
2025-07-21 11:56:33 +08:00
xiaochao
11d1352b20 优化飞牛sshd进程数,可能获取实体会比较慢,需要等待一段时间集成才会显示实体 2025-07-12 15:47:37 +08:00
xiaochao
57d14b48f8 修改CPU和主板温度获取方式,可能不支持服务器级硬件和特别老的主板 2025-07-12 14:47:34 +08:00
xiaochao
fae53cf5b9 修复飞牛nas关机后重启home assistant后,飞牛nas开机后部分实体显示不可用的问题
去除ssh连接数限制和缓存清理时间
2025-07-12 01:18:26 +08:00
xiaochao
f185b7e3ee 修复飞牛nas在关机到开机时,home assistant实体状态无法更新的问题 2025-07-11 23:45:05 +08:00
14 changed files with 2048 additions and 843 deletions

View File

@@ -33,7 +33,7 @@
1. 进入**HACS商店** 1. 进入**HACS商店**
2. 添加自定义存储库: 2. 添加自定义存储库:
```shell ```shell
https://github.com/anxms/fn_nas https://github.com/xiaochao99/fn_nas
``` ```
3. 搜索"飞牛NAS",点击下载 3. 搜索"飞牛NAS",点击下载
4. **重启Home Assistant服务** 4. **重启Home Assistant服务**

View File

@@ -1,62 +1,89 @@
import logging import logging
import asyncio
import asyncssh
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from .const import DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER # 导入新增常量 from homeassistant.helpers import config_validation as cv
from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, PLATFORMS, CONF_ENABLE_DOCKER,
CONF_HOST, DEFAULT_PORT
)
from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator from .coordinator import FlynasCoordinator, UPSDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
config = {**entry.data, **entry.options} config = {**entry.data, **entry.options}
coordinator = FlynasCoordinator(hass, config, entry)
coordinator = FlynasCoordinator(hass, config) # 直接初始化不阻塞等待NAS上线
await coordinator.async_config_entry_first_refresh()
_LOGGER.debug("协调器类型: %s", type(coordinator).__name__)
_LOGGER.debug("协调器是否有control_vm方法: %s", hasattr(coordinator, 'control_vm'))
_LOGGER.debug("协调器是否有vm_manager属性: %s", hasattr(coordinator, 'vm_manager'))
# 检查是否启用Docker并初始化Docker管理器如果有
enable_docker = config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
# 导入Docker管理器并初始化
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = { hass.data[DOMAIN][entry.entry_id] = {
DATA_UPDATE_COORDINATOR: coordinator, DATA_UPDATE_COORDINATOR: coordinator,
"ups_coordinator": ups_coordinator, "ups_coordinator": None,
CONF_ENABLE_DOCKER: enable_docker # 存储启用状态 CONF_ENABLE_DOCKER: coordinator.config.get(CONF_ENABLE_DOCKER, False)
} }
# 异步后台初始化
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) hass.async_create_task(async_delayed_setup(hass, entry, coordinator))
entry.async_on_unload(entry.add_update_listener(async_update_entry))
return True return True
async def async_delayed_setup(hass: HomeAssistant, entry: ConfigEntry, coordinator: FlynasCoordinator):
try:
# 不阻塞等待NAS上线直接尝试刷新数据
await coordinator.async_config_entry_first_refresh()
enable_docker = coordinator.config.get(CONF_ENABLE_DOCKER, False)
if enable_docker:
from .docker_manager import DockerManager
coordinator.docker_manager = DockerManager(coordinator)
_LOGGER.debug("已启用Docker容器监控")
else:
coordinator.docker_manager = None
_LOGGER.debug("未启用Docker容器监控")
ups_coordinator = UPSDataUpdateCoordinator(hass, coordinator.config, coordinator)
await ups_coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id]["ups_coordinator"] = ups_coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_entry))
_LOGGER.info("飞牛NAS集成初始化完成")
except Exception as e:
_LOGGER.error("飞牛NAS集成初始化失败: %s", str(e))
await coordinator.async_disconnect()
if hasattr(coordinator, '_ping_task') and coordinator._ping_task:
coordinator._ping_task.cancel()
async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry):
await hass.config_entries.async_reload(entry.entry_id) """更新配置项"""
# 卸载现有集成
await async_unload_entry(hass, entry)
# 重新加载集成
await async_setup_entry(hass, entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry): async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) """卸载集成"""
# 获取集成数据
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {})
unload_ok = True
if unload_ok: if DATA_UPDATE_COORDINATOR in domain_data:
domain_data = hass.data[DOMAIN][entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR] coordinator = domain_data[DATA_UPDATE_COORDINATOR]
ups_coordinator = domain_data["ups_coordinator"] ups_coordinator = domain_data.get("ups_coordinator")
# 关闭主协调器的SSH连接 # 卸载平台
await coordinator.async_disconnect() unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
# 关闭UPS协调器
await ups_coordinator.async_shutdown()
# 从DOMAIN中移除该entry的数据 if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id) # 关闭主协调器的SSH连接
await coordinator.async_disconnect()
# 关闭UPS协调器如果存在
if ups_coordinator:
await ups_coordinator.async_shutdown()
# 取消监控任务(如果存在)
if hasattr(coordinator, '_ping_task') and coordinator._ping_task and not coordinator._ping_task.done():
coordinator._ping_task.cancel()
# 从DOMAIN中移除该entry的数据
hass.data[DOMAIN].pop(entry.entry_id, None)
return unload_ok return unload_ok

View File

@@ -0,0 +1,72 @@
import logging
from homeassistant.components.binary_sensor import BinarySensorEntity, BinarySensorDeviceClass
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN, HDD_HEALTH, DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, config_entry, async_add_entities):
domain_data = hass.data[DOMAIN][config_entry.entry_id]
coordinator = domain_data[DATA_UPDATE_COORDINATOR]
entities = []
existing_ids = set()
# 添加硬盘健康状态二元传感器
for disk in coordinator.data.get("disks", []):
health_uid = f"{config_entry.entry_id}_{disk['device']}_health_binary"
if health_uid not in existing_ids:
entities.append(
DiskHealthBinarySensor(
coordinator,
disk["device"],
f"硬盘 {disk.get('model', '未知')} 健康状态",
health_uid,
disk
)
)
existing_ids.add(health_uid)
async_add_entities(entities)
class DiskHealthBinarySensor(CoordinatorEntity, BinarySensorEntity):
def __init__(self, coordinator, device_id, name, unique_id, disk_info):
super().__init__(coordinator)
self.device_id = device_id
self._attr_name = name
self._attr_unique_id = unique_id
self.disk_info = disk_info
self._attr_device_info = {
"identifiers": {(DOMAIN, f"disk_{device_id}")},
"name": disk_info.get("model", "未知硬盘"),
"manufacturer": "硬盘设备",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_device_class = BinarySensorDeviceClass.PROBLEM
@property
def is_on(self):
"""返回True表示有问题False表示正常"""
for disk in self.coordinator.data.get("disks", []):
if disk["device"] == self.device_id:
health = disk.get("health", "未知")
# 将健康状态映射为二元状态
if health in ["正常", "良好", "OK", "ok", "good", "Good"]:
return False # 正常状态
elif health in ["警告", "异常", "错误", "warning", "Warning", "error", "Error", "bad", "Bad"]:
return True # 有问题状态
else:
# 未知状态也视为有问题
return True
return True # 默认视为有问题
@property
def icon(self):
"""根据状态返回图标"""
if self.is_on:
return "mdi:alert-circle" # 有问题时显示警告图标
else:
return "mdi:check-circle" # 正常时显示对勾图标

View File

@@ -3,7 +3,7 @@ from homeassistant.components.button import ButtonEntity
from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import ( from .const import (
DOMAIN, DATA_UPDATE_COORDINATOR, DEVICE_ID_NAS, CONF_ENABLE_DOCKER DOMAIN, DATA_UPDATE_COORDINATOR, DEVICE_ID_NAS, CONF_ENABLE_DOCKER, DEVICE_ID_ZFS
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -18,7 +18,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
# 1. 添加NAS重启按钮 # 1. 添加NAS重启按钮
entities.append(RebootButton(coordinator, config_entry.entry_id)) entities.append(RebootButton(coordinator, config_entry.entry_id))
# 2. 添加虚拟机重启按钮 # 2. 添加虚拟机重启按钮和强制关机按钮
if "vms" in coordinator.data: if "vms" in coordinator.data:
for vm in coordinator.data["vms"]: for vm in coordinator.data["vms"]:
entities.append( entities.append(
@@ -29,6 +29,14 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
config_entry.entry_id config_entry.entry_id
) )
) )
entities.append(
VMDestroyButton(
coordinator,
vm["name"],
vm.get("title", vm["name"]),
config_entry.entry_id
)
)
# 3. 添加Docker容器重启按钮如果启用了Docker功能 # 3. 添加Docker容器重启按钮如果启用了Docker功能
if enable_docker and "docker_containers" in coordinator.data: if enable_docker and "docker_containers" in coordinator.data:
@@ -44,6 +52,19 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
) )
# 4. 添加ZFS存储池scrub按钮
if "zpools" in coordinator.data:
for zpool in coordinator.data["zpools"]:
safe_name = zpool["name"].replace(" ", "_").replace("/", "_").replace(".", "_")
entities.append(
ZpoolScrubButton(
coordinator,
zpool["name"],
safe_name,
config_entry.entry_id
)
)
async_add_entities(entities) async_add_entities(entities)
class RebootButton(CoordinatorEntity, ButtonEntity): class RebootButton(CoordinatorEntity, ButtonEntity):
@@ -162,4 +183,97 @@ class DockerContainerRestartButton(CoordinatorEntity, ButtonEntity):
"容器名称": self.container_name, "容器名称": self.container_name,
"操作类型": "重启容器", "操作类型": "重启容器",
"提示": "重启操作可能需要一些时间完成" "提示": "重启操作可能需要一些时间完成"
}
class VMDestroyButton(CoordinatorEntity, ButtonEntity):
def __init__(self, coordinator, vm_name, vm_title, entry_id):
super().__init__(coordinator)
self.vm_name = vm_name
self.vm_title = vm_title
self._attr_name = f"{vm_title} 强制关机"
self._attr_unique_id = f"{entry_id}_flynas_vm_{vm_name}_destroy"
self._attr_device_info = {
"identifiers": {(DOMAIN, f"vm_{vm_name}")},
"name": vm_title,
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_icon = "mdi:power-off" # 使用关机图标
self.vm_manager = coordinator.vm_manager if hasattr(coordinator, 'vm_manager') else None
async def async_press(self):
"""强制关机虚拟机"""
if not self.vm_manager:
_LOGGER.error("vm_manager不可用无法强制关机虚拟机 %s", self.vm_name)
return
try:
success = await self.vm_manager.control_vm(self.vm_name, "destroy")
if success:
# 更新状态为"强制关机中"
for vm in self.coordinator.data["vms"]:
if vm["name"] == self.vm_name:
vm["state"] = "destroying"
self.async_write_ha_state()
# 在下次更新时恢复实际状态
self.coordinator.async_add_listener(self.async_write_ha_state)
except Exception as e:
_LOGGER.error("强制关机虚拟机时出错: %s", str(e), exc_info=True)
@property
def extra_state_attributes(self):
return {
"虚拟机名称": self.vm_name,
"操作类型": "强制关机",
"警告": "此操作会强制关闭虚拟机,可能导致数据丢失",
"提示": "仅在虚拟机无法正常关机时使用此功能"
}
class ZpoolScrubButton(CoordinatorEntity, ButtonEntity):
def __init__(self, coordinator, zpool_name, safe_name, entry_id):
super().__init__(coordinator)
self.zpool_name = zpool_name
self.safe_name = safe_name
self._attr_name = f"ZFS {zpool_name} 数据检查"
self._attr_unique_id = f"{entry_id}_zpool_{safe_name}_scrub"
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_icon = "mdi:harddisk-check"
@property
def available(self):
"""检查按钮是否可用当scrub进行中时不可点击"""
scrub_status = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
return not scrub_status.get("scrub_in_progress", False)
async def async_press(self):
"""执行ZFS存储池数据一致性检查"""
try:
# 检查是否已经有scrub在进行中
scrub_status = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
if scrub_status.get("scrub_in_progress", False):
self.coordinator.logger.warning(f"ZFS存储池 {self.zpool_name} 已在进行数据一致性检查")
return
success = await self.coordinator.scrub_zpool(self.zpool_name)
if success:
self.coordinator.logger.info(f"ZFS存储池 {self.zpool_name} 数据一致性检查启动成功")
# 立即刷新状态以更新按钮状态
await self.coordinator.async_request_refresh()
else:
self.coordinator.logger.error(f"ZFS存储池 {self.zpool_name} 数据一致性检查启动失败")
except Exception as e:
self.coordinator.logger.error(f"启动ZFS存储池 {self.zpool_name} 数据一致性检查时出错: {str(e)}", exc_info=True)
@property
def extra_state_attributes(self):
return {
"存储池名称": self.zpool_name,
"操作类型": "数据一致性检查",
"说明": "对ZFS存储池执行数据完整性和一致性验证",
"提示": "此操作可能需要较长时间完成,建议在低峰期执行"
} }

View File

@@ -18,11 +18,7 @@ from .const import (
CONF_UPS_SCAN_INTERVAL, CONF_UPS_SCAN_INTERVAL,
DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ROOT_PASSWORD,
CONF_ENABLE_DOCKER, CONF_ENABLE_DOCKER
CONF_MAX_CONNECTIONS,
DEFAULT_MAX_CONNECTIONS,
CONF_CACHE_TIMEOUT,
DEFAULT_CACHE_TIMEOUT
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -75,17 +71,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
default=DEFAULT_SCAN_INTERVAL default=DEFAULT_SCAN_INTERVAL
): int, ): int,
# 添加启用Docker的选项 # 添加启用Docker的选项
vol.Optional(CONF_ENABLE_DOCKER, default=False): bool, vol.Optional(CONF_ENABLE_DOCKER, default=False): bool
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=DEFAULT_MAX_CONNECTIONS
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=DEFAULT_CACHE_TIMEOUT
): int
}) })
return self.async_show_form( return self.async_show_form(
@@ -118,9 +104,6 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self.ssh_config[CONF_MAC] = selected_mac self.ssh_config[CONF_MAC] = selected_mac
# 确保将CONF_ENABLE_DOCKER也存入配置项 # 确保将CONF_ENABLE_DOCKER也存入配置项
self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker self.ssh_config[CONF_ENABLE_DOCKER] = enable_docker
# 添加连接池和缓存配置
self.ssh_config[CONF_MAX_CONNECTIONS] = self.ssh_config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.ssh_config[CONF_CACHE_TIMEOUT] = self.ssh_config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
return self.async_create_entry( return self.async_create_entry(
title=self.ssh_config[CONF_HOST], title=self.ssh_config[CONF_HOST],
data=self.ssh_config data=self.ssh_config
@@ -237,17 +220,7 @@ class OptionsFlowHandler(config_entries.OptionsFlow):
vol.Optional( vol.Optional(
CONF_ENABLE_DOCKER, CONF_ENABLE_DOCKER,
default=data.get(CONF_ENABLE_DOCKER, False) default=data.get(CONF_ENABLE_DOCKER, False)
): bool, ): bool
# 新增:最大连接数
vol.Optional(
CONF_MAX_CONNECTIONS,
default=data.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
): int,
# 新增:缓存超时时间(分钟)
vol.Optional(
CONF_CACHE_TIMEOUT,
default=data.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT)
): int
}) })
return self.async_show_form( return self.async_show_form(

View File

@@ -3,6 +3,7 @@ from homeassistant.const import Platform
DOMAIN = "fn_nas" DOMAIN = "fn_nas"
PLATFORMS = [ PLATFORMS = [
Platform.SENSOR, Platform.SENSOR,
Platform.BINARY_SENSOR,
Platform.SWITCH, Platform.SWITCH,
Platform.BUTTON Platform.BUTTON
] ]
@@ -32,7 +33,8 @@ HDD_HEALTH = "health"
HDD_STATUS = "status" HDD_STATUS = "status"
SYSTEM_INFO = "system" SYSTEM_INFO = "system"
FAN_SPEED = "fan_speed" FAN_SPEED = "fan_speed"
UPS_INFO = "ups_info" UPS_INFO = "ups_info"
ZFS_POOL = "zfs_pool"
ATTR_DISK_MODEL = "硬盘型号" ATTR_DISK_MODEL = "硬盘型号"
ATTR_SERIAL_NO = "序列号" ATTR_SERIAL_NO = "序列号"
@@ -48,14 +50,28 @@ ICON_TEMPERATURE = "mdi:thermometer"
ICON_HEALTH = "mdi:heart-pulse" ICON_HEALTH = "mdi:heart-pulse"
ICON_POWER = "mdi:power" ICON_POWER = "mdi:power"
ICON_RESTART = "mdi:restart" ICON_RESTART = "mdi:restart"
ICON_ZFS = "mdi:harddisk-plus"
# 设备标识符常量 # 设备标识符常量
DEVICE_ID_NAS = "flynas_nas_system" DEVICE_ID_NAS = "flynas_nas_system"
DEVICE_ID_UPS = "flynas_ups" DEVICE_ID_UPS = "flynas_ups"
DEVICE_ID_ZFS = "flynas_zfs"
CONF_NETWORK_MACS = "network_macs" CONF_NETWORK_MACS = "network_macs"
# 新增配置常量 # ZFS相关常量
CONF_MAX_CONNECTIONS = "max_connections" ATTR_ZPOOL_NAME = "存储池名称"
CONF_CACHE_TIMEOUT = "cache_timeout" ATTR_ZPOOL_HEALTH = "健康状态"
DEFAULT_MAX_CONNECTIONS = 3 ATTR_ZPOOL_SIZE = "总大小"
DEFAULT_CACHE_TIMEOUT = 30 # 单位:分钟 ATTR_ZPOOL_ALLOC = "已使用"
ATTR_ZPOOL_FREE = "可用空间"
ATTR_ZPOOL_CAPACITY = "使用率"
ATTR_ZPOOL_FRAGMENTATION = "碎片率"
ATTR_ZPOOL_CKPOINT = "检查点"
ATTR_ZPOOL_EXPANDSZ = "扩展大小"
ATTR_ZPOOL_DEDUP = "重复数据删除率"
ATTR_ZPOOL_SCRUB_STATUS = "检查状态"
ATTR_ZPOOL_SCRUB_PROGRESS = "检查进度"
ATTR_ZPOOL_SCRUB_SCAN_RATE = "扫描速度"
ATTR_ZPOOL_SCRUB_TIME_REMAINING = "剩余时间"
ATTR_ZPOOL_SCRUB_ISSUED = "已发出数据"
ATTR_ZPOOL_SCRUB_REPAIRED = "已修复数据"

View File

@@ -1,8 +1,7 @@
import logging import logging
import re
import asyncssh
import asyncio import asyncio
import time import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -11,8 +10,7 @@ from .const import (
DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, DOMAIN, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD,
CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL, CONF_IGNORE_DISKS, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL,
DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL, DEFAULT_PORT, CONF_MAC, CONF_UPS_SCAN_INTERVAL, DEFAULT_UPS_SCAN_INTERVAL,
CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER, CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS, CONF_ROOT_PASSWORD, CONF_ENABLE_DOCKER
CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
) )
from .disk_manager import DiskManager from .disk_manager import DiskManager
from .system_manager import SystemManager from .system_manager import SystemManager
@@ -23,8 +21,10 @@ from .docker_manager import DockerManager
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class FlynasCoordinator(DataUpdateCoordinator): class FlynasCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config) -> None: def __init__(self, hass: HomeAssistant, config, config_entry) -> None:
self.config = config self.config = config
self.config_entry = config_entry
self.hass = hass
self.host = config[CONF_HOST] self.host = config[CONF_HOST]
self.port = config.get(CONF_PORT, DEFAULT_PORT) self.port = config.get(CONF_PORT, DEFAULT_PORT)
self.username = config[CONF_USERNAME] self.username = config[CONF_USERNAME]
@@ -32,25 +32,19 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.root_password = config.get(CONF_ROOT_PASSWORD) self.root_password = config.get(CONF_ROOT_PASSWORD)
self.mac = config.get(CONF_MAC, "") self.mac = config.get(CONF_MAC, "")
self.enable_docker = config.get(CONF_ENABLE_DOCKER, False) self.enable_docker = config.get(CONF_ENABLE_DOCKER, False)
self.max_connections = config.get(CONF_MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS)
self.cache_timeout = config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
self.docker_manager = DockerManager(self) if self.enable_docker else None self.docker_manager = DockerManager(self) if self.enable_docker else None
self.ssh_pool = [] # SSH连接池 self.ssh = None
self.active_commands = 0 # 当前活动命令数 self.ssh_closed = True
self.ssh_closed = True # 初始状态为关闭 # SSH连接池管理
self.use_sudo = False # 初始化use_sudo属性 self.ssh_pool = []
self.ssh_pool_size = 3 # 连接池大小
self.ssh_pool_lock = asyncio.Lock()
self.ups_manager = UPSManager(self)
self.vm_manager = VMManager(self)
self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -64,249 +58,419 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.disk_manager = DiskManager(self) self.disk_manager = DiskManager(self)
self.system_manager = SystemManager(self) self.system_manager = SystemManager(self)
self.ups_manager = UPSManager(self) self._system_online = False
self.vm_manager = VMManager(self) self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
async def get_ssh_connection(self): # 添加日志方法
"""从连接池获取或创建SSH连接""" self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
# 如果连接池中有可用连接且没有超过最大活动命令数
while len(self.ssh_pool) > 0 and self.active_commands < self.max_connections: def get_default_data(self):
conn = self.ssh_pool.pop() """返回默认的数据结构"""
if await self.is_connection_alive(conn): return {
self.active_commands += 1 "disks": [],
return conn "system": {
else: "uptime": "未知",
await self.close_connection(conn) "cpu_temperature": "未知",
"motherboard_temperature": "未知",
# 如果没有可用连接,创建新连接 "status": "off"
if self.active_commands < self.max_connections: },
try: "ups": {},
conn = await asyncssh.connect( "vms": [],
self.host, "docker_containers": [],
port=self.port, "zpools": [],
username=self.username, "scrub_status": {}
password=self.password, }
known_hosts=None,
connect_timeout=10 def _debug_log(self, message: str):
) """只在调试模式下输出详细日志"""
self.active_commands += 1 if self.debug_enabled:
self.ssh_closed = False _LOGGER.debug(message)
# 确定是否需要sudo权限
await self.determine_sudo_setting(conn)
return conn
except Exception as e:
_LOGGER.error("创建SSH连接失败: %s", str(e), exc_info=True)
raise UpdateFailed(f"SSH连接失败: {str(e)}")
else:
await asyncio.sleep(0.1)
return await self.get_ssh_connection()
async def determine_sudo_setting(self, conn): def _info_log(self, message: str):
"""确定是否需要使用sudo权限""" """重要信息日志"""
_LOGGER.info(message)
def _warning_log(self, message: str):
"""警告日志"""
_LOGGER.warning(message)
def _error_log(self, message: str):
"""错误日志"""
_LOGGER.error(message)
async def get_ssh_connection(self):
"""从连接池获取可用的SSH连接"""
async with self.ssh_pool_lock:
# 检查现有连接
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
# 测试连接是否活跃
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True) # 标记为使用中
self._debug_log(f"复用连接池中的连接 {i}")
return ssh, i
except Exception:
# 连接失效,移除
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
# 如果连接池未满,创建新连接
if len(self.ssh_pool) < self.ssh_pool_size:
try:
ssh = await asyncssh.connect(
self.host,
port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
# 检查并设置权限状态
await self._setup_connection_permissions(ssh)
connection_id = len(self.ssh_pool)
self.ssh_pool.append((ssh, True))
self._debug_log(f"创建新的SSH连接 {connection_id}")
return ssh, connection_id
except Exception as e:
self._debug_log(f"创建SSH连接失败: {e}")
raise
# 连接池满且所有连接都在使用中,等待可用连接
self._debug_log("所有连接都在使用中,等待可用连接...")
for _ in range(50): # 最多等待5秒
await asyncio.sleep(0.1)
for i, (ssh, in_use) in enumerate(self.ssh_pool):
if not in_use:
try:
await asyncio.wait_for(ssh.run("echo 'test'", timeout=1), timeout=2)
self.ssh_pool[i] = (ssh, True)
self._debug_log(f"等待后获得连接 {i}")
return ssh, i
except Exception:
try:
ssh.close()
except:
pass
self.ssh_pool.pop(i)
break
raise Exception("无法获取SSH连接")
async def _setup_connection_permissions(self, ssh):
"""为新连接设置权限状态"""
try: try:
# 检查当前用户是否root # 检查是否root用户
result = await conn.run("id -u", timeout=5) result = await ssh.run("id -u", timeout=3)
if result.stdout.strip() == "0": if result.stdout.strip() == "0":
_LOGGER.debug("当前用户是root不需要sudo") self._debug_log("当前用户是 root")
self.use_sudo = False self.use_sudo = False
return return
except Exception as e:
_LOGGER.warning("检查用户ID失败: %s", str(e)) # 尝试切换到root会话
if self.root_password:
# 检查是否可以使用密码sudo try:
try: await ssh.run(
result = await conn.run( f"echo '{self.root_password}' | sudo -S -i",
f"echo '{self.password}' | sudo -S whoami", input=self.root_password + "\n",
input=self.password + "\n", timeout=5
timeout=10 )
) whoami = await ssh.run("whoami")
if "root" in result.stdout: if "root" in whoami.stdout:
_LOGGER.info("可以使用用户密码sudo") self._info_log("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = True self.use_sudo = False
return return
except Exception as e: except Exception:
_LOGGER.debug("无法使用用户密码sudo: %s", str(e)) pass
# 如果有root密码尝试使用root密码sudo # 尝试使用登录密码sudo
if self.root_password:
try: try:
result = await conn.run( await ssh.run(
f"echo '{self.root_password}' | sudo -S whoami", f"echo '{self.password}' | sudo -S -i",
input=self.root_password + "\n", input=self.password + "\n",
timeout=10 timeout=5
) )
if "root" in result.stdout: whoami = await ssh.run("whoami")
_LOGGER.info("可以使用root密码sudo") if "root" in whoami.stdout:
self.use_sudo = True self._info_log("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False
return return
except Exception as e: except Exception:
_LOGGER.debug("无法使用root密码sudo: %s", str(e)) pass
_LOGGER.warning("无法获取root权限将使用普通用户执行命令") # 设置为使用sudo模式
self.use_sudo = False self.use_sudo = True
self._debug_log("设置为使用sudo模式")
async def release_ssh_connection(self, conn):
"""释放连接回连接池"""
self.active_commands -= 1
if conn and not conn.is_closed():
if len(self.ssh_pool) < self.max_connections:
self.ssh_pool.append(conn)
else:
await self.close_connection(conn)
else:
# 如果连接已经关闭,直接丢弃
pass
async def close_connection(self, conn):
"""关闭SSH连接"""
try:
if conn and not conn.is_closed():
conn.close()
except Exception as e:
_LOGGER.debug("关闭SSH连接时出错: %s", str(e))
async def is_connection_alive(self, conn) -> bool:
"""检查连接是否存活"""
try:
# 发送一个简单的命令测试连接
result = await conn.run("echo 'connection_test'", timeout=2)
return result.exit_status == 0 and "connection_test" in result.stdout
except (asyncssh.Error, TimeoutError, ConnectionResetError):
return False
async def run_command(self, command: str, retries=2) -> str:
"""使用连接池执行命令"""
conn = None
try:
conn = await self.get_ssh_connection()
# 根据sudo设置执行命令
if self.use_sudo:
password = self.root_password if self.root_password else self.password
if password:
full_command = f"sudo -S {command}"
result = await conn.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await conn.run(full_command, check=True)
else:
result = await conn.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.error("Command failed: %s (exit %d)", command, e.exit_status)
# 连接可能已损坏,关闭它
await self.close_connection(conn)
conn = None
if retries > 0:
return await self.run_command(command, retries-1)
else:
raise UpdateFailed(f"Command failed: {command}") from e
except asyncssh.Error as e:
_LOGGER.error("SSH连接错误: %s", str(e))
await self.close_connection(conn)
conn = None
if retries > 0:
return await self.run_command(command, retries-1)
else:
raise UpdateFailed(f"SSH错误: {str(e)}") from e
except Exception as e: except Exception as e:
_LOGGER.error("意外错误: %s", str(e), exc_info=True) self._debug_log(f"设置连接权限失败: {e}")
await self.close_connection(conn) self.use_sudo = True
conn = None
if retries > 0: async def release_ssh_connection(self, connection_id):
return await self.run_command(command, retries-1) """释放SSH连接回连接池"""
else: async with self.ssh_pool_lock:
raise UpdateFailed(f"意外错误: {str(e)}") from e if 0 <= connection_id < len(self.ssh_pool):
finally: ssh, _ = self.ssh_pool[connection_id]
if conn: self.ssh_pool[connection_id] = (ssh, False) # 标记为可用
await self.release_ssh_connection(conn) self._debug_log(f"释放SSH连接 {connection_id}")
async def close_all_ssh_connections(self):
"""关闭所有SSH连接"""
async with self.ssh_pool_lock:
for ssh, _ in self.ssh_pool:
try:
ssh.close()
except:
pass
self.ssh_pool.clear()
self._debug_log("已关闭所有SSH连接")
async def async_connect(self): async def async_connect(self):
"""建立SSH连接使用连接池""" """建立并保持持久SSH连接 - 兼容旧代码"""
# 连接池已处理连接,此方法现在主要用于初始化 try:
return True ssh, connection_id = await self.get_ssh_connection()
await self.release_ssh_connection(connection_id)
async def is_ssh_connected(self) -> bool: return True
"""检查是否有活动的SSH连接""" except Exception:
return len(self.ssh_pool) > 0 or self.active_commands > 0 return False
async def async_disconnect(self): async def async_disconnect(self):
"""关闭所有SSH连接""" """断开SSH连接 - 兼容旧代码"""
# 关闭连接池中的所有连接 await self.close_all_ssh_connections()
for conn in self.ssh_pool:
await self.close_connection(conn)
self.ssh_pool = []
self.active_commands = 0
self.ssh_closed = True
self.use_sudo = False # 重置sudo设置
async def _async_update_data(self): async def run_command(self, command: str, retries=2) -> str:
_LOGGER.debug("Starting data update...") """执行SSH命令使用连接池"""
# 系统离线时直接返回空字符串
if not self._system_online:
return ""
ssh = None
connection_id = None
try: try:
if await self.is_ssh_connected(): # 从连接池获取连接
status = "on" ssh, connection_id = await self.get_ssh_connection()
else:
if not await self.async_connect(): # 构建完整命令
status = "off" if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else: else:
status = "on" full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
# 使用已初始化的管理器获取数据 return result.stdout.strip()
disks = await self.disk_manager.get_disks_info()
except Exception as e:
self._debug_log(f"命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
# 释放连接回连接池
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def run_command_direct(self, command: str) -> str:
"""直接执行命令,获取独立连接 - 用于并发任务"""
if not self._system_online:
return ""
ssh = None
connection_id = None
try:
ssh, connection_id = await self.get_ssh_connection()
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await ssh.run(full_command, timeout=10)
else:
result = await ssh.run(command, timeout=10)
return result.stdout.strip()
except Exception as e:
self._debug_log(f"直接命令执行失败: {command}, 错误: {str(e)}")
return ""
finally:
if connection_id is not None:
await self.release_ssh_connection(connection_id)
async def ping_system(self) -> bool:
"""轻量级系统状态检测"""
# 对于本地主机直接返回True
if self.host in ['localhost', '127.0.0.1']:
return True
try:
# 使用异步ping检测减少超时时间
proc = await asyncio.create_subprocess_exec(
'ping', '-c', '1', '-W', '1', self.host,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await asyncio.wait_for(proc.wait(), timeout=2) # 总超时时间2秒
return proc.returncode == 0
except Exception:
return False
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self._debug_log(f"启动系统状态监控,每{self._retry_interval}秒检测一次")
# 使用指数退避策略,避免频繁检测
check_interval = self._retry_interval
max_interval = 300 # 最大5分钟检测一次
while True:
await asyncio.sleep(check_interval)
if await self.ping_system():
self._info_log("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
else:
# 系统仍然离线,增加检测间隔(指数退避)
check_interval = min(check_interval * 1.5, max_interval)
self._debug_log(f"系统仍离线,下次检测间隔: {check_interval}")
async def _async_update_data(self):
"""数据更新入口,优化命令执行频率"""
self._debug_log("开始数据更新...")
is_online = await self.ping_system()
self._system_online = is_online
if not is_online:
self._debug_log("系统离线,跳过数据更新")
# 启动后台监控任务
if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.close_all_ssh_connections()
return self.get_default_data()
# 系统在线处理
try:
# 预热连接池并确保权限设置正确
await self.async_connect()
# 获取系统状态信息
status = "on"
# 串行获取信息以确保稳定性
self._debug_log("开始获取系统信息...")
system = await self.system_manager.get_system_info() system = await self.system_manager.get_system_info()
ups_info = await self.ups_manager.get_ups_info() self._debug_log("系统信息获取完成")
vms = await self.vm_manager.get_vm_list()
# 获取虚拟机标题 self._debug_log("开始获取磁盘信息...")
disks = await self.disk_manager.get_disks_info()
self._debug_log(f"磁盘信息获取完成,数量: {len(disks)}")
self._debug_log("开始获取ZFS存储池信息...")
zpools = await self.disk_manager.get_zpools()
self._debug_log(f"ZFS存储池信息获取完成数量: {len(zpools)}")
# 获取所有ZFS存储池的scrub状态
scrub_status = {}
for zpool in zpools:
self._debug_log(f"开始获取存储池 {zpool['name']} 的scrub状态...")
scrub_info = await self.disk_manager.get_zpool_status(zpool['name'])
scrub_status[zpool['name']] = scrub_info
self._debug_log(f"存储池 {zpool['name']} scrub状态获取完成")
self._debug_log("开始获取UPS信息...")
ups_info = await self.ups_manager.get_ups_info()
self._debug_log("UPS信息获取完成")
self._debug_log("开始获取虚拟机信息...")
vms = await self.vm_manager.get_vm_list()
self._debug_log(f"虚拟机信息获取完成,数量: {len(vms)}")
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) try:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
# 获取Docker容器信息如果启用 except Exception as e:
self._debug_log(f"获取VM标题失败 {vm['name']}: {e}")
vm["title"] = vm["name"]
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker and hasattr(self, 'docker_manager') and self.docker_manager: if self.enable_docker and self.docker_manager:
docker_containers = await self.docker_manager.get_containers() self._debug_log("开始获取Docker信息...")
try:
docker_containers = await self.docker_manager.get_containers()
self._debug_log(f"Docker信息获取完成数量: {len(docker_containers)}")
except Exception as e:
self._debug_log(f"Docker信息获取失败: {e}")
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers,
"zpools": zpools,
"scrub_status": scrub_status
} }
self._debug_log(f"数据更新完成: disks={len(disks)}, vms={len(vms)}, containers={len(docker_containers)}")
return data return data
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update data: %s", str(e), exc_info=True) self._error_log(f"数据更新失败: {str(e)}")
return { self._system_online = False
"disks": [], if not self._ping_task or self._ping_task.done():
"system": { self._ping_task = asyncio.create_task(self._monitor_system_status())
"uptime": "未知",
"cpu_temperature": "未知", return self.get_default_data()
"motherboard_temperature": "未知",
"status": "off" async def shutdown_system(self):
}, """关闭系统 - 委托给SystemManager"""
"ups": {}, return await self.system_manager.shutdown_system()
"vms": []
}
async def reboot_system(self): async def reboot_system(self):
await self.system_manager.reboot_system() """重启系统 - 委托给SystemManager"""
return await self.system_manager.reboot_system()
async def shutdown_system(self): async def scrub_zpool(self, pool_name: str) -> bool:
await self.system_manager.shutdown_system() """执行ZFS存储池数据一致性检查"""
if self.data and "system" in self.data: try:
self.data["system"]["status"] = "off" self._debug_log(f"开始对ZFS存储池 {pool_name} 执行scrub操作")
self.async_update_listeners() command = f"zpool scrub {pool_name}"
result = await self.run_command(command)
if result and not result.lower().startswith("cannot"):
self._debug_log(f"ZFS存储池 {pool_name} scrub操作启动成功")
return True
else:
self.logger.error(f"ZFS存储池 {pool_name} scrub操作失败: {result}")
return False
except Exception as e:
self.logger.error(f"执行ZFS存储池 {pool_name} scrub操作时出错: {str(e)}", exc_info=True)
return False
class UPSDataUpdateCoordinator(DataUpdateCoordinator): class UPSDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, config, main_coordinator): def __init__(self, hass: HomeAssistant, config, main_coordinator):
@@ -326,19 +490,20 @@ class UPSDataUpdateCoordinator(DataUpdateCoordinator):
self.ups_manager = UPSManager(main_coordinator) self.ups_manager = UPSManager(main_coordinator)
async def _async_update_data(self): async def _async_update_data(self):
# 如果主协调器检测到系统离线跳过UPS更新
if not self.main_coordinator._system_online:
return {}
try: try:
return await self.ups_manager.get_ups_info() return await self.ups_manager.get_ups_info()
except Exception as e: except Exception as e:
_LOGGER.error("Failed to update UPS data: %s", str(e), exc_info=True) _LOGGER.debug("UPS数据更新失败: %s", str(e))
return {} return {}
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
try: try:
if not hasattr(self, 'vm_manager'): result = await self.main_coordinator.vm_manager.control_vm(vm_name, action)
self.vm_manager = VMManager(self)
result = await self.vm_manager.control_vm(vm_name, action)
return result return result
except Exception as e: except Exception as e:
_LOGGER.error("虚拟机控制失败: %s", str(e), exc_info=True) _LOGGER.debug("虚拟机控制失败: %s", str(e))
return False return False

View File

@@ -1,8 +1,7 @@
import re import re
import logging import logging
import asyncio import asyncio
import time from .const import CONF_IGNORE_DISKS
from .const import CONF_IGNORE_DISKS, CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -15,10 +14,8 @@ class DiskManager:
self.disk_full_info_cache = {} # 缓存磁盘完整信息 self.disk_full_info_cache = {} # 缓存磁盘完整信息
self.first_run = True # 首次运行标志 self.first_run = True # 首次运行标志
self.initial_detection_done = False # 首次完整检测完成标志 self.initial_detection_done = False # 首次完整检测完成标志
self.cache_expiry = {} # 缓存过期时间(时间戳) self.disk_io_stats_cache = {} # 缓存磁盘I/O统计信息
# 获取缓存超时配置(分钟),转换为秒
self.cache_timeout = self.coordinator.config.get(CONF_CACHE_TIMEOUT, DEFAULT_CACHE_TIMEOUT) * 60
def extract_value(self, text: str, patterns, default="未知", format_func=None): def extract_value(self, text: str, patterns, default="未知", format_func=None):
if not text: if not text:
return default return default
@@ -39,84 +36,251 @@ class DiskManager:
self.logger.debug("No match found for patterns: %s", patterns) self.logger.debug("No match found for patterns: %s", patterns)
return default return default
async def check_disk_active(self, device: str, window: int = 30) -> bool: def _format_capacity(self, capacity_str: str) -> str:
"""将容量字符串格式化为GB或TB格式"""
if not capacity_str or capacity_str == "未知":
return "未知"
try:
# 处理逗号分隔的数字(如 "1,000,204,886,016 bytes"
capacity_str = capacity_str.replace(',', '')
# 提取数字和单位
import re
# 匹配数字和单位(如 "500 GB", "1.0 TB", "1000204886016 bytes", "1,000,204,886,016 bytes"
match = re.search(r'(\d+(?:\.\d+)?)\s*([KMGT]?B|bytes?)', capacity_str, re.IGNORECASE)
if not match:
# 如果没有匹配到单位,尝试直接提取数字
numbers = re.findall(r'\d+', capacity_str)
if numbers:
# 取最大的数字(通常是容量值)
value = float(max(numbers, key=len))
bytes_value = value # 假设为字节
else:
return capacity_str
else:
value = float(match.group(1))
unit = match.group(2).upper()
# 转换为字节
if unit in ['B', 'BYTE', 'BYTES']:
bytes_value = value
elif unit in ['KB', 'KIB']:
bytes_value = value * 1024
elif unit in ['MB', 'MIB']:
bytes_value = value * 1024 * 1024
elif unit in ['GB', 'GIB']:
bytes_value = value * 1024 * 1024 * 1024
elif unit in ['TB', 'TIB']:
bytes_value = value * 1024 * 1024 * 1024 * 1024
else:
bytes_value = value # 默认假设为字节
# 转换为合适的单位
if bytes_value >= 1024**4: # 1 TB
return f"{bytes_value / (1024**4):.1f} TB"
elif bytes_value >= 1024**3: # 1 GB
return f"{bytes_value / (1024**3):.1f} GB"
elif bytes_value >= 1024**2: # 1 MB
return f"{bytes_value / (1024**2):.1f} MB"
elif bytes_value >= 1024: # 1 KB
return f"{bytes_value / 1024:.1f} KB"
else:
return f"{bytes_value:.1f} B"
except Exception as e:
self.logger.debug(f"格式化容量失败: {capacity_str}, 错误: {e}")
return capacity_str
async def check_disk_active(self, device: str, window: int = 30, current_status: str = None) -> bool:
"""检查硬盘在指定时间窗口内是否有活动""" """检查硬盘在指定时间窗口内是否有活动"""
try: try:
stat_path = f"/sys/block/{device}/stat" # 首先检查硬盘当前状态
if current_status is None:
current_status = await self.get_disk_activity(device)
else:
self.logger.debug(f"使用传入的状态: {device} = {current_status}")
# 读取统计文件 # 如果硬盘处于休眠状态,直接返回非活跃
stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null") if current_status == "休眠中":
if not stat_output: self.logger.debug(f"硬盘 {device} 处于休眠状态,不执行详细检测")
self.logger.debug(f"无法读取 {stat_path},默认返回活跃状态") return False
return True
# 解析统计信息
stats = stat_output.split()
if len(stats) < 11:
self.logger.debug(f"无效的统计信息格式:{stat_output}")
return True
# 关键字段当前正在进行的I/O操作数量第9个字段索引8
in_flight = int(stats[8])
# 如果当前有I/O操作直接返回活跃状态 # 如果硬盘处于空闲状态,检查是否有近期活动
if in_flight > 0: if current_status == "空闲中":
return True # 检查缓存的统计信息来判断近期活动
stat_path = f"/sys/block/{device}/stat"
stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
# 检查I/O操作时间第10个字段索引9 - io_ticks单位毫秒 if stat_output:
io_ticks = int(stats[9]) stats = stat_output.split()
if len(stats) >= 11:
# 如果设备在窗口时间内有I/O活动返回活跃状态 try:
if io_ticks > window * 1000: current_read_ios = int(stats[0])
return True current_write_ios = int(stats[4])
current_io_ticks = int(stats[9])
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
read_diff = current_read_ios - cached_stats.get('read_ios', 0)
write_diff = current_write_ios - cached_stats.get('write_ios', 0)
io_ticks_diff = current_io_ticks - cached_stats.get('io_ticks', 0)
# 如果在最近30秒内有I/O活动认为硬盘活跃
if read_diff > 0 or write_diff > 0 or io_ticks_diff > 100:
self.logger.debug(f"硬盘 {device} 近期有I/O活动需要更新信息")
return True
# 更新缓存
self.disk_io_stats_cache[device] = {
'read_ios': current_read_ios,
'write_ios': current_write_ios,
'io_ticks': current_io_ticks
}
except (ValueError, IndexError):
pass
# 所有检查都通过,返回非活跃状态 # 如果硬盘空闲且没有近期活动,使用缓存信息
return False self.logger.debug(f"硬盘 {device} 处于空闲状态且无近期活动,使用缓存信息")
return False
# 如果硬盘处于活动中,返回活跃状态
if current_status == "活动中":
self.logger.debug(f"硬盘 {device} 处于活动中,执行详细检测")
return True
# 默认情况下返回活跃状态
self.logger.debug(f"硬盘 {device} 状态未知,默认执行详细检测")
return True
except Exception as e: except Exception as e:
self.logger.error(f"检测硬盘活动状态失败: {str(e)}", exc_info=True) self.logger.error(f"检测硬盘活动状态失败: {str(e)}")
return True # 出错时默认执行检测 return True # 出错时默认执行检测
async def get_disk_activity(self, device: str) -> str: async def get_disk_power_state(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)""" """获取硬盘电源状态"""
try: try:
# 检查硬盘是否处于休眠状态 # 检查 SCSI 设备状态
state_path = f"/sys/block/{device}/device/state" state_path = f"/sys/block/{device}/device/state"
state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'") state_output = await self.coordinator.run_command(f"cat {state_path} 2>/dev/null || echo 'unknown'")
state = state_output.strip().lower() state = state_output.strip().lower()
if state in ["standby", "sleep"]: if state in ["running", "active"]:
return "active"
elif state in ["standby", "sleep"]:
return state
# 对于某些设备尝试通过hdparm检查状态非侵入性
hdparm_output = await self.coordinator.run_command(f"hdparm -C /dev/{device} 2>/dev/null || echo 'unknown'")
if "standby" in hdparm_output.lower():
return "standby"
elif "sleeping" in hdparm_output.lower():
return "sleep"
elif "active/idle" in hdparm_output.lower():
return "active"
return "unknown"
except Exception as e:
self.logger.debug(f"获取磁盘 {device} 电源状态失败: {e}")
return "unknown"
async def get_disk_activity(self, device: str) -> str:
"""获取硬盘活动状态(活动中/空闲中/休眠中)"""
try:
# 先检查电源状态 - 这是最可靠的休眠检测方法
power_state = await self.get_disk_power_state(device)
if power_state in ["standby", "sleep"]:
self.logger.debug(f"硬盘 {device} 电源状态为 {power_state},判定为休眠中")
return "休眠中" return "休眠中"
# 检查最近一分钟内的硬盘活动 # 检查最近的I/O活动 - 使用非侵入性方式
stat_path = f"/sys/block/{device}/stat" stat_path = f"/sys/block/{device}/stat"
stat_output = await self.coordinator.run_command(f"cat {stat_path}") stat_output = await self.coordinator.run_command(f"cat {stat_path} 2>/dev/null")
stats = stat_output.split()
if len(stats) >= 11: if stat_output:
recent_reads = int(stats[8]) stats = stat_output.split()
recent_writes = int(stats[9]) if len(stats) >= 11:
try:
if recent_reads > 0 or recent_writes > 0: in_flight = int(stats[8]) # 当前进行中的I/O
io_ticks = int(stats[9]) # I/O活动时间(ms)
# 如果有正在进行的I/O返回活动中
if in_flight > 0:
self.logger.debug(f"硬盘 {device} 有进行中的I/O操作: {in_flight}")
return "活动中"
# 检查缓存的统计信息来判断近期活动
cached_stats = self.disk_io_stats_cache.get(device)
if cached_stats:
current_read_ios = int(stats[0])
current_write_ios = int(stats[4])
read_diff = current_read_ios - cached_stats.get('read_ios', 0)
write_diff = current_write_ios - cached_stats.get('write_ios', 0)
io_ticks_diff = io_ticks - cached_stats.get('io_ticks', 0)
# 如果在最近30秒内有I/O活动认为硬盘活动中
if read_diff > 0 or write_diff > 0 or io_ticks_diff > 100: # 100ms内的活动
self.logger.debug(f"硬盘 {device} 近期有I/O活动: 读={read_diff}, 写={write_diff}, 活动时间={io_ticks_diff}ms")
# 更新缓存统计信息
self.disk_io_stats_cache[device] = {
'read_ios': current_read_ios,
'write_ios': current_write_ios,
'in_flight': in_flight,
'io_ticks': io_ticks
}
return "活动中"
else:
# 首次检测,保存当前状态并认为活跃
self.logger.debug(f"硬盘 {device} 首次检测,保存统计信息")
self.disk_io_stats_cache[device] = {
'read_ios': int(stats[0]),
'write_ios': int(stats[4]),
'in_flight': in_flight,
'io_ticks': io_ticks
}
return "活动中" # 首次检测默认返回活动中
# 更新缓存统计信息
self.disk_io_stats_cache[device] = {
'read_ios': int(stats[0]),
'write_ios': int(stats[4]),
'in_flight': in_flight,
'io_ticks': io_ticks
}
# 如果没有活动,返回空闲中
self.logger.debug(f"硬盘 {device} 处于空闲状态")
return "空闲中"
except (ValueError, IndexError) as e:
self.logger.debug(f"解析硬盘 {device} 统计信息失败: {e}")
return "活动中" # 出错时默认返回活动中,避免中断休眠
# 如果无法获取统计信息,检查硬盘是否可访问
try:
# 尝试读取设备信息,如果成功说明硬盘可访问
test_output = await self.coordinator.run_command(f"ls -la /dev/{device} 2>/dev/null")
if test_output and device in test_output:
self.logger.debug(f"硬盘 {device} 可访问但无统计信息,默认返回活动中")
return "活动中" return "活动中"
else:
return "空闲中" self.logger.debug(f"硬盘 {device} 不可访问,可能处于休眠状态")
return "休眠中"
except:
self.logger.debug(f"硬盘 {device} 检测失败,默认返回活动中")
return "活动中"
except Exception as e: except Exception as e:
self.logger.error(f"获取硬盘 {device} 状态失败: {str(e)}", exc_info=True) self.logger.error(f"获取硬盘 {device} 状态失败: {str(e)}", exc_info=True)
return "未知" return "活动中" # 出错时默认返回活动中,避免中断休眠
async def get_disks_info(self) -> list[dict]: async def get_disks_info(self) -> list[dict]:
disks = [] disks = []
try: try:
# 清理过期缓存
now = time.time()
for device in list(self.disk_full_info_cache.keys()):
if now - self.cache_expiry.get(device, 0) > self.cache_timeout:
self.logger.debug(f"磁盘 {device} 的缓存已过期,清除")
del self.disk_full_info_cache[device]
del self.cache_expiry[device]
self.logger.debug("Fetching disk list...") self.logger.debug("Fetching disk list...")
lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE") lsblk_output = await self.coordinator.run_command("lsblk -dno NAME,TYPE")
self.logger.debug("lsblk output: %s", lsblk_output) self.logger.debug("lsblk output: %s", lsblk_output)
@@ -157,15 +321,14 @@ class DiskManager:
# 检查是否有缓存的完整信息 # 检查是否有缓存的完整信息
cached_info = self.disk_full_info_cache.get(device, {}) cached_info = self.disk_full_info_cache.get(device, {})
# 首次运行时强制获取完整信息 # 优化点:首次运行时强制获取完整信息
if self.first_run: if self.first_run:
self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息") self.logger.debug(f"首次运行,强制获取硬盘 {device} 的完整信息")
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存并设置过期时间 # 更新缓存
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"首次运行获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)
@@ -182,31 +345,31 @@ class DiskManager:
disks.append(disk_info) disks.append(disk_info)
continue continue
# 检查硬盘是否活跃 # 检查硬盘是否活跃,传入当前状态确保一致性
is_active = await self.check_disk_active(device, window=30) is_active = await self.check_disk_active(device, window=30, current_status=status)
if not is_active: if not is_active:
self.logger.debug(f"硬盘 {device} 处于非活跃状态,使用上一次获取的信息") self.logger.debug(f"硬盘 {device} 处于非活跃状态,使用上一次获取的信息")
# 优先使用缓存的完整信息 # 优先使用缓存的完整信息
if cached_info: if cached_info:
disk_info.update({ disk_info.update({
"model": cached_info.get("model", "检测"), "model": cached_info.get("model", ""),
"serial": cached_info.get("serial", "检测"), "serial": cached_info.get("serial", ""),
"capacity": cached_info.get("capacity", "检测"), "capacity": cached_info.get("capacity", ""),
"health": cached_info.get("health", "检测"), "health": cached_info.get("health", ""),
"temperature": cached_info.get("temperature", "检测"), "temperature": cached_info.get("temperature", ""),
"power_on_hours": cached_info.get("power_on_hours", "检测"), "power_on_hours": cached_info.get("power_on_hours", ""),
"attributes": cached_info.get("attributes", {}) "attributes": cached_info.get("attributes", {})
}) })
else: else:
# 如果没有缓存信息,使用默认值 # 如果没有缓存信息,使用默认值
disk_info.update({ disk_info.update({
"model": "检测", "model": "",
"serial": "检测", "serial": "",
"capacity": "检测", "capacity": "",
"health": "检测", "health": "",
"temperature": "检测", "temperature": "",
"power_on_hours": "检测", "power_on_hours": "",
"attributes": {} "attributes": {}
}) })
@@ -216,9 +379,8 @@ class DiskManager:
try: try:
# 执行完整的信息获取 # 执行完整的信息获取
await self._get_full_disk_info(disk_info, device_path) await self._get_full_disk_info(disk_info, device_path)
# 更新缓存并设置过期时间 # 更新缓存
self.disk_full_info_cache[device] = disk_info.copy() self.disk_full_info_cache[device] = disk_info.copy()
self.cache_expiry[device] = now
except Exception as e: except Exception as e:
self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True) self.logger.warning(f"获取硬盘信息失败: {str(e)}", exc_info=True)
# 使用缓存信息(如果有) # 使用缓存信息(如果有)
@@ -251,31 +413,45 @@ class DiskManager:
async def _get_full_disk_info(self, disk_info, device_path): async def _get_full_disk_info(self, disk_info, device_path):
"""获取硬盘的完整信息(模型、序列号、健康状态等)""" """获取硬盘的完整信息(模型、序列号、健康状态等)"""
# 获取基本信息 # 获取基本信息 - 首先尝试NVMe格式
info_output = await self.coordinator.run_command(f"smartctl -i {device_path}") info_output = await self.coordinator.run_command(f"smartctl -i {device_path}")
self.logger.debug("smartctl -i output for %s: %s", disk_info["device"], info_output[:200] + "..." if len(info_output) > 200 else info_output) self.logger.debug("smartctl -i output for %s: %s", disk_info["device"], info_output[:200] + "..." if len(info_output) > 200 else info_output)
# 模型 # 检查是否为NVMe设备
is_nvme = "nvme" in disk_info["device"].lower()
# 模型 - 增强NVMe支持
disk_info["model"] = self.extract_value( disk_info["model"] = self.extract_value(
info_output, info_output,
[ [
r"Device Model:\s*(.+)", r"Device Model:\s*(.+)",
r"Model(?: Family)?\s*:\s*(.+)", r"Model(?: Family)?\s*:\s*(.+)",
r"Model\s*Number:\s*(.+)" r"Model Number:\s*(.+)",
r"Product:\s*(.+)", # NVMe格式
r"Model Number:\s*(.+)", # NVMe格式
] ]
) )
# 序列号 # 序列号 - 增强NVMe支持
disk_info["serial"] = self.extract_value( disk_info["serial"] = self.extract_value(
info_output, info_output,
r"Serial Number\s*:\s*(.+)" [
r"Serial Number\s*:\s*(.+)",
r"Serial Number:\s*(.+)", # NVMe格式
r"Serial\s*:\s*(.+)", # NVMe格式
]
) )
# 容量 # 容量 - 增强NVMe支持并转换为GB/TB格式
disk_info["capacity"] = self.extract_value( capacity_patterns = [
info_output, r"User Capacity:\s*([^\[]+)",
r"User Capacity:\s*([^[]+)" r"Namespace 1 Size/Capacity:\s*([^\[]+)", # NVMe格式
) r"Total NVM Capacity:\s*([^\[]+)", # NVMe格式
r"Capacity:\s*([^\[]+)", # NVMe格式
]
raw_capacity = self.extract_value(info_output, capacity_patterns)
disk_info["capacity"] = self._format_capacity(raw_capacity)
# 健康状态 # 健康状态
health_output = await self.coordinator.run_command(f"smartctl -H {device_path}") health_output = await self.coordinator.run_command(f"smartctl -H {device_path}")
@@ -352,6 +528,46 @@ class DiskManager:
# 改进的通电时间检测逻辑 - 处理特殊格式 # 改进的通电时间检测逻辑 - 处理特殊格式
power_on_hours = "未知" power_on_hours = "未知"
# 检查是否为NVMe设备
is_nvme = "nvme" in disk_info["device"].lower()
# 方法0NVMe设备的通电时间提取优先处理
if is_nvme:
# NVMe格式的通电时间提取 - 支持带逗号的数字格式
nvme_patterns = [
r"Power On Hours\s*:\s*([\d,]+)", # 支持带逗号的数字格式(如 "6,123"
r"Power On Time\s*:\s*([\d,]+)", # NVMe备用格式
r"Power on hours\s*:\s*([\d,]+)", # 小写格式
r"Power on time\s*:\s*([\d,]+)", # 小写格式
]
for pattern in nvme_patterns:
match = re.search(pattern, data_output, re.IGNORECASE)
if match:
try:
# 处理带逗号的数字格式(如 "6,123"
hours_str = match.group(1).replace(',', '')
hours = int(hours_str)
power_on_hours = f"{hours} 小时"
self.logger.debug("Found NVMe power_on_hours via pattern %s: %s", pattern, power_on_hours)
break
except:
continue
# 如果还没找到尝试在SMART数据部分查找
if power_on_hours == "未知":
# 查找SMART数据部分中的Power On Hours
smart_section_match = re.search(r"SMART/Health Information.*?Power On Hours\s*:\s*([\d,]+)",
data_output, re.IGNORECASE | re.DOTALL)
if smart_section_match:
try:
hours_str = smart_section_match.group(1).replace(',', '')
hours = int(hours_str)
power_on_hours = f"{hours} 小时"
self.logger.debug("Found NVMe power_on_hours in SMART section: %s", power_on_hours)
except:
pass
# 方法1提取属性9的RAW_VALUE处理特殊格式 # 方法1提取属性9的RAW_VALUE处理特殊格式
attr9_match = re.search( attr9_match = re.search(
r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?", r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?",
@@ -389,7 +605,7 @@ class DiskManager:
[ [
# 精确匹配属性9行 # 精确匹配属性9行
r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)\s*$", r"^\s*9\s+Power_On_Hours\b[^\n]+\s+(\d+)\s*$",
r"^\s*9\s+Power On Hours\b[^\n]+\s+(\d+)h(?:\+(\d+)m(?:\+(\d+)\.\d+s)?)?",
# 通用匹配模式 # 通用匹配模式
r"9\s+Power_On_Hours\b.*?(\d+)\b", r"9\s+Power_On_Hours\b.*?(\d+)\b",
r"Power_On_Hours\b.*?(\d+)\b", r"Power_On_Hours\b.*?(\d+)\b",
@@ -435,7 +651,7 @@ class DiskManager:
# 添加额外属性:温度历史记录 # 添加额外属性:温度历史记录
temp_history = {} temp_history = {}
# 提取属性194的温度历史 # 提取属性194的温度历史
temp194_match = re.search(r"194\s+Temperature_Celsius+.*?\(\s*([\d\s]+)$", data_output) temp194_match = re.search(r"194\s+Temperature_Celsius+.*?(\s*[\d\s]+)$", data_output)
if temp194_match: if temp194_match:
try: try:
values = [int(x) for x in temp194_match.group(1).split()] values = [int(x) for x in temp194_match.group(1).split()]
@@ -450,4 +666,199 @@ class DiskManager:
pass pass
# 保存额外属性 # 保存额外属性
disk_info["attributes"] = temp_history disk_info["attributes"] = temp_history
async def get_zpools(self) -> list[dict]:
"""获取ZFS存储池信息"""
zpools = []
try:
self.logger.debug("Fetching ZFS pool list...")
# 使用zpool list获取存储池信息包含所有字段
zpool_output = await self.coordinator.run_command("zpool list 2>/dev/null || echo 'NO_ZPOOL'")
self.logger.debug("zpool list output: %s", zpool_output)
if "NO_ZPOOL" in zpool_output or "command not found" in zpool_output.lower():
self.logger.info("系统未安装ZFS或没有ZFS存储池")
return []
# 解析zpool list输出
lines = zpool_output.splitlines()
# 跳过标题行,从第二行开始解析
for line in lines[1:]: # 跳过第一行标题
if line.strip():
# 分割制表符或连续空格
parts = re.split(r'\s+', line.strip())
if len(parts) >= 11: # 根据实际输出有11个字段
pool_info = {
"name": parts[0],
"size": parts[1],
"alloc": parts[2],
"free": parts[3],
"ckpoint": parts[4] if parts[4] != "-" else "",
"expand_sz": parts[5] if parts[5] != "-" else "",
"frag": parts[6] if parts[6] != "-" else "0%",
"capacity": parts[7],
"dedup": parts[8],
"health": parts[9],
"altroot": parts[10] if parts[10] != "-" else ""
}
zpools.append(pool_info)
self.logger.debug("Found ZFS pool: %s", pool_info["name"])
self.logger.info("Found %d ZFS pools", len(zpools))
return zpools
except Exception as e:
self.logger.error("Failed to get ZFS pool info: %s", str(e), exc_info=True)
return []
async def get_zpool_status(self, pool_name: str) -> dict:
"""获取ZFS存储池的详细状态信息包括scrub进度"""
try:
self.logger.debug(f"Getting ZFS pool status for {pool_name}")
status_output = await self.coordinator.run_command(f"zpool status {pool_name} 2>/dev/null || echo 'NO_POOL'")
if "NO_POOL" in status_output or "command not found" in status_output.lower():
self.logger.debug(f"ZFS pool {pool_name} not found")
return {"scrub_in_progress": False}
# 解析scrub信息
scrub_info = self._parse_scrub_info(status_output)
return scrub_info
except Exception as e:
self.logger.error(f"Failed to get ZFS pool status for {pool_name}: {str(e)}", exc_info=True)
return {"scrub_in_progress": False}
def _parse_scrub_info(self, status_output: str) -> dict:
"""解析zpool status中的scrub信息"""
scrub_info = {
"scrub_in_progress": False,
"scrub_status": "无检查",
"scrub_progress": "0%",
"scan_rate": "0/s",
"time_remaining": "",
"scanned": "0",
"issued": "0",
"repaired": "0",
"scrub_start_time": ""
}
lines = status_output.split('\n')
has_scan_section = False
# 首先判断是否有scan段这是判断scrub进行中的关键
for line in lines:
line = line.strip()
if line.startswith('scan:'):
has_scan_section = True
break
# 如果没有scan段直接返回无检查状态
if not has_scan_section:
return scrub_info
# 解析scan段的内容
in_scan_section = False
for line in lines:
line = line.strip()
# 检查是否进入scan部分
if line.startswith('scan:'):
in_scan_section = True
scrub_info["scrub_in_progress"] = True # 有scan段就表示在进行中
scan_line = line[5:].strip() # 去掉'scan:'
# 检查scrub具体状态
if 'scrub in progress' in scan_line or 'scrub resilvering' in scan_line:
scrub_info["scrub_status"] = "检查进行中"
scrub_info["scrub_progress"] = "0.1%" # 刚开始,显示微小进度表示进行中
# 解析开始时间
if 'since' in scan_line:
time_part = scan_line.split('since')[-1].strip()
scrub_info["scrub_start_time"] = time_part
elif 'scrub repaired' in scan_line or 'scrub completed' in scan_line:
scrub_info["scrub_status"] = "检查完成"
scrub_info["scrub_in_progress"] = False
elif 'scrub canceled' in scan_line:
scrub_info["scrub_status"] = "检查已取消"
scrub_info["scrub_in_progress"] = False
elif 'scrub paused' in scan_line:
scrub_info["scrub_status"] = "检查已暂停"
scrub_info["scrub_in_progress"] = False
else:
# 有scan段但没有具体状态说明默认为进行中
scrub_info["scrub_status"] = "检查进行中"
scrub_info["scrub_progress"] = "0.1%"
continue
# 如果在scan部分解析详细信息
if in_scan_section and line and not line.startswith('config'):
# 解析进度信息,例如: "2.10T / 2.10T scanned, 413G / 2.10T issued at 223M/s"
if 'scanned' in line and 'issued' in line:
parts = line.split(',')
# 解析扫描进度
if len(parts) >= 1:
scanned_part = parts[0].strip()
if ' / ' in scanned_part:
scanned_data = scanned_part.split(' / ')[0].strip()
total_data = scanned_part.split(' / ')[1].split()[0].strip()
scrub_info["scanned"] = f"{scanned_data}/{total_data}"
# 解析发出的数据
if len(parts) >= 2:
issued_part = parts[1].strip()
if ' / ' in issued_part:
issued_data = issued_part.split(' / ')[0].strip()
total_issued = issued_part.split(' / ')[1].split()[0].strip()
scrub_info["issued"] = f"{issued_data}/{total_issued}"
# 解析扫描速度
if 'at' in line:
speed_part = line.split('at')[-1].strip().split()[0]
scrub_info["scan_rate"] = speed_part
# 解析进度百分比和剩余时间
elif '%' in line and 'done' in line:
# 例如: "644M repaired, 19.23% done, 02:12:38 to go"
if '%' in line:
progress_match = re.search(r'(\d+\.?\d*)%', line)
if progress_match:
scrub_info["scrub_progress"] = f"{progress_match.group(1)}%"
if 'repaired' in line:
repaired_match = re.search(r'([\d.]+[KMGT]?).*repaired', line)
if repaired_match:
scrub_info["repaired"] = repaired_match.group(1)
if 'to go' in line:
time_match = re.search(r'(\d{2}:\d{2}:\d{2})\s+to\s+go', line)
if time_match:
scrub_info["time_remaining"] = time_match.group(1)
# 如果遇到空行或新章节退出scan部分
elif line == '' or line.startswith('config'):
break
return scrub_info
def _format_bytes(self, bytes_value: int) -> str:
"""将字节数格式化为易读的格式"""
try:
if bytes_value >= 1024**4: # 1 TB
return f"{bytes_value / (1024**4):.1f} TB"
elif bytes_value >= 1024**3: # 1 GB
return f"{bytes_value / (1024**3):.1f} GB"
elif bytes_value >= 1024**2: # 1 MB
return f"{bytes_value / (1024**2):.1f} MB"
elif bytes_value >= 1024: # 1 KB
return f"{bytes_value / 1024:.1f} KB"
else:
return f"{bytes_value} B"
except Exception:
return f"{bytes_value} B"

View File

@@ -1,10 +1,10 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.1", "version": "1.4.0",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/xiaochao99/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@xiaochao99"],
"requirements": ["asyncssh>=2.13.1"], "requirements": ["asyncssh>=2.13.1"],
"iot_class": "local_polling", "iot_class": "local_polling",
"config_flow": true "config_flow": true

View File

@@ -3,10 +3,16 @@ from homeassistant.components.sensor import SensorEntity, SensorDeviceClass, Sen
from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.const import UnitOfTemperature from homeassistant.const import UnitOfTemperature
from .const import ( from .const import (
DOMAIN, HDD_TEMP, HDD_HEALTH, HDD_STATUS, SYSTEM_INFO, ICON_DISK, DOMAIN, HDD_TEMP, HDD_STATUS, SYSTEM_INFO, ICON_DISK,
ICON_TEMPERATURE, ICON_HEALTH, ATTR_DISK_MODEL, ATTR_SERIAL_NO, ICON_TEMPERATURE, ATTR_DISK_MODEL, ATTR_SERIAL_NO,
ATTR_POWER_ON_HOURS, ATTR_TOTAL_CAPACITY, ATTR_HEALTH_STATUS, ATTR_POWER_ON_HOURS, ATTR_TOTAL_CAPACITY, ATTR_HEALTH_STATUS,
DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR DEVICE_ID_NAS, DATA_UPDATE_COORDINATOR, ZFS_POOL, ICON_ZFS,
ATTR_ZPOOL_NAME, ATTR_ZPOOL_HEALTH, ATTR_ZPOOL_SIZE,
ATTR_ZPOOL_ALLOC, ATTR_ZPOOL_FREE, ATTR_ZPOOL_CAPACITY,
ATTR_ZPOOL_FRAGMENTATION, ATTR_ZPOOL_CKPOINT, ATTR_ZPOOL_EXPANDSZ,
ATTR_ZPOOL_DEDUP, ATTR_ZPOOL_SCRUB_STATUS, ATTR_ZPOOL_SCRUB_PROGRESS,
ATTR_ZPOOL_SCRUB_SCAN_RATE, ATTR_ZPOOL_SCRUB_TIME_REMAINING,
ATTR_ZPOOL_SCRUB_ISSUED, ATTR_ZPOOL_SCRUB_REPAIRED, DEVICE_ID_ZFS
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -38,22 +44,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
existing_ids.add(temp_uid) existing_ids.add(temp_uid)
# 健康状态传感器
health_uid = f"{config_entry.entry_id}_{disk['device']}_health"
if health_uid not in existing_ids:
entities.append(
DiskSensor(
coordinator,
disk["device"],
HDD_HEALTH,
f"硬盘 {disk.get('model', '未知')} 健康状态",
health_uid,
None,
ICON_HEALTH,
disk
)
)
existing_ids.add(health_uid)
# 硬盘状态传感器 # 硬盘状态传感器
status_uid = f"{config_entry.entry_id}_{disk['device']}_status" status_uid = f"{config_entry.entry_id}_{disk['device']}_status"
@@ -244,6 +235,77 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
) )
existing_ids.add(sensor_uid) existing_ids.add(sensor_uid)
# 添加ZFS存储池传感器
if "zpools" in coordinator.data:
for zpool in coordinator.data["zpools"]:
safe_name = zpool["name"].replace(" ", "_").replace("/", "_").replace(".", "_")
# ZFS存储池健康状态传感器
health_uid = f"{config_entry.entry_id}_zpool_{safe_name}_health"
if health_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"health",
f"ZFS {zpool['name']} 健康状态",
health_uid,
None,
ICON_ZFS,
zpool
)
)
existing_ids.add(health_uid)
# ZFS存储池容量使用率传感器
capacity_uid = f"{config_entry.entry_id}_zpool_{safe_name}_capacity"
if capacity_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"capacity",
f"ZFS {zpool['name']} 使用率",
capacity_uid,
"%",
ICON_ZFS,
zpool,
device_class=SensorDeviceClass.POWER_FACTOR,
state_class=SensorStateClass.MEASUREMENT
)
)
existing_ids.add(capacity_uid)
# ZFS存储池总大小传感器
size_uid = f"{config_entry.entry_id}_zpool_{safe_name}_size"
if size_uid not in existing_ids:
entities.append(
ZFSPoolSensor(
coordinator,
zpool["name"],
"size",
f"ZFS {zpool['name']} 容量",
size_uid,
None, # 动态确定单位
ICON_ZFS,
zpool
)
)
existing_ids.add(size_uid)
# ZFS存储池scrub进度传感器
scrub_uid = f"{config_entry.entry_id}_zpool_{safe_name}_scrub"
if scrub_uid not in existing_ids:
entities.append(
ZFSScrubSensor(
coordinator,
zpool["name"],
f"ZFS {zpool['name']} 检查进度",
scrub_uid
)
)
existing_ids.add(scrub_uid)
# 添加剩余内存传感器 # 添加剩余内存传感器
mem_available_uid = f"{config_entry.entry_id}_memory_available" mem_available_uid = f"{config_entry.entry_id}_memory_available"
if mem_available_uid not in existing_ids: if mem_available_uid not in existing_ids:
@@ -302,7 +364,7 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
if disk["device"] == self.device_id: if disk["device"] == self.device_id:
if self.sensor_type == HDD_TEMP: if self.sensor_type == HDD_TEMP:
temp = disk.get("temperature") temp = disk.get("temperature")
if temp is None or temp == "未知" or temp == "未检测": if temp is None or temp == "未知":
return None return None
if isinstance(temp, str): if isinstance(temp, str):
try: try:
@@ -314,11 +376,7 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
elif isinstance(temp, (int, float)): elif isinstance(temp, (int, float)):
return temp return temp
return None return None
elif self.sensor_type == HDD_HEALTH:
health = disk.get("health", "未知")
if health == "未检测":
return "未检测"
return health if health != "未知" else "未知状态"
elif self.sensor_type == HDD_STATUS: elif self.sensor_type == HDD_STATUS:
return disk.get("status", "未知") return disk.get("status", "未知")
return None return None
@@ -329,6 +387,28 @@ class DiskSensor(CoordinatorEntity, SensorEntity):
return SensorDeviceClass.TEMPERATURE return SensorDeviceClass.TEMPERATURE
return None return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
# 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property @property
def extra_state_attributes(self): def extra_state_attributes(self):
return { return {
@@ -602,6 +682,28 @@ class MemoryAvailableSensor(CoordinatorEntity, SensorEntity):
except (TypeError, ValueError): except (TypeError, ValueError):
return None return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
# 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property @property
def extra_state_attributes(self): def extra_state_attributes(self):
"""返回总内存和已用内存GB以及原始字节值""" """返回总内存和已用内存GB以及原始字节值"""
@@ -692,6 +794,158 @@ class VolumeAvailableSensor(CoordinatorEntity, SensorEntity):
"已用容量": vol_info.get("used", "未知"), "已用容量": vol_info.get("used", "未知"),
"使用率": vol_info.get("use_percent", "未知") "使用率": vol_info.get("use_percent", "未知")
} }
class ZFSPoolSensor(CoordinatorEntity, SensorEntity):
"""ZFS存储池传感器"""
def __init__(self, coordinator, zpool_name, sensor_type, name, unique_id, unit, icon, zpool_info, device_class=None, state_class=None):
super().__init__(coordinator)
self.zpool_name = zpool_name
self.sensor_type = sensor_type
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_native_unit_of_measurement = unit
self._attr_icon = icon
self.zpool_info = zpool_info
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
# 设置设备类和状态类(如果提供)
if device_class:
self._attr_device_class = device_class
if state_class:
self._attr_state_class = state_class
@property
def native_value(self):
"""返回传感器的值"""
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
if self.sensor_type == "health":
# 健康状态中英文映射
health_map = {
"ONLINE": "在线",
"DEGRADED": "降级",
"FAULTED": "故障",
"OFFLINE": "离线",
"REMOVED": "已移除",
"UNAVAIL": "不可用"
}
return health_map.get(zpool.get("health", "UNKNOWN"), zpool.get("health", "未知"))
elif self.sensor_type == "capacity":
# 返回使用率数值(去掉百分号)
capacity = zpool.get("capacity", "0%")
try:
return float(capacity.replace("%", ""))
except ValueError:
return None
elif self.sensor_type == "size":
# 返回总大小的数值部分
size = zpool.get("size", "0")
try:
# 提取数字部分
import re
match = re.search(r'([\d.]+)', size)
if match:
return float(match.group(1))
return None
except (ValueError, AttributeError):
return None
return None
@property
def native_unit_of_measurement(self):
"""动态返回单位仅对size类型传感器"""
if self.sensor_type != "size":
return self._attr_native_unit_of_measurement
return attributes # 对于size类型传感器根据实际数据确定单位
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
size_str = zpool.get("size", "")
if size_str.endswith("T") or size_str.endswith("Ti"):
return "TB"
elif size_str.endswith("G") or size_str.endswith("Gi"):
return "GB"
elif size_str.endswith("M") or size_str.endswith("Mi"):
return "MB"
elif size_str.endswith("K") or size_str.endswith("Ki"):
return "KB"
else:
return "GB" # 默认单位
return "GB" # 默认单位
@property
def extra_state_attributes(self):
"""返回额外的状态属性"""
for zpool in self.coordinator.data.get("zpools", []):
if zpool["name"] == self.zpool_name:
return {
ATTR_ZPOOL_NAME: zpool.get("name", "未知"),
ATTR_ZPOOL_HEALTH: zpool.get("health", "未知"),
ATTR_ZPOOL_SIZE: zpool.get("size", "未知"),
ATTR_ZPOOL_ALLOC: zpool.get("alloc", "未知"),
ATTR_ZPOOL_FREE: zpool.get("free", "未知"),
ATTR_ZPOOL_CAPACITY: zpool.get("capacity", "未知"),
ATTR_ZPOOL_FRAGMENTATION: zpool.get("frag", "未知"),
ATTR_ZPOOL_CKPOINT: zpool.get("ckpoint", "") if zpool.get("ckpoint") != "" else "",
ATTR_ZPOOL_EXPANDSZ: zpool.get("expand_sz", "") if zpool.get("expand_sz") != "" else "",
ATTR_ZPOOL_DEDUP: zpool.get("dedup", "未知"),
"根路径": zpool.get("altroot", "") if zpool.get("altroot") != "" else "默认"
}
return {}
class ZFSScrubSensor(CoordinatorEntity, SensorEntity):
"""ZFS存储池scrub进度传感器"""
def __init__(self, coordinator, zpool_name, name, unique_id):
super().__init__(coordinator)
self.zpool_name = zpool_name
self._attr_name = name
self._attr_unique_id = unique_id
self._attr_native_unit_of_measurement = "%"
self._attr_icon = "mdi:progress-check"
self._attr_device_info = {
"identifiers": {(DOMAIN, DEVICE_ID_ZFS)},
"name": "ZFS存储池",
"via_device": (DOMAIN, DEVICE_ID_NAS)
}
self._attr_device_class = SensorDeviceClass.POWER_FACTOR
self._attr_state_class = SensorStateClass.MEASUREMENT
self.scrub_cache = {}
@property
def native_value(self):
"""返回scrub进度百分比"""
# 获取scrub状态信息
scrub_info = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
progress_str = scrub_info.get("scrub_progress", "0%")
try:
# 提取数字部分
if progress_str and progress_str != "0%":
return float(progress_str.replace("%", ""))
return 0.0
except (ValueError, AttributeError):
return 0.0
@property
def extra_state_attributes(self):
"""返回scrub详细状态信息"""
scrub_info = self.coordinator.data.get("scrub_status", {}).get(self.zpool_name, {})
return {
ATTR_ZPOOL_NAME: self.zpool_name,
ATTR_ZPOOL_SCRUB_STATUS: scrub_info.get("scrub_status", "无检查"),
ATTR_ZPOOL_SCRUB_PROGRESS: scrub_info.get("scrub_progress", "0%"),
ATTR_ZPOOL_SCRUB_SCAN_RATE: scrub_info.get("scan_rate", "0/s"),
ATTR_ZPOOL_SCRUB_TIME_REMAINING: scrub_info.get("time_remaining", ""),
ATTR_ZPOOL_SCRUB_ISSUED: scrub_info.get("issued", "0"),
ATTR_ZPOOL_SCRUB_REPAIRED: scrub_info.get("repaired", "0"),
"开始时间": scrub_info.get("scrub_start_time", ""),
"扫描数据": scrub_info.get("scanned", "0"),
"检查进行中": scrub_info.get("scrub_in_progress", False)
}

File diff suppressed because it is too large Load Diff

View File

@@ -10,9 +10,7 @@
"username": "用户名", "username": "用户名",
"password": "密码", "password": "密码",
"scan_interval": "数据更新间隔(秒)", "scan_interval": "数据更新间隔(秒)",
"enable_docker": "启用docker控制", "enable_docker": "启用docker控制"
"max_connections": "最大连接数",
"cache_timeout": "缓存过期时间"
} }
}, },
"select_mac": { "select_mac": {

View File

@@ -11,9 +11,27 @@ class UPSManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.logger = _LOGGER.getChild("ups_manager") self.logger = _LOGGER.getChild("ups_manager")
self.logger.setLevel(logging.DEBUG) # 根据Home Assistant的日志级别动态设置
self.debug_enabled = False # UPS调试模式开关 self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.ups_debug_path = "/config/fn_nas_ups_debug" # UPS调试文件保存路径 self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG) # 基于HA调试模式
self.ups_debug_path = "/config/fn_nas_ups_debug"
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_ups_info(self) -> dict: async def get_ups_info(self) -> dict:
"""获取连接的UPS信息""" """获取连接的UPS信息"""
@@ -31,7 +49,7 @@ class UPSManager:
try: try:
# 尝试使用NUT工具获取UPS信息 # 尝试使用NUT工具获取UPS信息
self.logger.debug("尝试使用NUT工具获取UPS信息") self._debug_log("尝试使用NUT工具获取UPS信息")
output = await self.coordinator.run_command("upsc -l") output = await self.coordinator.run_command("upsc -l")
if output and "No such file" not in output: if output and "No such file" not in output:
@@ -39,11 +57,11 @@ class UPSManager:
ups_names = output.splitlines() ups_names = output.splitlines()
if ups_names: if ups_names:
ups_name = ups_names[0].strip() ups_name = ups_names[0].strip()
self.logger.debug("发现UPS: %s", ups_name) self._debug_log(f"发现UPS: {ups_name}")
# 获取详细的UPS信息 # 获取详细的UPS信息
ups_details = await self.coordinator.run_command(f"upsc {ups_name}") ups_details = await self.coordinator.run_command(f"upsc {ups_name}")
self.logger.debug("UPS详细信息: %s", ups_details) self._debug_log(f"UPS详细信息: {ups_details}")
# 保存UPS数据以便调试 # 保存UPS数据以便调试
self.save_ups_data_for_debug(ups_details) self.save_ups_data_for_debug(ups_details)
@@ -51,20 +69,20 @@ class UPSManager:
# 解析UPS信息 # 解析UPS信息
return self.parse_nut_ups_info(ups_details) return self.parse_nut_ups_info(ups_details)
else: else:
self.logger.debug("未找到连接的UPS") self._debug_log("未找到连接的UPS")
else: else:
self.logger.debug("未安装NUT工具尝试备用方法") self._debug_log("未安装NUT工具尝试备用方法")
# 备用方法尝试直接读取UPS状态 # 备用方法尝试直接读取UPS状态
return await self.get_ups_info_fallback() return await self.get_ups_info_fallback()
except Exception as e: except Exception as e:
self.logger.error("获取UPS信息时出错: %s", str(e), exc_info=True) self._error_log(f"获取UPS信息时出错: {str(e)}")
return ups_info return ups_info
async def get_ups_info_fallback(self) -> dict: async def get_ups_info_fallback(self) -> dict:
"""备用方法获取UPS信息""" """备用方法获取UPS信息"""
self.logger.info("尝试备用方法获取UPS信息") self._info_log("尝试备用方法获取UPS信息")
ups_info = { ups_info = {
"status": "未知", "status": "未知",
"battery_level": "未知", "battery_level": "未知",
@@ -81,7 +99,7 @@ class UPSManager:
# 方法1: 检查USB连接的UPS # 方法1: 检查USB连接的UPS
usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'") usb_ups_output = await self.coordinator.run_command("lsusb | grep -i ups || echo 'No USB UPS'")
if usb_ups_output and "No USB UPS" not in usb_ups_output: if usb_ups_output and "No USB UPS" not in usb_ups_output:
self.logger.debug("检测到USB UPS设备: %s", usb_ups_output) self._debug_log(f"检测到USB UPS设备: {usb_ups_output}")
ups_info["ups_type"] = "USB" ups_info["ups_type"] = "USB"
# 尝试从输出中提取型号 # 尝试从输出中提取型号
@@ -111,7 +129,7 @@ class UPSManager:
return ups_info return ups_info
except Exception as e: except Exception as e:
self.logger.error("备用方法获取UPS信息失败: %s", str(e)) self._error_log(f"备用方法获取UPS信息失败: {str(e)}")
return ups_info return ups_info
def parse_nut_ups_info(self, ups_output: str) -> dict: def parse_nut_ups_info(self, ups_output: str) -> dict:
@@ -253,6 +271,6 @@ class UPSManager:
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(ups_output) f.write(ups_output)
self.logger.info("保存UPS数据到 %s 用于调试", filename) self._info_log(f"保存UPS数据到 {filename} 用于调试")
except Exception as e: except Exception as e:
self.logger.error("保存UPS数据失败: %s", str(e)) self._error_log(f"保存UPS数据失败: {str(e)}")

View File

@@ -8,15 +8,40 @@ class VMManager:
def __init__(self, coordinator): def __init__(self, coordinator):
self.coordinator = coordinator self.coordinator = coordinator
self.vms = [] self.vms = []
self.logger = _LOGGER.getChild("vm_manager")
# 根据Home Assistant的日志级别动态设置
self.logger.setLevel(logging.DEBUG if _LOGGER.isEnabledFor(logging.DEBUG) else logging.INFO)
self.debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
def _debug_log(self, message: str):
"""只在调试模式下输出详细日志"""
if self.debug_enabled:
self.logger.debug(message)
def _info_log(self, message: str):
"""重要信息日志"""
self.logger.info(message)
def _warning_log(self, message: str):
"""警告日志"""
self.logger.warning(message)
def _error_log(self, message: str):
"""错误日志"""
self.logger.error(message)
async def get_vm_list(self): async def get_vm_list(self):
"""获取虚拟机列表及其状态""" """获取虚拟机列表及其状态"""
try: try:
self._debug_log("开始获取虚拟机列表")
output = await self.coordinator.run_command("virsh list --all") output = await self.coordinator.run_command("virsh list --all")
self._debug_log(f"virsh命令输出: {output}")
self.vms = self._parse_vm_list(output) self.vms = self._parse_vm_list(output)
self._info_log(f"获取到{len(self.vms)}个虚拟机")
return self.vms return self.vms
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机列表失败: %s", str(e)) self._error_log(f"获取虚拟机列表失败: {str(e)}")
return [] return []
def _parse_vm_list(self, output): def _parse_vm_list(self, output):
@@ -43,26 +68,32 @@ class VMManager:
async def get_vm_title(self, vm_name): async def get_vm_title(self, vm_name):
"""获取虚拟机的标题""" """获取虚拟机的标题"""
try: try:
self._debug_log(f"获取虚拟机{vm_name}的标题")
output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}") output = await self.coordinator.run_command(f"virsh dumpxml {vm_name}")
# 在XML输出中查找<title>标签 # 在XML输出中查找<title>标签
match = re.search(r'<title>(.*?)</title>', output, re.DOTALL) match = re.search(r'<title>(.*?)</title>', output, re.DOTALL)
if match: if match:
return match.group(1).strip() title = match.group(1).strip()
self._debug_log(f"虚拟机{vm_name}标题: {title}")
return title
self._debug_log(f"虚拟机{vm_name}无标题,使用名称")
return vm_name # 如果没有标题,则返回虚拟机名称 return vm_name # 如果没有标题,则返回虚拟机名称
except Exception as e: except Exception as e:
_LOGGER.error("获取虚拟机标题失败: %s", str(e)) self._error_log(f"获取虚拟机标题失败: {str(e)}")
return vm_name return vm_name
async def control_vm(self, vm_name, action): async def control_vm(self, vm_name, action):
"""控制虚拟机操作""" """控制虚拟机操作"""
valid_actions = ["start", "shutdown", "reboot"] valid_actions = ["start", "shutdown", "reboot", "destroy"]
if action not in valid_actions: if action not in valid_actions:
raise ValueError(f"无效操作: {action}") raise ValueError(f"无效操作: {action}")
command = f"virsh {action} {vm_name}" command = f"virsh {action} {vm_name}"
try: try:
self._info_log(f"执行虚拟机操作: {command}")
await self.coordinator.run_command(command) await self.coordinator.run_command(command)
self._info_log(f"虚拟机{vm_name}操作{action}成功")
return True return True
except Exception as e: except Exception as e:
_LOGGER.error("执行虚拟机操作失败: %s", str(e)) self._error_log(f"执行虚拟机操作失败: {str(e)}")
return False return False