rename repo

This commit is contained in:
unknown
2023-09-17 20:30:02 +08:00
parent 7263e09692
commit 2c1d391283
27 changed files with 1 additions and 1 deletions

View File

@@ -0,0 +1,262 @@
import os
import base64
import voluptuous as vol
from importlib import import_module
from homeassistant.config_entries import ConfigEntry
from homeassistant.util.json import load_json
try:
from homeassistant.helpers.json import save_json
except ImportError:
from homeassistant.util.json import save_json
from homeassistant.helpers.typing import ConfigType
from homeassistant.core import (
HomeAssistant,
ServiceCall
)
from homeassistant.const import (
Platform,
CONF_TYPE,
CONF_PORT,
CONF_MODEL,
CONF_IP_ADDRESS,
CONF_DEVICE_ID,
CONF_PROTOCOL,
CONF_TOKEN,
CONF_NAME,
CONF_DEVICE,
CONF_ENTITIES
)
from .core.logger import MideaLogger
from .core.device import MiedaDevice
from .const import (
DOMAIN,
DEVICES,
CONF_REFRESH_INTERVAL,
CONFIG_PATH,
CONF_KEY,
CONF_ACCOUNT,
CONF_SN8,
CONF_SN,
CONF_MODEL_NUMBER,
CONF_LUA_FILE
)
ALL_PLATFORM = [
Platform.BINARY_SENSOR,
Platform.SENSOR,
Platform.SWITCH,
Platform.CLIMATE,
Platform.SELECT,
Platform.WATER_HEATER,
Platform.FAN
]
def get_sn8_used(hass: HomeAssistant, sn8):
entries = hass.config_entries.async_entries(DOMAIN)
count = 0
for entry in entries:
if sn8 == entry.data.get("sn8"):
count += 1
return count
def remove_device_config(hass: HomeAssistant, sn8):
config_file = hass.config.path(f"{CONFIG_PATH}/{sn8}.json")
try:
os.remove(config_file)
except FileNotFoundError:
pass
def load_device_config(hass: HomeAssistant, device_type, sn8):
os.makedirs(hass.config.path(CONFIG_PATH), exist_ok=True)
config_file = hass.config.path(f"{CONFIG_PATH}/{sn8}.json")
json_data = load_json(config_file, default={})
if len(json_data) > 0:
json_data = json_data.get(sn8)
else:
device_path = f".device_mapping.{'T0x%02X' % device_type}"
try:
mapping_module = import_module(device_path, __package__)
if sn8 in mapping_module.DEVICE_MAPPING.keys():
json_data = mapping_module.DEVICE_MAPPING[sn8]
elif "default" in mapping_module.DEVICE_MAPPING:
json_data = mapping_module.DEVICE_MAPPING["default"]
if len(json_data) > 0:
save_data = {sn8: json_data}
save_json(config_file, save_data)
except ModuleNotFoundError:
MideaLogger.warning(f"Can't load mapping file for type {'T0x%02X' % device_type}")
return json_data
def register_services(hass: HomeAssistant):
async def async_set_attributes(service: ServiceCall):
device_id = service.data.get("device_id")
attributes = service.data.get("attributes")
MideaLogger.debug(f"Service called: set_attributes, device_id: {device_id}, attributes: {attributes}")
try:
device: MiedaDevice = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
except KeyError:
MideaLogger.error(f"Failed to call service set_attributes: the device {device_id} isn't exist.")
return
if device:
device.set_attributes(attributes)
async def async_send_command(service: ServiceCall):
device_id = service.data.get("device_id")
cmd_type = service.data.get("cmd_type")
cmd_body = service.data.get("cmd_body")
try:
cmd_body = bytearray.fromhex(cmd_body)
except ValueError:
MideaLogger.error(f"Failed to call service set_attributes: invalid cmd_body, a hexadecimal string required")
return
try:
device: MiedaDevice = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
except KeyError:
MideaLogger.error(f"Failed to call service set_attributes: the device {device_id} isn't exist.")
return
if device:
device.send_command(cmd_type, cmd_body)
hass.services.async_register(
DOMAIN,
"set_attributes",
async_set_attributes,
schema=vol.Schema({
vol.Required("device_id"): vol.Coerce(int),
vol.Required("attributes"): vol.Any(dict)
})
)
hass.services.async_register(
DOMAIN, "send_command", async_send_command,
schema=vol.Schema({
vol.Required("device_id"): vol.Coerce(int),
vol.Required("cmd_type"): vol.In([2, 3]),
vol.Required("cmd_body"): str
})
)
async def update_listener(hass: HomeAssistant, config_entry: ConfigEntry):
device_id = config_entry.data.get(CONF_DEVICE_ID)
if device_id is not None:
ip_address = config_entry.options.get(
CONF_IP_ADDRESS, None
)
refresh_interval = config_entry.options.get(
CONF_REFRESH_INTERVAL, None
)
device: MiedaDevice = hass.data[DOMAIN][DEVICES][device_id][CONF_DEVICE]
if device:
if ip_address is not None:
device.set_ip_address(ip_address)
if refresh_interval is not None:
device.set_refresh_interval(refresh_interval)
async def async_setup(hass: HomeAssistant, config: ConfigType):
hass.data.setdefault(DOMAIN, {})
cjson = os.getcwd() + "/cjson.lua"
bit = os.getcwd() + "/bit.lua"
if not os.path.exists(cjson):
from .const import CJSON_LUA
cjson_lua = base64.b64decode(CJSON_LUA.encode("utf-8")).decode("utf-8")
with open(cjson, "wt") as fp:
fp.write(cjson_lua)
if not os.path.exists(bit):
from .const import BIT_LUA
bit_lua = base64.b64decode(BIT_LUA.encode("utf-8")).decode("utf-8")
with open(bit, "wt") as fp:
fp.write(bit_lua)
register_services(hass)
return True
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry):
device_type = config_entry.data.get(CONF_TYPE)
if device_type == CONF_ACCOUNT:
return True
name = config_entry.data.get(CONF_NAME)
device_id = config_entry.data.get(CONF_DEVICE_ID)
device_type = config_entry.data.get(CONF_TYPE)
token = config_entry.data.get(CONF_TOKEN)
key = config_entry.data.get(CONF_KEY)
ip_address = config_entry.options.get(CONF_IP_ADDRESS, None)
if not ip_address:
ip_address = config_entry.data.get(CONF_IP_ADDRESS)
refresh_interval = config_entry.options.get(CONF_REFRESH_INTERVAL)
port = config_entry.data.get(CONF_PORT)
model = config_entry.data.get(CONF_MODEL)
protocol = config_entry.data.get(CONF_PROTOCOL)
subtype = config_entry.data.get(CONF_MODEL_NUMBER)
sn = config_entry.data.get(CONF_SN)
sn8 = config_entry.data.get(CONF_SN8)
lua_file = config_entry.data.get(CONF_LUA_FILE)
if protocol == 3 and (key is None or key is None):
MideaLogger.error("For V3 devices, the key and the token is required.")
return False
device = MiedaDevice(
name=name,
device_id=device_id,
device_type=device_type,
ip_address=ip_address,
port=port,
token=token,
key=key,
protocol=protocol,
model=model,
subtype=subtype,
sn=sn,
sn8=sn8,
lua_file=lua_file,
)
if refresh_interval is not None:
device.set_refresh_interval(refresh_interval)
device.open()
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
if DEVICES not in hass.data[DOMAIN]:
hass.data[DOMAIN][DEVICES] = {}
hass.data[DOMAIN][DEVICES][device_id] = {}
hass.data[DOMAIN][DEVICES][device_id][CONF_DEVICE] = device
hass.data[DOMAIN][DEVICES][device_id][CONF_ENTITIES] = {}
config = load_device_config(hass, device_type, sn8)
if config is not None and len(config) > 0:
queries = config.get("queries")
if queries is not None and isinstance(queries, list):
device.set_queries(queries)
centralized = config.get("centralized")
if centralized is not None and isinstance(centralized, list):
device.set_centralized(centralized)
calculate = config.get("calculate")
if calculate is not None and isinstance(calculate, dict):
device.set_calculate(calculate)
hass.data[DOMAIN][DEVICES][device_id]["manufacturer"] = config.get("manufacturer")
hass.data[DOMAIN][DEVICES][device_id]["rationale"] = config.get("rationale")
hass.data[DOMAIN][DEVICES][device_id][CONF_ENTITIES] = config.get(CONF_ENTITIES)
for platform in ALL_PLATFORM:
hass.async_create_task(hass.config_entries.async_forward_entry_setup(
config_entry, platform))
config_entry.add_update_listener(update_listener)
return True
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry):
device_id = config_entry.data.get(CONF_DEVICE_ID)
if device_id is not None:
device: MiedaDevice = hass.data[DOMAIN][DEVICES][device_id][CONF_DEVICE]
if device is not None:
if get_sn8_used(hass, device.sn8) == 1:
lua_file = config_entry.data.get("lua_file")
os.remove(lua_file)
remove_device_config(hass, device.sn8)
device.close()
hass.data[DOMAIN][DEVICES].pop(device_id)
for platform in ALL_PLATFORM:
await hass.config_entries.async_forward_entry_unload(config_entry, platform)
return True

View File

@@ -0,0 +1,62 @@
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorDeviceClass
)
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaBinaryBaseEntity
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.BINARY_SENSOR)
devs = [MideaDeviceStatusSensorEntity(device, manufacturer, rationale,"Status", {})]
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaBinarySensorEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaDeviceStatusSensorEntity(MideaBinaryBaseEntity, BinarySensorEntity):
@property
def device_class(self):
return BinarySensorDeviceClass.CONNECTIVITY
@property
def icon(self):
return "mdi:devices"
@property
def is_on(self):
return self._device.connected
@property
def available(self):
return True
@property
def extra_state_attributes(self) -> dict:
return self._device.attributes
def update_state(self, status):
try:
self.schedule_update_ha_state()
except Exception as e:
pass
class MideaBinarySensorEntity(MideaBinaryBaseEntity, BinarySensorEntity):
pass

View File

@@ -0,0 +1,193 @@
from homeassistant.components.climate import (
ClimateEntity,
ClimateEntityFeature,
HVACMode,
ATTR_HVAC_MODE,
)
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_ENTITIES,
CONF_DEVICE,
ATTR_TEMPERATURE
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaEntity, Rationale
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.CLIMATE)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaClimateEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaClimateEntity(MideaEntity, ClimateEntity):
def __init__(self, device, manufacturer, rationale, entity_key, config):
super().__init__(device, manufacturer, rationale, entity_key, config)
self._key_power = self._config.get("power")
self._key_hvac_modes = self._config.get("hvac_modes")
self._key_preset_modes = self._config.get("preset_modes")
self._key_aux_heat = self._config.get("aux_heat")
self._key_swing_modes = self._config.get("swing_modes")
self._key_fan_modes = self._config.get("fan_modes")
self._key_min_temp = self._config.get("min_temp")
self._key_max_temp = self._config.get("max_temp")
self._key_current_temperature = self._config.get("current_temperature")
self._key_target_temperature = self._config.get("target_temperature")
self._attr_temperature_unit = self._config.get("temperature_unit")
self._attr_precision = self._config.get("precision")
@property
def supported_features(self):
features = 0
if self._key_target_temperature is not None:
features |= ClimateEntityFeature.TARGET_TEMPERATURE
if self._key_preset_modes is not None:
features |= ClimateEntityFeature.PRESET_MODE
if self._key_aux_heat is not None:
features |= ClimateEntityFeature.AUX_HEAT
if self._key_swing_modes is not None:
features |= ClimateEntityFeature.SWING_MODE
if self._key_fan_modes is not None:
features |= ClimateEntityFeature.FAN_MODE
return features
@property
def current_temperature(self):
return self._device.get_attribute(self._key_current_temperature)
@property
def target_temperature(self):
if isinstance(self._key_target_temperature, list):
temp_int = self._device.get_attribute(self._key_target_temperature[0])
tem_dec = self._device.get_attribute(self._key_target_temperature[1])
if temp_int is not None and tem_dec is not None:
return temp_int + tem_dec
return None
else:
return self._device.get_attribute(self._key_target_temperature)
@property
def min_temp(self):
if isinstance(self._key_min_temp, str):
return float(self._device.get_attribute(self._key_min_temp))
else:
return float(self._key_min_temp)
@property
def max_temp(self):
if isinstance(self._key_max_temp, str):
return float(self._device.get_attribute(self._key_max_temp))
else:
return float(self._key_max_temp)
@property
def target_temperature_low(self):
return self.min_temp
@property
def target_temperature_high(self):
return self.max_temp
@property
def preset_modes(self):
return list(self._key_preset_modes.keys())
@property
def preset_mode(self):
return self._dict_get_selected(self._key_preset_modes)
@property
def fan_modes(self):
return list(self._key_fan_modes.keys())
@property
def fan_mode(self):
return self._dict_get_selected(self._key_fan_modes, Rationale.LESS)
@property
def swing_modes(self):
return list(self._key_swing_modes.keys())
@property
def swing_mode(self):
return self._dict_get_selected(self._key_swing_modes)
@property
def is_on(self) -> bool:
return self.hvac_mode != HVACMode.OFF
@property
def hvac_mode(self):
return self._dict_get_selected(self._key_hvac_modes)
@property
def hvac_modes(self):
return list(self._key_hvac_modes.keys())
@property
def is_aux_heat(self):
return self._get_status_on_off(self._key_aux_heat)
def turn_on(self):
self._set_status_on_off(self._key_power, True)
def turn_off(self):
self._set_status_on_off(self._key_power, False)
def set_temperature(self, **kwargs):
if ATTR_TEMPERATURE not in kwargs:
return
temperature = kwargs.get(ATTR_TEMPERATURE)
temp_int, temp_dec = divmod(temperature, 1)
temp_int = int(temp_int)
hvac_mode = kwargs.get(ATTR_HVAC_MODE)
if hvac_mode is not None:
new_status = self._key_hvac_modes.get(hvac_mode)
else:
new_status = {}
if isinstance(self._key_target_temperature, list):
new_status[self._key_target_temperature[0]] = temp_int
new_status[self._key_target_temperature[1]] = temp_dec
else:
new_status[self._key_target_temperature] = temperature
self._device.set_attributes(new_status)
def set_fan_mode(self, fan_mode: str):
new_status = self._key_fan_modes.get(fan_mode)
self._device.set_attributes(new_status)
def set_preset_mode(self, preset_mode: str):
new_status = self._key_preset_modes.get(preset_mode)
self._device.set_attributes(new_status)
def set_hvac_mode(self, hvac_mode: str):
new_status = self._key_hvac_modes.get(hvac_mode)
self._device.set_attributes(new_status)
def set_swing_mode(self, swing_mode: str):
new_status = self._key_swing_modes.get(swing_mode)
self._device.set_attributes(new_status)
def turn_aux_heat_on(self) -> None:
self._set_status_on_off(self._key_aux_heat, True)
def turn_aux_heat_off(self) -> None:
self._set_status_on_off(self._key_aux_heat, False)
def update_state(self, status):
try:
self.schedule_update_ha_state()
except Exception as e:
pass

View File

@@ -0,0 +1,350 @@
import voluptuous as vol
import logging
import os
import ipaddress
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant import config_entries
from homeassistant.core import callback
from homeassistant.const import (
CONF_TYPE,
CONF_PASSWORD,
CONF_PORT,
CONF_MODEL,
CONF_IP_ADDRESS,
CONF_DEVICE_ID,
CONF_PROTOCOL,
CONF_TOKEN,
CONF_NAME
)
from . import remove_device_config, load_device_config
from .core.cloud import get_midea_cloud
from .core.discover import discover
from .core.device import MiedaDevice
from .const import (
DOMAIN,
CONF_REFRESH_INTERVAL,
STORAGE_PATH,
CONF_ACCOUNT,
CONF_SERVER,
CONF_HOME,
CONF_KEY,
CONF_SN8,
CONF_SN,
CONF_MODEL_NUMBER,
CONF_LUA_FILE
)
_LOGGER = logging.getLogger(__name__)
servers = {
1: "MSmartHome",
2: "美的美居",
}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
_session = None
_cloud = None
_current_home = None
_device_list = {}
_device = None
@staticmethod
@callback
def async_get_options_flow(config_entry):
return OptionsFlowHandler(config_entry)
def _get_configured_account(self):
for entry in self._async_current_entries():
if entry.data.get(CONF_TYPE) == CONF_ACCOUNT:
return entry.data.get(CONF_ACCOUNT), entry.data.get(CONF_PASSWORD), entry.data.get(CONF_SERVER)
return None, None, None
def _device_configured(self, device_id):
for entry in self._async_current_entries():
if device_id == entry.data.get(CONF_DEVICE_ID):
return True
return False
@staticmethod
def _is_valid_ip_address(ip_address):
try:
ipaddress.ip_address(ip_address)
return True
except ValueError:
return False
async def async_step_user(self, user_input=None, error=None):
if self._session is None:
self._session = async_create_clientsession(self.hass)
account, password, server = self._get_configured_account()
if account is not None and password is not None:
if self._cloud is None:
self._cloud = get_midea_cloud(
session=self._session,
cloud_name=servers[server],
account=account,
password=password
)
if await self._cloud.login():
return await self.async_step_home()
else:
return await self.async_step_user(error="account_invalid")
if user_input is not None:
if self._cloud is None:
self._cloud = get_midea_cloud(
session=self._session,
cloud_name=servers[user_input[CONF_SERVER]],
account=user_input[CONF_ACCOUNT],
password=user_input[CONF_PASSWORD]
)
if await self._cloud.login():
return self.async_create_entry(
title=f"{user_input[CONF_ACCOUNT]}",
data={
CONF_TYPE: CONF_ACCOUNT,
CONF_ACCOUNT: user_input[CONF_ACCOUNT],
CONF_PASSWORD: user_input[CONF_PASSWORD],
CONF_SERVER: user_input[CONF_SERVER]
})
else:
self._cloud = None
return await self.async_step_user(error="login_failed")
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required(CONF_ACCOUNT): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_SERVER, default=1): vol.In(servers)
}),
errors={"base": error} if error else None
)
async def async_step_home(self, user_input=None, error=None):
if user_input is not None:
self._current_home = user_input[CONF_HOME]
return await self.async_step_device()
homes = await self._cloud.list_home()
if homes is None or len(homes) == 0:
return await self.async_step_device(error="no_home")
return self.async_show_form(
step_id="home",
data_schema=vol.Schema({
vol.Required(CONF_HOME, default=list(homes.keys())[0]):
vol.In(homes),
}),
errors={"base": error} if error else None
)
async def async_step_device(self, user_input=None, error=None):
if user_input is not None:
# 下载lua
# 本地尝试连接设备
self._device = self._device_list[user_input[CONF_DEVICE_ID]]
if self._device.get("online") is not True:
return await self.async_step_device(error="offline_error")
return await self.async_step_discover()
appliances = await self._cloud.list_appliances(self._current_home)
self._device_list = {}
device_list = {}
for appliance_code, appliance_info in appliances.items():
if not self._device_configured(appliance_code):
try:
model_number = int(appliance_info.get("model_number")) if appliance_info.get("model_number") is not None else 0
except ValueError:
model_number = 0
self._device_list[appliance_code] = {
CONF_DEVICE_ID: appliance_code,
CONF_NAME: appliance_info.get("name"),
CONF_TYPE: appliance_info.get("type"),
CONF_SN8: appliance_info.get("sn8", "00000000"),
CONF_SN: appliance_info.get("sn"),
CONF_MODEL: appliance_info.get("model", "0"),
CONF_MODEL_NUMBER: model_number,
"manufacturer_code": appliance_info.get("manufacturer_code","0000"),
"online": appliance_info.get("online")
}
device_list[appliance_code] = \
f"{appliance_info.get('name')} ({'online' if appliance_info.get('online') is True else 'offline'})"
if len(self._device_list) == 0:
return await self.async_step_device(error="no_new_devices")
return self.async_show_form(
step_id="device",
data_schema=vol.Schema({
vol.Required(CONF_DEVICE_ID, default=list(device_list.keys())[0]):
vol.In(device_list),
}),
errors={"base": error} if error else None
)
async def async_step_discover(self, user_input=None, error=None):
if user_input is not None:
if user_input[CONF_IP_ADDRESS] == "auto" or self._is_valid_ip_address(user_input[CONF_IP_ADDRESS]):
ip_address = None
if self._is_valid_ip_address(user_input[CONF_IP_ADDRESS]):
ip_address = user_input[CONF_IP_ADDRESS]
discover_devices = discover([self._device[CONF_TYPE]], ip_address)
_LOGGER.debug(discover_devices)
if discover_devices is None or len(discover_devices) == 0:
return await self.async_step_discover(error="discover_failed")
current_device = discover_devices.get(self._device[CONF_DEVICE_ID])
if current_device is None:
return await self.async_step_discover(error="discover_failed")
os.makedirs(self.hass.config.path(STORAGE_PATH), exist_ok=True)
path = self.hass.config.path(STORAGE_PATH)
file = await self._cloud.download_lua(
path=path,
device_type=self._device[CONF_TYPE],
sn=self._device[CONF_SN],
model_number=self._device[CONF_MODEL_NUMBER],
manufacturer_code=self._device["manufacturer_code"]
)
if file is None:
return await self.async_step_discover(error="download_lua_failed")
use_token = None
use_key = None
connected = False
if current_device.get(CONF_PROTOCOL) == 3:
keys = await self._cloud.get_keys(self._device.get(CONF_DEVICE_ID))
for method, key in keys.items():
dm = MiedaDevice(
name="",
device_id=self._device.get(CONF_DEVICE_ID),
device_type=current_device.get(CONF_TYPE),
ip_address=current_device.get(CONF_IP_ADDRESS),
port=current_device.get(CONF_PORT),
token=key["token"],
key=key["key"],
protocol=3,
model=None,
subtype = None,
sn=None,
sn8=None,
lua_file=None
)
_LOGGER.debug(
f"Successful to take token and key, token: {key['token']},"
f" key: { key['key']}, method: {method}"
)
if dm.connect():
use_token = key["token"]
use_key = key["key"]
dm.disconnect()
connected = True
break
else:
dm = MiedaDevice(
name=self._device.get("name"),
device_id=self._device.get("device_id"),
device_type=current_device.get(CONF_TYPE),
ip_address=current_device.get(CONF_IP_ADDRESS),
port=current_device.get(CONF_PORT),
token=None,
key=None,
protocol=2,
model=None,
subtype=None,
sn=None,
sn8=None,
lua_file=None
)
if dm.connect():
dm.disconnect()
connected = True
if not connected:
return await self.async_step_discover(error="connect_error")
return self.async_create_entry(
title=self._device.get("name"),
data={
CONF_NAME: self._device.get(CONF_NAME),
CONF_DEVICE_ID: self._device.get(CONF_DEVICE_ID),
CONF_TYPE: current_device.get(CONF_TYPE),
CONF_PROTOCOL: current_device.get(CONF_PROTOCOL),
CONF_IP_ADDRESS: current_device.get(CONF_IP_ADDRESS),
CONF_PORT: current_device.get(CONF_PORT),
CONF_TOKEN: use_token,
CONF_KEY: use_key,
CONF_MODEL: self._device.get(CONF_MODEL),
CONF_MODEL_NUMBER: self._device.get(CONF_MODEL_NUMBER),
CONF_SN: self._device.get(CONF_SN),
CONF_SN8: self._device.get(CONF_SN8),
CONF_LUA_FILE: file,
})
else:
return await self.async_step_discover(error="invalid_input")
return self.async_show_form(
step_id="discover",
data_schema=vol.Schema({
vol.Required(CONF_IP_ADDRESS, default="auto"): str
}),
errors={"base": error} if error else None
)
class OptionsFlowHandler(config_entries.OptionsFlow):
def __init__(self, config_entry: config_entries.ConfigEntry):
self._config_entry = config_entry
async def async_step_init(self, user_input=None, error=None):
if self._config_entry.data.get(CONF_TYPE) == CONF_ACCOUNT:
return self.async_abort(reason="account_unsupport_config")
if user_input is not None:
if user_input.get("option") == 1:
return await self.async_step_configure()
else:
return await self.async_step_reset()
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required("option", default=1):
vol.In({1: "Options", 2: "Reset device configuration"})
}),
errors={"base": error} if error else None
)
async def async_step_reset(self, user_input=None):
if user_input is not None:
if user_input["check"]:
remove_device_config(self.hass, self._config_entry.data.get(CONF_SN8))
load_device_config(
self.hass,
self._config_entry.data.get(CONF_TYPE),
self._config_entry.data.get(CONF_SN8))
return self.async_abort(reason="reset_success")
return self.async_show_form(
step_id="reset",
data_schema=vol.Schema({
vol.Required("check", default=False): bool
})
)
async def async_step_configure(self, user_input=None):
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
ip_address = self._config_entry.options.get(
CONF_IP_ADDRESS, None
)
if ip_address is None:
ip_address = self._config_entry.data.get(
CONF_IP_ADDRESS, None
)
refresh_interval = self._config_entry.options.get(
CONF_REFRESH_INTERVAL, 30
)
data_schema = vol.Schema({
vol.Required(
CONF_IP_ADDRESS,
default=ip_address
): str,
vol.Required(
CONF_REFRESH_INTERVAL,
default=refresh_interval
): int
})
return self.async_show_form(
step_id="configure",
data_schema=data_schema
)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,465 @@
import logging
import time
import datetime
import json
import base64
from threading import Lock
from aiohttp import ClientSession
from secrets import token_hex
from .security import CloudSecurity, MeijuCloudSecurity, MSmartCloudSecurity
_LOGGER = logging.getLogger(__name__)
clouds = {
"美的美居": {
"class_name": "MeijuCloud",
"app_key": "46579c15",
"login_key": "ad0ee21d48a64bf49f4fb583ab76e799",
"iot_key": bytes.fromhex(format(9795516279659324117647275084689641883661667, 'x')).decode(),
"hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(),
"api_url": "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias=",
},
"MSmartHome": {
"class_name": "MSmartHomeCloud",
"app_key": "ac21b9f9cbfe4ca5a88562ef25e2b768",
"iot_key": bytes.fromhex(format(7882822598523843940, 'x')).decode(),
"hmac_key": bytes.fromhex(format(117390035944627627450677220413733956185864939010425, 'x')).decode(),
"api_url": "https://mp-prod.appsmb.com/mas/v5/app/proxy?alias=",
},
}
default_keys = {
99: {
"token": "ee755a84a115703768bcc7c6c13d3d629aa416f1e2fd798beb9f78cbb1381d09"
"1cc245d7b063aad2a900e5b498fbd936c811f5d504b2e656d4f33b3bbc6d1da3",
"key": "ed37bd31558a4b039aaf4e7a7a59aa7a75fd9101682045f69baf45d28380ae5c"
}
}
class MideaCloud:
def __init__(
self,
session: ClientSession,
security: CloudSecurity,
app_key: str,
account: str,
password: str,
api_url: str
):
self._device_id = CloudSecurity.get_deviceid(account)
self._session = session
self._security = security
self._api_lock = Lock()
self._app_key = app_key
self._account = account
self._password = password
self._api_url = api_url
self._access_token = None
self._login_id = None
def _make_general_data(self):
return {
}
async def _api_request(self, endpoint: str, data: dict, header=None) -> dict | None:
header = header or {}
if not data.get("reqId"):
data.update({
"reqId": token_hex(16)
})
if not data.get("stamp"):
data.update({
"stamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S")
})
random = str(int(time.time()))
url = self._api_url + endpoint
dump_data = json.dumps(data)
sign = self._security.sign(dump_data, random)
header.update({
"content-type": "application/json; charset=utf-8",
"secretVersion": "1",
"sign": sign,
"random": random,
})
if self._access_token is not None:
header.update({
"accesstoken": self._access_token
})
response:dict = {"code": -1}
for i in range(0, 3):
try:
with self._api_lock:
r = await self._session.request("POST", url, headers=header, data=dump_data, timeout=10)
raw = await r.read()
_LOGGER.debug(f"Midea cloud API url: {url}, data: {data}, response: {raw}")
response = json.loads(raw)
break
except Exception as e:
pass
if int(response["code"]) == 0 and "data" in response:
return response["data"]
print(response)
return None
async def _get_login_id(self) -> str | None:
data = self._make_general_data()
data.update({
"loginAccount": f"{self._account}"
})
if response := await self._api_request(
endpoint="/v1/user/login/id/get",
data=data
):
return response.get("loginId")
return None
async def login(self) -> bool:
raise NotImplementedError()
async def get_keys(self, appliance_id: int):
result = {}
for method in [1, 2]:
udp_id = self._security.get_udp_id(appliance_id, method)
data = self._make_general_data()
data.update({
"udpid": udp_id
})
response = await self._api_request(
endpoint="/v1/iot/secure/getToken",
data=data
)
if response and "tokenlist" in response:
for token in response["tokenlist"]:
if token["udpId"] == udp_id:
result[method] = {
"token": token["token"].lower(),
"key": token["key"].lower()
}
result.update(default_keys)
return result
async def list_home(self) -> dict | None:
return {1: "My home"}
async def list_appliances(self, home_id) -> dict | None:
raise NotImplementedError()
async def download_lua(
self, path: str,
device_type: str,
sn: str,
model_number: str | None,
manufacturer_code: str = "0000",
):
raise NotImplementedError()
class MeijuCloud(MideaCloud):
APP_ID = "900"
APP_VERSION = "8.20.0.2"
def __init__(
self,
cloud_name: str,
session: ClientSession,
account: str,
password: str,
):
super().__init__(
session=session,
security=MeijuCloudSecurity(
login_key=clouds[cloud_name]["login_key"],
iot_key=clouds[cloud_name]["iot_key"],
hmac_key=clouds[cloud_name]["hmac_key"],
),
app_key=clouds[cloud_name]["app_key"],
account=account,
password=password,
api_url=clouds[cloud_name]["api_url"]
)
async def login(self) -> bool:
if login_id := await self._get_login_id():
self._login_id = login_id
stamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
data = {
"iotData": {
"clientType": 1,
"deviceId": self._device_id,
"iampwd": self._security.encrypt_iam_password(self._login_id, self._password),
"iotAppId": self.APP_ID,
"loginAccount": self._account,
"password": self._security.encrypt_password(self._login_id, self._password),
"reqId": token_hex(16),
"stamp": stamp
},
"data": {
"appKey": self._app_key,
"deviceId": self._device_id,
"platform": 2
},
"timestamp": stamp,
"stamp": stamp
}
if response := await self._api_request(
endpoint="/mj/user/login",
data=data
):
self._access_token = response["mdata"]["accessToken"]
self._security.set_aes_keys(
self._security.aes_decrypt_with_fixed_key(
response["key"]
), None
)
return True
return False
async def list_home(self):
if response := await self._api_request(
endpoint="/v1/homegroup/list/get",
data={}
):
homes = {}
for home in response["homeList"]:
homes.update({
int(home["homegroupId"]): home["name"]
})
return homes
return None
async def list_appliances(self, home_id) -> dict | None:
data = {
"homegroupId": home_id
}
if response := await self._api_request(
endpoint="/v1/appliance/home/list/get",
data=data
):
appliances = {}
for home in response.get("homeList") or []:
for room in home.get("roomList") or []:
for appliance in room.get("applianceList"):
device_info = {
"name": appliance.get("name"),
"type": int(appliance.get("type"), 16),
"sn": self._security.aes_decrypt(appliance.get("sn")) if appliance.get("sn") else "",
"sn8": appliance.get("sn8", "00000000"),
"model_number": appliance.get("modelNumber", "0"),
"manufacturer_code":appliance.get("enterpriseCode", "0000"),
"model": appliance.get("productModel"),
"online": appliance.get("onlineStatus") == "1",
}
if device_info.get("sn8") is None or len(device_info.get("sn8")) == 0:
device_info["sn8"] = "00000000"
if device_info.get("model") is None or len(device_info.get("model")) == 0:
device_info["model"] = device_info["sn8"]
appliances[int(appliance["applianceCode"])] = device_info
return appliances
return None
async def download_lua(
self, path: str,
device_type: int,
sn: str,
model_number: str | None,
manufacturer_code: str = "0000",
):
data = {
"applianceSn": sn,
"applianceType": "0x%02X" % device_type,
"applianceMFCode": manufacturer_code,
'version': "0",
"iotAppId": self.APP_ID,
}
fnm = None
if response := await self._api_request(
endpoint="/v1/appliance/protocol/lua/luaGet",
data=data
):
res = await self._session.get(response["url"])
if res.status == 200:
lua = await res.text()
if lua:
stream = ('local bit = require "bit"\n' +
self._security.aes_decrypt_with_fixed_key(lua))
stream = stream.replace("\r\n", "\n")
fnm = f"{path}/{response['fileName']}"
with open(fnm, "w") as fp:
fp.write(stream)
return fnm
class MSmartHomeCloud(MideaCloud):
APP_ID = "1010"
SRC = "10"
APP_VERSION = "3.0.2"
def __init__(
self,
cloud_name: str,
session: ClientSession,
account: str,
password: str,
):
super().__init__(
session=session,
security=MSmartCloudSecurity(
login_key=clouds[cloud_name]["app_key"],
iot_key=clouds[cloud_name]["iot_key"],
hmac_key=clouds[cloud_name]["hmac_key"],
),
app_key=clouds[cloud_name]["app_key"],
account=account,
password=password,
api_url=clouds[cloud_name]["api_url"]
)
self._auth_base = base64.b64encode(
f"{self._app_key}:{clouds['MSmartHome']['iot_key']}".encode("ascii")
).decode("ascii")
self._uid = ""
def _make_general_data(self):
return {
"appVersion": self.APP_VERSION,
"src": self.SRC,
"format": "2",
"stamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"),
"platformId": "1",
"deviceId": self._device_id,
"reqId": token_hex(16),
"uid": self._uid,
"clientType": "1",
"appId": self.APP_ID,
}
async def _api_request(self, endpoint: str, data: dict, header=None) -> dict | None:
header = header or {}
header.update({
"x-recipe-app": self.APP_ID,
"authorization": f"Basic {self._auth_base}"
})
if len(self._uid) > 0:
header.update({
"uid": self._uid
})
return await super()._api_request(endpoint, data, header)
async def _re_route(self):
data = self._make_general_data()
data.update({
"userType": "0",
"userName": f"{self._account}"
})
if response := await self._api_request(
endpoint="/v1/multicloud/platform/user/route",
data=data
):
if api_url := response.get("masUrl"):
self._api_url = api_url
async def login(self) -> bool:
await self._re_route()
if login_id := await self._get_login_id():
self._login_id = login_id
iot_data = self._make_general_data()
iot_data.pop("uid")
stamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
iot_data.update({
"iampwd": self._security.encrypt_iam_password(self._login_id, self._password),
"loginAccount": self._account,
"password": self._security.encrypt_password(self._login_id, self._password),
"stamp": stamp
})
data = {
"iotData": iot_data,
"data": {
"appKey": self._app_key,
"deviceId": self._device_id,
"platform": "2"
},
"stamp": stamp
}
if response := await self._api_request(
endpoint="/mj/user/login",
data=data
):
self._uid = response["uid"]
self._access_token = response["mdata"]["accessToken"]
self._security.set_aes_keys(response["accessToken"], response["randomData"])
return True
return False
async def list_appliances(self, home_id=None) -> dict | None:
data = self._make_general_data()
if response := await self._api_request(
endpoint="/v1/appliance/user/list/get",
data=data
):
appliances = {}
for appliance in response["list"]:
device_info = {
"name": appliance.get("name"),
"type": int(appliance.get("type"), 16),
"sn": self._security.aes_decrypt(appliance.get("sn")) if appliance.get("sn") else "",
"sn8": "",
"model_number": appliance.get("modelNumber", "0"),
"manufacturer_code":appliance.get("enterpriseCode", "0000"),
"model": "",
"online": appliance.get("onlineStatus") == "1",
}
device_info["sn8"] = device_info.get("sn")[9:17] if len(device_info["sn"]) > 17 else ""
device_info["model"] = device_info.get("sn8")
appliances[int(appliance["id"])] = device_info
return appliances
return None
async def download_lua(
self, path: str,
device_type: int,
sn: str,
model_number: str | None,
manufacturer_code: str = "0000",
):
data = {
"clientType": "1",
"appId": self.APP_ID,
"format": "2",
"deviceId": self._device_id,
"iotAppId": self.APP_ID,
"applianceMFCode": manufacturer_code,
"applianceType": "0x%02X" % device_type,
"modelNumber": model_number,
"applianceSn": self._security.aes_encrypt_with_fixed_key(sn.encode("ascii")).hex(),
"version": "0",
"encryptedType ": "2"
}
fnm = None
if response := await self._api_request(
endpoint="/v2/luaEncryption/luaGet",
data=data
):
res = await self._session.get(response["url"])
if res.status == 200:
lua = await res.text()
if lua:
stream = ('local bit = require "bit"\n' +
self._security.aes_decrypt_with_fixed_key(lua))
stream = stream.replace("\r\n", "\n")
fnm = f"{path}/{response['fileName']}"
with open(fnm, "w") as fp:
fp.write(stream)
return fnm
def get_midea_cloud(cloud_name: str, session: ClientSession, account: str, password: str) -> MideaCloud | None:
cloud = None
if cloud_name in clouds.keys():
cloud = globals()[clouds[cloud_name]["class_name"]](
cloud_name=cloud_name,
session=session,
account=account,
password=password
)
return cloud

View File

@@ -0,0 +1,46 @@
crc8_854_table = [
0x00, 0x5E, 0xBC, 0xE2, 0x61, 0x3F, 0xDD, 0x83,
0xC2, 0x9C, 0x7E, 0x20, 0xA3, 0xFD, 0x1F, 0x41,
0x9D, 0xC3, 0x21, 0x7F, 0xFC, 0xA2, 0x40, 0x1E,
0x5F, 0x01, 0xE3, 0xBD, 0x3E, 0x60, 0x82, 0xDC,
0x23, 0x7D, 0x9F, 0xC1, 0x42, 0x1C, 0xFE, 0xA0,
0xE1, 0xBF, 0x5D, 0x03, 0x80, 0xDE, 0x3C, 0x62,
0xBE, 0xE0, 0x02, 0x5C, 0xDF, 0x81, 0x63, 0x3D,
0x7C, 0x22, 0xC0, 0x9E, 0x1D, 0x43, 0xA1, 0xFF,
0x46, 0x18, 0xFA, 0xA4, 0x27, 0x79, 0x9B, 0xC5,
0x84, 0xDA, 0x38, 0x66, 0xE5, 0xBB, 0x59, 0x07,
0xDB, 0x85, 0x67, 0x39, 0xBA, 0xE4, 0x06, 0x58,
0x19, 0x47, 0xA5, 0xFB, 0x78, 0x26, 0xC4, 0x9A,
0x65, 0x3B, 0xD9, 0x87, 0x04, 0x5A, 0xB8, 0xE6,
0xA7, 0xF9, 0x1B, 0x45, 0xC6, 0x98, 0x7A, 0x24,
0xF8, 0xA6, 0x44, 0x1A, 0x99, 0xC7, 0x25, 0x7B,
0x3A, 0x64, 0x86, 0xD8, 0x5B, 0x05, 0xE7, 0xB9,
0x8C, 0xD2, 0x30, 0x6E, 0xED, 0xB3, 0x51, 0x0F,
0x4E, 0x10, 0xF2, 0xAC, 0x2F, 0x71, 0x93, 0xCD,
0x11, 0x4F, 0xAD, 0xF3, 0x70, 0x2E, 0xCC, 0x92,
0xD3, 0x8D, 0x6F, 0x31, 0xB2, 0xEC, 0x0E, 0x50,
0xAF, 0xF1, 0x13, 0x4D, 0xCE, 0x90, 0x72, 0x2C,
0x6D, 0x33, 0xD1, 0x8F, 0x0C, 0x52, 0xB0, 0xEE,
0x32, 0x6C, 0x8E, 0xD0, 0x53, 0x0D, 0xEF, 0xB1,
0xF0, 0xAE, 0x4C, 0x12, 0x91, 0xCF, 0x2D, 0x73,
0xCA, 0x94, 0x76, 0x28, 0xAB, 0xF5, 0x17, 0x49,
0x08, 0x56, 0xB4, 0xEA, 0x69, 0x37, 0xD5, 0x8B,
0x57, 0x09, 0xEB, 0xB5, 0x36, 0x68, 0x8A, 0xD4,
0x95, 0xCB, 0x29, 0x77, 0xF4, 0xAA, 0x48, 0x16,
0xE9, 0xB7, 0x55, 0x0B, 0x88, 0xD6, 0x34, 0x6A,
0x2B, 0x75, 0x97, 0xC9, 0x4A, 0x14, 0xF6, 0xA8,
0x74, 0x2A, 0xC8, 0x96, 0x15, 0x4B, 0xA9, 0xF7,
0xB6, 0xE8, 0x0A, 0x54, 0xD7, 0x89, 0x6B, 0x35
]
def calculate(data):
crc_value = 0
for m in data:
k = crc_value ^ m
if k > 256:
k -= 256
if k < 0:
k += 256
crc_value = crc8_854_table[k]
return crc_value

View File

@@ -0,0 +1,396 @@
import threading
import socket
import time
from enum import IntEnum
from .security import LocalSecurity, MSGTYPE_HANDSHAKE_REQUEST, MSGTYPE_ENCRYPTED_REQUEST
from .packet_builder import PacketBuilder
from .lua_runtime import MideaCodec
from .message import MessageQuestCustom
from .logger import MideaLogger
class AuthException(Exception):
pass
class ResponseException(Exception):
pass
class RefreshFailed(Exception):
pass
class ParseMessageResult(IntEnum):
SUCCESS = 0
PADDING = 1
ERROR = 99
class MiedaDevice(threading.Thread):
def __init__(self,
name: str,
device_id: int,
device_type: int,
ip_address: str,
port: int,
token: str | None,
key: str | None,
protocol: int,
model: str | None,
subtype: int | None,
sn: str | None,
sn8: str | None,
lua_file: str | None):
threading.Thread.__init__(self)
self._socket = None
self._ip_address = ip_address
self._port = port
self._security = LocalSecurity()
self._token = bytes.fromhex(token) if token else None
self._key = bytes.fromhex(key) if key else None
self._buffer = b""
self._device_name = name
self._device_id = device_id
self._device_type = device_type
self._protocol = protocol
self._model = model
self._updates = []
self._is_run = False
self._subtype = subtype
self._sn = sn
self._sn8 = sn8
self._attributes = {
"device_type": "T0x%02X" % device_type,
"sn": sn,
"sn8": sn8,
"subtype": subtype
}
self._refresh_interval = 30
self._heartbeat_interval = 10
self._connected = False
self._queries = [{}]
self._centralized = []
self._calculate_get = []
self._calculate_set = []
self._lua_runtime = MideaCodec(lua_file, sn=sn, subtype=subtype) if lua_file is not None else None
@property
def device_name(self):
return self._device_name
@property
def device_id(self):
return self._device_id
@property
def device_type(self):
return self._device_type
@property
def model(self):
return self._model
@property
def sn(self):
return self._sn
@property
def sn8(self):
return self._sn8
@property
def subtype(self):
return self._subtype
@property
def attributes(self):
return self._attributes
@property
def connected(self):
return self._connected
def set_refresh_interval(self, refresh_interval):
self._refresh_interval = refresh_interval
def set_queries(self, queries: list):
self._queries = queries
def set_centralized(self, centralized: list):
self._centralized = centralized
def set_calculate(self, calculate: dict):
values_get = calculate.get("get")
values_set = calculate.get("set")
self._calculate_get = values_get if values_get else []
self._calculate_set = values_set if values_set else []
def get_attribute(self, attribute):
return self._attributes.get(attribute)
def set_attribute(self, attribute, value):
if attribute in self._attributes.keys():
new_status = {}
for attr in self._centralized:
new_status[attr] = self._attributes.get(attr)
new_status[attribute] = value
if set_cmd := self._lua_runtime.build_control(new_status):
self._build_send(set_cmd)
def set_attributes(self, attributes):
new_status = {}
for attr in self._centralized:
new_status[attr] = self._attributes.get(attr)
has_new = False
for attribute, value in attributes.items():
if attribute in self._attributes.keys():
has_new = True
new_status[attribute] = value
if has_new:
if set_cmd := self._lua_runtime.build_control(new_status):
self._build_send(set_cmd)
def set_ip_address(self, ip_address):
MideaLogger.debug(f"Update IP address to {ip_address}")
self._ip_address = ip_address
self.close_socket()
def send_command(self, cmd_type, cmd_body: bytearray):
cmd = MessageQuestCustom(self._device_type, cmd_type, cmd_body)
try:
self._build_send(cmd.serialize().hex())
except socket.error as e:
MideaLogger.debug(
f"Interface send_command failure, {repr(e)}, "
f"cmd_type: {cmd_type}, cmd_body: {cmd_body.hex()}",
self._device_id
)
def register_update(self, update):
self._updates.append(update)
def connect(self, refresh=False):
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(10)
MideaLogger.debug(f"Connecting to {self._ip_address}:{self._port}", self._device_id)
self._socket.connect((self._ip_address, self._port))
MideaLogger.debug(f"Connected", self._device_id)
if self._protocol == 3:
self._authenticate()
MideaLogger.debug(f"Authentication success", self._device_id)
self._device_connected(True)
if refresh:
self._refresh_status()
return True
except socket.timeout:
MideaLogger.debug(f"Connection timed out", self._device_id)
except socket.error:
MideaLogger.debug(f"Connection error", self._device_id)
except AuthException:
MideaLogger.debug(f"Authentication failed", self._device_id)
except ResponseException:
MideaLogger.debug(f"Unexpected response received", self._device_id)
except RefreshFailed:
MideaLogger.debug(f"Refresh status is timed out", self._device_id)
except Exception as e:
MideaLogger.error(f"Unknown error: {e.__traceback__.tb_frame.f_globals['__file__']}, "
f"{e.__traceback__.tb_lineno}, {repr(e)}")
if refresh:
self._device_connected(False)
self._socket = None
return False
def disconnect(self):
self._buffer = b""
if self._socket:
self._socket.close()
self._socket = None
@staticmethod
def _fetch_v2_message(msg):
result = []
while len(msg) > 0:
factual_msg_len = len(msg)
if factual_msg_len < 6:
break
alleged_msg_len = msg[4] + (msg[5] << 8)
if factual_msg_len >= alleged_msg_len:
result.append(msg[:alleged_msg_len])
msg = msg[alleged_msg_len:]
else:
break
return result, msg
def _authenticate(self):
request = self._security.encode_8370(
self._token, MSGTYPE_HANDSHAKE_REQUEST)
MideaLogger.debug(f"Handshaking")
self._socket.send(request)
response = self._socket.recv(512)
if len(response) < 20:
raise AuthException()
response = response[8: 72]
self._security.tcp_key(response, self._key)
def _send_message(self, data):
if self._protocol == 3:
self._send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST)
else:
self._send_message_v2(data)
def _send_message_v2(self, data):
if self._socket is not None:
self._socket.send(data)
else:
MideaLogger.debug(f"Command send failure, device disconnected, data: {data.hex()}")
def _send_message_v3(self, data, msg_type=MSGTYPE_ENCRYPTED_REQUEST):
data = self._security.encode_8370(data, msg_type)
self._send_message_v2(data)
def _build_send(self, cmd: str):
MideaLogger.debug(f"Sending: {cmd.lower()}")
bytes_cmd = bytes.fromhex(cmd)
msg = PacketBuilder(self._device_id, bytes_cmd).finalize()
self._send_message(msg)
def _refresh_status(self):
for query in self._queries:
if query_cmd := self._lua_runtime.build_query(query):
self._build_send(query_cmd)
def _parse_message(self, msg):
if self._protocol == 3:
messages, self._buffer = self._security.decode_8370(self._buffer + msg)
else:
messages, self._buffer = self.fetch_v2_message(self._buffer + msg)
if len(messages) == 0:
return ParseMessageResult.PADDING
for message in messages:
if message == b"ERROR":
return ParseMessageResult.ERROR
payload_len = message[4] + (message[5] << 8) - 56
payload_type = message[2] + (message[3] << 8)
if payload_type in [0x1001, 0x0001]:
# Heartbeat detected
pass
elif len(message) > 56:
cryptographic = message[40:-16]
if payload_len % 16 == 0:
decrypted = self._security.aes_decrypt(cryptographic)
MideaLogger.debug(f"Received: {decrypted.hex().lower()}")
if status := self._lua_runtime.decode_status(decrypted.hex()):
MideaLogger.debug(f"Decoded: {status}")
new_status = {}
for single in status.keys():
value = status.get(single)
if single not in self._attributes or self._attributes[single] != value:
self._attributes[single] = value
new_status[single] = value
if len(new_status) > 0:
for c in self._calculate_get:
lvalue = c.get("lvalue")
rvalue = c.get("rvalue")
if lvalue and rvalue:
calculate = False
for s, v in new_status.items():
if rvalue.find(f"[{s}]") >= 0:
calculate = True
break
if calculate:
calculate_str1 = \
(f"{lvalue.replace('[', 'self._attributes[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[","[\"").replace("]","\"]")
calculate_str2 = \
(f"{lvalue.replace('[', 'new_status[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[","[\"").replace("]","\"]")
try:
exec(calculate_str1)
exec(calculate_str2)
except Exception:
MideaLogger.warning(
f"Calculation Error: {lvalue} = {rvalue}", self._device_id
)
self._update_all(new_status)
return ParseMessageResult.SUCCESS
def _send_heartbeat(self):
msg = PacketBuilder(self._device_id, bytearray([0x00])).finalize(msg_type=0)
self._send_message(msg)
def _device_connected(self, connected=True):
self._connected = connected
status = {"connected": connected}
self._update_all(status)
def _update_all(self, status):
MideaLogger.debug(f"Status update: {status}")
for update in self._updates:
update(status)
def open(self):
if not self._is_run:
self._is_run = True
threading.Thread.start(self)
def close(self):
if self._is_run:
self._is_run = False
self._lua_runtime = None
self.disconnect()
def run(self):
while self._is_run:
while self._socket is None:
if self.connect(refresh=True) is False:
if not self._is_run:
return
self.disconnect()
time.sleep(5)
timeout_counter = 0
start = time.time()
previous_refresh = start
previous_heartbeat = start
self._socket.settimeout(1)
while True:
try:
now = time.time()
if 0 < self._refresh_interval <= now - previous_refresh:
self._refresh_status()
previous_refresh = now
if now - previous_heartbeat >= self._heartbeat_interval:
self._send_heartbeat()
previous_heartbeat = now
msg = self._socket.recv(512)
msg_len = len(msg)
if msg_len == 0:
raise socket.error("Connection closed by peer")
result = self._parse_message(msg)
if result == ParseMessageResult.ERROR:
MideaLogger.debug(f"Message 'ERROR' received")
self.disconnect()
break
elif result == ParseMessageResult.SUCCESS:
timeout_counter = 0
except socket.timeout:
timeout_counter = timeout_counter + 1
if timeout_counter >= 120:
MideaLogger.debug(f"Heartbeat timed out")
self.disconnect()
break
except socket.error as e:
MideaLogger.debug(f"Socket error {repr(e)}")
self.disconnect()
break
except Exception as e:
MideaLogger.error(f"Unknown error :{e.__traceback__.tb_frame.f_globals['__file__']}, "
f"{e.__traceback__.tb_lineno}, {repr(e)}")
self.disconnect()
break

View File

@@ -0,0 +1,174 @@
import socket
import ifaddr
from ipaddress import IPv4Network
from .security import LocalSecurity
from .logger import MideaLogger
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
BROADCAST_MSG = bytearray([
0x5a, 0x5a, 0x01, 0x11, 0x48, 0x00, 0x92, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x7f, 0x75, 0xbd, 0x6b, 0x3e, 0x4f, 0x8b, 0x76,
0x2e, 0x84, 0x9c, 0x6e, 0x57, 0x8d, 0x65, 0x90,
0x03, 0x6e, 0x9d, 0x43, 0x42, 0xa5, 0x0f, 0x1f,
0x56, 0x9e, 0xb8, 0xec, 0x91, 0x8e, 0x92, 0xe5
])
DEVICE_INFO_MSG = bytearray([
0x5a, 0x5a, 0x15, 0x00, 0x00, 0x38, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x27, 0x33, 0x05,
0x13, 0x06, 0x14, 0x14, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xca, 0x8d, 0x9b, 0xf9, 0xa0, 0x30, 0x1a, 0xe3,
0xb7, 0xe4, 0x2d, 0x53, 0x49, 0x47, 0x62, 0xbe
])
def discover(discover_type=None, ip_address=None):
MideaLogger.debug(f"Begin discover, type: {discover_type}, ip_address: {ip_address}")
if discover_type is None:
discover_type = []
security = LocalSecurity()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(5)
found_devices = {}
if ip_address is None:
addrs = enum_all_broadcast()
else:
addrs = [ip_address]
for v in range(0, 3):
for addr in addrs:
sock.sendto(BROADCAST_MSG, (addr, 6445))
sock.sendto(BROADCAST_MSG, (addr, 20086))
while True:
try:
data, addr = sock.recvfrom(512)
ip = addr[0]
MideaLogger.debug(f"Received broadcast from {addr}: {data.hex()}")
if len(data) >= 104 and (data[:2].hex() == "5a5a" or data[8:10].hex() == "5a5a"):
if data[:2].hex() == "5a5a":
protocol = 2
elif data[:2].hex() == "8370":
protocol = 3
if data[8:10].hex() == "5a5a":
data = data[8:-16]
else:
continue
device_id = int.from_bytes(bytearray.fromhex(data[20:26].hex()), "little")
if device_id in found_devices:
continue
encrypt_data = data[40:-16]
reply = security.aes_decrypt(encrypt_data)
MideaLogger.debug(f"Declassified reply: {reply.hex()}")
ssid = reply[41:41 + reply[40]].decode("utf-8")
device_type = ssid.split("_")[1]
port = bytes2port(reply[4:8])
model = reply[17:25].decode("utf-8")
sn = reply[8:40].decode("utf-8")
elif data[:6].hex() == "3c3f786d6c20":
protocol = 1
root = ET.fromstring(data.decode(
encoding="utf-8", errors="replace"))
child = root.find("body/device")
m = child.attrib
port, sn, device_type = int(m["port"]), m["apc_sn"], str(
hex(int(m["apc_type"])))[2:]
response = get_device_info(ip, int(port))
device_id = get_id_from_response(response)
if len(sn) == 32:
model = sn[9:17]
elif len(sn) == 22:
model = sn[3:11]
else:
model = ""
else:
continue
device = {
"device_id": device_id,
"type": int(device_type, 16),
"ip_address": ip,
"port": port,
"model": model,
"sn": sn,
"protocol": protocol
}
if len(discover_type) == 0 or device.get("type") in discover_type:
found_devices[device_id] = device
MideaLogger.debug(f"Found a supported device: {device}")
else:
MideaLogger.debug(f"Found a unsupported device: {device}")
if ip_address is not None:
break
except socket.timeout:
break
except socket.error as e:
MideaLogger.debug(f"Socket error: {repr(e)}")
return found_devices
def get_id_from_response(response):
if response[64:-16][:6].hex() == "3c3f786d6c20":
xml = response[64:-16]
root = ET.fromstring(xml.decode(encoding="utf-8", errors="replace"))
child = root.find("smartDevice")
m = child.attrib
return int.from_bytes(bytearray.fromhex(m["devId"]), "little")
else:
return 0
def bytes2port(paramArrayOfbyte):
if paramArrayOfbyte is None:
return 0
b, i = 0, 0
while b < 4:
if b < len(paramArrayOfbyte):
b1 = paramArrayOfbyte[b] & 0xFF
else:
b1 = 0
i |= b1 << b * 8
b += 1
return i
def get_device_info(device_ip, device_port: int):
response = bytearray(0)
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(8)
device_address = (device_ip, device_port)
sock.connect(device_address)
MideaLogger.debug(f"Sending to {device_ip}:{device_port} {DEVICE_INFO_MSG.hex()}")
sock.sendall(DEVICE_INFO_MSG)
response = sock.recv(512)
except socket.timeout:
MideaLogger.warning(f"Connect the device {device_ip}:{device_port} timed out for 8s. "
f"Don't care about a small amount of this. if many maybe not support."
)
except socket.error:
MideaLogger.warning(f"Can't connect to Device {device_ip}:{device_port}")
return response
def enum_all_broadcast():
nets = []
adapters = ifaddr.get_adapters()
for adapter in adapters:
for ip in adapter.ips:
if ip.is_IPv4 and ip.network_prefix < 32:
localNet = IPv4Network(f"{ip.ip}/{ip.network_prefix}", strict=False)
if localNet.is_private and not localNet.is_loopback and not localNet.is_link_local:
addr = str(localNet.broadcast_address)
if addr not in nets:
nets.append(addr)
return nets

View File

@@ -0,0 +1,36 @@
import inspect
import logging
from enum import IntEnum
class MideaLogType(IntEnum):
DEBUG = 1
WARN = 2
ERROR = 3
class MideaLogger:
@staticmethod
def _log(log_type, log, device_id):
frm = inspect.stack()[2]
mod = inspect.getmodule(frm[0])
if device_id is not None:
log = f"[{device_id}] {log}"
if log_type == MideaLogType.DEBUG:
logging.getLogger(mod.__name__).debug(log)
elif log_type == MideaLogType.WARN:
logging.getLogger(mod.__name__).warning(log)
elif log_type == MideaLogType.ERROR:
logging.getLogger(mod.__name__).error(log)
@staticmethod
def debug(log, device_id=None):
MideaLogger._log(MideaLogType.DEBUG, log, device_id)
@staticmethod
def warning(log, device_id=None):
MideaLogger._log(MideaLogType.WARN, log, device_id)
@staticmethod
def error(log, device_id=None):
MideaLogger._log(MideaLogType.ERROR, log, device_id)

View File

@@ -0,0 +1,91 @@
import lupa
import threading
import json
from .logger import MideaLogger
class LuaRuntime:
def __init__(self, file):
self._runtimes = lupa.LuaRuntime()
string = f'dofile("{file}")'
self._runtimes.execute(string)
self._lock = threading.Lock()
self._json_to_data = self._runtimes.eval("function(param) return jsonToData(param) end")
self._data_to_json = self._runtimes.eval("function(param) return dataToJson(param) end")
def json_to_data(self, json_value):
with self._lock:
result = self._json_to_data(json_value)
return result
def data_to_json(self, data_value):
with self._lock:
result = self._data_to_json(data_value)
return result
class MideaCodec(LuaRuntime):
def __init__(self, file, sn=None, subtype=None):
super().__init__(file)
self._sn = sn
self._subtype = subtype
def _build_base_dict(self):
device_info ={}
if self._sn is not None:
device_info["deviceSN"] = self._sn
if self._subtype is not None:
device_info["deviceSubType"] = self._subtype
base_dict = {
"deviceinfo": device_info
}
return base_dict
def build_query(self, append=None):
query_dict = self._build_base_dict()
query_dict["query"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_query {json_str}: {repr(e)}")
return None
def build_control(self, append=None):
query_dict = self._build_base_dict()
query_dict["control"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_control {json_str}: {repr(e)}")
return None
def build_status(self, append=None):
query_dict = self._build_base_dict()
query_dict["status"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_status {json_str}: {repr(e)}")
return None
def decode_status(self, data: str):
data_dict = self._build_base_dict()
data_dict["msg"] = {
"data": data
}
json_str = json.dumps(data_dict)
try:
result = self.data_to_json(json_str)
status = json.loads(result)
return status.get("status")
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in decode_status {data}: {repr(e)}")
return None

View File

@@ -0,0 +1,158 @@
from abc import ABC
from enum import IntEnum
class MessageLenError(Exception):
pass
class MessageBodyError(Exception):
pass
class MessageCheckSumError(Exception):
pass
class MessageType(IntEnum):
set = 0x02,
query = 0x03,
notify1 = 0x04,
notify2 = 0x05,
exception = 0x06,
querySN = 0x07,
exception2 = 0x0A,
querySubtype = 0xA0
class MessageBase(ABC):
HEADER_LENGTH = 10
def __init__(self):
self._device_type = 0x00
self._message_type = 0x00
self._body_type = 0x00
self._device_protocol_version = 0
@staticmethod
def checksum(data):
return (~ sum(data) + 1) & 0xff
@property
def header(self):
raise NotImplementedError
@property
def body(self):
raise NotImplementedError
@property
def message_type(self):
return self._message_type
@message_type.setter
def message_type(self, value):
self._message_type = value
@property
def device_type(self):
return self._device_type
@device_type.setter
def device_type(self, value):
self._device_type = value
@property
def body_type(self):
return self._body_type
@body_type.setter
def body_type(self, value):
self._body_type = value
@property
def device_protocol_version(self):
return self._device_protocol_version
@device_protocol_version.setter
def device_protocol_version(self, value):
self._device_protocol_version = value
def __str__(self) -> str:
output = {
"header": self.header.hex(),
"body": self.body.hex(),
"message type": "%02x" % self._message_type,
"body type": ("%02x" % self._body_type) if self._body_type is not None else "None"
}
return str(output)
class MessageRequest(MessageBase):
def __init__(self, device_protocol_version, device_type, message_type, body_type):
super().__init__()
self.device_protocol_version = device_protocol_version
self.device_type = device_type
self.message_type = message_type
self.body_type = body_type
@property
def header(self):
length = self.HEADER_LENGTH + len(self.body)
return bytearray([
# flag
0xAA,
# length
length,
# device type
self._device_type,
# frame checksum
0x00, # self._device_type ^ length,
# unused
0x00, 0x00,
# frame ID
0x00,
# frame protocol version
0x00,
# device protocol version
self._device_protocol_version,
# frame type
self._message_type
])
@property
def _body(self):
raise NotImplementedError
@property
def body(self):
body = bytearray([])
if self.body_type is not None:
body.append(self.body_type)
if self._body is not None:
body.extend(self._body)
return body
def serialize(self):
stream = self.header + self.body
stream.append(MessageBase.checksum(stream[1:]))
return stream
class MessageQuestCustom(MessageRequest):
def __init__(self, device_type, cmd_type, cmd_body):
super().__init__(
device_protocol_version=0,
device_type=device_type,
message_type=cmd_type,
body_type=None)
self._cmd_body = cmd_body
@property
def _body(self):
return bytearray([])
@property
def body(self):
return self._cmd_body

View File

@@ -0,0 +1,59 @@
from .security import LocalSecurity
import datetime
class PacketBuilder:
def __init__(self, device_id: int, command):
self.command = None
self.security = LocalSecurity()
# Init the packet with the header data.
self.packet = bytearray([
# 2 bytes - StaicHeader
0x5a, 0x5a,
# 2 bytes - mMessageType
0x01, 0x11,
# 2 bytes - PacketLenght
0x00, 0x00,
# 2 bytes
0x20, 0x00,
# 4 bytes - MessageId
0x00, 0x00, 0x00, 0x00,
# 8 bytes - Date&Time
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
# 6 bytes - mDeviceID
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
# 12 bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
])
self.packet[12:20] = self.packet_time()
self.packet[20:28] = device_id.to_bytes(8, "little")
self.command = command
def finalize(self, msg_type=1):
if msg_type != 1:
self.packet[3] = 0x10
self.packet[6] = 0x7b
else:
self.packet.extend(self.security.aes_encrypt(self.command))
# PacketLenght
self.packet[4:6] = (len(self.packet) + 16).to_bytes(2, "little")
# Append a basic checksum data(16 bytes) to the packet
self.packet.extend(self.encode32(self.packet))
return self.packet
def encode32(self, data: bytearray):
return self.security.encode32_data(data)
@staticmethod
def checksum(data):
return (~ sum(data) + 1) & 0xff
@staticmethod
def packet_time():
t = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[
:16]
b = bytearray()
for i in range(0, len(t), 2):
d = int(t[i:i+2])
b.insert(0, d)
return b

View File

@@ -0,0 +1,243 @@
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Util.strxor import strxor
from Crypto.Random import get_random_bytes
from hashlib import md5, sha256
import hmac
MSGTYPE_HANDSHAKE_REQUEST = 0x0
MSGTYPE_HANDSHAKE_RESPONSE = 0x1
MSGTYPE_ENCRYPTED_RESPONSE = 0x3
MSGTYPE_ENCRYPTED_REQUEST = 0x6
class CloudSecurity:
def __init__(self, login_key, iot_key, hmac_key, fixed_key=None, fixed_iv=None):
self._login_key = login_key
self._iot_key = iot_key
self._hmac_key = hmac_key
self._aes_key = None
self._aes_iv = None
self._fixed_key = format(fixed_key, 'x').encode("ascii") if fixed_key else None
self._fixed_iv = format(fixed_iv, 'x').encode("ascii") if fixed_iv else None
def sign(self, data: str, random: str) -> str:
msg = self._iot_key
msg += data
msg += random
sign = hmac.new(self._hmac_key.encode("ascii"), msg.encode("ascii"), sha256)
return sign.hexdigest()
def encrypt_password(self, login_id, data):
m = sha256()
m.update(data.encode("ascii"))
login_hash = login_id + m.hexdigest() + self._login_key
m = sha256()
m.update(login_hash.encode("ascii"))
return m.hexdigest()
def encrypt_iam_password(self, login_id, data) -> str:
raise NotImplementedError
@staticmethod
def get_deviceid(username):
return md5(f"Hello, {username}!".encode("ascii")).digest().hex()[:16]
@staticmethod
def get_udp_id(appliance_id, method=0):
if method == 0:
bytes_id = bytes(reversed(appliance_id.to_bytes(8, "big")))
elif method == 1:
bytes_id = appliance_id.to_bytes(6, "big")
elif method == 2:
bytes_id = appliance_id.to_bytes(6, "little")
else:
return None
data = bytearray(sha256(bytes_id).digest())
for i in range(0, 16):
data[i] ^= data[i + 16]
return data[0: 16].hex()
def set_aes_keys(self, key, iv):
if isinstance(key, str):
key = key.encode("ascii")
if isinstance(iv, str):
iv = iv.encode("ascii")
self._aes_key = key
self._aes_iv = iv
def aes_encrypt_with_fixed_key(self, data):
return self.aes_encrypt(data, self._fixed_key, self._fixed_iv)
def aes_decrypt_with_fixed_key(self, data):
return self.aes_decrypt(data, self._fixed_key, self._fixed_iv)
def aes_encrypt(self, data, key=None, iv=None):
if key is not None:
aes_key = key
aes_iv = iv
else:
aes_key = self._aes_key
aes_iv = self._aes_iv
if aes_key is None:
raise ValueError("Encrypt need a key")
if isinstance(data, str):
data = bytes.fromhex(data)
if aes_iv is None: # ECB
return AES.new(aes_key, AES.MODE_ECB).encrypt(pad(data, 16))
else: # CBC
return AES.new(aes_key, AES.MODE_CBC, iv=aes_iv).encrypt(pad(data, 16))
def aes_decrypt(self, data, key=None, iv=None):
if key is not None:
aes_key = key
aes_iv = iv
else:
aes_key = self._aes_key
aes_iv = self._aes_iv
if aes_key is None:
raise ValueError("Encrypt need a key")
if isinstance(data, str):
data = bytes.fromhex(data)
if aes_iv is None: # ECB
return unpad(AES.new(aes_key, AES.MODE_ECB).decrypt(data), len(aes_key)).decode()
else: # CBC
return unpad(AES.new(aes_key, AES.MODE_CBC, iv=aes_iv).decrypt(data), len(aes_key)).decode()
class MeijuCloudSecurity(CloudSecurity):
def __init__(self, login_key, iot_key, hmac_key):
super().__init__(login_key, iot_key, hmac_key,
10864842703515613082)
def encrypt_iam_password(self, login_id, data) -> str:
md = md5()
md.update(data.encode("ascii"))
md_second = md5()
md_second.update(md.hexdigest().encode("ascii"))
return md_second.hexdigest()
class MSmartCloudSecurity(CloudSecurity):
def __init__(self, login_key, iot_key, hmac_key):
super().__init__(login_key, iot_key, hmac_key,
13101328926877700970,
16429062708050928556)
def encrypt_iam_password(self, login_id, data) -> str:
md = md5()
md.update(data.encode("ascii"))
md_second = md5()
md_second.update(md.hexdigest().encode("ascii"))
login_hash = login_id + md_second.hexdigest() + self._login_key
sha = sha256()
sha.update(login_hash.encode("ascii"))
return sha.hexdigest()
def set_aes_keys(self, encrypted_key, encrypted_iv):
key_digest = sha256(self._login_key.encode("ascii")).hexdigest()
tmp_key = key_digest[:16].encode("ascii")
tmp_iv = key_digest[16:32].encode("ascii")
self._aes_key = self.aes_decrypt(encrypted_key, tmp_key, tmp_iv).encode('ascii')
self._aes_iv = self.aes_decrypt(encrypted_iv, tmp_key, tmp_iv).encode('ascii')
class LocalSecurity:
def __init__(self):
self.blockSize = 16
self.iv = b"\0" * 16
self.aes_key = bytes.fromhex(
format(141661095494369103254425781617665632877, 'x')
)
self.salt = bytes.fromhex(
format(233912452794221312800602098970898185176935770387238278451789080441632479840061417076563, 'x')
)
self._tcp_key = None
self._request_count = 0
self._response_count = 0
def aes_decrypt(self, raw):
try:
return unpad(AES.new(self.aes_key, AES.MODE_ECB).decrypt(bytearray(raw)), 16)
except ValueError as e:
return bytearray(0)
def aes_encrypt(self, raw):
return AES.new(self.aes_key, AES.MODE_ECB).encrypt(bytearray(pad(raw, 16)))
def aes_cbc_decrypt(self, raw, key):
return AES.new(key=key, mode=AES.MODE_CBC, iv=self.iv).decrypt(raw)
def aes_cbc_encrypt(self, raw, key):
return AES.new(key=key, mode=AES.MODE_CBC, iv=self.iv).encrypt(raw)
def encode32_data(self, raw):
return md5(raw + self.salt).digest()
def tcp_key(self, response, key):
if response == b"ERROR":
raise Exception("authentication failed")
if len(response) != 64:
raise Exception("unexpected data length")
payload = response[:32]
sign = response[32:]
plain = self.aes_cbc_decrypt(payload, key)
if sha256(plain).digest() != sign:
raise Exception("sign does not match")
self._tcp_key = strxor(plain, key)
self._request_count = 0
self._response_count = 0
return self._tcp_key
def encode_8370(self, data, msgtype):
header = bytearray([0x83, 0x70])
size, padding = len(data), 0
if msgtype in (MSGTYPE_ENCRYPTED_RESPONSE, MSGTYPE_ENCRYPTED_REQUEST):
if (size + 2) % 16 != 0:
padding = 16 - (size + 2 & 0xf)
size += padding + 32
data += get_random_bytes(padding)
header += size.to_bytes(2, "big")
header += bytearray([0x20, padding << 4 | msgtype])
data = self._request_count.to_bytes(2, "big") + data
self._request_count += 1
if self._request_count >= 0xFFFF:
self._request_count = 0
if msgtype in (MSGTYPE_ENCRYPTED_RESPONSE, MSGTYPE_ENCRYPTED_REQUEST):
sign = sha256(header + data).digest()
data = self.aes_cbc_encrypt(raw=data, key=self._tcp_key) + sign
return header + data
def decode_8370(self, data):
if len(data) < 6:
return [], data
header = data[:6]
if header[0] != 0x83 or header[1] != 0x70:
raise Exception("not an 8370 message")
size = int.from_bytes(header[2:4], "big") + 8
leftover = None
if len(data) < size:
return [], data
elif len(data) > size:
leftover = data[size:]
data = data[:size]
if header[4] != 0x20:
raise Exception("missing byte 4")
padding = header[5] >> 4
msgtype = header[5] & 0xf
data = data[6:]
if msgtype in (MSGTYPE_ENCRYPTED_RESPONSE, MSGTYPE_ENCRYPTED_REQUEST):
sign = data[-32:]
data = data[:-32]
data = self.aes_cbc_decrypt(raw=data, key=self._tcp_key)
if sha256(header + data).digest() != sign:
raise Exception("sign does not match")
if padding:
data = data[:-padding]
self._response_count = int.from_bytes(data[:2], "big")
data = data[2:]
if leftover:
packets, incomplete = self.decode_8370(leftover)
return [data] + packets, incomplete
return [data], b""

View File

@@ -0,0 +1,169 @@
from homeassistant.const import *
from homeassistant.components.sensor import SensorStateClass, SensorDeviceClass
# from homeassistant.components.binary_sensor import BinarySensorDeviceClass
from homeassistant.components.switch import SwitchDeviceClass
DEVICE_MAPPING = {
"default": {
"rationale": ["off", "on"],
"queries": [{}, {"query_type": "prevent_straight_wind"}],
"centralized": [
"power", "temperature", "small_temperature", "mode", "eco",
"comfort_power_save", "comfort_sleep", "strong_wind",
"wind_swing_lr", "wind_swing_lr", "wind_speed","ptc", "dry"
],
"entities": {
Platform.CLIMATE: {
"thermostat": {
"name": "Thermostat",
"power": "power",
"hvac_modes": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"preset_modes": {
"none": {
"eco": "off",
"comfort_power_save": "off",
"comfort_sleep": "off",
"strong_wind": "off"
},
"eco": {"eco": "on"},
"comfort": {"comfort_power_save": "on"},
"sleep": {"comfort_sleep": "on"},
"boost": {"strong_wind": "on"}
},
"swing_modes": {
"off": {"wind_swing_lr": "off", "wind_swing_ud": "off"},
"both": {"wind_swing_lr": "on", "wind_swing_ud": "on"},
"horizontal": {"wind_swing_lr": "on", "wind_swing_ud": "off"},
"vertical": {"wind_swing_lr": "off", "wind_swing_ud": "on"},
},
"fan_modes": {
"silent": {"wind_speed": 20},
"low": {"wind_speed": 40},
"medium": {"wind_speed": 60},
"high": {"wind_speed": 80},
"full": {"wind_speed": 100},
"auto": {"wind_speed": 102}
},
"target_temperature": ["temperature", "small_temperature"],
"current_temperature": "indoor_temperature",
"aux_heat": "ptc",
"min_temp": 17,
"max_temp": 30,
"temperature_unit": TEMP_CELSIUS,
"precision": PRECISION_HALVES,
}
},
Platform.SWITCH: {
"dry": {
"device_class": SwitchDeviceClass.SWITCH,
},
"prevent_straight_wind": {
"device_class": SwitchDeviceClass.SWITCH,
"rationale": [1, 2]
},
"aux_heat": {
"device_class": SwitchDeviceClass.SWITCH,
}
},
Platform.SENSOR: {
"indoor_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"outdoor_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
}
}
},
"22012227": {
"rationale": ["off", "on"],
"queries": [{}, {"query_type": "prevent_straight_wind"}],
"centralized": ["power", "temperature", "small_temperature", "mode", "eco", "comfort_power_save",
"comfort_sleep", "strong_wind", "wind_swing_lr", "wind_swing_ud", "wind_speed",
"ptc", "dry"],
"entities": {
Platform.CLIMATE: {
"thermostat": {
"name": "Thermostat",
"power": "power",
"hvac_modes": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"preset_modes": {
"none": {
"eco": "off",
"comfort_power_save": "off",
"comfort_sleep": "off",
"strong_wind": "off"
},
"eco": {"eco": "on"},
"comfort": {"comfort_power_save": "on"},
"sleep": {"comfort_sleep": "on"},
"boost": {"strong_wind": "on"}
},
"swing_modes": {
"off": {"wind_swing_lr": "off", "wind_swing_ud": "off"},
"both": {"wind_swing_lr": "on", "wind_swing_ud": "on"},
"horizontal": {"wind_swing_lr": "on", "wind_swing_ud": "off"},
"vertical": {"wind_swing_lr": "off", "wind_swing_ud": "on"},
},
"fan_modes": {
"silent": {"wind_speed": 20},
"low": {"wind_speed": 40},
"medium": {"wind_speed": 60},
"high": {"wind_speed": 80},
"full": {"wind_speed": 100},
"auto": {"wind_speed": 102}
},
"target_temperature": ["temperature", "small_temperature"],
"current_temperature": "indoor_temperature",
"aux_heat": "ptc",
"min_temp": 17,
"max_temp": 30,
"temperature_unit": UnitOfTemperature.CELSIUS,
"precision": PRECISION_HALVES,
}
},
Platform.SWITCH: {
"dry": {
"name": "干燥",
"device_class": SwitchDeviceClass.SWITCH,
},
"prevent_straight_wind": {
"name": "防直吹",
"device_class": SwitchDeviceClass.SWITCH,
"rationale": [1, 2]
},
"aux_heat": {
"name": "电辅热",
"device_class": SwitchDeviceClass.SWITCH,
}
},
Platform.SENSOR: {
"outdoor_temperature": {
"name": "室外机温度",
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
}
}
}
}

View File

@@ -0,0 +1,177 @@
from homeassistant.const import *
from homeassistant.components.sensor import SensorStateClass, SensorDeviceClass
from homeassistant.components.binary_sensor import BinarySensorDeviceClass
DEVICE_MAPPING = {
"default": {
"rationale": [0, 1],
"calculate": {
"get": [
{
"lvalue": "[remaining_time]",
"rvalue": "[left_time_hour] * 60 + [left_time_min]"
},
{
"lvalue": "[warming_time]",
"rvalue": "[warm_time_hour] * 60 + [warm_time_min]"
},
{
"lvalue": "[delay_time]",
"rvalue": "[order_time_hour] * 60 + [order_time_min]",
}
],
"set": {
}
},
"entities": {
Platform.SENSOR: {
"work_stage": {},
"voltage": {
"device_class": SensorDeviceClass.VOLTAGE,
"unit_of_measurement": UnitOfElectricPotential.VOLT,
"state_class": SensorStateClass.MEASUREMENT
},
"top_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"bottom_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"remaining_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
"warming_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
"delay_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
},
Platform.BINARY_SENSOR: {
"top_hot": {
"device_class": BinarySensorDeviceClass.RUNNING
},
"flank_hot": {
"device_class": BinarySensorDeviceClass.RUNNING
},
"bottom_hot": {
"device_class": BinarySensorDeviceClass.RUNNING
}
},
Platform.SELECT: {
"mode": {
"options": {
"Rice": {"mode": "essence_rice", "work_status": "cooking"},
"Porridge": {"mode": "gruel", "work_status": "cooking"},
"热饭": {"mode": "heat_rice", "work_status": "cooking"},
"Congee": {"mode": "boil_congee", "work_status": "cooking"},
"Soup": {"mode": "cook_soup", "work_status": "cooking"},
"Steam": {"mode": "stewing", "work_status": "cooking"},
}
},
"rice_type": {
"options": {
"None": {"rice_type": "none"},
"Northeast rice": {"rice_type": "northeast"},
"Long-grain rice": {"rice_type": "longrain"},
"Fragrant rice": {"rice_type": "fragrant"},
"Wuchang rice": {"rice_type": "five"},
}
},
"work_status": {
"options": {
"Stop": {"work_status": "cancel"},
"Cooking": {"work_status": "cooking"},
"Warming": {"work_status": "keep_warm"},
"Soaking": {"work_status": "awakening_rice"},
"Delay": {"work_status": "schedule"}
}
}
}
}
},
"61001527": {
"rationale": [0, 1],
"calculate": {
"get": [
{
"lvalue": "[remaining_time]",
"rvalue": "[left_time_hour] * 60 + [left_time_min]"
},
{
"lvalue": "[warming_time]",
"rvalue": "[warm_time_hour] * 60 + [warm_time_min]"
},
{
"lvalue": "[delay_time]",
"rvalue": "[order_time_hour] * 60 + [order_time_min]",
}
],
"set": {
}
},
"entities": {
Platform.SENSOR: {
"work_stage": {},
"voltage": {
"device_class": SensorDeviceClass.VOLTAGE,
"unit_of_measurement": UnitOfElectricPotential.VOLT,
"state_class": SensorStateClass.MEASUREMENT
},
"top_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"bottom_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"remaining_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
"warming_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
"delay_time": {
"unit_of_measurement": UnitOfTime.MINUTES
},
},
Platform.SELECT: {
"mode": {
"options": {
"精华饭": {"mode": "essence_rice", "work_status": "cooking"},
"稀饭": {"mode": "gruel", "work_status": "cooking"},
"热饭": {"mode": "heat_rice", "work_status": "cooking"},
"煮粥": {"mode": "boil_congee", "work_status": "cooking"},
"煲汤": {"mode": "cook_soup", "work_status": "cooking"},
"蒸煮": {"mode": "stewing", "work_status": "cooking"},
}
},
"rice_type": {
"options": {
"": {"rice_type": "none"},
"东北大米": {"rice_type": "northeast"},
"长粒米": {"rice_type": "longrain"},
"香米": {"rice_type": "fragrant"},
"五常大米": {"rice_type": "five"},
}
},
"work_status": {
"options": {
"停止": {"work_status": "cancel"},
"烹饪": {"work_status": "cooking"},
"保温": {"work_status": "keep_warm"},
"醒米": {"work_status": "awakening_rice"},
"预约": {"work_status": "schedule"},
}
}
}
}
}
}

View File

@@ -0,0 +1,267 @@
from homeassistant.const import *
from homeassistant.components.sensor import SensorStateClass, SensorDeviceClass
from homeassistant.components.binary_sensor import BinarySensorDeviceClass
from homeassistant.components.switch import SwitchDeviceClass
DEVICE_MAPPING = {
"default": {
"manufacturer": "小天鹅",
"rationale": ["off", "on"],
"queries": [{}, {"query_type": "prevent_straight_wind"}],
"centralized": [
"power", "temperature", "small_temperature", "mode", "eco",
"comfort_power_save", "comfort_sleep", "strong_wind",
"wind_swing_lr", "wind_swing_lr", "wind_speed","ptc", "dry"
],
"calculate": {
"get": [
{
"lvalue": "[target_temperature_new]",
"rvalue": "[target_temperature] / 2"
},
{
"lvalue": "[current_temperature_new]",
"rvalue": "[current_temperature] / 2"
}
],
"set": {
{
"lvalue": "[target_temperature]",
"rvalue": "[target_temperature_new] * 2"
},
{
"lvalue": "[current_temperature]",
"rvalue": "[current_temperature_new] * 2"
}
}
},
"entities": {
Platform.CLIMATE: {
"thermostat": {
"name": "Thermostat",
"power": "power",
"hvac_modes": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"preset_modes": {
"none": {
"eco": "off",
"comfort_power_save": "off",
"comfort_sleep": "off",
"strong_wind": "off"
},
"eco": {"eco": "on"},
"comfort": {"comfort_power_save": "on"},
"sleep": {"comfort_sleep": "on"},
"boost": {"strong_wind": "on"}
},
"swing_modes": {
"off": {"wind_swing_lr": "off", "wind_swing_ud": "off"},
"both": {"wind_swing_lr": "on", "wind_swing_ud": "on"},
"horizontal": {"wind_swing_lr": "on", "wind_swing_ud": "off"},
"vertical": {"wind_swing_lr": "off", "wind_swing_ud": "on"},
},
"fan_modes": {
"silent": {"wind_speed": 20},
"low": {"wind_speed": 40},
"medium": {"wind_speed": 60},
"high": {"wind_speed": 80},
"full": {"wind_speed": 100},
"auto": {"wind_speed": 102}
},
"target_temperature": ["temperature", "small_temperature"],
"current_temperature": "indoor_temperature",
"aux_heat": "ptc",
"min_temp": 17,
"max_temp": 30,
"temperature_unit": TEMP_CELSIUS,
"precision": PRECISION_HALVES,
}
},
Platform.SWITCH: {
"dry": {
"device_class": SwitchDeviceClass.SWITCH,
},
"prevent_straight_wind": {
"device_class": SwitchDeviceClass.SWITCH,
"rationale": [1, 2]
},
"aux_heat": {
"device_class": SwitchDeviceClass.SWITCH,
}
},
Platform.SENSOR: {
"indoor_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"outdoor_temperature": {
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
}
}
},
"22012227": {
"manufacturer": "TOSHIBA",
"rationale": ["off", "on"],
"queries": [],
"centralized": ["power", "temperature", "small_temperature", "mode", "eco", "comfort_power_save",
"comfort_sleep", "strong_wind", "wind_swing_lr", "wind_swing_ud", "wind_speed",
"ptc", "dry"],
"entities": {
Platform.CLIMATE: {
"thermostat": {
"name": "Thermostat",
"power": "power",
"hvac_modes": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"preset_modes": {
"none": {
"eco": "off",
"comfort_power_save": "off",
"comfort_sleep": "off",
"strong_wind": "off"
},
"eco": {"eco": "on"},
"comfort": {"comfort_power_save": "on"},
"sleep": {"comfort_sleep": "on"},
"boost": {"strong_wind": "on"}
},
"swing_modes": {
"off": {"wind_swing_lr": "off", "wind_swing_ud": "off"},
"both": {"wind_swing_lr": "on", "wind_swing_ud": "on"},
"horizontal": {"wind_swing_lr": "on", "wind_swing_ud": "off"},
"vertical": {"wind_swing_lr": "off", "wind_swing_ud": "on"},
},
"fan_modes": {
"silent": {"wind_speed": 20},
"low": {"wind_speed": 40},
"medium": {"wind_speed": 60},
"high": {"wind_speed": 80},
"full": {"wind_speed": 100},
"auto": {"wind_speed": 102}
},
"target_temperature": "target_temperature_new",
"current_temperature": "current_temperature_new",
"aux_heat": "ptc",
"min_temp": 17,
"max_temp": 30,
"temperature_unit": UnitOfTemperature.CELSIUS,
"precision": PRECISION_HALVES,
}
},
Platform.WATER_HEATER:{
"water_heater": {
"name": "Gas Water Heater",
"power": "power",
"operation_list": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"target_temperature": ["temperature", "small_temperature"],
"current_temperature": "indoor_temperature",
"min_temp": 17,
"max_temp": 30,
"temperature_unit": UnitOfTemperature.CELSIUS,
"precision": PRECISION_HALVES,
}
},
Platform.FAN: {
"fan": {
"power": "power",
"preset_modes": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
},
"oscillate": "wind_swing_lr",
"speeds": list({"wind_speed": value + 1} for value in range(0, 100)),
}
},
Platform.SWITCH: {
"dry": {
"name": "干燥",
"device_class": SwitchDeviceClass.SWITCH,
},
"prevent_straight_wind": {
"name": "防直吹",
"device_class": SwitchDeviceClass.SWITCH,
"rationale": [1, 2]
},
"aux_heat": {
"name": "电辅热",
"device_class": SwitchDeviceClass.SWITCH,
}
},
Platform.SENSOR: {
"indoor_temperature": {
"name": "室内温度",
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
"outdoor_temperature": {
"name": "室外温度",
"device_class": SensorDeviceClass.TEMPERATURE,
"unit_of_measurement": UnitOfTemperature.CELSIUS,
"state_class": SensorStateClass.MEASUREMENT
},
},
Platform.BINARY_SENSOR: {
"dust_full": {
"icon": "mdi:air-filter",
"name": "滤网尘满",
"device_class": BinarySensorDeviceClass.PROBLEM
},
"move_detect": {}
},
Platform.SELECT: {
"preset_modes": {
"options": {
"none": {
"eco": "off",
"comfort_power_save": "off",
"comfort_sleep": "off",
"strong_wind": "off"
},
"eco": {"eco": "on"},
"comfort": {"comfort_power_save": "on"},
"sleep": {"comfort_sleep": "on"},
"boost": {"strong_wind": "on"}
}
},
"hvac_modes": {
"options": {
"off": {"power": "off"},
"heat": {"power": "on", "mode": "heat"},
"cool": {"power": "on", "mode": "cool"},
"auto": {"power": "on", "mode": "auto"},
"dry": {"power": "on", "mode": "dry"},
"fan_only": {"power": "on", "mode": "fan"}
}
}
}
}
}
}

View File

@@ -0,0 +1,112 @@
from homeassistant.components.fan import FanEntity, FanEntityFeature
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES,
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaEntity
from .core.logger import MideaLogger
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.FAN)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaFanEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaFanEntity(MideaEntity, FanEntity):
def __init__(self, device, manufacturer, rationale, entity_key, config):
super().__init__(device, manufacturer, rationale, entity_key, config)
self._key_power = self._config.get("power")
self._key_preset_modes = self._config.get("preset_modes")
self._key_speeds = self._config.get("speeds")
self._key_oscillate = self._config.get("oscillate")
self._key_directions = self._config.get("directions")
self._attr_speed_count = len(self._key_speeds) if self._key_speeds else 0
@property
def supported_features(self):
features = 0
if self._key_preset_modes is not None and len(self._key_preset_modes) > 0:
features |= FanEntityFeature.PRESET_MODE
if self._key_speeds is not None and len(self._key_speeds) > 0:
features |= FanEntityFeature.SET_SPEED
if self._key_oscillate is not None:
features |= FanEntityFeature.OSCILLATE
if self._key_directions is not None and len(self._key_directions) > 0:
features |= FanEntityFeature.DIRECTION
return features
@property
def is_on(self) -> bool:
return self._get_status_on_off(self._key_power)
@property
def preset_modes(self):
return list(self._key_preset_modes.keys())
@property
def preset_mode(self):
return self._dict_get_selected(self._key_preset_modes)
@property
def percentage(self):
index = self._list_get_selected(self._key_speeds)
if index is None:
return None
return round((index + 1) * 100 / self._attr_speed_count)
@property
def oscillating(self):
return self._get_status_on_off(self._key_oscillate)
def turn_on(
self,
percentage: int | None = None,
preset_mode: str | None = None,
**kwargs,
):
if preset_mode is not None:
new_status = self._key_preset_modes.get(preset_mode)
else:
new_status = {}
if percentage is not None:
index = round(percentage * self._attr_speed_count / 100) - 1
new_status.update(self._key_speeds[index])
new_status[self._key_power] = self._rationale[1]
self._device.set_attributes(new_status)
def turn_off(self):
self._set_status_on_off(self._key_power, False)
def set_percentage(self, percentage: int):
index = round(percentage * self._attr_speed_count / 100)
if 0 < index < len(self._key_speeds):
new_status = self._key_speeds[index - 1]
self._device.set_attributes(new_status)
def set_preset_mode(self, preset_mode: str):
new_status = self._key_preset_modes.get(preset_mode)
self._device.set_attributes(new_status)
def oscillate(self, oscillating: bool):
if self.oscillating != oscillating:
self._set_status_on_off(self._key_oscillate, oscillating)
def update_state(self, status):
try:
self.schedule_update_ha_state()
except Exception as e:
pass

View File

@@ -0,0 +1,13 @@
{
"domain": "midea_meiju_codec",
"name": "Midea Meiju Codec",
"codeowners": ["@georgezhao2010"],
"config_flow": true,
"dependencies": [],
"documentation": "https://github.com/georgezhao2010/midea-meiju-codec#readme",
"integration_type": "device",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/georgezhao2010/midea-meiju-codec/issues",
"requirements": ["lupa>=2.0"],
"version": "v0.0.3"
}

View File

@@ -0,0 +1,130 @@
from enum import IntEnum
from homeassistant.helpers.entity import Entity
from homeassistant.const import (
STATE_ON,
STATE_OFF
)
from .const import DOMAIN
from .core.logger import MideaLogger
class Rationale(IntEnum):
EQUALLY = 0
GREATER = 1
LESS = 2
class MideaEntity(Entity):
def __init__(self, device, manufacturer: str | None, rationale: list | None, entity_key: str, config: dict):
self._device = device
self._device.register_update(self.update_state)
self._entity_key = entity_key
self._config = config
self._device_name = self._device.device_name
self._rationale = rationale
if rationale_local := config.get("rationale"):
self._rationale = rationale_local
if self._rationale is None:
self._rationale = ["off", "on"]
self._attr_native_unit_of_measurement = self._config.get("unit_of_measurement")
self._attr_device_class = self._config.get("device_class")
self._attr_state_class = self._config.get("state_class")
self._attr_icon = self._config.get("icon")
self._attr_unique_id = f"{DOMAIN}.{self._device.device_id}_{self._entity_key}"
self._attr_device_info = {
"manufacturer": "Midea" if manufacturer is None else manufacturer,
"model": f"{self._device.model}",
"identifiers": {(DOMAIN, self._device.device_id)},
"name": self._device_name
}
name = self._config.get("name")
if name is None:
name = self._entity_key.replace("_", " ").title()
self._attr_name = f"{self._device_name} {name}"
self.entity_id = self._attr_unique_id
@property
def device(self):
return self._device
@property
def should_poll(self):
return False
@property
def available(self):
return self._device.connected
def _get_status_on_off(self, status_key: str):
result = False
status = self._device.get_attribute(status_key)
if status is not None:
try:
result = bool(self._rationale.index(status))
except ValueError:
MideaLogger.error(f"The value of attribute {status_key} ('{status}') "
f"is not in rationale {self._rationale}")
return result
def _set_status_on_off(self, status_key: str, turn_on: bool):
self._device.set_attribute(status_key, self._rationale[int(turn_on)])
def _list_get_selected(self, key_of_list: list, rationale: Rationale = Rationale.EQUALLY):
for index in range(0, len(key_of_list)):
match = True
for attr, value in key_of_list[index].items():
state_value = self._device.get_attribute(attr)
if state_value is None:
match = False
break
if rationale is Rationale.EQUALLY and state_value != value:
match = False
break
if rationale is Rationale.GREATER and state_value < value:
match = False
break
if rationale is Rationale.LESS and state_value > value:
match = False
break
if match:
return index
return None
def _dict_get_selected(self, key_of_dict: dict, rationale: Rationale = Rationale.EQUALLY):
for mode, status in key_of_dict.items():
match = True
for attr, value in status.items():
state_value = self._device.get_attribute(attr)
if state_value is None:
match = False
break
if rationale is Rationale.EQUALLY and state_value != value:
match = False
break
if rationale is Rationale.GREATER and state_value < value:
match = False
break
if rationale is Rationale.LESS and state_value > value:
match = False
break
if match:
return mode
return None
def update_state(self, status):
if self._entity_key in status or "connected" in status:
try:
self.schedule_update_ha_state()
except Exception as e:
pass
class MideaBinaryBaseEntity(MideaEntity):
@property
def state(self):
return STATE_ON if self.is_on else STATE_OFF
@property
def is_on(self):
return self._get_status_on_off(self._entity_key)

View File

@@ -0,0 +1,50 @@
from homeassistant.components.select import SelectEntity
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES,
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaEntity
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.SELECT)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaSelectEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaSelectEntity(MideaEntity, SelectEntity):
def __init__(self, device, manufacturer, rationale, entity_key, config):
super().__init__(device, manufacturer, rationale, entity_key, config)
self._key_options = self._config.get("options")
@property
def options(self):
return list(self._key_options.keys())
@property
def current_option(self):
return self._dict_get_selected(self._key_options)
def select_option(self, option: str):
new_status = self._key_options.get(option)
self._device.set_attributes(new_status)
def update_state(self, status):
try:
self.schedule_update_ha_state()
except Exception as e:
pass

View File

@@ -0,0 +1,32 @@
from homeassistant.components.sensor import SensorEntity
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaEntity
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.SENSOR)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaSensorEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaSensorEntity(MideaEntity, SensorEntity):
@property
def native_value(self):
return self._device.get_attribute(self._entity_key)

View File

@@ -0,0 +1,18 @@
set_attributes:
fields:
device_id:
example: "1234567890"
attributes:
example:
"power": "on"
"mode": "cool"
set_mode:
send_command:
fields:
device_id:
example: "1234567890"
cmd_type:
example: 2
cmd_body:
example: "B0FF01370E0000A500"

View File

@@ -0,0 +1,34 @@
from homeassistant.components.switch import SwitchEntity
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES,
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaBinaryBaseEntity
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.SWITCH)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaSwitchEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaSwitchEntity(MideaBinaryBaseEntity, SwitchEntity):
def turn_on(self):
self._set_status_on_off(self._entity_key, True)
def turn_off(self):
self._set_status_on_off(self._entity_key, False)

View File

@@ -0,0 +1,106 @@
{
"config": {
"error": {
"no_home": "No available home",
"account_invalid": "Failed to authenticate on Midea cloud, the password may be changed",
"invalid_input": "Illegal input, IP address or 'auto' needed",
"login_failed": "Failed to login, wrong account or password",
"offline_error": "Only the online appliance can be configured",
"download_lua_failed": "Failed to download lua script of appliance",
"discover_failed": "The appliance can't be found on the local network",
"no_new_devices": "No any new available can be found in your home",
"connect_error": "Can't connect to the appliance"
},
"step": {
"user": {
"data": {
"account": "Account",
"password": "Password"
},
"description": "Login and save storage your account",
"title": "Login"
},
"home": {
"title": "Home",
"data": {
"home": "Choose a location where your appliance in"
}
},
"device": {
"title": "Appliances",
"data": {
"device_id": "Choice a appliance to add"
}
},
"discover": {
"description": "Discover the appliance, it must in the local area work",
"title": "Appliance info",
"data": {
"ip_address": "IP address('auto' for discovery automatic)"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"option": "Option"
},
"title": "Configure"
},
"reset":{
"title": "Reset the configuration of appliance",
"description": "Remove the old configuration and make a new configuration use template\nIf your configuration was modified, the changes will lost\nIf your appliance type or model not in template, then the new configuration won't be made",
"data":{
"check": "I know that, do it"
}
},
"configure": {
"data": {
"ip_address": "IP address",
"refresh_interval": "Refresh interval(0 means not refreshing actively)"
},
"title": "Option"
}
},
"abort":{
"reset_success": "Reset done",
"account_unsupport_config": "Doesn't support this operation"
}
},
"services": {
"set_attribute": {
"name": "set the attributes",
"description": "Set the attributes of appliance in a dict",
"fields" : {
"device_id": {
"name": "Appliance code",
"description": "Appliance code (Device ID)"
},
"attributes": {
"name": "Attributes",
"description": "Attributes to set"
}
}
},
"send_command": {
"name": "Custom command",
"description": "Send a custom command to appliance",
"fields" : {
"device_id": {
"name": "Appliance code",
"description": "Appliance code (Device ID)"
},
"cmd_type": {
"name": "Type of command",
"description": "It can be 2 (query) or 3 (control)"
},
"cmd_body": {
"name": "Body of command",
"description": "The body of command without the MSmart protocol head and the checksum at the end"
}
}
}
}
}

View File

@@ -0,0 +1,106 @@
{
"config": {
"error": {
"no_home": "未找到可用家庭",
"account_invalid": "登录美的云服务器失败,是否已修改过密码",
"invalid_input": "无效的输入请输入有效IP地址或auto",
"login_failed": "无法登录到选择的美的云服务器,请检查用户名或密码",
"offline_error": "只能配置在线设备",
"download_lua_failed": "下载设备协议脚本失败",
"discover_failed": "无法在本地搜索到该设备",
"no_new_devices": "没有可用的设备",
"connect_error": "无法连接到指定设备"
},
"step": {
"user": {
"data": {
"account": "用户名",
"password": "密码"
},
"description": "登录并保存你的美居账号及密码",
"title": "登录"
},
"home": {
"title": "家庭",
"data": {
"home": "选择设备所在家庭"
}
},
"device": {
"title": "设备",
"data": {
"device_id": "选择要添加的设备"
}
},
"discover": {
"description": "获取设备信息,设备必须位于本地局域网内",
"title": "设备信息",
"data": {
"ip_address": "设备地址(输入auto自动搜索设备)"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"option": "操作"
},
"title": "选项"
},
"reset":{
"title": "重置配置文件",
"description": "移除已有的设备配置,并使用标准模板重新生成设备配置\n如果你的设备配置json文件进行过修改重置之后修改将丢失\n如果标准模板中没有该设备类型则不会生成设备配置",
"data":{
"check": "我知道了,重置吧"
}
},
"configure": {
"data": {
"ip_address": "IP地址",
"refresh_interval": "刷新间隔(设0为不进行主动刷新)"
},
"title": "配置"
}
},
"abort":{
"reset_success": "重置完成,已尝试生成新的配置",
"account_unsupport_config": "账户配置不支持该操作"
}
},
"services": {
"set_attribute": {
"name": "设置属性",
"description": "设置设备的属性值(可多属性一起设置)",
"fields" : {
"device_id": {
"name": "设备编码",
"description": "设备编码(Device ID)"
},
"attributes": {
"name": "属性集合",
"description": "要设置的属性"
}
}
},
"send_command": {
"name": "自定义命令",
"description": "向设备发送一个自定义命令",
"fields" : {
"device_id": {
"name": "设备编码",
"description": "设备编码(Device ID)"
},
"cmd_type": {
"name": "命令类型",
"description": "命令类型可以为2(查询)或3(设置)"
},
"cmd_body": {
"name": "命令体",
"description": "命令的消息体(不包括前部的MSmart协议头及后部的校验码)"
}
}
}
}
}

View File

@@ -0,0 +1,128 @@
from homeassistant.components.water_heater import WaterHeaterEntity, WaterHeaterEntityFeature
from homeassistant.const import (
Platform,
CONF_DEVICE_ID,
CONF_DEVICE,
CONF_ENTITIES,
ATTR_TEMPERATURE
)
from .const import (
DOMAIN,
DEVICES
)
from .midea_entities import MideaEntity
async def async_setup_entry(hass, config_entry, async_add_entities):
device_id = config_entry.data.get(CONF_DEVICE_ID)
device = hass.data[DOMAIN][DEVICES][device_id].get(CONF_DEVICE)
manufacturer = hass.data[DOMAIN][DEVICES][device_id].get("manufacturer")
rationale = hass.data[DOMAIN][DEVICES][device_id].get("rationale")
entities = hass.data[DOMAIN][DEVICES][device_id].get(CONF_ENTITIES).get(Platform.WATER_HEATER)
devs = []
if entities is not None:
for entity_key, config in entities.items():
devs.append(MideaWaterHeaterEntityEntity(device, manufacturer, rationale, entity_key, config))
async_add_entities(devs)
class MideaWaterHeaterEntityEntity(MideaEntity, WaterHeaterEntity):
def __init__(self, device, manufacturer, rationale, entity_key, config):
super().__init__(device, manufacturer, rationale, entity_key, config)
self._key_power = self._config.get("power")
self._key_operation_list = self._config.get("operation_list")
self._key_min_temp = self._config.get("min_temp")
self._key_max_temp = self._config.get("max_temp")
self._key_current_temperature = self._config.get("current_temperature")
self._key_target_temperature = self._config.get("target_temperature")
self._attr_temperature_unit = self._config.get("temperature_unit")
self._attr_precision = self._config.get("precision")
@property
def supported_features(self):
features = 0
if self._key_target_temperature is not None:
features |= WaterHeaterEntityFeature.TARGET_TEMPERATURE
if self._key_operation_list is not None:
features |= WaterHeaterEntityFeature.OPERATION_MODE
return features
@property
def operation_list(self):
return list(self._key_operation_list.keys())
@property
def current_operation(self):
return self._dict_get_selected(self._key_operation_list)
@property
def current_temperature(self):
return self._device.get_attribute(self._key_current_temperature)
@property
def target_temperature(self):
if isinstance(self._key_target_temperature, list):
temp_int = self._device.get_attribute(self._key_target_temperature[0])
tem_dec = self._device.get_attribute(self._key_target_temperature[1])
if temp_int is not None and tem_dec is not None:
return temp_int + tem_dec
return None
else:
return self._device.get_attribute(self._key_target_temperature)
@property
def min_temp(self):
if isinstance(self._key_min_temp, str):
return float(self._device.get_attribute(self._key_min_temp))
else:
return float(self._key_min_temp)
@property
def max_temp(self):
if isinstance(self._key_max_temp, str):
return float(self._device.get_attribute(self._key_max_temp))
else:
return float(self._key_max_temp)
@property
def target_temperature_low(self):
return self.min_temp
@property
def target_temperature_high(self):
return self.max_temp
@property
def is_on(self) -> bool:
return self._get_status_on_off(self._key_power)
def turn_on(self):
self._set_status_on_off(self._key_power, True)
def turn_off(self):
self._set_status_on_off(self._key_power, False)
def set_temperature(self, **kwargs):
if ATTR_TEMPERATURE not in kwargs:
return
temperature = kwargs.get(ATTR_TEMPERATURE)
temp_int, temp_dec = divmod(temperature, 1)
temp_int = int(temp_int)
new_status = {}
if isinstance(self._key_target_temperature, list):
new_status[self._key_target_temperature[0]] = temp_int
new_status[self._key_target_temperature[1]] = temp_dec
else:
new_status[self._key_target_temperature] = temperature
self._device.set_attributes(new_status)
def set_operation_mode(self, operation_mode: str) -> None:
new_status = self._key_operation_list.get(operation_mode)
self._device.set_attributes(new_status)
def update_state(self, status):
try:
self.schedule_update_ha_state()
except Exception as e:
pass