forked from xiaozhi/xiaozhi-esp32
Add audio_send_queue to synchronize timestamp
This commit is contained in:
@@ -543,16 +543,6 @@ void Application::Start() {
|
|||||||
|
|
||||||
audio_processor_->Initialize(codec);
|
audio_processor_->Initialize(codec);
|
||||||
audio_processor_->OnOutput([this](std::vector<int16_t>&& data) {
|
audio_processor_->OnOutput([this](std::vector<int16_t>&& data) {
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
|
||||||
// We do not have a send queue yet, but all packets are sent by the main task
|
|
||||||
// so we use the main task queue to limit the number of packets
|
|
||||||
if (main_tasks_.size() > MAX_AUDIO_PACKETS_IN_QUEUE) {
|
|
||||||
ESP_LOGW(TAG, "Too many main tasks = %u, skip sending audio...", main_tasks_.size());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
background_task_->Schedule([this, data = std::move(data)]() mutable {
|
background_task_->Schedule([this, data = std::move(data)]() mutable {
|
||||||
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
|
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
|
||||||
AudioStreamPacket packet;
|
AudioStreamPacket packet;
|
||||||
@@ -573,9 +563,13 @@ void Application::Start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
Schedule([this, packet = std::move(packet)]() {
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
protocol_->SendAudio(packet);
|
if (audio_send_queue_.size() >= MAX_AUDIO_PACKETS_IN_QUEUE) {
|
||||||
});
|
ESP_LOGW(TAG, "Too many audio packets in queue, drop the oldest packet");
|
||||||
|
audio_send_queue_.pop_front();
|
||||||
|
}
|
||||||
|
audio_send_queue_.emplace_back(std::move(packet));
|
||||||
|
xEventGroupSetBits(event_group_, SEND_AUDIO_EVENT);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -686,11 +680,22 @@ void Application::Schedule(std::function<void()> callback) {
|
|||||||
// they should use Schedule to call this function
|
// they should use Schedule to call this function
|
||||||
void Application::MainEventLoop() {
|
void Application::MainEventLoop() {
|
||||||
while (true) {
|
while (true) {
|
||||||
auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT, pdTRUE, pdFALSE, portMAX_DELAY);
|
auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT | SEND_AUDIO_EVENT, pdTRUE, pdFALSE, portMAX_DELAY);
|
||||||
|
|
||||||
|
if (bits & SEND_AUDIO_EVENT) {
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
auto packets = std::move(audio_send_queue_);
|
||||||
|
lock.unlock();
|
||||||
|
for (auto& packet : packets) {
|
||||||
|
if (!protocol_->SendAudio(packet)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (bits & SCHEDULE_EVENT) {
|
if (bits & SCHEDULE_EVENT) {
|
||||||
std::unique_lock<std::mutex> lock(mutex_);
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
std::list<std::function<void()>> tasks = std::move(main_tasks_);
|
auto tasks = std::move(main_tasks_);
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
for (auto& task : tasks) {
|
for (auto& task : tasks) {
|
||||||
task();
|
task();
|
||||||
@@ -792,7 +797,7 @@ void Application::OnAudioInput() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vTaskDelay(pdMS_TO_TICKS(30));
|
vTaskDelay(pdMS_TO_TICKS(OPUS_FRAME_DURATION_MS / 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Application::ReadAudio(std::vector<int16_t>& data, int sample_rate, int samples) {
|
void Application::ReadAudio(std::vector<int16_t>& data, int sample_rate, int samples) {
|
||||||
|
|||||||
@@ -27,6 +27,7 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define SCHEDULE_EVENT (1 << 0)
|
#define SCHEDULE_EVENT (1 << 0)
|
||||||
|
#define SEND_AUDIO_EVENT (1 << 1)
|
||||||
#define CHECK_NEW_VERSION_DONE_EVENT (1 << 2)
|
#define CHECK_NEW_VERSION_DONE_EVENT (1 << 2)
|
||||||
|
|
||||||
enum DeviceState {
|
enum DeviceState {
|
||||||
@@ -104,6 +105,7 @@ private:
|
|||||||
TaskHandle_t audio_loop_task_handle_ = nullptr;
|
TaskHandle_t audio_loop_task_handle_ = nullptr;
|
||||||
BackgroundTask* background_task_ = nullptr;
|
BackgroundTask* background_task_ = nullptr;
|
||||||
std::chrono::steady_clock::time_point last_output_time_;
|
std::chrono::steady_clock::time_point last_output_time_;
|
||||||
|
std::list<AudioStreamPacket> audio_send_queue_;
|
||||||
std::list<AudioStreamPacket> audio_decode_queue_;
|
std::list<AudioStreamPacket> audio_decode_queue_;
|
||||||
std::condition_variable audio_decode_cv_;
|
std::condition_variable audio_decode_cv_;
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,8 @@
|
|||||||
{
|
{
|
||||||
"name": "lichuang-dev",
|
"name": "lichuang-dev",
|
||||||
"sdkconfig_append": [
|
"sdkconfig_append": [
|
||||||
"CONFIG_USE_DEVICE_AEC=y"
|
"CONFIG_USE_DEVICE_AEC=y",
|
||||||
|
"CONFIG_IOT_PROTOCOL_MCP=y"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -121,10 +121,10 @@ bool MqttProtocol::SendText(const std::string& text) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
|
bool MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
|
||||||
std::lock_guard<std::mutex> lock(channel_mutex_);
|
std::lock_guard<std::mutex> lock(channel_mutex_);
|
||||||
if (udp_ == nullptr) {
|
if (udp_ == nullptr) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string nonce(aes_nonce_);
|
std::string nonce(aes_nonce_);
|
||||||
@@ -141,10 +141,10 @@ void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
|
|||||||
if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
|
if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
|
||||||
(uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
|
(uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
|
||||||
ESP_LOGE(TAG, "Failed to encrypt audio data");
|
ESP_LOGE(TAG, "Failed to encrypt audio data");
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
udp_->Send(encrypted);
|
return udp_->Send(encrypted) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttProtocol::CloseAudioChannel() {
|
void MqttProtocol::CloseAudioChannel() {
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ public:
|
|||||||
~MqttProtocol();
|
~MqttProtocol();
|
||||||
|
|
||||||
bool Start() override;
|
bool Start() override;
|
||||||
void SendAudio(const AudioStreamPacket& packet) override;
|
bool SendAudio(const AudioStreamPacket& packet) override;
|
||||||
bool OpenAudioChannel() override;
|
bool OpenAudioChannel() override;
|
||||||
void CloseAudioChannel() override;
|
void CloseAudioChannel() override;
|
||||||
bool IsAudioChannelOpened() const override;
|
bool IsAudioChannelOpened() const override;
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ public:
|
|||||||
virtual bool OpenAudioChannel() = 0;
|
virtual bool OpenAudioChannel() = 0;
|
||||||
virtual void CloseAudioChannel() = 0;
|
virtual void CloseAudioChannel() = 0;
|
||||||
virtual bool IsAudioChannelOpened() const = 0;
|
virtual bool IsAudioChannelOpened() const = 0;
|
||||||
virtual void SendAudio(const AudioStreamPacket& packet) = 0;
|
virtual bool SendAudio(const AudioStreamPacket& packet) = 0;
|
||||||
virtual void SendWakeWordDetected(const std::string& wake_word);
|
virtual void SendWakeWordDetected(const std::string& wake_word);
|
||||||
virtual void SendStartListening(ListeningMode mode);
|
virtual void SendStartListening(ListeningMode mode);
|
||||||
virtual void SendStopListening();
|
virtual void SendStopListening();
|
||||||
|
|||||||
@@ -28,9 +28,9 @@ bool WebsocketProtocol::Start() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
|
bool WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
|
||||||
if (websocket_ == nullptr) {
|
if (websocket_ == nullptr) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (version_ == 2) {
|
if (version_ == 2) {
|
||||||
@@ -44,7 +44,7 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
|
|||||||
bp2->payload_size = htonl(packet.payload.size());
|
bp2->payload_size = htonl(packet.payload.size());
|
||||||
memcpy(bp2->payload, packet.payload.data(), packet.payload.size());
|
memcpy(bp2->payload, packet.payload.data(), packet.payload.size());
|
||||||
|
|
||||||
websocket_->Send(serialized.data(), serialized.size(), true);
|
return websocket_->Send(serialized.data(), serialized.size(), true);
|
||||||
} else if (version_ == 3) {
|
} else if (version_ == 3) {
|
||||||
std::string serialized;
|
std::string serialized;
|
||||||
serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size());
|
serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size());
|
||||||
@@ -54,9 +54,9 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
|
|||||||
bp3->payload_size = htons(packet.payload.size());
|
bp3->payload_size = htons(packet.payload.size());
|
||||||
memcpy(bp3->payload, packet.payload.data(), packet.payload.size());
|
memcpy(bp3->payload, packet.payload.data(), packet.payload.size());
|
||||||
|
|
||||||
websocket_->Send(serialized.data(), serialized.size(), true);
|
return websocket_->Send(serialized.data(), serialized.size(), true);
|
||||||
} else {
|
} else {
|
||||||
websocket_->Send(packet.payload.data(), packet.payload.size(), true);
|
return websocket_->Send(packet.payload.data(), packet.payload.size(), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ public:
|
|||||||
~WebsocketProtocol();
|
~WebsocketProtocol();
|
||||||
|
|
||||||
bool Start() override;
|
bool Start() override;
|
||||||
void SendAudio(const AudioStreamPacket& packet) override;
|
bool SendAudio(const AudioStreamPacket& packet) override;
|
||||||
bool OpenAudioChannel() override;
|
bool OpenAudioChannel() override;
|
||||||
void CloseAudioChannel() override;
|
void CloseAudioChannel() override;
|
||||||
bool IsAudioChannelOpened() const override;
|
bool IsAudioChannelOpened() const override;
|
||||||
|
|||||||
Reference in New Issue
Block a user