4 Commits

3 changed files with 223 additions and 523 deletions

View File

@@ -1,6 +1,8 @@
# coordinator.py (文档9)
import logging import logging
import asyncio import asyncio
import asyncssh import asyncssh
import re
from datetime import timedelta from datetime import timedelta
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -38,17 +40,8 @@ class FlynasCoordinator(DataUpdateCoordinator):
self.vm_manager = VMManager(self) self.vm_manager = VMManager(self)
self.use_sudo = False self.use_sudo = False
self.data = { # 确保data始终有初始值
"disks": [], self.data = self.get_default_data()
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
update_interval = timedelta(seconds=scan_interval) update_interval = timedelta(seconds=scan_interval)
@@ -65,67 +58,94 @@ class FlynasCoordinator(DataUpdateCoordinator):
self._system_online = False self._system_online = False
self._ping_task = None self._ping_task = None
self._retry_interval = 30 # 系统离线时的检测间隔(秒) self._retry_interval = 30 # 系统离线时的检测间隔(秒)
self._last_command_time = 0
self._command_count = 0
def get_default_data(self):
"""返回默认的数据结构"""
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
async def async_connect(self): async def async_connect(self):
if self.ssh is None or self.ssh_closed: """建立并保持持久SSH连接"""
if self.ssh is not None and not self.ssh_closed:
try: try:
self.ssh = await asyncssh.connect( # 测试连接是否仍然活跃
self.host, await self.ssh.run("echo 'connection_test'", timeout=1)
port=self.port, return True
username=self.username, except (asyncssh.Error, TimeoutError):
password=self.password, _LOGGER.debug("现有连接失效,准备重建")
known_hosts=None, await self.async_disconnect()
connect_timeout=5 # 缩短连接超时时间
) try:
self.ssh = await asyncssh.connect(
if await self.is_root_user(): self.host,
_LOGGER.debug("当前用户是 root") port=self.port,
username=self.username,
password=self.password,
known_hosts=None,
connect_timeout=5
)
self.ssh_closed = False
_LOGGER.info("已建立持久SSH连接到 %s", self.host)
# 检查权限状态
if await self.is_root_user():
_LOGGER.debug("当前用户是 root")
self.use_sudo = False
else:
# 尝试切换到root会话
if await self.try_switch_to_root():
self.use_sudo = False self.use_sudo = False
self.ssh_closed = False
return True return True
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.debug("连接失败: %s", str(e))
return False
async def try_switch_to_root(self):
"""尝试切换到root会话"""
try:
if self.root_password:
result = await self.ssh.run( result = await self.ssh.run(
f"echo '{self.password}' | sudo -S -i", f"echo '{self.root_password}' | sudo -S -i",
input=self.password + "\n", input=self.root_password + "\n",
timeout=5 timeout=5
) )
whoami = await self.ssh.run("whoami")
whoami_result = await self.ssh.run("whoami") if "root" in whoami.stdout:
if "root" in whoami_result.stdout: _LOGGER.info("成功切换到 root 会话(使用 root 密码)")
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
self.use_sudo = False
self.ssh_closed = False
return True return True
else:
if self.root_password:
result = await self.ssh.run(
f"echo '{self.root_password}' | sudo -S -i",
input=self.root_password + "\n",
timeout=5
)
whoami_result = await self.ssh.run("whoami")
if "root" in whoami_result.stdout:
_LOGGER.info("成功切换到 root 会话(使用 root 密码)")
self.use_sudo = False
self.ssh_closed = False
return True
else:
# 切换到 root 会话失败,将使用 sudo
self.use_sudo = True
else:
# 非 root 用户且未提供 root 密码,将使用 sudo
self.use_sudo = True
self.ssh_closed = False result = await self.ssh.run(
_LOGGER.info("SSH 连接已建立到 %s", self.host) f"echo '{self.password}' | sudo -S -i",
input=self.password + "\n",
timeout=5
)
whoami = await self.ssh.run("whoami")
if "root" in whoami.stdout:
_LOGGER.info("成功切换到 root 会话(使用登录密码)")
return True return True
except Exception as e:
self.ssh = None self.use_sudo = True
self.ssh_closed = True return False
_LOGGER.debug("连接失败: %s", str(e)) except Exception:
return False self.use_sudo = True
return True return False
async def is_root_user(self): async def is_root_user(self):
try: try:
@@ -135,26 +155,74 @@ class FlynasCoordinator(DataUpdateCoordinator):
return False return False
async def async_disconnect(self): async def async_disconnect(self):
"""断开SSH连接"""
if self.ssh is not None and not self.ssh_closed: if self.ssh is not None and not self.ssh_closed:
try: try:
self.ssh.close() self.ssh.close()
self.ssh_closed = True self.ssh_closed = True
_LOGGER.debug("SSH connection closed") _LOGGER.debug("已关闭SSH连接")
except Exception as e: except Exception as e:
_LOGGER.debug("Error closing SSH connection: %s", str(e)) _LOGGER.debug("关闭SSH连接时出错: %s", str(e))
finally: finally:
self.ssh = None self.ssh = None
async def is_ssh_connected(self) -> bool: async def run_command(self, command: str, retries=2) -> str:
if self.ssh is None or self.ssh_closed: """执行SSH命令使用持久连接"""
return False current_time = asyncio.get_event_loop().time()
# 连接冷却机制:避免短时间内频繁创建新连接
if current_time - self._last_command_time < 1.0 and self._command_count > 5:
await asyncio.sleep(0.5)
self._last_command_time = current_time
self._command_count += 1
# 系统离线时直接返回空字符串
if not self._system_online:
return ""
try: try:
test_command = "echo 'connection_test'" # 确保连接有效
result = await self.ssh.run(test_command, timeout=2) if not await self.async_connect():
return result.exit_status == 0 and "connection_test" in result.stdout return ""
except (asyncssh.Error, TimeoutError):
return False # 使用sudo执行命令
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", timeout=10)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, timeout=10)
else:
result = await self.ssh.run(command, timeout=10)
return result.stdout.strip()
except (asyncssh.Error, TimeoutError) as e:
_LOGGER.debug("命令执行失败: %s, 错误: %s", command, str(e))
# 标记连接失效
self.ssh_closed = True
return ""
except Exception as e:
_LOGGER.debug("执行命令时出现意外错误: %s", str(e))
self.ssh_closed = True
return ""
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self.logger.debug("启动系统状态监控,每%d秒检测一次", self._retry_interval)
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self.logger.info("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def ping_system(self) -> bool: async def ping_system(self) -> bool:
"""轻量级系统状态检测""" """轻量级系统状态检测"""
@@ -174,155 +242,51 @@ class FlynasCoordinator(DataUpdateCoordinator):
except Exception: except Exception:
return False return False
async def run_command(self, command: str, retries=2) -> str:
# 系统离线时直接返回空字符串,避免抛出异常
if not self._system_online:
return ""
for attempt in range(retries):
try:
if not await self.is_ssh_connected():
if not await self.async_connect():
if self.data and "system" in self.data:
self.data["system"]["status"] = "off"
return ""
if self.use_sudo:
if self.root_password or self.password:
password = self.root_password if self.root_password else self.password
full_command = f"sudo -S {command}"
result = await self.ssh.run(full_command, input=password + "\n", check=True)
else:
full_command = f"sudo {command}"
result = await self.ssh.run(full_command, check=True)
else:
result = await self.ssh.run(command, check=True)
return result.stdout.strip()
except asyncssh.process.ProcessError as e:
if e.exit_status in [4, 32]:
return ""
_LOGGER.debug("Command failed: %s (exit %d)", command, e.exit_status)
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except asyncssh.Error as e:
_LOGGER.debug("SSH connection error: %s", str(e))
self.ssh = None
self.ssh_closed = True
if attempt == retries - 1:
return ""
except Exception as e:
self.ssh = None
self.ssh_closed = True
_LOGGER.debug("Unexpected error: %s", str(e))
if attempt == retries - 1:
return ""
return ""
async def get_network_macs(self):
try:
output = await self.run_command("ip link show")
macs = {}
pattern = re.compile(r'^\d+: (\w+):.*\n\s+link/\w+\s+([0-9a-fA-F:]{17})', re.MULTILINE)
matches = pattern.findall(output)
for interface, mac in matches:
if interface == "lo" or mac == "00:00:00:00:00:00":
continue
macs[mac] = interface
return macs
except Exception as e:
self.logger.debug("获取MAC地址失败: %s", str(e))
return {}
async def _monitor_system_status(self):
"""系统离线时轮询检测状态"""
self.logger.debug("启动系统状态监控,每%d秒检测一次", self._retry_interval)
while True:
await asyncio.sleep(self._retry_interval)
if await self.ping_system():
self.logger.info("检测到系统已开机,触发重新加载")
# 触发集成重新加载
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
break
async def _async_update_data(self): async def _async_update_data(self):
_LOGGER.debug("Starting data update...") """数据更新入口,优化命令执行频率"""
_LOGGER.debug("开始数据更新...")
is_online = await self.ping_system() is_online = await self.ping_system()
self._system_online = is_online self._system_online = is_online
if not is_online: if not is_online:
_LOGGER.debug("系统离线,跳过数据更新") _LOGGER.debug("系统离线,跳过数据更新")
# 修复:确保 self.data 结构有效 # 启动后台监控任务
if self.data is None or not isinstance(self.data, dict):
self.data = {}
if "system" not in self.data or not isinstance(self.data.get("system"), dict):
self.data["system"] = {}
self.data["system"]["status"] = "off"
# 启动后台监控任务(非阻塞)
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
await self.async_disconnect() await self.async_disconnect()
# 直接返回空数据,不阻塞 return self.get_default_data()
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
# 系统在线处理 # 系统在线处理
try: try:
# 确保SSH连接 # 确保连接有效
if not await self.async_connect(): if not await self.async_connect():
self.data["system"]["status"] = "off" return self.get_default_data()
return {
"disks": [],
"system": {
"uptime": "未知",
"cpu_temperature": "未知",
"motherboard_temperature": "未知",
"status": "off"
},
"ups": {},
"vms": []
}
# 获取系统状态信息
status = "on" status = "on"
# 并行获取磁盘、UPS和系统信息
system_task = asyncio.create_task(self.system_manager.get_system_info())
disks_task = asyncio.create_task(self.disk_manager.get_disks_info())
ups_task = asyncio.create_task(self.ups_manager.get_ups_info())
vms_task = asyncio.create_task(self.vm_manager.get_vm_list())
disks = await self.disk_manager.get_disks_info() # 等待并行任务完成
system = await self.system_manager.get_system_info() system, disks, ups_info, vms = await asyncio.gather(
ups_info = await self.ups_manager.get_ups_info() system_task, disks_task, ups_task, vms_task
vms = await self.vm_manager.get_vm_list() )
# 为每个虚拟机获取标题
for vm in vms: for vm in vms:
vm["title"] = await self.vm_manager.get_vm_title(vm["name"]) vm["title"] = await self.vm_manager.get_vm_title(vm["name"])
# 获取Docker容器信息
docker_containers = [] docker_containers = []
if self.enable_docker: if self.enable_docker:
docker_containers = await self.docker_manager.get_containers() docker_containers = await self.docker_manager.get_containers()
data = { data = {
"disks": disks, "disks": disks,
"system": { "system": {**system, "status": status},
**system,
"status": status
},
"ups": ups_info, "ups": ups_info,
"vms": vms, "vms": vms,
"docker_containers": docker_containers "docker_containers": docker_containers
@@ -332,29 +296,34 @@ class FlynasCoordinator(DataUpdateCoordinator):
except Exception as e: except Exception as e:
_LOGGER.debug("数据更新失败: %s", str(e)) _LOGGER.debug("数据更新失败: %s", str(e))
# 检查错误类型,如果是连接问题,标记为离线
self._system_online = False self._system_online = False
if not self._ping_task or self._ping_task.done(): if not self._ping_task or self._ping_task.done():
self._ping_task = asyncio.create_task(self._monitor_system_status()) self._ping_task = asyncio.create_task(self._monitor_system_status())
return { return self.get_default_data()
"disks": [],
"system": { def get_default_data(self):
"uptime": "未知", """获取默认数据(离线状态)"""
"cpu_temperature": "未知", return {
"motherboard_temperature": "未知", "disks": [],
"status": "off" "system": {
}, "uptime": "未知",
"ups": {}, "cpu_temperature": "未知",
"vms": [] "motherboard_temperature": "未知",
} "status": "off"
},
"ups": {},
"vms": [],
"docker_containers": []
}
async def reboot_system(self): async def reboot_system(self):
await self.system_manager.reboot_system() await self.system_manager.reboot_system()
async def shutdown_system(self): async def shutdown_system(self):
await self.system_manager.shutdown_system() await self.system_manager.shutdown_system()
if self.data and "system" in self.data: # 更新状态,但使用安全的方式
if self.data and isinstance(self.data, dict) and "system" in self.data:
self.data["system"]["status"] = "off" self.data["system"]["status"] = "off"
self.async_update_listeners() self.async_update_listeners()

View File

@@ -1,7 +1,7 @@
{ {
"domain": "fn_nas", "domain": "fn_nas",
"name": "飞牛NAS", "name": "飞牛NAS",
"version": "1.3.4", "version": "1.3.5",
"documentation": "https://github.com/anxms/fn_nas", "documentation": "https://github.com/anxms/fn_nas",
"dependencies": [], "dependencies": [],
"codeowners": ["@anxms"], "codeowners": ["@anxms"],

View File

@@ -1,7 +1,5 @@
import re
import logging import logging
import asyncio import asyncio
import json
import os import os
from datetime import datetime from datetime import datetime
@@ -23,10 +21,8 @@ class SystemManager:
uptime_output = await self.coordinator.run_command("cat /proc/uptime") uptime_output = await self.coordinator.run_command("cat /proc/uptime")
if uptime_output: if uptime_output:
try: try:
# 保存原始秒数
uptime_seconds = float(uptime_output.split()[0]) uptime_seconds = float(uptime_output.split()[0])
system_info["uptime_seconds"] = uptime_seconds system_info["uptime_seconds"] = uptime_seconds
# 保存格式化字符串
system_info["uptime"] = self.format_uptime(uptime_seconds) system_info["uptime"] = self.format_uptime(uptime_seconds)
except (ValueError, IndexError): except (ValueError, IndexError):
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
@@ -34,43 +30,23 @@ class SystemManager:
else: else:
system_info["uptime_seconds"] = 0 system_info["uptime_seconds"] = 0
system_info["uptime"] = "未知" system_info["uptime"] = "未知"
# 获取 sensors 命令输出使用JSON格式 # 只通过内核方式获取温度
sensors_output = await self.coordinator.run_command( cpu_temp = await self.get_cpu_temp_from_kernel()
"sensors -j 2>/dev/null || sensors 2>/dev/null || echo 'No sensor data'"
)
# 保存传感器数据以便调试
self.save_sensor_data_for_debug(sensors_output)
self.logger.debug("Sensors output: %s", sensors_output[:500] + "..." if len(sensors_output) > 500 else sensors_output)
# 提取 CPU 温度(改进算法)
cpu_temp = self.extract_cpu_temp(sensors_output)
system_info["cpu_temperature"] = cpu_temp system_info["cpu_temperature"] = cpu_temp
# 提取主板温度(改进算法) mobo_temp = await self.get_mobo_temp_from_kernel()
mobo_temp = self.extract_mobo_temp(sensors_output)
system_info["motherboard_temperature"] = mobo_temp system_info["motherboard_temperature"] = mobo_temp
# 尝试备用方法获取CPU温度
if cpu_temp == "未知":
backup_cpu_temp = await self.get_cpu_temp_fallback()
if backup_cpu_temp:
system_info["cpu_temperature"] = backup_cpu_temp
# 新增:获取内存信息
mem_info = await self.get_memory_info() mem_info = await self.get_memory_info()
system_info.update(mem_info) system_info.update(mem_info)
# 新增:获取存储卷信息
vol_info = await self.get_vol_usage() vol_info = await self.get_vol_usage()
system_info["volumes"] = vol_info system_info["volumes"] = vol_info
return system_info return system_info
except Exception as e: except Exception as e:
self.logger.error("Error getting system info: %s", str(e)) self.logger.error("Error getting system info: %s", str(e))
# 在异常处理中返回空数据
return { return {
"uptime_seconds": 0, "uptime_seconds": 0,
"uptime": "未知", "uptime": "未知",
@@ -81,71 +57,43 @@ class SystemManager:
"memory_available": "未知", "memory_available": "未知",
"volumes": {} "volumes": {}
} }
def save_sensor_data_for_debug(self, sensors_output: str): async def get_cpu_temp_from_kernel(self) -> str:
"""保存传感器数据以便调试""" # 获取CPU温度
if not self.debug_enabled: for i in range(5):
return for j in range(5):
label_path = f"/sys/class/hwmon/hwmon{i}/temp{j}_label"
try: label = await self.coordinator.run_command(f"cat {label_path} 2>/dev/null")
# 创建调试目录 if label and ("cpu" in label.lower() or "package" in label.lower()):
if not os.path.exists(self.sensors_debug_path): temp_path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
os.makedirs(self.sensors_debug_path) temp_str = await self.coordinator.run_command(f"cat {temp_path} 2>/dev/null")
if temp_str and temp_str.isdigit():
# 生成文件名 temp = float(temp_str) / 1000.0
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.sensors_debug_path, f"sensors_{timestamp}.log")
# 写入文件
with open(filename, "w") as f:
f.write(sensors_output)
self.logger.info("Saved sensors output to %s for debugging", filename)
except Exception as e:
self.logger.error("Failed to save sensor data: %s", str(e))
async def get_cpu_temp_fallback(self) -> str:
"""备用方法获取CPU温度"""
self.logger.info("Trying fallback methods to get CPU temperature")
# 方法1: 从/sys/class/thermal读取
try:
for i in range(5): # 检查前5个可能的传感器
path = f"/sys/class/thermal/thermal_zone{i}/temp"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit():
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via thermal zone: %.1f°C", temp)
return f"{temp:.1f} °C"
except Exception:
pass
# 方法2: 从hwmon设备读取
try:
for i in range(5): # 检查前5个可能的hwmon设备
for j in range(5): # 检查每个设备的前5个温度传感器
path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
output = await self.coordinator.run_command(f"cat {path} 2>/dev/null")
if output and output.isdigit():
temp = float(output) / 1000.0
self.logger.info("Found CPU temperature via hwmon: %.1f°C", temp)
return f"{temp:.1f} °C" return f"{temp:.1f} °C"
except Exception: return "未知"
pass
async def get_mobo_temp_from_kernel(self) -> str:
# 方法3: 使用psutil库如果可用 # 获取主板温度
try: for i in range(5):
output = await self.coordinator.run_command("python3 -c 'import psutil; print(psutil.sensors_temperatures().get(\"coretemp\")[0].current)' 2>/dev/null") for j in range(5):
if output and output.replace('.', '', 1).isdigit(): label_path = f"/sys/class/hwmon/hwmon{i}/temp{j}_label"
temp = float(output) label = await self.coordinator.run_command(f"cat {label_path} 2>/dev/null")
self.logger.info("Found CPU temperature via psutil: %.1f°C", temp) if label and ("mobo" in label.lower() or "mb" in label.lower() or "sys" in label.lower() or "pch" in label.lower()):
return f"{temp:.1f} °C" temp_path = f"/sys/class/hwmon/hwmon{i}/temp{j}_input"
except Exception: temp_str = await self.coordinator.run_command(f"cat {temp_path} 2>/dev/null")
pass if temp_str and temp_str.isdigit():
temp = float(temp_str) / 1000.0
self.logger.warning("All fallback methods failed to get CPU temperature") return f"{temp:.1f} °C"
return "" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str:
"""兼容旧接口,直接返回未知"""
return "未知"
def extract_mobo_temp(self, sensors_output: str) -> str:
"""兼容旧接口,直接返回未知"""
return "未知"
def format_uptime(self, seconds: float) -> str: def format_uptime(self, seconds: float) -> str:
"""格式化运行时间为易读格式""" """格式化运行时间为易读格式"""
try: try:
@@ -166,223 +114,6 @@ class SystemManager:
self.logger.error("Failed to format uptime: %s", str(e)) self.logger.error("Failed to format uptime: %s", str(e))
return "未知" return "未知"
def extract_cpu_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取 CPU 温度,优先获取 Package id 0"""
# 优先尝试获取 Package id 0 温度值
package_id_pattern = r'Package id 0:\s*\+?(\d+\.?\d*)°C'
package_match = re.search(package_id_pattern, sensors_output, re.IGNORECASE)
if package_match:
try:
package_temp = float(package_match.group(1))
self.logger.debug("优先使用 Package id 0 温度: %.1f°C", package_temp)
return f"{package_temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("Package id 0 解析错误: %s", str(e))
# 其次尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
self.logger.debug("JSON sensors data: %s", json.dumps(data, indent=2))
# 查找包含Package相关键名的温度值
for key, values in data.items():
if any(kw in key.lower() for kw in ["package", "pkg", "physical"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Package温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except Exception as e:
self.logger.debug("JSON值错误: %s", str(e))
# 新增尝试直接获取Tdie/Tctl温度AMD CPU
for key, values in data.items():
if "k10temp" in key.lower():
for subkey, temp_value in values.items():
if "tdie" in subkey.lower() or "tctl" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
self.logger.debug("JSON中找到Tdie/Tctl温度: %s/%s = %.1f°C", key, subkey, temp_value)
return f"{temp_value:.1f} °C"
except:
pass
except Exception as e:
self.logger.warning("JSON解析失败: %s", str(e))
# 最后尝试其他模式
other_patterns = [
r'Package id 0:\s*\+?(\d+\.?\d*)°C', # 再次尝试确保捕获
r'CPU Temperature:\s*\+?(\d+\.?\d*)°C',
r'cpu_thermal:\s*\+?(\d+\.?\d*)°C',
r'Tdie:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'Tctl:\s*\+?(\d+\.?\d*)°C', # AMD CPU
r'PECI Agent \d:\s*\+?(\d+\.?\d*)°C',
r'Composite:\s*\+?(\d+\.?\d*)°C',
r'CPU\s+Temp:\s*\+?(\d+\.?\d*)°C',
r'k10temp-pci\S*:\s*\+?(\d+\.?\d*)°C',
r'Physical id 0:\s*\+?(\d+\.?\d*)°C'
]
for pattern in other_patterns:
match = re.search(pattern, sensors_output, re.IGNORECASE)
if match:
try:
temp = float(match.group(1))
self.logger.debug("匹配到CPU温度: %s: %.1f°C", pattern, temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError):
continue
# 如果所有方法都失败返回未知
return "未知"
def extract_temp_from_systin(self, systin_data: dict) -> float:
"""从 SYSTIN 数据结构中提取温度值"""
if not systin_data:
return None
# 尝试从不同键名获取温度值
for key in ["temp1_input", "input", "value"]:
temp = systin_data.get(key)
if temp is not None:
try:
return float(temp)
except (TypeError, ValueError):
continue
return None
def extract_mobo_temp(self, sensors_output: str) -> str:
"""从 sensors 输出中提取主板温度"""
# 首先尝试解析JSON格式
if sensors_output.strip().startswith('{'):
try:
data = json.loads(sensors_output)
# 查找包含主板相关键名的温度值
candidates = []
for key, values in data.items():
# 优先检查 SYSTIN 键
if "systin" in key.lower():
temp = self.extract_temp_from_systin(values)
if temp is not None:
return f"{temp:.1f} °C"
if any(kw in key.lower() for kw in ["system", "motherboard", "mb", "board", "pch", "chipset", "sys", "baseboard", "systin"]):
for subkey, temp_value in values.items():
if any(kw in subkey.lower() for kw in ["temp", "input"]) and not "crit" in subkey.lower():
try:
if isinstance(temp_value, (int, float)):
candidates.append(temp_value)
self.logger.debug("Found mobo temp candidate in JSON: %s/%s = %.1f°C", key, subkey, temp_value)
except Exception:
pass
# 如果有候选值,取平均值
if candidates:
avg_temp = sum(candidates) / len(candidates)
return f"{avg_temp:.1f} °C"
# 新增:尝试直接获取 SYSTIN 的温度值
systin_temp = self.extract_temp_from_systin(data.get("nct6798-isa-02a0", {}).get("SYSTIN", {}))
if systin_temp is not None:
return f"{systin_temp:.1f} °C"
except Exception as e:
self.logger.warning("Failed to parse sensors JSON: %s", str(e))
# 改进SYSTIN提取逻辑
systin_patterns = [
r'SYSTIN:\s*[+\-]?\s*(\d+\.?\d*)\s*°C', # 标准格式
r'SYSTIN[:\s]+[+\-]?\s*(\d+\.?\d*)\s*°C', # 兼容无冒号或多余空格
r'System Temp:\s*[+\-]?\s*(\d+\.?\d*)\s*°C' # 备选方案
]
for pattern in systin_patterns:
systin_match = re.search(pattern, sensors_output, re.IGNORECASE)
if systin_match:
try:
temp = float(systin_match.group(1))
self.logger.debug("Found SYSTIN temperature: %.1f°C", temp)
return f"{temp:.1f} °C"
except (ValueError, IndexError) as e:
self.logger.debug("SYSTIN match error: %s", str(e))
continue
for line in sensors_output.splitlines():
if 'SYSTIN' in line or 'System Temp' in line:
# 改进的温度值提取正则
match = re.search(r'[+\-]?\s*(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
self.logger.debug("Found mobo temp in line: %s: %.1f°C", line.strip(), temp)
return f"{temp:.1f} °C"
except ValueError:
continue
# 如果找不到SYSTIN尝试其他主板温度模式
other_patterns = [
r'System Temp:\s*\+?(\d+\.?\d*)°C',
r'MB Temperature:\s*\+?(\d+\.?\d*)°C',
r'Motherboard:\s*\+?(\d+\.?\d*)°C',
r'SYS Temp:\s*\+?(\d+\.?\d*)°C',
r'Board Temp:\s*\+?(\d+\.?\d*)°C',
r'PCH_Temp:\s*\+?(\d+\.?\d*)°C',
r'Chipset:\s*\+?(\d+\.?\d*)°C',
r'Baseboard Temp:\s*\+?(\d+\.?\d*)°C',
r'System Temperature:\s*\+?(\d+\.?\d*)°C',
r'Mainboard Temp:\s*\+?(\d+\.?\d*)°C'
]
temp_values = []
for pattern in other_patterns:
matches = re.finditer(pattern, sensors_output, re.IGNORECASE)
for match in matches:
try:
temp = float(match.group(1))
temp_values.append(temp)
self.logger.debug("Found motherboard temperature with pattern: %s: %.1f°C", pattern, temp)
except (ValueError, IndexError):
continue
# 如果有找到温度值,取平均值
if temp_values:
avg_temp = sum(temp_values) / len(temp_values)
return f"{avg_temp:.1f} °C"
# 最后,尝试手动扫描所有温度值
fallback_candidates = []
for line in sensors_output.splitlines():
if '°C' in line:
# 跳过CPU相关的行
if any(kw in line.lower() for kw in ["core", "cpu", "package", "tccd", "k10temp", "processor", "amd", "intel", "nvme"]):
continue
# 跳过风扇和电压行
if any(kw in line.lower() for kw in ["fan", "volt", "vin", "+3.3", "+5", "+12", "vdd", "power", "crit", "max", "min"]):
continue
# 查找温度值
match = re.search(r'(\d+\.?\d*)\s*°C', line)
if match:
try:
temp = float(match.group(1))
# 合理温度范围检查 (0-80°C)
if 0 < temp < 80:
fallback_candidates.append(temp)
self.logger.debug("Fallback mobo candidate: %s -> %.1f°C", line.strip(), temp)
except ValueError:
continue
# 如果有候选值,取平均值
if fallback_candidates:
avg_temp = sum(fallback_candidates) / len(fallback_candidates)
self.logger.warning("Using fallback motherboard temperature detection")
return f"{avg_temp:.1f} °C"
return "未知"
async def get_memory_info(self) -> dict: async def get_memory_info(self) -> dict:
"""获取内存使用信息""" """获取内存使用信息"""
try: try: