Files
midea-meiju-codec/custom_components/midea_auto_cloud/data_coordinator.py

253 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Data coordinator for Midea Auto Cloud integration."""
import logging
from datetime import datetime, timedelta
from typing import NamedTuple
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .core.device import MiedaDevice
from .core.logger import MideaLogger
_LOGGER = logging.getLogger(__name__)
class MideaDeviceData(NamedTuple):
"""Data structure for Midea device state."""
attributes: dict
available: bool
connected: bool
class MideaDataUpdateCoordinator(DataUpdateCoordinator[MideaDeviceData]):
"""Data update coordinator for Midea devices."""
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
device: MiedaDevice,
cloud=None,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"{device.device_name} ({device.device_id})",
update_method=self.poll_device_state,
update_interval=timedelta(seconds=30),
always_update=False,
)
self.device = device
self.state_update_muted: CALLBACK_TYPE | None = None
self._device_id = device.device_id
self._cloud = cloud
async def _async_setup(self) -> None:
"""Set up the coordinator."""
# Immediate first refresh to avoid waiting for the interval
self.data = await self.poll_device_state()
# Register for device updates
self.device.register_update(self._device_update_callback)
def mute_state_update_for_a_while(self) -> None:
"""Mute subscription for a while to avoid state bouncing."""
if self.state_update_muted:
self.state_update_muted()
@callback
def unmute(now: datetime) -> None:
self.state_update_muted = None
self.state_update_muted = async_call_later(self.hass, 10, unmute)
def _device_update_callback(self, status: dict) -> None:
"""Callback for device status updates."""
if self.state_update_muted:
return
# Update device attributes (allow new keys to be added)
for key, value in status.items():
self.device.attributes[key] = value
# Update coordinator data
self.async_set_updated_data(
MideaDeviceData(
attributes=self.device.attributes,
available=self.device.connected,
connected=self.device.connected,
)
)
async def poll_device_state(self) -> MideaDeviceData:
"""Poll device state."""
if self.state_update_muted:
return self.data
try:
# 检查是否为中央空调设备T0x21
if self.device.device_type == 0x21:
await self._poll_central_ac_state()
else:
await self.device.refresh_status()
# 返回并推送当前状态
updated = MideaDeviceData(
attributes=self.device.attributes,
available=self.device.connected,
connected=self.device.connected,
)
self.async_set_updated_data(updated)
return updated
except Exception as e:
_LOGGER.error(f"Error polling device state: {e}")
return MideaDeviceData(
attributes=self.device.attributes,
available=False,
connected=False,
)
async def _poll_central_ac_state(self) -> None:
"""轮询中央空调状态"""
try:
cloud = self._cloud
if cloud and hasattr(cloud, "get_central_ac_status"):
status_data = await cloud.get_central_ac_status([self._device_id])
if status_data and "appliances" in status_data:
# 找到对应的设备数据并更新到设备属性中
for appliance in status_data["appliances"]:
if appliance.get("type") == "0x21" and "extraData" in appliance:
extra_data = appliance["extraData"]
if "attr" in extra_data and "state" in extra_data["attr"]:
state = extra_data["attr"]["state"]
if "nodeid" in extra_data["attr"]:
self.device._attributes["nodeid"] = extra_data["attr"]["nodeid"]
if "masterId" in extra_data["attr"]:
self.device._attributes["masterId"] = extra_data["attr"]["masterId"]
if "modelid" in extra_data["attr"]:
self.device._attributes["modelid"] = extra_data["attr"]["modelid"]
if "idType" in extra_data["attr"]:
self.device._attributes["idType"] = extra_data["attr"]["idType"]
if "condition_attribute" in state:
condition = state["condition_attribute"]
# 将状态数据更新到设备属性中
for key, value in condition.items():
# 尝试将数字字符串转换为数字
if key.find("temp") > -1:
try:
# 尝试转换为整数
if '.' not in value:
self.device._attributes[key] = int(value)
else:
# 尝试转换为浮点数
self.device._attributes[key] = float(value)
except (ValueError, TypeError):
# 如果转换失败,保持原值
self.device._attributes[key] = value
else:
self.device._attributes[key] = value
break
except Exception as e:
MideaLogger.debug(f"Error polling central AC state: {e}")
async def async_set_attribute(self, attribute: str, value) -> None:
"""Set a device attribute."""
# 云端控制:构造 control 与 status携带当前状态作为上下文
await self.device.set_attribute(attribute, value)
self.device.attributes[attribute] = value
self.mute_state_update_for_a_while()
self.async_update_listeners()
async def async_set_attributes(self, attributes: dict) -> None:
"""Set multiple device attributes."""
await self.device.set_attributes(attributes)
self.device.attributes.update(attributes)
self.mute_state_update_for_a_while()
self.async_update_listeners()
async def async_send_command(self, cmd_type: int, cmd_body: str) -> None:
"""Send a command to the device."""
try:
cmd_body_bytes = bytearray.fromhex(cmd_body)
self.device.send_command(cmd_type, cmd_body_bytes)
except ValueError as e:
_LOGGER.error(f"Invalid command body: {e}")
raise
async def async_send_central_ac_control(self, control: dict) -> bool:
"""发送中央空调控制命令"""
try:
cloud = self._cloud
if cloud and hasattr(cloud, "send_central_ac_control"):
# 从设备属性中获取nodeid
masterid = self.device.attributes.get("masterId")
nodeid = self.device.attributes.get("nodeid")
modelid = self.device.attributes.get("modelid")
idtype = int(self.device.attributes.get("idType"))
if not nodeid:
MideaLogger.warning(f"No nodeid found for central AC device {self._device_id}")
return False
# 构建完整的控制命令包含centralized中的所有字段
full_control = self._build_full_central_ac_control(control)
MideaLogger.debug(f"Sending control to {self.device.device_name}: {full_control}")
success = await cloud.send_central_ac_control(
masterid,
nodeid,
modelid,
idtype,
full_control
)
if success:
# 更新本地状态
self.device.attributes.update(control)
self.mute_state_update_for_a_while()
self.async_update_listeners()
return True
else:
MideaLogger.debug(f"Failed to send control to {self.device.device_name}")
return False
else:
MideaLogger.debug("Cloud service not available for central AC control")
return False
except Exception as e:
MideaLogger.debug(f"Error sending control to {self.device.device_name}: {e}")
return False
def _build_full_central_ac_control(self, new_control: dict) -> dict:
"""构建完整控制命令"""
full_control = {}
full_control["run_mode"] = self.device.attributes.get("run_mode")
full_control["cooling_temp"] = str(self.device.attributes.get("cool_temp_set") or 26.0)
full_control["heating_temp"] = str(self.device.attributes.get("heat_temp_set") or 20.0)
full_control["fan_speed"] = self.device.attributes.get("fan_speed")
swing_mode = self.device.attributes.get("is_swing")
is_elec_heat = self.device.attributes.get("is_elec_heat")
if swing_mode == "1":
# 开启摆风:如果当前有电辅热(2)则设为6(电辅热+摆风)否则设为4(摆风)
if is_elec_heat == "1":
new_extflag = "6" # 电辅热+摆风
else:
new_extflag = "4" # 仅摆风
else:
# 关闭摆风如果当前是6(电辅热+摆风)则设为2(仅电辅热)否则设为0(关闭)
if is_elec_heat == "1":
new_extflag = "2" # 仅电辅热
else:
new_extflag = "0" # 关闭
full_control["extflag"] = new_extflag
# 然后用新的控制值覆盖
full_control.update(new_control)
return full_control