diff --git a/main/application.cc b/main/application.cc index aff6dd19..61ab7f20 100644 --- a/main/application.cc +++ b/main/application.cc @@ -543,16 +543,6 @@ void Application::Start() { audio_processor_->Initialize(codec); audio_processor_->OnOutput([this](std::vector&& data) { - { - std::lock_guard lock(mutex_); - // We do not have a send queue yet, but all packets are sent by the main task - // so we use the main task queue to limit the number of packets - if (main_tasks_.size() > MAX_AUDIO_PACKETS_IN_QUEUE) { - ESP_LOGW(TAG, "Too many main tasks = %u, skip sending audio...", main_tasks_.size()); - return; - } - } - background_task_->Schedule([this, data = std::move(data)]() mutable { opus_encoder_->Encode(std::move(data), [this](std::vector&& opus) { AudioStreamPacket packet; @@ -573,9 +563,13 @@ void Application::Start() { } } #endif - Schedule([this, packet = std::move(packet)]() { - protocol_->SendAudio(packet); - }); + std::lock_guard lock(mutex_); + if (audio_send_queue_.size() >= MAX_AUDIO_PACKETS_IN_QUEUE) { + ESP_LOGW(TAG, "Too many audio packets in queue, drop the oldest packet"); + audio_send_queue_.pop_front(); + } + audio_send_queue_.emplace_back(std::move(packet)); + xEventGroupSetBits(event_group_, SEND_AUDIO_EVENT); }); }); }); @@ -686,11 +680,22 @@ void Application::Schedule(std::function callback) { // they should use Schedule to call this function void Application::MainEventLoop() { while (true) { - auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT, pdTRUE, pdFALSE, portMAX_DELAY); + auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT | SEND_AUDIO_EVENT, pdTRUE, pdFALSE, portMAX_DELAY); + + if (bits & SEND_AUDIO_EVENT) { + std::unique_lock lock(mutex_); + auto packets = std::move(audio_send_queue_); + lock.unlock(); + for (auto& packet : packets) { + if (!protocol_->SendAudio(packet)) { + break; + } + } + } if (bits & SCHEDULE_EVENT) { std::unique_lock lock(mutex_); - std::list> tasks = std::move(main_tasks_); + auto tasks = std::move(main_tasks_); lock.unlock(); for (auto& task : tasks) { task(); @@ -792,7 +797,7 @@ void Application::OnAudioInput() { } } - vTaskDelay(pdMS_TO_TICKS(30)); + vTaskDelay(pdMS_TO_TICKS(OPUS_FRAME_DURATION_MS / 2)); } void Application::ReadAudio(std::vector& data, int sample_rate, int samples) { diff --git a/main/application.h b/main/application.h index e10ae8a7..66a6f930 100644 --- a/main/application.h +++ b/main/application.h @@ -27,6 +27,7 @@ #endif #define SCHEDULE_EVENT (1 << 0) +#define SEND_AUDIO_EVENT (1 << 1) #define CHECK_NEW_VERSION_DONE_EVENT (1 << 2) enum DeviceState { @@ -104,6 +105,7 @@ private: TaskHandle_t audio_loop_task_handle_ = nullptr; BackgroundTask* background_task_ = nullptr; std::chrono::steady_clock::time_point last_output_time_; + std::list audio_send_queue_; std::list audio_decode_queue_; std::condition_variable audio_decode_cv_; diff --git a/main/boards/lichuang-dev/config.json b/main/boards/lichuang-dev/config.json index e2a7090e..7a756ee4 100644 --- a/main/boards/lichuang-dev/config.json +++ b/main/boards/lichuang-dev/config.json @@ -4,7 +4,8 @@ { "name": "lichuang-dev", "sdkconfig_append": [ - "CONFIG_USE_DEVICE_AEC=y" + "CONFIG_USE_DEVICE_AEC=y", + "CONFIG_IOT_PROTOCOL_MCP=y" ] } ] diff --git a/main/protocols/mqtt_protocol.cc b/main/protocols/mqtt_protocol.cc index d8b12b59..bf96b12d 100644 --- a/main/protocols/mqtt_protocol.cc +++ b/main/protocols/mqtt_protocol.cc @@ -121,10 +121,10 @@ bool MqttProtocol::SendText(const std::string& text) { return true; } -void MqttProtocol::SendAudio(const AudioStreamPacket& packet) { +bool MqttProtocol::SendAudio(const AudioStreamPacket& packet) { std::lock_guard lock(channel_mutex_); if (udp_ == nullptr) { - return; + return false; } std::string nonce(aes_nonce_); @@ -141,10 +141,10 @@ void MqttProtocol::SendAudio(const AudioStreamPacket& packet) { if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block, (uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) { ESP_LOGE(TAG, "Failed to encrypt audio data"); - return; + return false; } - udp_->Send(encrypted); + return udp_->Send(encrypted) > 0; } void MqttProtocol::CloseAudioChannel() { diff --git a/main/protocols/mqtt_protocol.h b/main/protocols/mqtt_protocol.h index d7d712de..d85bf684 100644 --- a/main/protocols/mqtt_protocol.h +++ b/main/protocols/mqtt_protocol.h @@ -26,7 +26,7 @@ public: ~MqttProtocol(); bool Start() override; - void SendAudio(const AudioStreamPacket& packet) override; + bool SendAudio(const AudioStreamPacket& packet) override; bool OpenAudioChannel() override; void CloseAudioChannel() override; bool IsAudioChannelOpened() const override; diff --git a/main/protocols/protocol.h b/main/protocols/protocol.h index 210d6567..e23561ee 100644 --- a/main/protocols/protocol.h +++ b/main/protocols/protocol.h @@ -63,7 +63,7 @@ public: virtual bool OpenAudioChannel() = 0; virtual void CloseAudioChannel() = 0; virtual bool IsAudioChannelOpened() const = 0; - virtual void SendAudio(const AudioStreamPacket& packet) = 0; + virtual bool SendAudio(const AudioStreamPacket& packet) = 0; virtual void SendWakeWordDetected(const std::string& wake_word); virtual void SendStartListening(ListeningMode mode); virtual void SendStopListening(); diff --git a/main/protocols/websocket_protocol.cc b/main/protocols/websocket_protocol.cc index 10aa4e9c..45c00957 100644 --- a/main/protocols/websocket_protocol.cc +++ b/main/protocols/websocket_protocol.cc @@ -28,9 +28,9 @@ bool WebsocketProtocol::Start() { return true; } -void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) { +bool WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) { if (websocket_ == nullptr) { - return; + return false; } if (version_ == 2) { @@ -44,7 +44,7 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) { bp2->payload_size = htonl(packet.payload.size()); memcpy(bp2->payload, packet.payload.data(), packet.payload.size()); - websocket_->Send(serialized.data(), serialized.size(), true); + return websocket_->Send(serialized.data(), serialized.size(), true); } else if (version_ == 3) { std::string serialized; serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size()); @@ -54,9 +54,9 @@ void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) { bp3->payload_size = htons(packet.payload.size()); memcpy(bp3->payload, packet.payload.data(), packet.payload.size()); - websocket_->Send(serialized.data(), serialized.size(), true); + return websocket_->Send(serialized.data(), serialized.size(), true); } else { - websocket_->Send(packet.payload.data(), packet.payload.size(), true); + return websocket_->Send(packet.payload.data(), packet.payload.size(), true); } } diff --git a/main/protocols/websocket_protocol.h b/main/protocols/websocket_protocol.h index 9d16fe18..3b79ef88 100644 --- a/main/protocols/websocket_protocol.h +++ b/main/protocols/websocket_protocol.h @@ -16,7 +16,7 @@ public: ~WebsocketProtocol(); bool Start() override; - void SendAudio(const AudioStreamPacket& packet) override; + bool SendAudio(const AudioStreamPacket& packet) override; bool OpenAudioChannel() override; void CloseAudioChannel() override; bool IsAudioChannelOpened() const override;