Audio stream packet with timestamp

This commit is contained in:
Xiaoxia
2025-04-28 23:10:24 +08:00
parent 7fdf78408e
commit b804343d30
8 changed files with 70 additions and 51 deletions

View File

@@ -245,13 +245,13 @@ void Application::PlaySound(const std::string_view& sound) {
p += sizeof(BinaryProtocol3); p += sizeof(BinaryProtocol3);
auto payload_size = ntohs(p3->payload_size); auto payload_size = ntohs(p3->payload_size);
std::vector<uint8_t> opus; AudioStreamPacket packet;
opus.resize(payload_size); packet.payload.resize(payload_size);
memcpy(opus.data(), p3->payload, payload_size); memcpy(packet.payload.data(), p3->payload, payload_size);
p += payload_size; p += payload_size;
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
audio_decode_queue_.emplace_back(std::move(opus)); audio_decode_queue_.emplace_back(std::move(packet));
} }
} }
@@ -391,11 +391,11 @@ void Application::Start() {
SetDeviceState(kDeviceStateIdle); SetDeviceState(kDeviceStateIdle);
Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION); Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION);
}); });
protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) { protocol_->OnIncomingAudio([this](AudioStreamPacket&& packet) {
const int max_packets_in_queue = 600 / OPUS_FRAME_DURATION_MS; const int max_packets_in_queue = 600 / OPUS_FRAME_DURATION_MS;
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (audio_decode_queue_.size() < max_packets_in_queue) { if (audio_decode_queue_.size() < max_packets_in_queue) {
audio_decode_queue_.emplace_back(std::move(data)); audio_decode_queue_.emplace_back(std::move(packet));
} }
}); });
protocol_->OnAudioChannelOpened([this, codec, &board]() { protocol_->OnAudioChannelOpened([this, codec, &board]() {
@@ -510,8 +510,12 @@ void Application::Start() {
return; return;
} }
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) { opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
Schedule([this, opus = std::move(opus)]() { AudioStreamPacket packet;
protocol_->SendAudio(opus); packet.payload = std::move(opus);
packet.timestamp = last_output_timestamp_;
last_output_timestamp_ = 0;
Schedule([this, packet = std::move(packet)]() {
protocol_->SendAudio(packet);
}); });
}); });
}); });
@@ -544,10 +548,10 @@ void Application::Start() {
return; return;
} }
std::vector<uint8_t> opus; AudioStreamPacket packet;
// Encode and send the wake word data to the server // Encode and send the wake word data to the server
while (wake_word_detect_.GetWakeWordOpus(opus)) { while (wake_word_detect_.GetWakeWordOpus(packet.payload)) {
protocol_->SendAudio(opus); protocol_->SendAudio(packet);
} }
// Set the chat state to wake word detected // Set the chat state to wake word detected
protocol_->SendWakeWordDetected(wake_word); protocol_->SendWakeWordDetected(wake_word);
@@ -671,20 +675,20 @@ void Application::OnAudioOutput() {
return; return;
} }
auto opus = std::move(audio_decode_queue_.front()); auto packet = std::move(audio_decode_queue_.front());
audio_decode_queue_.pop_front(); audio_decode_queue_.pop_front();
lock.unlock(); lock.unlock();
audio_decode_cv_.notify_all(); audio_decode_cv_.notify_all();
busy_decoding_audio_ = true; busy_decoding_audio_ = true;
background_task_->Schedule([this, codec, opus = std::move(opus)]() mutable { background_task_->Schedule([this, codec, packet = std::move(packet)]() mutable {
busy_decoding_audio_ = false; busy_decoding_audio_ = false;
if (aborted_) { if (aborted_) {
return; return;
} }
std::vector<int16_t> pcm; std::vector<int16_t> pcm;
if (!opus_decoder_->Decode(std::move(opus), pcm)) { if (!opus_decoder_->Decode(std::move(packet.payload), pcm)) {
return; return;
} }
// Resample if the sample rate is different // Resample if the sample rate is different
@@ -695,6 +699,7 @@ void Application::OnAudioOutput() {
pcm = std::move(resampled); pcm = std::move(resampled);
} }
codec->OutputData(pcm); codec->OutputData(pcm);
last_output_timestamp_ = packet.timestamp;
last_output_time_ = std::chrono::steady_clock::now(); last_output_time_ = std::chrono::steady_clock::now();
}); });
} }
@@ -730,8 +735,12 @@ void Application::OnAudioInput() {
return; return;
} }
opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) { opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) {
Schedule([this, opus = std::move(opus)]() { AudioStreamPacket packet;
protocol_->SendAudio(opus); packet.payload = std::move(opus);
packet.timestamp = last_output_timestamp_;
last_output_timestamp_ = 0;
Schedule([this, packet = std::move(packet)]() {
protocol_->SendAudio(packet);
}); });
}); });
}); });

View File

@@ -107,7 +107,8 @@ private:
TaskHandle_t audio_loop_task_handle_ = nullptr; TaskHandle_t audio_loop_task_handle_ = nullptr;
BackgroundTask* background_task_ = nullptr; BackgroundTask* background_task_ = nullptr;
std::chrono::steady_clock::time_point last_output_time_; std::chrono::steady_clock::time_point last_output_time_;
std::list<std::vector<uint8_t>> audio_decode_queue_; std::atomic<uint32_t> last_output_timestamp_ = 0;
std::list<AudioStreamPacket> audio_decode_queue_;
std::condition_variable audio_decode_cv_; std::condition_variable audio_decode_cv_;
std::unique_ptr<OpusEncoderWrapper> opus_encoder_; std::unique_ptr<OpusEncoderWrapper> opus_encoder_;

View File

@@ -121,24 +121,24 @@ bool MqttProtocol::SendText(const std::string& text) {
return true; return true;
} }
void MqttProtocol::SendAudio(const std::vector<uint8_t>& data) { void MqttProtocol::SendAudio(const AudioStreamPacket& packet) {
std::lock_guard<std::mutex> lock(channel_mutex_); std::lock_guard<std::mutex> lock(channel_mutex_);
if (udp_ == nullptr) { if (udp_ == nullptr) {
return; return;
} }
std::string nonce(aes_nonce_); std::string nonce(aes_nonce_);
*(uint16_t*)&nonce[2] = htons(data.size()); *(uint16_t*)&nonce[2] = htons(packet.payload.size());
*(uint32_t*)&nonce[12] = htonl(++local_sequence_); *(uint32_t*)&nonce[12] = htonl(++local_sequence_);
std::string encrypted; std::string encrypted;
encrypted.resize(aes_nonce_.size() + data.size()); encrypted.resize(aes_nonce_.size() + packet.payload.size());
memcpy(encrypted.data(), nonce.data(), nonce.size()); memcpy(encrypted.data(), nonce.data(), nonce.size());
size_t nc_off = 0; size_t nc_off = 0;
uint8_t stream_block[16] = {0}; uint8_t stream_block[16] = {0};
if (mbedtls_aes_crypt_ctr(&aes_ctx_, data.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block, if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet.payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
(uint8_t*)data.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) { (uint8_t*)packet.payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
ESP_LOGE(TAG, "Failed to encrypt audio data"); ESP_LOGE(TAG, "Failed to encrypt audio data");
return; return;
} }
@@ -229,20 +229,20 @@ bool MqttProtocol::OpenAudioChannel() {
ESP_LOGW(TAG, "Received audio packet with wrong sequence: %lu, expected: %lu", sequence, remote_sequence_ + 1); ESP_LOGW(TAG, "Received audio packet with wrong sequence: %lu, expected: %lu", sequence, remote_sequence_ + 1);
} }
std::vector<uint8_t> decrypted;
size_t decrypted_size = data.size() - aes_nonce_.size(); size_t decrypted_size = data.size() - aes_nonce_.size();
size_t nc_off = 0; size_t nc_off = 0;
uint8_t stream_block[16] = {0}; uint8_t stream_block[16] = {0};
decrypted.resize(decrypted_size);
auto nonce = (uint8_t*)data.data(); auto nonce = (uint8_t*)data.data();
auto encrypted = (uint8_t*)data.data() + aes_nonce_.size(); auto encrypted = (uint8_t*)data.data() + aes_nonce_.size();
int ret = mbedtls_aes_crypt_ctr(&aes_ctx_, decrypted_size, &nc_off, nonce, stream_block, encrypted, (uint8_t*)decrypted.data()); AudioStreamPacket packet;
packet.payload.resize(decrypted_size);
int ret = mbedtls_aes_crypt_ctr(&aes_ctx_, decrypted_size, &nc_off, nonce, stream_block, encrypted, (uint8_t*)packet.payload.data());
if (ret != 0) { if (ret != 0) {
ESP_LOGE(TAG, "Failed to decrypt audio data, ret: %d", ret); ESP_LOGE(TAG, "Failed to decrypt audio data, ret: %d", ret);
return; return;
} }
if (on_incoming_audio_ != nullptr) { if (on_incoming_audio_ != nullptr) {
on_incoming_audio_(std::move(decrypted)); on_incoming_audio_(std::move(packet));
} }
remote_sequence_ = sequence; remote_sequence_ = sequence;
last_incoming_time_ = std::chrono::steady_clock::now(); last_incoming_time_ = std::chrono::steady_clock::now();

View File

@@ -26,7 +26,7 @@ public:
~MqttProtocol(); ~MqttProtocol();
bool Start() override; bool Start() override;
void SendAudio(const std::vector<uint8_t>& data) override; void SendAudio(const AudioStreamPacket& packet) override;
bool OpenAudioChannel() override; bool OpenAudioChannel() override;
void CloseAudioChannel() override; void CloseAudioChannel() override;
bool IsAudioChannelOpened() const override; bool IsAudioChannelOpened() const override;

View File

@@ -8,7 +8,7 @@ void Protocol::OnIncomingJson(std::function<void(const cJSON* root)> callback) {
on_incoming_json_ = callback; on_incoming_json_ = callback;
} }
void Protocol::OnIncomingAudio(std::function<void(std::vector<uint8_t>&& data)> callback) { void Protocol::OnIncomingAudio(std::function<void(AudioStreamPacket&& packet)> callback) {
on_incoming_audio_ = callback; on_incoming_audio_ = callback;
} }

View File

@@ -8,7 +8,7 @@
#include <vector> #include <vector>
struct AudioStreamPacket { struct AudioStreamPacket {
uint32_t timestamp; uint32_t timestamp = 0;
std::vector<uint8_t> payload; std::vector<uint8_t> payload;
}; };
@@ -53,7 +53,7 @@ public:
return session_id_; return session_id_;
} }
void OnIncomingAudio(std::function<void(std::vector<uint8_t>&& data)> callback); void OnIncomingAudio(std::function<void(AudioStreamPacket&& packet)> callback);
void OnIncomingJson(std::function<void(const cJSON* root)> callback); void OnIncomingJson(std::function<void(const cJSON* root)> callback);
void OnAudioChannelOpened(std::function<void()> callback); void OnAudioChannelOpened(std::function<void()> callback);
void OnAudioChannelClosed(std::function<void()> callback); void OnAudioChannelClosed(std::function<void()> callback);
@@ -64,7 +64,7 @@ public:
virtual void CloseAudioChannel() = 0; virtual void CloseAudioChannel() = 0;
virtual bool IsAudioChannelOpened() const = 0; virtual bool IsAudioChannelOpened() const = 0;
virtual bool IsAudioChannelBusy() const; virtual bool IsAudioChannelBusy() const;
virtual void SendAudio(const std::vector<uint8_t>& data) = 0; virtual void SendAudio(const AudioStreamPacket& packet) = 0;
virtual void SendWakeWordDetected(const std::string& wake_word); virtual void SendWakeWordDetected(const std::string& wake_word);
virtual void SendStartListening(ListeningMode mode); virtual void SendStartListening(ListeningMode mode);
virtual void SendStopListening(); virtual void SendStopListening();
@@ -74,7 +74,7 @@ public:
protected: protected:
std::function<void(const cJSON* root)> on_incoming_json_; std::function<void(const cJSON* root)> on_incoming_json_;
std::function<void(std::vector<uint8_t>&& data)> on_incoming_audio_; std::function<void(AudioStreamPacket&& packet)> on_incoming_audio_;
std::function<void()> on_audio_channel_opened_; std::function<void()> on_audio_channel_opened_;
std::function<void()> on_audio_channel_closed_; std::function<void()> on_audio_channel_closed_;
std::function<void(const std::string& message)> on_network_error_; std::function<void(const std::string& message)> on_network_error_;

View File

@@ -28,40 +28,40 @@ bool WebsocketProtocol::Start() {
return true; return true;
} }
void WebsocketProtocol::SendAudio(const std::vector<uint8_t>& data) { void WebsocketProtocol::SendAudio(const AudioStreamPacket& packet) {
if (websocket_ == nullptr) { if (websocket_ == nullptr) {
return; return;
} }
if (version_ == 2) { if (version_ == 2) {
std::string packet; std::string serialized;
packet.resize(sizeof(BinaryProtocol2) + data.size()); serialized.resize(sizeof(BinaryProtocol2) + packet.payload.size());
auto bp2 = (BinaryProtocol2*)packet.data(); auto bp2 = (BinaryProtocol2*)serialized.data();
bp2->version = htons(version_); bp2->version = htons(version_);
bp2->type = 0; bp2->type = 0;
bp2->reserved = 0; bp2->reserved = 0;
bp2->timestamp = htonl(0); bp2->timestamp = htonl(packet.timestamp);
bp2->payload_size = htonl(data.size()); bp2->payload_size = htonl(packet.payload.size());
memcpy(bp2->payload, data.data(), data.size()); memcpy(bp2->payload, packet.payload.data(), packet.payload.size());
busy_sending_audio_ = true; busy_sending_audio_ = true;
websocket_->Send(packet.data(), packet.size(), true); websocket_->Send(serialized.data(), serialized.size(), true);
busy_sending_audio_ = false; busy_sending_audio_ = false;
} else if (version_ == 3) { } else if (version_ == 3) {
std::string packet; std::string serialized;
packet.resize(sizeof(BinaryProtocol3) + data.size()); serialized.resize(sizeof(BinaryProtocol3) + packet.payload.size());
auto bp3 = (BinaryProtocol3*)packet.data(); auto bp3 = (BinaryProtocol3*)serialized.data();
bp3->type = 0; bp3->type = 0;
bp3->reserved = 0; bp3->reserved = 0;
bp3->payload_size = htons(data.size()); bp3->payload_size = htons(packet.payload.size());
memcpy(bp3->payload, data.data(), data.size()); memcpy(bp3->payload, packet.payload.data(), packet.payload.size());
busy_sending_audio_ = true; busy_sending_audio_ = true;
websocket_->Send(packet.data(), packet.size(), true); websocket_->Send(serialized.data(), serialized.size(), true);
busy_sending_audio_ = false; busy_sending_audio_ = false;
} else { } else {
busy_sending_audio_ = true; busy_sending_audio_ = true;
websocket_->Send(data.data(), data.size(), true); websocket_->Send(packet.payload.data(), packet.payload.size(), true);
busy_sending_audio_ = false; busy_sending_audio_ = false;
} }
} }
@@ -130,15 +130,24 @@ bool WebsocketProtocol::OpenAudioChannel() {
bp2->timestamp = ntohl(bp2->timestamp); bp2->timestamp = ntohl(bp2->timestamp);
bp2->payload_size = ntohl(bp2->payload_size); bp2->payload_size = ntohl(bp2->payload_size);
auto payload = (uint8_t*)bp2->payload; auto payload = (uint8_t*)bp2->payload;
on_incoming_audio_(std::vector<uint8_t>(payload, payload + bp2->payload_size)); on_incoming_audio_(AudioStreamPacket{
.timestamp = bp2->timestamp,
.payload = std::vector<uint8_t>(payload, payload + bp2->payload_size)
});
} else if (version_ == 3) { } else if (version_ == 3) {
BinaryProtocol3* bp3 = (BinaryProtocol3*)data; BinaryProtocol3* bp3 = (BinaryProtocol3*)data;
bp3->type = bp3->type; bp3->type = bp3->type;
bp3->payload_size = ntohs(bp3->payload_size); bp3->payload_size = ntohs(bp3->payload_size);
auto payload = (uint8_t*)bp3->payload; auto payload = (uint8_t*)bp3->payload;
on_incoming_audio_(std::vector<uint8_t>(payload, payload + bp3->payload_size)); on_incoming_audio_(AudioStreamPacket{
.timestamp = 0,
.payload = std::vector<uint8_t>(payload, payload + bp3->payload_size)
});
} else { } else {
on_incoming_audio_(std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + len)); on_incoming_audio_(AudioStreamPacket{
.timestamp = 0,
.payload = std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + len)
});
} }
} }
} else { } else {

View File

@@ -16,7 +16,7 @@ public:
~WebsocketProtocol(); ~WebsocketProtocol();
bool Start() override; bool Start() override;
void SendAudio(const std::vector<uint8_t>& data) override; void SendAudio(const AudioStreamPacket& packet) override;
bool OpenAudioChannel() override; bool OpenAudioChannel() override;
void CloseAudioChannel() override; void CloseAudioChannel() override;
bool IsAudioChannelOpened() const override; bool IsAudioChannelOpened() const override;