From cf4afde88e0308f25a4bf89cd55b98b76b788d37 Mon Sep 17 00:00:00 2001 From: Ky1eYang Date: Thu, 14 Aug 2025 22:11:56 +0800 Subject: [PATCH] =?UTF-8?q?add:=20=E6=B7=BB=E5=8A=A0=E5=A3=B0=E9=9F=B3?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E7=9A=84=E5=8F=AF=E8=A7=86=E5=8C=96=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E5=A3=B0=E6=B3=A2demod=E7=9A=84=E5=87=86=E7=A1=AE?= =?UTF-8?q?=E5=BA=A6=20(#1077)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: yangkaiyue --- scripts/acoustic_check/demod.py | 280 +++++++++++++++ scripts/acoustic_check/graphic.py | 444 ++++++++++++++++++++++++ scripts/acoustic_check/main.py | 18 + scripts/acoustic_check/readme.md | 23 ++ scripts/acoustic_check/requirements.txt | 4 + 5 files changed, 769 insertions(+) create mode 100644 scripts/acoustic_check/demod.py create mode 100644 scripts/acoustic_check/graphic.py create mode 100644 scripts/acoustic_check/main.py create mode 100644 scripts/acoustic_check/readme.md create mode 100644 scripts/acoustic_check/requirements.txt diff --git a/scripts/acoustic_check/demod.py b/scripts/acoustic_check/demod.py new file mode 100644 index 00000000..89c8196a --- /dev/null +++ b/scripts/acoustic_check/demod.py @@ -0,0 +1,280 @@ +""" +实时AFSK解调器 - 基于Goertzel算法 +""" + +import numpy as np +from collections import deque + + +class TraceGoertzel: + """实时Goertzel算法实现""" + + def __init__(self, freq: float, n: int): + """ + 初始化Goertzel算法 + + Args: + freq: 归一化频率 (目标频率/采样频率) + n: 窗口大小 + """ + self.freq = freq + self.n = n + + # 预计算系数 - 与参考代码一致 + self.k = int(freq * n) + self.w = 2.0 * np.pi * freq + self.cw = np.cos(self.w) + self.sw = np.sin(self.w) + self.c = 2.0 * self.cw + + # 初始化状态变量 - 使用deque存储最近两个值 + self.zs = deque([0.0, 0.0], maxlen=2) + + def reset(self): + """重置算法状态""" + self.zs.clear() + self.zs.extend([0.0, 0.0]) + + def __call__(self, xs): + """ + 处理一组采样点 - 与参考代码一致的接口 + + Args: + xs: 采样点序列 + + Returns: + 计算出的振幅 + """ + self.reset() + for x in xs: + z1, z2 = self.zs[-1], self.zs[-2] # Z[-1], Z[-2] + z0 = x + self.c * z1 - z2 # S[n] = x[n] + C * S[n-1] - S[n-2] + self.zs.append(float(z0)) # 更新序列 + return self.amp + + @property + def amp(self) -> float: + """计算当前振幅 - 与参考代码一致""" + z1, z2 = self.zs[-1], self.zs[-2] + ip = self.cw * z1 - z2 + qp = self.sw * z1 + return np.sqrt(ip**2 + qp**2) / (self.n / 2.0) + + +class PairGoertzel: + """双频Goertzel解调器""" + + def __init__(self, f_sample: int, f_space: int, f_mark: int, + bit_rate: int, win_size: int): + """ + 初始化双频解调器 + + Args: + f_sample: 采样频率 + f_space: Space频率 (通常对应0) + f_mark: Mark频率 (通常对应1) + bit_rate: 比特率 + win_size: Goertzel窗口大小 + """ + assert f_sample % bit_rate == 0, "采样频率必须是比特率的整数倍" + + self.Fs = f_sample + self.F0 = f_space + self.F1 = f_mark + self.bit_rate = bit_rate + self.n_per_bit = int(f_sample // bit_rate) # 每个比特的采样点数 + + # 计算归一化频率 + f1 = f_mark / f_sample + f0 = f_space / f_sample + + # 初始化Goertzel算法 + self.g0 = TraceGoertzel(freq=f0, n=win_size) + self.g1 = TraceGoertzel(freq=f1, n=win_size) + + # 输入缓冲区 + self.in_buffer = deque(maxlen=win_size) + self.out_count = 0 + + print(f"PairGoertzel initialized: f0={f0:.6f}, f1={f1:.6f}, win_size={win_size}, n_per_bit={self.n_per_bit}") + + def __call__(self, s: float): + """ + 处理单个采样点 - 与参考代码一致的接口 + + Args: + s: 采样点值 + + Returns: + (amp0, amp1, p1_prob) - 空间频率振幅,标记频率振幅,标记概率 + """ + self.in_buffer.append(s) + self.out_count += 1 + + amp0, amp1, p1_prob = 0, 0, None + + # 每个比特周期输出一次结果 + if self.out_count >= self.n_per_bit: + amp0 = self.g0(self.in_buffer) # 计算space频率振幅 + amp1 = self.g1(self.in_buffer) # 计算mark频率振幅 + p1_prob = amp1 / (amp0 + amp1 + 1e-8) # 计算mark概率 + self.out_count = 0 + + return amp0, amp1, p1_prob + + +class RealTimeAFSKDecoder: + """实时AFSK解码器 - 基于起始帧触发""" + + def __init__(self, f_sample: int = 16000, mark_freq: int = 1800, + space_freq: int = 1500, bitrate: int = 100, + s_goertzel: int = 9, threshold: float = 0.5): + """ + 初始化实时AFSK解码器 + + Args: + f_sample: 采样频率 + mark_freq: Mark频率 + space_freq: Space频率 + bitrate: 比特率 + s_goertzel: Goertzel窗口大小系数 (win_size = f_sample // mark_freq * s_goertzel) + threshold: 判决门限 + """ + self.f_sample = f_sample + self.mark_freq = mark_freq + self.space_freq = space_freq + self.bitrate = bitrate + self.threshold = threshold + + # 计算窗口大小 - 与参考代码一致 + win_size = int(f_sample / mark_freq * s_goertzel) + + # 初始化解调器 + self.demodulator = PairGoertzel(f_sample, space_freq, mark_freq, + bitrate, win_size) + + # 帧定义 - 与参考代码一致 + self.start_bytes = b'\x01\x02' + self.end_bytes = b'\x03\x04' + self.start_bits = "".join(format(int(x), '08b') for x in self.start_bytes) + self.end_bits = "".join(format(int(x), '08b') for x in self.end_bytes) + + # 状态机 + self.state = "idle" # idle / entering + + # 存储解调结果 + self.buffer_prelude:deque = deque(maxlen=len(self.start_bits)) # 判断是否启动 + self.indicators = [] # 存储概率序列 + self.signal_bits = "" # 存储比特序列 + self.text_cache = "" + + # 解码结果 + self.decoded_messages = [] + self.total_bits_received = 0 + + print(f"Decoder initialized: win_size={win_size}") + print(f"Start frame: {self.start_bits} (from {self.start_bytes.hex()})") + print(f"End frame: {self.end_bits} (from {self.end_bytes.hex()})") + + def process_audio(self, samples: np.array) -> str: + """ + 处理音频数据并返回解码文本 + + Args: + audio_data: 音频字节数据 (16-bit PCM) + + Returns: + 新解码的文本 + """ + new_text = "" + # 逐个处理采样点 + for sample in samples: + amp0, amp1, p1_prob = self.demodulator(sample) + # 如果有概率输出,记录并判决 + if p1_prob is not None: + bit = '1' if p1_prob > self.threshold else '0' + match self.state: + case "idle": + self.buffer_prelude.append(bit) + pass + case "entering": + self.buffer_prelude.append(bit) + self.signal_bits += bit + self.total_bits_received += 1 + case _: + pass + self.indicators.append(p1_prob) + + # 检查状态机 + if self.state == "idle" and "".join(self.buffer_prelude) == self.start_bits: + self.state = "entering" + self.text_cache = "" + self.signal_bits = "" # 清空比特序列 + self.buffer_prelude.clear() + elif self.state == "entering" and ("".join(self.buffer_prelude) == self.end_bits or len(self.signal_bits) >= 256): + self.state = "idle" + self.buffer_prelude.clear() + + # 每收集一定数量的比特后尝试解码 + if len(self.signal_bits) >= 8: + text = self._decode_bits_to_text(self.signal_bits) + if len(text) > len(self.text_cache): + new_text = text[len(self.text_cache) - len(text):] + self.text_cache = text + return new_text + + def _decode_bits_to_text(self, bits: str) -> str: + """ + 将比特串解码为文本 + + Args: + bits: 比特串 + + Returns: + 解码出的文本 + """ + if len(bits) < 8: + return "" + + decoded_text = "" + byte_count = len(bits) // 8 + + for i in range(byte_count): + # 提取8位 + byte_bits = bits[i*8:(i+1)*8] + + # 位转字节 + byte_val = int(byte_bits, 2) + + # 尝试解码为ASCII字符 + if 32 <= byte_val <= 126: # 可打印ASCII字符 + decoded_text += chr(byte_val) + elif byte_val == 0: # NULL字符,忽略 + continue + else: + # 非可打印字符pass,以十六进制显示 + pass + # decoded_text += f"\\x{byte_val:02X}" + + return decoded_text + + def clear(self): + """清空解码状态""" + self.indicators = [] + self.signal_bits = "" + self.decoded_messages = [] + self.total_bits_received = 0 + print("解码器状态已清空") + + def get_stats(self) -> dict: + """获取解码统计信息""" + return { + 'prelude_bits': "".join(self.buffer_prelude), + "state": self.state, + 'total_chars': sum(len(msg) for msg in self.text_cache), + 'buffer_bits': len(self.signal_bits), + 'mark_freq': self.mark_freq, + 'space_freq': self.space_freq, + 'bitrate': self.bitrate, + 'threshold': self.threshold, + } diff --git a/scripts/acoustic_check/graphic.py b/scripts/acoustic_check/graphic.py new file mode 100644 index 00000000..e60cf063 --- /dev/null +++ b/scripts/acoustic_check/graphic.py @@ -0,0 +1,444 @@ +import sys +import numpy as np +import asyncio +import wave +from collections import deque +import qasync + +import matplotlib +matplotlib.use('qtagg') + +from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.backends.backend_qtagg import NavigationToolbar2QT as NavigationToolbar # noqa: F401 +from matplotlib.figure import Figure + +from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QWidget, + QHBoxLayout, QLineEdit, QPushButton, QLabel, QTextEdit) +from PyQt6.QtCore import QTimer + +# 导入解码器 +from demod import RealTimeAFSKDecoder + + +class UDPServerProtocol(asyncio.DatagramProtocol): + """UDP服务器协议类""" + def __init__(self, data_queue): + self.client_address = None + self.data_queue: deque = data_queue + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + # 如果还没有客户端地址,记录第一个连接的客户端 + if self.client_address is None: + self.client_address = addr + print(f"接受来自 {addr} 的连接") + + # 只处理来自已记录客户端的数据 + if addr == self.client_address: + # 将接收到的音频数据添加到队列 + self.data_queue.extend(data) + else: + print(f"忽略来自未知地址 {addr} 的数据") + + +class MatplotlibWidget(QWidget): + def __init__(self, parent=None): + super().__init__(parent) + + # 创建 Matplotlib 的 Figure 对象 + self.figure = Figure() + + # 创建 FigureCanvas 对象,它是 Figure 的 QWidget 容器 + self.canvas = FigureCanvas(self.figure) + + # 创建 Matplotlib 的导航工具栏 + # self.toolbar = NavigationToolbar(self.canvas, self) + self.toolbar = None + + # 创建布局 + layout = QVBoxLayout() + layout.addWidget(self.toolbar) + layout.addWidget(self.canvas) + self.setLayout(layout) + + # 初始化音频数据参数 + self.freq = 16000 # 采样频率 + self.time_window = 20 # 显示时间窗口 + self.wave_data = deque(maxlen=self.freq * self.time_window * 2) # 缓冲队列, 用于分发计算/绘图 + self.signals = deque(maxlen=self.freq * self.time_window) # 双端队列存储信号数据 + + # 创建包含两个子图的画布 + self.ax1 = self.figure.add_subplot(2, 1, 1) + self.ax2 = self.figure.add_subplot(2, 1, 2) + + # 时域子图 + self.ax1.set_title('Real-time Audio Waveform') + self.ax1.set_xlabel('Sample Index') + self.ax1.set_ylabel('Amplitude') + self.line_time, = self.ax1.plot([], []) + self.ax1.grid(True, alpha=0.3) + + # 频域子图 + self.ax2.set_title('Real-time Frequency Spectrum') + self.ax2.set_xlabel('Frequency (Hz)') + self.ax2.set_ylabel('Magnitude') + self.line_freq, = self.ax2.plot([], []) + self.ax2.grid(True, alpha=0.3) + + self.figure.tight_layout() + + # 定时器用于更新图表 + self.timer = QTimer(self) + self.timer.setInterval(100) # 100毫秒更新一次 + self.timer.timeout.connect(self.update_plot) + + # 初始化AFSK解码器 + self.decoder = RealTimeAFSKDecoder( + f_sample=self.freq, + mark_freq=1800, + space_freq=1500, + bitrate=100, + s_goertzel=9, + threshold=0.5 + ) + + # 解码结果回调 + self.decode_callback = None + + def start_plotting(self): + """开始绘图""" + self.timer.start() + + def stop_plotting(self): + """停止绘图""" + self.timer.stop() + + def update_plot(self): + """更新绘图数据""" + if len(self.wave_data) >= 2: + # 进行实时解码 + # 获取最新的音频数据进行解码 + even = len(self.wave_data) // 2 * 2 + print(f"length of wave_data: {len(self.wave_data)}") + drained = [self.wave_data.popleft() for _ in range(even)] + signal = np.frombuffer(bytearray(drained), dtype=' 0: + # 只显示最近的一段数据,避免图表过于密集 + signal = np.array(self.signals) + max_samples = min(len(signal), self.freq * self.time_window) + if len(signal) > max_samples: + signal = signal[-max_samples:] + + # 更新时域图 + x = np.arange(len(signal)) + self.line_time.set_data(x, signal) + + # 自动调整时域坐标轴范围 + if len(signal) > 0: + self.ax1.set_xlim(0, len(signal)) + y_min, y_max = np.min(signal), np.max(signal) + if y_min != y_max: + margin = (y_max - y_min) * 0.1 + self.ax1.set_ylim(y_min - margin, y_max + margin) + else: + self.ax1.set_ylim(-1, 1) + + # 计算频谱(短时离散傅立叶变换) + if len(signal) > 1: + # 计算FFT + fft_signal = np.abs(np.fft.fft(signal)) + frequencies = np.fft.fftfreq(len(signal), 1/self.freq) + + # 只取正频率部分 + positive_freq_idx = frequencies >= 0 + freq_positive = frequencies[positive_freq_idx] + fft_positive = fft_signal[positive_freq_idx] + + # 更新频域图 + self.line_freq.set_data(freq_positive, fft_positive) + + # 自动调整频域坐标轴范围 + if len(fft_positive) > 0: + # 限制频率显示范围到0-4000Hz,避免过于密集 + max_freq_show = min(4000, self.freq // 2) + freq_mask = freq_positive <= max_freq_show + if np.any(freq_mask): + self.ax2.set_xlim(0, max_freq_show) + fft_masked = fft_positive[freq_mask] + if len(fft_masked) > 0: + fft_max = np.max(fft_masked) + if fft_max > 0: + self.ax2.set_ylim(0, fft_max * 1.1) + else: + self.ax2.set_ylim(0, 1) + + self.canvas.draw() + + +class MainWindow(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle("Acoustic Check") + self.setGeometry(100, 100, 1000, 800) + + # 主窗口部件 + main_widget = QWidget() + self.setCentralWidget(main_widget) + + # 主布局 + main_layout = QVBoxLayout(main_widget) + + # 绘图区域 + self.matplotlib_widget = MatplotlibWidget() + main_layout.addWidget(self.matplotlib_widget) + + # 控制面板 + control_panel = QWidget() + control_layout = QHBoxLayout(control_panel) + + # 监听地址和端口输入 + control_layout.addWidget(QLabel("监听地址:")) + self.address_input = QLineEdit("0.0.0.0") + self.address_input.setFixedWidth(120) + control_layout.addWidget(self.address_input) + + control_layout.addWidget(QLabel("端口:")) + self.port_input = QLineEdit("8000") + self.port_input.setFixedWidth(80) + control_layout.addWidget(self.port_input) + + # 监听按钮 + self.listen_button = QPushButton("开始监听") + self.listen_button.clicked.connect(self.toggle_listening) + control_layout.addWidget(self.listen_button) + + # 状态标签 + self.status_label = QLabel("状态: 未连接") + control_layout.addWidget(self.status_label) + + # 数据统计标签 + self.data_label = QLabel("接收数据: 0 bytes") + control_layout.addWidget(self.data_label) + + # 保存按钮 + self.save_button = QPushButton("保存音频") + self.save_button.clicked.connect(self.save_audio) + self.save_button.setEnabled(False) + control_layout.addWidget(self.save_button) + + control_layout.addStretch() # 添加弹性空间 + + main_layout.addWidget(control_panel) + + # 解码显示区域 + decode_panel = QWidget() + decode_layout = QVBoxLayout(decode_panel) + + # 解码标题 + decode_title = QLabel("实时AFSK解码结果:") + decode_title.setStyleSheet("font-weight: bold; font-size: 14px;") + decode_layout.addWidget(decode_title) + + # 解码文本显示 + self.decode_text = QTextEdit() + self.decode_text.setMaximumHeight(150) + self.decode_text.setReadOnly(True) + self.decode_text.setStyleSheet("font-family: 'Courier New', monospace; font-size: 12px;") + decode_layout.addWidget(self.decode_text) + + # 解码控制按钮 + decode_control_layout = QHBoxLayout() + + # 清空按钮 + self.clear_decode_button = QPushButton("清空解码") + self.clear_decode_button.clicked.connect(self.clear_decode_text) + decode_control_layout.addWidget(self.clear_decode_button) + + # 解码统计标签 + self.decode_stats_label = QLabel("解码统计: 0 bits, 0 chars") + decode_control_layout.addWidget(self.decode_stats_label) + + decode_control_layout.addStretch() + decode_layout.addLayout(decode_control_layout) + + main_layout.addWidget(decode_panel) + + # 设置解码回调 + self.matplotlib_widget.decode_callback = self.on_decode_text + + # UDP相关属性 + self.udp_transport = None + self.is_listening = False + + # 数据统计定时器 + self.stats_timer = QTimer(self) + self.stats_timer.setInterval(1000) # 每秒更新一次统计 + self.stats_timer.timeout.connect(self.update_stats) + + def on_decode_text(self, new_text: str): + """解码文本回调""" + if new_text: + # 添加新解码的文本 + current_text = self.decode_text.toPlainText() + updated_text = current_text + new_text + + # 限制文本长度,保留最新的1000个字符 + if len(updated_text) > 1000: + updated_text = updated_text[-1000:] + + self.decode_text.setPlainText(updated_text) + + # 滚动到底部 + cursor = self.decode_text.textCursor() + cursor.movePosition(cursor.MoveOperation.End) + self.decode_text.setTextCursor(cursor) + + def clear_decode_text(self): + """清空解码文本""" + self.decode_text.clear() + if hasattr(self.matplotlib_widget, 'decoder'): + self.matplotlib_widget.decoder.clear() + self.decode_stats_label.setText("解码统计: 0 bits, 0 chars") + + def update_decode_stats(self): + """更新解码统计""" + if hasattr(self.matplotlib_widget, 'decoder'): + stats = self.matplotlib_widget.decoder.get_stats() + stats_text = ( + f"前置: {stats['prelude_bits']} , 已接收{stats['total_chars']} chars, " + f"缓冲: {stats['buffer_bits']} bits, 状态: {stats['state']}" + ) + self.decode_stats_label.setText(stats_text) + + def toggle_listening(self): + """切换监听状态""" + if not self.is_listening: + self.start_listening() + else: + self.stop_listening() + + async def start_listening_async(self): + """异步启动UDP监听""" + try: + address = self.address_input.text().strip() + port = int(self.port_input.text().strip()) + + loop = asyncio.get_running_loop() + self.udp_transport, protocol = await loop.create_datagram_endpoint( + lambda: UDPServerProtocol(self.matplotlib_widget.wave_data), + local_addr=(address, port) + ) + + self.status_label.setText(f"状态: 监听中 ({address}:{port})") + print(f"UDP服务器启动, 监听 {address}:{port}") + + except Exception as e: + self.status_label.setText(f"状态: 启动失败 - {str(e)}") + print(f"UDP服务器启动失败: {e}") + self.is_listening = False + self.listen_button.setText("开始监听") + self.address_input.setEnabled(True) + self.port_input.setEnabled(True) + + def start_listening(self): + """开始监听""" + try: + int(self.port_input.text().strip()) # 验证端口号格式 + except ValueError: + self.status_label.setText("状态: 端口号必须是数字") + return + + self.is_listening = True + self.listen_button.setText("停止监听") + self.address_input.setEnabled(False) + self.port_input.setEnabled(False) + self.save_button.setEnabled(True) + + # 清空数据队列 + self.matplotlib_widget.wave_data.clear() + + # 启动绘图和统计更新 + self.matplotlib_widget.start_plotting() + self.stats_timer.start() + + # 异步启动UDP服务器 + loop = asyncio.get_event_loop() + loop.create_task(self.start_listening_async()) + + def stop_listening(self): + """停止监听""" + self.is_listening = False + self.listen_button.setText("开始监听") + self.address_input.setEnabled(True) + self.port_input.setEnabled(True) + + # 停止UDP服务器 + if self.udp_transport: + self.udp_transport.close() + self.udp_transport = None + + # 停止绘图和统计更新 + self.matplotlib_widget.stop_plotting() + self.matplotlib_widget.wave_data.clear() + self.stats_timer.stop() + + self.status_label.setText("状态: 已停止") + + def update_stats(self): + """更新数据统计""" + data_size = len(self.matplotlib_widget.signals) + self.data_label.setText(f"接收数据: {data_size} 采样") + + # 更新解码统计 + self.update_decode_stats() + + def save_audio(self): + """保存音频数据""" + if len(self.matplotlib_widget.signals) > 0: + try: + signal_data = np.array(self.matplotlib_widget.signals) + + # 保存为WAV文件 + with wave.open("received_audio.wav", "wb") as wf: + wf.setnchannels(1) # 单声道 + wf.setsampwidth(2) # 采样宽度为2字节 + wf.setframerate(self.matplotlib_widget.freq) # 设置采样率 + wf.writeframes(signal_data.tobytes()) # 写入数据 + + self.status_label.setText("状态: 音频已保存为 received_audio.wav") + print("音频已保存为 received_audio.wav") + + except Exception as e: + self.status_label.setText(f"状态: 保存失败 - {str(e)}") + print(f"保存音频失败: {e}") + else: + self.status_label.setText("状态: 没有足够的数据可保存") + + +async def main(): + """异步主函数""" + app = QApplication(sys.argv) + + # 设置异步事件循环 + loop = qasync.QEventLoop(app) + asyncio.set_event_loop(loop) + + window = MainWindow() + window.show() + + try: + with loop: + await loop.run_forever() + except KeyboardInterrupt: + print("程序被用户中断") + finally: + # 确保清理资源 + if window.udp_transport: + window.udp_transport.close() \ No newline at end of file diff --git a/scripts/acoustic_check/main.py b/scripts/acoustic_check/main.py new file mode 100644 index 00000000..63ec8497 --- /dev/null +++ b/scripts/acoustic_check/main.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +""" +音频实时监听与绘图系统主程序 +基于Qt GUI + Matplotlib + UDP接收 + AFSK解码字符串 +""" + +import sys +import asyncio +from graphic import main + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程序被用户中断") + except Exception as e: + print(f"程序执行出错: {e}") + sys.exit(1) diff --git a/scripts/acoustic_check/readme.md b/scripts/acoustic_check/readme.md new file mode 100644 index 00000000..9eb9c7c6 --- /dev/null +++ b/scripts/acoustic_check/readme.md @@ -0,0 +1,23 @@ +# 声波测试 +该gui用于测试接受小智设备通过`udp`回传的`pcm`转时域/频域, 可以保存窗口长度的声音, 用于判断噪音频率分布和测试声波传输ascii的准确度, + +固件测试需要打开`USE_AUDIO_DEBUGGER`, 并设置好`AUDIO_DEBUG_UDP_SERVER`是本机地址. +声波`demod`可以通过`sonic_wifi_config.html`或者上传至`PinMe`的[小智声波配网](https://iqf7jnhi.pinit.eth.limo)来输出声波测试 + +# 声波解码测试记录 + +> `✓`代表在I2S DIN接收原始PCM信号时就能成功解码, `△`代表需要降噪或额外操作可稳定解码, `X`代表降噪后效果也不好(可能能解部分但非常不稳定)。 +> 个别ADC需要I2C配置阶段做更精细的降噪调整, 由于设备不通用暂只按照boards内提供的config测试 + +| 设备 | ADC | MIC | 效果 | 备注 | +| ---- | ---- | --- | --- | ---- | +| bread-compact | INMP441 | 集成MEMEMIC | ✓ | +| atk-dnesp32s3-box | ES8311 | | ✓ | +| magiclick-2p5 | ES8311 | | ✓ | +| lichuang-dev | ES7210 | | △ | 测试时需要关掉INPUT_REFERENCE +| kevin-box-2 | ES7210 | | △ | 测试时需要关掉INPUT_REFERENCE +| m5stack-core-s3 | ES7210 | | △ | 测试时需要关掉INPUT_REFERENCE +| xmini-c3 | ES8311 | | △ | 需降噪 +| atoms3r-echo-base | ES8311 | | △ | 需降噪 +| atk-dnesp32s3-box0 | ES8311 | | X | 能接收且解码, 但是丢包率很高 +| movecall-moji-esp32s3 | ES8311 | | X | 能接收且解码, 但是丢包率很高 \ No newline at end of file diff --git a/scripts/acoustic_check/requirements.txt b/scripts/acoustic_check/requirements.txt new file mode 100644 index 00000000..91bc5eca --- /dev/null +++ b/scripts/acoustic_check/requirements.txt @@ -0,0 +1,4 @@ +matplotlib==3.10.5 +numpy==2.3.2 +PyQt6==6.9.1 +qasync==0.27.1