forked from xiaozhi/xiaozhi-esp32
add: GUI版P3文件简易播放器 (#334)
This commit is contained in:
245
scripts/p3_tools/p3_gui_player.py
Normal file
245
scripts/p3_tools/p3_gui_player.py
Normal file
@@ -0,0 +1,245 @@
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox
|
||||
import threading
|
||||
import time
|
||||
import opuslib
|
||||
import struct
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
import os
|
||||
|
||||
|
||||
def play_p3_file(input_file, stop_event=None, pause_event=None):
|
||||
"""
|
||||
播放p3格式的音频文件
|
||||
p3格式: [1字节类型, 1字节保留, 2字节长度, Opus数据]
|
||||
"""
|
||||
# 初始化Opus解码器
|
||||
sample_rate = 16000 # 采样率固定为16000Hz
|
||||
channels = 1 # 单声道
|
||||
decoder = opuslib.Decoder(sample_rate, channels)
|
||||
|
||||
# 帧大小 (60ms)
|
||||
frame_size = int(sample_rate * 60 / 1000)
|
||||
|
||||
# 打开音频流
|
||||
stream = sd.OutputStream(
|
||||
samplerate=sample_rate,
|
||||
channels=channels,
|
||||
dtype='int16'
|
||||
)
|
||||
stream.start()
|
||||
|
||||
try:
|
||||
with open(input_file, 'rb') as f:
|
||||
print(f"正在播放: {input_file}")
|
||||
|
||||
while True:
|
||||
if stop_event and stop_event.is_set():
|
||||
break
|
||||
|
||||
if pause_event and pause_event.is_set():
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
# 读取头部 (4字节)
|
||||
header = f.read(4)
|
||||
if not header or len(header) < 4:
|
||||
break
|
||||
|
||||
# 解析头部
|
||||
packet_type, reserved, data_len = struct.unpack('>BBH', header)
|
||||
|
||||
# 读取Opus数据
|
||||
opus_data = f.read(data_len)
|
||||
if not opus_data or len(opus_data) < data_len:
|
||||
break
|
||||
|
||||
# 解码Opus数据
|
||||
pcm_data = decoder.decode(opus_data, frame_size)
|
||||
|
||||
# 将字节转换为numpy数组
|
||||
audio_array = np.frombuffer(pcm_data, dtype=np.int16)
|
||||
|
||||
# 播放音频
|
||||
stream.write(audio_array)
|
||||
|
||||
# 等待一帧的时间
|
||||
time.sleep(60 / 1000) # 60ms
|
||||
|
||||
# 播放结束后添加0.5秒静音,避免破音
|
||||
silence = np.zeros(int(sample_rate / 2), dtype=np.int16)
|
||||
stream.write(silence)
|
||||
time.sleep(0.5) # 等待1秒
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n播放已停止")
|
||||
finally:
|
||||
stream.stop()
|
||||
stream.close()
|
||||
print("播放完成")
|
||||
|
||||
|
||||
class P3PlayerApp:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
self.root.title("P3 文件简易播放器")
|
||||
self.root.geometry("500x400")
|
||||
|
||||
self.playlist = []
|
||||
self.current_index = 0
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.stop_event = threading.Event()
|
||||
self.pause_event = threading.Event()
|
||||
|
||||
# 创建界面组件
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
# 播放列表
|
||||
self.playlist_label = tk.Label(self.root, text="播放列表:")
|
||||
self.playlist_label.pack(pady=5)
|
||||
|
||||
self.playlist_frame = tk.Frame(self.root)
|
||||
self.playlist_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
|
||||
|
||||
self.playlist_listbox = tk.Listbox(self.playlist_frame, selectmode=tk.SINGLE)
|
||||
self.playlist_listbox.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# 复选框和移除按钮
|
||||
self.checkbox_frame = tk.Frame(self.root)
|
||||
self.checkbox_frame.pack(pady=5)
|
||||
|
||||
self.remove_button = tk.Button(self.checkbox_frame, text="移除文件", command=self.remove_files)
|
||||
self.remove_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 控制按钮
|
||||
self.control_frame = tk.Frame(self.root)
|
||||
self.control_frame.pack(pady=10)
|
||||
|
||||
self.add_button = tk.Button(self.control_frame, text="添加文件", command=self.add_file)
|
||||
self.add_button.grid(row=0, column=0, padx=5)
|
||||
|
||||
self.play_button = tk.Button(self.control_frame, text="播放", command=self.play)
|
||||
self.play_button.grid(row=0, column=1, padx=5)
|
||||
|
||||
self.pause_button = tk.Button(self.control_frame, text="暂停", command=self.pause)
|
||||
self.pause_button.grid(row=0, column=2, padx=5)
|
||||
|
||||
self.stop_button = tk.Button(self.control_frame, text="停止", command=self.stop)
|
||||
self.stop_button.grid(row=0, column=3, padx=5)
|
||||
|
||||
self.next_button = tk.Button(self.control_frame, text="下一首", command=self.next)
|
||||
self.next_button.grid(row=0, column=4, padx=5)
|
||||
|
||||
# 状态标签
|
||||
self.status_label = tk.Label(self.root, text="未在播放", fg="blue")
|
||||
self.status_label.pack(pady=10)
|
||||
|
||||
def add_file(self):
|
||||
files = filedialog.askopenfilenames(filetypes=[("P3 文件", "*.p3")])
|
||||
if files:
|
||||
self.playlist.extend(files)
|
||||
self.update_playlist()
|
||||
|
||||
def update_playlist(self):
|
||||
self.playlist_listbox.delete(0, tk.END)
|
||||
for file in self.playlist:
|
||||
self.playlist_listbox.insert(tk.END, os.path.basename(file)) # 仅显示文件名
|
||||
|
||||
def update_status(self, status_text, color="blue"):
|
||||
"""更新状态标签的内容"""
|
||||
self.status_label.config(text=status_text, fg=color)
|
||||
|
||||
def play(self):
|
||||
if not self.playlist:
|
||||
messagebox.showwarning("警告", "播放列表为空!")
|
||||
return
|
||||
|
||||
if self.is_paused:
|
||||
self.is_paused = False
|
||||
self.pause_event.clear()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
return
|
||||
|
||||
if self.is_playing:
|
||||
return
|
||||
|
||||
self.is_playing = True
|
||||
self.stop_event.clear()
|
||||
self.pause_event.clear()
|
||||
self.current_index = self.playlist_listbox.curselection()[0] if self.playlist_listbox.curselection() else 0
|
||||
self.play_thread = threading.Thread(target=self.play_audio, daemon=True)
|
||||
self.play_thread.start()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
|
||||
def play_audio(self):
|
||||
while self.current_index < len(self.playlist):
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
if self.pause_event.is_set():
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
file = self.playlist[self.current_index]
|
||||
self.playlist_listbox.selection_clear(0, tk.END)
|
||||
self.playlist_listbox.selection_set(self.current_index)
|
||||
self.playlist_listbox.activate(self.current_index)
|
||||
|
||||
play_p3_file(file, self.stop_event, self.pause_event)
|
||||
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
self.current_index += 1
|
||||
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.update_status("播放已停止", "red")
|
||||
|
||||
def pause(self):
|
||||
if self.is_playing:
|
||||
self.is_paused = not self.is_paused
|
||||
if self.is_paused:
|
||||
self.pause_event.set()
|
||||
self.update_status("播放已暂停", "orange")
|
||||
else:
|
||||
self.pause_event.clear()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
|
||||
def stop(self):
|
||||
if self.is_playing or self.is_paused:
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.stop_event.set()
|
||||
self.pause_event.clear()
|
||||
self.update_status("播放已停止", "red")
|
||||
|
||||
def next(self):
|
||||
if self.is_playing or self.is_paused:
|
||||
self.stop()
|
||||
time.sleep(0.1) # 等待停止
|
||||
|
||||
self.current_index += 1
|
||||
if self.current_index >= len(self.playlist):
|
||||
self.current_index = 0
|
||||
|
||||
self.play()
|
||||
|
||||
def remove_files(self):
|
||||
selected_indices = self.playlist_listbox.curselection()
|
||||
if not selected_indices:
|
||||
messagebox.showwarning("警告", "请先选择要移除的文件!")
|
||||
return
|
||||
|
||||
for index in reversed(selected_indices):
|
||||
self.playlist.pop(index)
|
||||
self.update_playlist()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
app = P3PlayerApp(root)
|
||||
root.mainloop()
|
||||
Reference in New Issue
Block a user