feat: add transparent protocol.

This commit is contained in:
sususweet
2025-09-28 20:24:15 +08:00
parent 3507671120
commit f3246eb779
11 changed files with 851 additions and 74 deletions

View File

@@ -3,10 +3,13 @@ import time
import datetime
import json
import base64
import asyncio
import requests
from aiohttp import ClientSession
from secrets import token_hex
from .logger import MideaLogger
from .security import CloudSecurity, MeijuCloudSecurity, MSmartCloudSecurity
from .util import bytes_to_dec_string
_LOGGER = logging.getLogger(__name__)
@@ -100,6 +103,46 @@ class MideaCloud:
return None
def _api_request_sync(self, endpoint: str, data: dict, header=None) -> dict | None:
header = header or {}
if not data.get("reqId"):
data.update({
"reqId": token_hex(16)
})
if not data.get("stamp"):
data.update({
"stamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S")
})
random = str(int(time.time()))
url = self._api_url + endpoint
dump_data = json.dumps(data)
sign = self._security.sign(dump_data, random)
header.update({
"content-type": "application/json; charset=utf-8",
"secretVersion": "1",
"sign": sign,
"random": random,
})
if self._access_token is not None:
header.update({
"accesstoken": self._access_token
})
response:dict = {"code": -1}
_LOGGER.debug(f"Midea cloud API header: {header}")
_LOGGER.debug(f"Midea cloud API dump_data: {dump_data}")
try:
r = requests.post(url, headers=header, data=dump_data, timeout=5)
raw = r.content
_LOGGER.debug(f"Midea cloud API url: {url}, data: {data}, response: {raw}")
response = json.loads(raw)
except Exception as e:
_LOGGER.debug(f"API request attempt failed: {e}")
if int(response["code"]) == 0 and "data" in response:
return response["data"]
return None
async def _get_login_id(self) -> str | None:
data = self._make_general_data()
data.update({
@@ -115,27 +158,27 @@ class MideaCloud:
async def login(self) -> bool:
raise NotImplementedError()
async def get_keys(self, appliance_id: int):
result = {}
for method in [1, 2]:
udp_id = self._security.get_udp_id(appliance_id, method)
data = self._make_general_data()
data.update({
"udpid": udp_id
})
response = await self._api_request(
endpoint="/v1/iot/secure/getToken",
data=data
)
if response and "tokenlist" in response:
for token in response["tokenlist"]:
if token["udpId"] == udp_id:
result[method] = {
"token": token["token"].lower(),
"key": token["key"].lower()
}
result.update(default_keys)
return result
async def send_cloud(self, appliance_id: int, data: bytearray):
appliance_code = str(appliance_id)
params = {
'applianceCode': appliance_code,
'order': self._security.aes_encrypt(bytes_to_dec_string(data)).hex(),
'timestamp': 'true',
"isFull": "false"
}
if response := await self._api_request(
endpoint='/v1/appliance/transparent/send',
data=params,
):
if response and response.get('reply'):
_LOGGER.debug("[%s] Cloud command response: %s", appliance_code, response)
reply_data = self._security.aes_decrypt(bytes.fromhex(response['reply']))
return reply_data
else:
_LOGGER.warning("[%s] Cloud command failed: %s", appliance_code, response)
return None
async def list_home(self) -> dict | None:
return {1: "My home"}

View File

@@ -1,10 +1,14 @@
import threading
import socket
from enum import IntEnum
from .cloud import MideaCloud
from .security import LocalSecurity, MSGTYPE_HANDSHAKE_REQUEST, MSGTYPE_ENCRYPTED_REQUEST
from .packet_builder import PacketBuilder
from .message import MessageQuestCustom
from .logger import MideaLogger
from .lua_runtime import MideaCodec
from .util import dec_string_to_bytes
class AuthException(Exception):
@@ -39,7 +43,9 @@ class MiedaDevice(threading.Thread):
subtype: int | None,
connected: bool,
sn: str | None,
sn8: str | None):
sn8: str | None,
lua_file: str | None,
cloud: MideaCloud | None):
threading.Thread.__init__(self)
self._socket = None
self._ip_address = ip_address
@@ -71,6 +77,8 @@ class MiedaDevice(threading.Thread):
self._centralized = []
self._calculate_get = []
self._calculate_set = []
self._lua_runtime = MideaCodec(lua_file, sn=sn, subtype=subtype) if lua_file is not None else None
self._cloud = cloud
@property
def device_name(self):
@@ -126,14 +134,16 @@ class MiedaDevice(threading.Thread):
def get_attribute(self, attribute):
return self._attributes.get(attribute)
def set_attribute(self, attribute, value):
async def set_attribute(self, attribute, value):
if attribute in self._attributes.keys():
new_status = {}
for attr in self._centralized:
new_status[attr] = self._attributes.get(attr)
new_status[attribute] = value
if set_cmd := self._lua_runtime.build_control(new_status):
await self._build_send(set_cmd)
def set_attributes(self, attributes):
async def set_attributes(self, attributes):
new_status = {}
for attr in self._centralized:
new_status[attr] = self._attributes.get(attr)
@@ -142,6 +152,9 @@ class MiedaDevice(threading.Thread):
if attribute in self._attributes.keys():
has_new = True
new_status[attribute] = value
if has_new:
if set_cmd := self._lua_runtime.build_control(new_status):
await self._build_send(set_cmd)
def set_ip_address(self, ip_address):
MideaLogger.debug(f"Update IP address to {ip_address}")
@@ -188,12 +201,6 @@ class MiedaDevice(threading.Thread):
response = response[8: 72]
self._security.tcp_key(response, self._key)
def _send_message(self, data):
if self._protocol == 3:
self._send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST)
else:
self._send_message_v2(data)
def _send_message_v2(self, data):
if self._socket is not None:
self._socket.send(data)
@@ -204,11 +211,128 @@ class MiedaDevice(threading.Thread):
data = self._security.encode_8370(data, msg_type)
self._send_message_v2(data)
def _build_send(self, cmd: str):
async def _build_send(self, cmd: str):
MideaLogger.debug(f"Sending: {cmd.lower()}")
bytes_cmd = bytes.fromhex(cmd)
msg = PacketBuilder(self._device_id, bytes_cmd).finalize()
self._send_message(msg)
await self._send_message(bytes_cmd)
async def refresh_status(self):
for query in self._queries:
if query_cmd := self._lua_runtime.build_query(query):
await self._build_send(query_cmd)
def _parse_cloud_message(self, decrypted):
# MideaLogger.debug(f"Received: {decrypted}")
if status := self._lua_runtime.decode_status(dec_string_to_bytes(decrypted).hex()):
MideaLogger.debug(f"Decoded: {status}")
new_status = {}
for single in status.keys():
value = status.get(single)
if single not in self._attributes or self._attributes[single] != value:
self._attributes[single] = value
new_status[single] = value
if len(new_status) > 0:
for c in self._calculate_get:
lvalue = c.get("lvalue")
rvalue = c.get("rvalue")
if lvalue and rvalue:
calculate = False
for s, v in new_status.items():
if rvalue.find(f"[{s}]") >= 0:
calculate = True
break
if calculate:
calculate_str1 = \
(f"{lvalue.replace('[', 'self._attributes[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[", "[\"").replace("]", "\"]")
calculate_str2 = \
(f"{lvalue.replace('[', 'new_status[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[", "[\"").replace("]", "\"]")
try:
exec(calculate_str1)
exec(calculate_str2)
except Exception:
MideaLogger.warning(
f"Calculation Error: {lvalue} = {rvalue}", self._device_id
)
self._update_all(new_status)
return ParseMessageResult.SUCCESS
def _parse_message(self, msg):
if self._protocol == 3:
messages, self._buffer = self._security.decode_8370(self._buffer + msg)
else:
messages, self._buffer = self.fetch_v2_message(self._buffer + msg)
if len(messages) == 0:
return ParseMessageResult.PADDING
for message in messages:
if message == b"ERROR":
return ParseMessageResult.ERROR
payload_len = message[4] + (message[5] << 8) - 56
payload_type = message[2] + (message[3] << 8)
if payload_type in [0x1001, 0x0001]:
# Heartbeat detected
pass
elif len(message) > 56:
cryptographic = message[40:-16]
if payload_len % 16 == 0:
decrypted = self._security.aes_decrypt(cryptographic)
MideaLogger.debug(f"Received: {decrypted.hex().lower()}")
if status := self._lua_runtime.decode_status(decrypted.hex()):
MideaLogger.debug(f"Decoded: {status}")
new_status = {}
for single in status.keys():
value = status.get(single)
if single not in self._attributes or self._attributes[single] != value:
self._attributes[single] = value
new_status[single] = value
if len(new_status) > 0:
for c in self._calculate_get:
lvalue = c.get("lvalue")
rvalue = c.get("rvalue")
if lvalue and rvalue:
calculate = False
for s, v in new_status.items():
if rvalue.find(f"[{s}]") >= 0:
calculate = True
break
if calculate:
calculate_str1 = \
(f"{lvalue.replace('[', 'self._attributes[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[", "[\"").replace("]", "\"]")
calculate_str2 = \
(f"{lvalue.replace('[', 'new_status[')} = "
f"{rvalue.replace('[', 'self._attributes[')}") \
.replace("[", "[\"").replace("]", "\"]")
try:
exec(calculate_str1)
exec(calculate_str2)
except Exception:
MideaLogger.warning(
f"Calculation Error: {lvalue} = {rvalue}", self._device_id
)
self._update_all(new_status)
return ParseMessageResult.SUCCESS
async def _send_message(self, data):
if reply := await self._cloud.send_cloud(self._device_id, data):
result = self._parse_cloud_message(reply)
if result == ParseMessageResult.ERROR:
MideaLogger.debug(f"Message 'ERROR' received")
elif result == ParseMessageResult.SUCCESS:
timeout_counter = 0
# if self._protocol == 3:
# self._send_message_v3(data, msg_type=MSGTYPE_ENCRYPTED_REQUEST)
# else:
# self._send_message_v2(data)
async def _send_heartbeat(self):
msg = PacketBuilder(self._device_id, bytearray([0x00])).finalize(msg_type=0)
await self._send_message(msg)
def _device_connected(self, connected=True):
self._connected = connected

View File

@@ -0,0 +1,91 @@
import lupa
import threading
import json
from .logger import MideaLogger
class LuaRuntime:
def __init__(self, file):
self._runtimes = lupa.LuaRuntime()
string = f'dofile("{file}")'
self._runtimes.execute(string)
self._lock = threading.Lock()
self._json_to_data = self._runtimes.eval("function(param) return jsonToData(param) end")
self._data_to_json = self._runtimes.eval("function(param) return dataToJson(param) end")
def json_to_data(self, json_value):
with self._lock:
result = self._json_to_data(json_value)
return result
def data_to_json(self, data_value):
with self._lock:
result = self._data_to_json(data_value)
return result
class MideaCodec(LuaRuntime):
def __init__(self, file, sn=None, subtype=None):
super().__init__(file)
self._sn = sn
self._subtype = subtype
def _build_base_dict(self):
device_info ={}
if self._sn is not None:
device_info["deviceSN"] = self._sn
if self._subtype is not None:
device_info["deviceSubType"] = self._subtype
base_dict = {
"deviceinfo": device_info
}
return base_dict
def build_query(self, append=None):
query_dict = self._build_base_dict()
query_dict["query"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_query {json_str}: {repr(e)}")
return None
def build_control(self, append=None):
query_dict = self._build_base_dict()
query_dict["control"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_control {json_str}: {repr(e)}")
return None
def build_status(self, append=None):
query_dict = self._build_base_dict()
query_dict["status"] = {} if append is None else append
json_str = json.dumps(query_dict)
try:
result = self.json_to_data(json_str)
return result
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in build_status {json_str}: {repr(e)}")
return None
def decode_status(self, data: str):
data_dict = self._build_base_dict()
data_dict["msg"] = {
"data": data
}
json_str = json.dumps(data_dict)
try:
result = self.data_to_json(json_str)
status = json.loads(result)
return status.get("status")
except lupa.LuaError as e:
MideaLogger.error(f"LuaRuntimeError in decode_status {data}: {repr(e)}")
return None

View File

@@ -0,0 +1,50 @@
def bytes_to_dec_string(data: bytearray) -> bytearray:
"""
将 bytearray 转换为逗号分隔的十进制字符串格式,然后返回 bytearray
对应 Java 的 bytesToDecString 方法
"""
# 处理有符号字节(模拟 Java 的 byte 类型 -128 到 127
result = []
for b in data:
# 将无符号字节转换为有符号字节
signed_byte = b if b < 128 else b - 256
result.append(str(signed_byte))
decimal_string = ','.join(result)
return bytearray(decimal_string, 'utf-8')
def dec_string_to_bytes(dec_string: str) -> bytearray:
"""
将逗号分隔的十进制字符串转换为字节数组
对应 Java 的 decStringToBytes 方法
Args:
dec_string: 逗号分隔的十进制字符串,如 "1,2,-3,127"
Returns:
bytearray: 转换后的字节数组
"""
if dec_string is None:
return bytearray()
# 按逗号分割字符串
split_values = dec_string.split(',')
result = bytearray(len(split_values))
for i, value_str in enumerate(split_values):
try:
# 解析十进制字符串为整数,然后转换为字节
int_value = int(value_str.strip())
# 确保值在字节范围内 (-128 到 127)
if int_value < -128:
int_value = -128
elif int_value > 127:
int_value = 127
result[i] = int_value & 0xFF # 转换为无符号字节
except (ValueError, IndexError) as e:
# 如果解析失败,记录错误并跳过该值
print(f"dec_string_to_bytes() error: {e}")
result[i] = 0 # 默认值
return result