From efc6f238e73b864db71939fc7ca93a67558c696f Mon Sep 17 00:00:00 2001 From: Terrence Date: Sun, 20 Jul 2025 03:57:36 +0800 Subject: [PATCH] Fix Server AEC --- main/audio/audio_processor.h | 4 +- main/audio/audio_service.cc | 72 +++++++++++++------- main/audio/audio_service.h | 2 + main/audio/processors/afe_audio_processor.cc | 29 +++++++- main/audio/processors/afe_audio_processor.h | 7 +- main/audio/processors/no_audio_processor.cc | 29 ++++++-- main/audio/processors/no_audio_processor.h | 6 +- 7 files changed, 109 insertions(+), 40 deletions(-) diff --git a/main/audio/audio_processor.h b/main/audio/audio_processor.h index 3c729e06..266bf99a 100644 --- a/main/audio/audio_processor.h +++ b/main/audio/audio_processor.h @@ -11,8 +11,8 @@ class AudioProcessor { public: virtual ~AudioProcessor() = default; - virtual void Initialize(AudioCodec* codec) = 0; - virtual void Feed(const std::vector& data) = 0; + virtual void Initialize(AudioCodec* codec, int frame_duration_ms) = 0; + virtual void Feed(std::vector&& data) = 0; virtual void Start() = 0; virtual void Stop() = 0; virtual bool IsRunning() = 0; diff --git a/main/audio/audio_service.cc b/main/audio/audio_service.cc index e43cef32..2a22259d 100644 --- a/main/audio/audio_service.cc +++ b/main/audio/audio_service.cc @@ -43,7 +43,6 @@ void AudioService::Initialize(AudioCodec* codec) { reference_resampler_.Configure(codec->input_sample_rate(), 16000); } - audio_debugger_ = std::make_unique(); #if CONFIG_USE_AUDIO_PROCESSOR audio_processor_ = std::make_unique(); #else @@ -118,7 +117,7 @@ void AudioService::Start() { AudioService* audio_service = (AudioService*)arg; audio_service->AudioOutputTask(); vTaskDelete(NULL); - }, "audio_output", 2048, this, 3, &audio_output_task_handle_); + }, "audio_output", 4096, this, 3, &audio_output_task_handle_); /* Start the opus codec task */ xTaskCreate([](void* arg) { @@ -185,12 +184,15 @@ bool AudioService::ReadAudioData(std::vector& data, int sample_rate, in /* Update the last input time */ last_input_time_ = std::chrono::steady_clock::now(); debug_statistics_.input_count++; - + +#if CONFIG_USE_AUDIO_DEBUGGER // 音频调试:发送原始音频数据 - if (audio_debugger_) { - audio_debugger_->Feed(data); + if (audio_debugger_ == nullptr) { + audio_debugger_ = std::make_unique(); } - + audio_debugger_->Feed(data); +#endif + return true; } @@ -250,7 +252,7 @@ void AudioService::AudioInputTask() { int samples = audio_processor_->GetFeedSize(); if (samples > 0) { if (ReadAudioData(data, 16000, samples)) { - audio_processor_->Feed(data); + audio_processor_->Feed(std::move(data)); continue; } } @@ -285,6 +287,12 @@ void AudioService::AudioOutputTask() { /* Update the last output time */ last_output_time_ = std::chrono::steady_clock::now(); debug_statistics_.playback_count++; + + /* Record the timestamp */ + if (task->timestamp > 0) { + lock.lock(); + timestamp_queue_.push_back(task->timestamp); + } } ESP_LOGW(TAG, "Audio output task stopped"); @@ -311,6 +319,7 @@ void AudioService::OpusCodecTask() { auto task = std::make_unique(); task->type = kAudioTaskTypeDecodeToPlaybackQueue; + task->timestamp = packet->timestamp; SetDecodeSampleRate(packet->sample_rate, packet->frame_duration); if (opus_decoder_->Decode(std::move(packet->payload), task->pcm)) { @@ -338,25 +347,28 @@ void AudioService::OpusCodecTask() { audio_encode_queue_.pop_front(); audio_queue_cv_.notify_all(); lock.unlock(); - opus_encoder_->Encode(std::move(task->pcm), [this, &task](std::vector&& opus) { - auto packet = std::make_unique(); - packet->payload = std::move(opus); - packet->frame_duration = OPUS_FRAME_DURATION_MS; - packet->sample_rate = 16000; - if (task->type == kAudioTaskTypeEncodeToSendQueue) { - { - std::lock_guard lock(audio_queue_mutex_); - audio_send_queue_.push_back(std::move(packet)); - } - if (callbacks_.on_send_queue_available) { - callbacks_.on_send_queue_available(); - } - } else if (task->type == kAudioTaskTypeEncodeToTestingQueue) { + auto packet = std::make_unique(); + packet->frame_duration = OPUS_FRAME_DURATION_MS; + packet->sample_rate = 16000; + packet->timestamp = task->timestamp; + if (!opus_encoder_->Encode(std::move(task->pcm), packet->payload)) { + ESP_LOGE(TAG, "Failed to encode audio"); + continue; + } + + if (task->type == kAudioTaskTypeEncodeToSendQueue) { + { std::lock_guard lock(audio_queue_mutex_); - audio_testing_queue_.push_back(std::move(packet)); + audio_send_queue_.push_back(std::move(packet)); } - }); + if (callbacks_.on_send_queue_available) { + callbacks_.on_send_queue_available(); + } + } else if (task->type == kAudioTaskTypeEncodeToTestingQueue) { + std::lock_guard lock(audio_queue_mutex_); + audio_testing_queue_.push_back(std::move(packet)); + } debug_statistics_.encode_count++; lock.lock(); } @@ -387,6 +399,17 @@ void AudioService::PushTaskToEncodeQueue(AudioTaskType type, std::vector lock(audio_queue_mutex_); + + /* If the task is to send queue, we need to set the timestamp */ + if (type == kAudioTaskTypeEncodeToSendQueue && !timestamp_queue_.empty()) { + if (timestamp_queue_.size() <= MAX_TIMESTAMPS_IN_QUEUE) { + task->timestamp = timestamp_queue_.front(); + } else { + ESP_LOGW(TAG, "Timestamp queue is full, dropping timestamp"); + } + timestamp_queue_.pop_front(); + } + audio_queue_cv_.wait(lock, [this]() { return audio_encode_queue_.size() < MAX_ENCODE_TASKS_IN_QUEUE; }); audio_encode_queue_.push_back(std::move(task)); audio_queue_cv_.notify_all(); @@ -458,7 +481,7 @@ void AudioService::EnableVoiceProcessing(bool enable) { ESP_LOGD(TAG, "%s voice processing", enable ? "Enabling" : "Disabling"); if (enable) { if (!audio_processor_initialized_) { - audio_processor_->Initialize(codec_); + audio_processor_->Initialize(codec_, OPUS_FRAME_DURATION_MS); audio_processor_initialized_ = true; } @@ -522,6 +545,7 @@ bool AudioService::IsIdle() { void AudioService::ResetDecoder() { std::lock_guard lock(audio_queue_mutex_); opus_decoder_->ResetState(); + timestamp_queue_.clear(); audio_decode_queue_.clear(); audio_playback_queue_.clear(); audio_testing_queue_.clear(); diff --git a/main/audio/audio_service.h b/main/audio/audio_service.h index 95cabacf..10d1c670 100644 --- a/main/audio/audio_service.h +++ b/main/audio/audio_service.h @@ -40,6 +40,7 @@ #define MAX_DECODE_PACKETS_IN_QUEUE (2400 / OPUS_FRAME_DURATION_MS) #define MAX_SEND_PACKETS_IN_QUEUE (2400 / OPUS_FRAME_DURATION_MS) #define AUDIO_TESTING_MAX_DURATION_MS 10000 +#define MAX_TIMESTAMPS_IN_QUEUE 3 #define AUDIO_POWER_TIMEOUT_MS 15000 #define AUDIO_POWER_CHECK_INTERVAL_MS 1000 @@ -67,6 +68,7 @@ enum AudioTaskType { struct AudioTask { AudioTaskType type; std::vector pcm; + uint32_t timestamp; }; struct DebugStatistics { diff --git a/main/audio/processors/afe_audio_processor.cc b/main/audio/processors/afe_audio_processor.cc index d853130c..4dcd63fe 100644 --- a/main/audio/processors/afe_audio_processor.cc +++ b/main/audio/processors/afe_audio_processor.cc @@ -10,8 +10,14 @@ AfeAudioProcessor::AfeAudioProcessor() event_group_ = xEventGroupCreate(); } -void AfeAudioProcessor::Initialize(AudioCodec* codec) { +void AfeAudioProcessor::Initialize(AudioCodec* codec, int frame_duration_ms) { codec_ = codec; + frame_duration_ms_ = frame_duration_ms; + frame_samples_ = frame_duration_ms_ * codec_->input_sample_rate() / 1000; + + // Pre-allocate output buffer capacity + output_buffer_.reserve(frame_samples_); + int ref_num = codec_->input_reference() ? 1 : 0; std::string input_format; @@ -79,7 +85,7 @@ size_t AfeAudioProcessor::GetFeedSize() { return afe_iface_->get_feed_chunksize(afe_data_) * codec_->input_channels(); } -void AfeAudioProcessor::Feed(const std::vector& data) { +void AfeAudioProcessor::Feed(std::vector&& data) { if (afe_data_ == nullptr) { return; } @@ -141,7 +147,24 @@ void AfeAudioProcessor::AudioProcessorTask() { } if (output_callback_) { - output_callback_(std::vector(res->data, res->data + res->data_size / sizeof(int16_t))); + size_t samples = res->data_size / sizeof(int16_t); + + // Add data to buffer + output_buffer_.insert(output_buffer_.end(), res->data, res->data + samples); + + // Output complete frames when buffer has enough data + while (output_buffer_.size() >= frame_samples_) { + if (output_buffer_.size() == frame_samples_) { + // If buffer size equals frame size, move the entire buffer + output_callback_(std::move(output_buffer_)); + output_buffer_.clear(); + output_buffer_.reserve(frame_samples_); + } else { + // If buffer size exceeds frame size, copy one frame and remove it + output_callback_(std::vector(output_buffer_.begin(), output_buffer_.begin() + frame_samples_)); + output_buffer_.erase(output_buffer_.begin(), output_buffer_.begin() + frame_samples_); + } + } } } } diff --git a/main/audio/processors/afe_audio_processor.h b/main/audio/processors/afe_audio_processor.h index 04bfcbe7..345fd5fe 100644 --- a/main/audio/processors/afe_audio_processor.h +++ b/main/audio/processors/afe_audio_processor.h @@ -18,8 +18,8 @@ public: AfeAudioProcessor(); ~AfeAudioProcessor(); - void Initialize(AudioCodec* codec) override; - void Feed(const std::vector& data) override; + void Initialize(AudioCodec* codec, int frame_duration_ms) override; + void Feed(std::vector&& data) override; void Start() override; void Stop() override; bool IsRunning() override; @@ -35,7 +35,10 @@ private: std::function&& data)> output_callback_; std::function vad_state_change_callback_; AudioCodec* codec_ = nullptr; + int frame_duration_ms_ = 0; + int frame_samples_ = 0; bool is_speaking_ = false; + std::vector output_buffer_; void AudioProcessorTask(); }; diff --git a/main/audio/processors/no_audio_processor.cc b/main/audio/processors/no_audio_processor.cc index a84f8af1..4134bc75 100644 --- a/main/audio/processors/no_audio_processor.cc +++ b/main/audio/processors/no_audio_processor.cc @@ -3,16 +3,32 @@ #define TAG "NoAudioProcessor" -void NoAudioProcessor::Initialize(AudioCodec* codec) { - codec_ = codec; +void NoAudioProcessor::Initialize(AudioCodec* codec, int frame_duration_ms) : + codec_(codec), + frame_duration_ms_(frame_duration_ms) { + frame_samples_ = frame_duration_ms_ * codec_->input_sample_rate() / 1000; } -void NoAudioProcessor::Feed(const std::vector& data) { +void NoAudioProcessor::Feed(std::vector&& data) { if (!is_running_ || !output_callback_) { return; } - // 直接将输入数据传递给输出回调 - output_callback_(std::vector(data)); + + if (data.size() != frame_samples_) { + ESP_LOGE(TAG, "Feed data size is not equal to frame size, feed size: %u, frame size: %u", data.size(), frame_samples_); + return; + } + + if (codec_->input_channels() == 2) { + // If input channels is 2, we need to fetch the left channel data + auto mono_data = std::vector(data.size() / 2); + for (size_t i = 0, j = 0; i < mono_data.size(); ++i, j += 2) { + mono_data[i] = data[j]; + } + output_callback_(std::move(mono_data)); + } else { + output_callback_(std::move(data)); + } } void NoAudioProcessor::Start() { @@ -39,8 +55,7 @@ size_t NoAudioProcessor::GetFeedSize() { if (!codec_) { return 0; } - // 返回一个固定的帧大小,比如 30ms 的数据 - return 30 * codec_->input_sample_rate() / 1000; + return frame_samples_; } void NoAudioProcessor::EnableDeviceAec(bool enable) { diff --git a/main/audio/processors/no_audio_processor.h b/main/audio/processors/no_audio_processor.h index ed54741a..3b20e92e 100644 --- a/main/audio/processors/no_audio_processor.h +++ b/main/audio/processors/no_audio_processor.h @@ -12,8 +12,8 @@ public: NoAudioProcessor() = default; ~NoAudioProcessor() = default; - void Initialize(AudioCodec* codec) override; - void Feed(const std::vector& data) override; + void Initialize(AudioCodec* codec, int frame_duration_ms) override; + void Feed(std::vector&& data) override; void Start() override; void Stop() override; bool IsRunning() override; @@ -24,6 +24,8 @@ public: private: AudioCodec* codec_ = nullptr; + int frame_duration_ms_ = 0; + int frame_samples_ = 0; std::function&& data)> output_callback_; std::function vad_state_change_callback_; bool is_running_ = false;