WebRTC 语音激活检测(VAD)算法

语音激活检测最早应用于电话传输和检测系统当中,用于通信信道的时间分配,提高传输线路的利用效率。激活检测属于语音处理系统的前端操作,在语音检测领域意义重大。

但是目前的语音激活检测,尤其是检测人声开始和结束的端点始终是属于技术难点,各家公司始终处于能判断,但是不敢保证判别准确性的阶段。

常搭建机器人聊天系统主要包括以下三个方面:

  1. 语音转文字(ASR/STT)
  2. 语义内容(NLU/NLP)
  3. 文字转语音(TTS)

语音转文字主要包括如下几个方面:

  1. 麦克风降噪
  2. 声源定位
  3. 回声消除
  4. 唤醒词/语音激活检测
  5. 音频格式压缩

语音激活检测主要功能可以有:

  1. 自动打断
  2. 去掉语音中的静音成分
  3. 获取输入语音中有效语音
  4. 去除噪声,对语音进行增强

检测原理

WebRTC的VAD模型采用了高斯模型,这一模型应用极其广泛。

检测原理是根据人声的频谱范围,把输入的频谱分成六个子带(80Hz~250Hz,250Hz~500Hz,500Hz~1K,1K~2K,2K~3K,3K~4K), 分别计算这六个子带的能量。然后使用高斯模型的概率密度函数做运算,得出一个对数似然比函数。对数似然比分为全局和局部,全局是六个子带之加权之和,而局部是指每一个子带则是局部,所以语音判决会先判断子带,子带判断没有时会判断全局,只要有一方过了,就算有语音。

这种算法的优点是它是无监督的,不需要严格的训练。GMM的噪声和语音模型如下:

 p(xk|z,rk)={1/sqrt(2*pi*sita^2)} * exp{ - (xk-uz) ^2/(2 * sita ^2 )} 

xk是选取的特征量,在webrtc的VAD中具体是指子带能量,rk是包括均值uz和方差sita的参数集合。z=0,代表噪声,z=1代表语音。

python 激活检测

由于实际应用中,单纯依靠能量检测特征检测等方法很难判断人声说话的起始点,所以市面上大多数的语音产品都是使用唤醒词判断语音起始,另外加上声音回路,还可以做语音打断。这样的交互方式可能有些傻,每次必须喊一下唤醒词才能继续聊天。现在github上有snowboy唤醒词的开源库,大家可以登录snowboy官网训练自己的唤醒词模型。

  1. Kitt-AI : Snowboy
  2. Sensory : Sensory

考虑到用唤醒词嘴巴会累,使用VAD自动唤醒。这种方式容易受强噪声干扰,在远场语音交互场景中,VAD面临着两个难题:

1. 可以成功检测到最低能量的语音(灵敏度)。

2. 如何在多噪环境下成功检测(漏检率和虚检率)。

漏检反应的是原本是语音但是没有检测出来,而虚检率反应的是不是语音信号而被检测成语音信号的概率。相对而言漏检是不可接受的,而虚检可以通过后端的ASR和NLP算法进一步过滤,但是虚检会带来系统资源利用率上升,随之系统的功耗和发热会进一步增加,而这会上升为可移动和随声携带设备的一个难题。

适合一个人在家玩玩的套件:

  1. pyaudio: pip install pyaudio 可以从设备节点读取原始音频流数据,音频编码是PCM格式;
  2. webrtcvad: pip install webrtcvad 检测判断一组语音数据是否为空语音;

当检测到持续时间长度 T1 vad检测都有语音活动,可以判定为语音起始。

当检测到持续时间长度 T2 vad检测都没有有语音活动,可以判定为语音结束。

程序很简单,相信看一会儿就明白了。

'''

Requirements:

+ pyaudio - `pip install pyaudio`

+ py-webrtcvad - `pip install webrtcvad`

'''

import webrtcvad

import collections

import sys

import signal

import pyaudio

from array import array

from struct import pack

import wave

import time

FORMAT = pyaudio.paInt16

CHANNELS = 1

RATE = 16000

CHUNK_DURATION_MS = 30 # supports 10, 20 and 30 (ms)

PADDING_DURATION_MS = 1500 # 1 sec jugement

CHUNK_SIZE = int(RATE CHUNK_DURATION_MS / 1000) # chunk to read

CHUNK_BYTES = CHUNK_SIZE 2 # 16bit = 2 bytes, PCM

NUM_PADDING_CHUNKS = int(PADDING_DURATION_MS / CHUNK_DURATION_MS)

# NUM_WINDOW_CHUNKS = int(240 / CHUNK_DURATION_MS)

NUM_WINDOW_CHUNKS = int(400 / CHUNK_DURATION_MS) # 400 ms/ 30ms ge

NUM_WINDOW_CHUNKS_END = NUM_WINDOW_CHUNKS 2

START_OFFSET = int(NUM_WINDOW_CHUNKS CHUNK_DURATION_MS 0.5 RATE)

vad = webrtcvad.Vad(1)

pa = pyaudio.PyAudio()

stream = pa.open(format=FORMAT,

channels=CHANNELS,

rate=RATE,

input=True,

start=False,

# input_device_index=2,

frames_per_buffer=CHUNK_SIZE)

got_a_sentence = False

leave = False

def handle_int(sig, chunk):

global leave, got_a_sentence

leave = True

got_a_sentence = True

def record_to_file(path, data, sample_width):

"Records from the microphone and outputs the resulting data to 'path'"

# sample_width, data = record()

data = pack('<' + ('h' len(data)), data)

wf = wave.open(path, 'wb')

wf.setnchannels(1)

wf.setsampwidth(sample_width)

wf.setframerate(RATE)

wf.writeframes(data)

wf.close()

def normalize(snd_data):

"Average the volume out"

MAXIMUM = 32767 # 16384

times = float(MAXIMUM) / max(abs(i) for i in snd_data)

r = array('h')

for i in snd_data:

r.append(int(i times))

return r

signal.signal(signal.SIGINT, handle_int)

while not leave:

ring_buffer = collections.deque(maxlen=NUM_PADDING_CHUNKS)

triggered = False

voiced_frames = []

ring_buffer_flags = [0] NUM_WINDOW_CHUNKS

ring_buffer_index = 0

ring_buffer_flags_end = [0] NUM_WINDOW_CHUNKS_END

ring_buffer_index_end = 0

buffer_in = ''

# WangS

raw_data = array('h')

index = 0

start_point = 0

StartTime = time.time()

print(" recording: ")

stream.start_stream()

while not got_a_sentence and not leave:

chunk = stream.read(CHUNK_SIZE)

# add WangS

raw_data.extend(array('h', chunk))

index += CHUNK_SIZE

TimeUse = time.time() - StartTime

active = vad.is_speech(chunk, RATE)

sys.stdout.write('1' if active else '_')

ring_buffer_flags[ring_buffer_index] = 1 if active else 0

ring_buffer_index += 1

ring_buffer_index %= NUM_WINDOW_CHUNKS

ring_buffer_flags_end[ring_buffer_index_end] = 1 if active else 0

ring_buffer_index_end += 1

ring_buffer_index_end %= NUM_WINDOW_CHUNKS_END

# start point detection

if not triggered:

ring_buffer.append(chunk)

num_voiced = sum(ring_buffer_flags)

if num_voiced > 0.8 NUM_WINDOW_CHUNKS:

sys.stdout.write(' Open ')

triggered = True

start_point = index - CHUNK_SIZE 20 # start point

# voiced_frames.extend(ring_buffer)

ring_buffer.clear()

# end point detection

else:

# voiced_frames.append(chunk)

ring_buffer.append(chunk)

num_unvoiced = NUM_WINDOW_CHUNKS_END - sum(ring_buffer_flags_end)

if num_unvoiced > 0.90 NUM_WINDOW_CHUNKS_END or TimeUse > 10:

sys.stdout.write(' Close ')

triggered = False

got_a_sentence = True

sys.stdout.flush()

sys.stdout.write('\n')

# data = b''.join(voiced_frames)

stream.stop_stream()

print(" done recording")

got_a_sentence = False

# write to file

raw_data.reverse()

for index in range(start_point):

raw_data.pop()

raw_data.reverse()

raw_data = normalize(raw_data)

record_to_file("recording.wav", raw_data, 2)

leave = True

stream.close()

程序运行方式sudo python vad.py

版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。

(2)

发表回复

登录后才能评论