[to #42322933] feat: add dingding denoise model

Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11731594

* feat: add dingding denoise

* refactor: delete duplicated unidfsmn class

* refactor: delete empty lines

* refactor: make some methods inline

* style: add license and optimize imports

* style: comments style
This commit is contained in:
bin.xue
2023-03-07 21:55:10 +08:00
committed by wenmeng.zwm
parent 0503c40919
commit b4c90d8160
17 changed files with 668 additions and 52 deletions

Binary file not shown.

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a9e76c8448e93934ed9c8827b76f702d07fccc3e586900903617971471235800
size 475278

View File

@@ -161,6 +161,7 @@ class Models(object):
# audio models
sambert_hifigan = 'sambert-hifigan'
speech_frcrn_ans_cirm_16k = 'speech_frcrn_ans_cirm_16k'
speech_dfsmn_ans = 'speech_dfsmn_ans'
speech_dfsmn_kws_char_farfield = 'speech_dfsmn_kws_char_farfield'
speech_kws_fsmn_char_ctc_nearfield = 'speech_kws_fsmn_char_ctc_nearfield'
speech_mossformer_separation_temporal_8k = 'speech_mossformer_separation_temporal_8k'
@@ -441,6 +442,7 @@ class Pipelines(object):
sambert_hifigan_tts = 'sambert-hifigan-tts'
speech_dfsmn_aec_psm_16k = 'speech-dfsmn-aec-psm-16k'
speech_frcrn_ans_cirm_16k = 'speech_frcrn_ans_cirm_16k'
speech_dfsmn_ans_psm_48k_causal = 'speech_dfsmn_ans_psm_48k_causal'
speech_dfsmn_kws_char_farfield = 'speech_dfsmn_kws_char_farfield'
speech_separation = 'speech-separation'
kws_kwsbp = 'kws-kwsbp'

View File

@@ -9,6 +9,7 @@ if TYPE_CHECKING:
else:
_import_structure = {
'frcrn': ['FRCRNDecorator'],
'dnoise_net': ['DenoiseNet'],
}
import sys

View File

@@ -7,57 +7,8 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
class UniDeepFsmn(nn.Module):
def __init__(self, input_dim, output_dim, lorder=None, hidden_size=None):
super(UniDeepFsmn, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
if lorder is None:
return
self.lorder = lorder
self.hidden_size = hidden_size
self.linear = nn.Linear(input_dim, hidden_size)
self.project = nn.Linear(hidden_size, output_dim, bias=False)
self.conv1 = nn.Conv2d(
output_dim,
output_dim, [lorder, 1], [1, 1],
groups=output_dim,
bias=False)
def forward(self, input):
r"""
Args:
input: torch with shape: batch (b) x sequence(T) x feature (h)
Returns:
batch (b) x channel (c) x sequence(T) x feature (h)
"""
f1 = F.relu(self.linear(input))
p1 = self.project(f1)
x = torch.unsqueeze(p1, 1)
# x: batch (b) x channel (c) x sequence(T) x feature (h)
x_per = x.permute(0, 3, 2, 1)
# x_per: batch (b) x feature (h) x sequence(T) x channel (c)
y = F.pad(x_per, [0, 0, self.lorder - 1, 0])
out = x_per + self.conv1(y)
out1 = out.permute(0, 3, 2, 1)
# out1: batch (b) x channel (c) x sequence(T) x feature (h)
return input + out1.squeeze()
from modelscope.models.audio.ans.layers.uni_deep_fsmn import UniDeepFsmn
class ComplexUniDeepFsmn(nn.Module):

View File

@@ -0,0 +1,73 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
# Related papers:
# Shengkui Zhao, Trung Hieu Nguyen, Bin Ma, “Monaural Speech Enhancement with Complex Convolutional
# Block Attention Module and Joint Time Frequency Losses”, ICASSP 2021.
# Shiliang Zhang, Ming Lei, Zhijie Yan, Lirong Dai, “Deep-FSMN for Large Vocabulary Continuous Speech
# Recognition “, arXiv:1803.05030, 2018.
from torch import nn
from modelscope.metainfo import Models
from modelscope.models import MODELS, TorchModel
from modelscope.models.audio.ans.layers.activations import (RectifiedLinear,
Sigmoid)
from modelscope.models.audio.ans.layers.affine_transform import AffineTransform
from modelscope.models.audio.ans.layers.uni_deep_fsmn import UniDeepFsmn
from modelscope.utils.constant import Tasks
@MODELS.register_module(
Tasks.acoustic_noise_suppression, module_name=Models.speech_dfsmn_ans)
class DfsmnAns(TorchModel):
"""Denoise model with DFSMN.
Args:
model_dir (str): the model path.
fsmn_depth (int): the depth of deepfsmn
lorder (int):
"""
def __init__(self,
model_dir: str,
fsmn_depth=9,
lorder=20,
*args,
**kwargs):
super().__init__(model_dir, *args, **kwargs)
self.lorder = lorder
self.linear1 = AffineTransform(120, 256)
self.relu = RectifiedLinear(256, 256)
repeats = [
UniDeepFsmn(256, 256, lorder, 256) for i in range(fsmn_depth)
]
self.deepfsmn = nn.Sequential(*repeats)
self.linear2 = AffineTransform(256, 961)
self.sig = Sigmoid(961, 961)
def forward(self, input):
"""
Args:
input: fbank feature [batch_size,number_of_frame,feature_dimension]
Returns:
mask value [batch_size, number_of_frame, FFT_size/2+1]
"""
x1 = self.linear1(input)
x2 = self.relu(x1)
x3 = self.deepfsmn(x2)
x4 = self.linear2(x3)
x5 = self.sig(x4)
return x5
def to_kaldi_nnet(self):
re_str = ''
re_str += '<Nnet>\n'
re_str += self.linear1.to_kaldi_nnet()
re_str += self.relu.to_kaldi_nnet()
for dfsmn in self.deepfsmn:
re_str += dfsmn.to_kaldi_nnet()
re_str += self.linear2.to_kaldi_nnet()
re_str += self.sig.to_kaldi_nnet()
re_str += '</Nnet>\n'
return re_str

View File

@@ -78,7 +78,7 @@ class FRCRN(nn.Module):
win_len=400,
win_inc=100,
fft_len=512,
win_type='hanning',
win_type='hann',
**kwargs):
r"""
Args:

View File

@@ -0,0 +1,62 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import torch.nn as nn
from modelscope.models.audio.ans.layers.layer_base import LayerBase
class RectifiedLinear(LayerBase):
def __init__(self, input_dim, output_dim):
super(RectifiedLinear, self).__init__()
self.dim = input_dim
self.relu = nn.ReLU()
def forward(self, input):
return self.relu(input)
def to_kaldi_nnet(self):
re_str = ''
re_str += '<RectifiedLinear> %d %d\n' % (self.dim, self.dim)
return re_str
def load_kaldi_nnet(self, instr):
return instr
class LogSoftmax(LayerBase):
def __init__(self, input_dim, output_dim):
super(LogSoftmax, self).__init__()
self.dim = input_dim
self.ls = nn.LogSoftmax()
def forward(self, input):
return self.ls(input)
def to_kaldi_nnet(self):
re_str = ''
re_str += '<Softmax> %d %d\n' % (self.dim, self.dim)
return re_str
def load_kaldi_nnet(self, instr):
return instr
class Sigmoid(LayerBase):
def __init__(self, input_dim, output_dim):
super(Sigmoid, self).__init__()
self.dim = input_dim
self.sig = nn.Sigmoid()
def forward(self, input):
return self.sig(input)
def to_kaldi_nnet(self):
re_str = ''
re_str += '<Sigmoid> %d %d\n' % (self.dim, self.dim)
return re_str
def load_kaldi_nnet(self, instr):
return instr

View File

@@ -0,0 +1,86 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import torch as th
import torch.nn as nn
from modelscope.models.audio.ans.layers.layer_base import (LayerBase,
to_kaldi_matrix)
from modelscope.utils.audio.audio_utils import (expect_kaldi_matrix,
expect_token_number)
class AffineTransform(LayerBase):
def __init__(self, input_dim, output_dim):
super(AffineTransform, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.linear = nn.Linear(input_dim, output_dim)
def forward(self, input):
return self.linear(input)
def to_kaldi_nnet(self):
re_str = ''
re_str += '<AffineTransform> %d %d\n' % (self.output_dim,
self.input_dim)
re_str += '<LearnRateCoef> 1 <BiasLearnRateCoef> 1 <MaxNorm> 0\n'
linear_weights = self.state_dict()['linear.weight']
x = linear_weights.squeeze().numpy()
re_str += to_kaldi_matrix(x)
linear_bias = self.state_dict()['linear.bias']
x = linear_bias.squeeze().numpy()
re_str += to_kaldi_matrix(x)
return re_str
def load_kaldi_nnet(self, instr):
output = expect_token_number(
instr,
'<LearnRateCoef>',
)
if output is None:
raise Exception('AffineTransform format error')
instr, lr = output
output = expect_token_number(instr, '<BiasLearnRateCoef>')
if output is None:
raise Exception('AffineTransform format error')
instr, lr = output
output = expect_token_number(instr, '<MaxNorm>')
if output is None:
raise Exception('AffineTransform format error')
instr, lr = output
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('AffineTransform format error')
instr, mat = output
self.linear.weight = th.nn.Parameter(
th.from_numpy(mat).type(th.FloatTensor))
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('AffineTransform format error')
instr, mat = output
self.linear.bias = th.nn.Parameter(
th.from_numpy(mat).type(th.FloatTensor))
return instr

View File

@@ -0,0 +1,31 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import abc
import numpy as np
import six
import torch.nn as nn
def to_kaldi_matrix(np_mat):
""" function that transform as str numpy mat to standard kaldi str matrix
Args:
np_mat: numpy mat
"""
np.set_printoptions(threshold=np.inf, linewidth=np.nan)
out_str = str(np_mat)
out_str = out_str.replace('[', '')
out_str = out_str.replace(']', '')
return '[ %s ]\n' % out_str
@six.add_metaclass(abc.ABCMeta)
class LayerBase(nn.Module):
def __init__(self):
super(LayerBase, self).__init__()
@abc.abstractmethod
def to_kaldi_nnet(self):
pass

View File

@@ -0,0 +1,156 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from modelscope.models.audio.ans.layers.layer_base import (LayerBase,
to_kaldi_matrix)
from modelscope.utils.audio.audio_utils import (expect_kaldi_matrix,
expect_token_number)
class UniDeepFsmn(LayerBase):
def __init__(self, input_dim, output_dim, lorder=1, hidden_size=None):
super(UniDeepFsmn, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.lorder = lorder
self.hidden_size = hidden_size
self.linear = nn.Linear(input_dim, hidden_size)
self.project = nn.Linear(hidden_size, output_dim, bias=False)
self.conv1 = nn.Conv2d(
output_dim,
output_dim, (lorder, 1), (1, 1),
groups=output_dim,
bias=False)
def forward(self, input):
"""
Args:
input: torch with shape: batch (b) x sequence(T) x feature (h)
Returns:
batch (b) x channel (c) x sequence(T) x feature (h)
"""
f1 = F.relu(self.linear(input))
p1 = self.project(f1)
x = torch.unsqueeze(p1, 1)
# x: batch (b) x channel (c) x sequence(T) x feature (h)
x_per = x.permute(0, 3, 2, 1)
# x_per: batch (b) x feature (h) x sequence(T) x channel (c)
y = F.pad(x_per, [0, 0, self.lorder - 1, 0])
out = x_per + self.conv1(y)
out1 = out.permute(0, 3, 2, 1)
# out1: batch (b) x channel (c) x sequence(T) x feature (h)
return input + out1.squeeze()
def to_kaldi_nnet(self):
re_str = ''
re_str += '<UniDeepFsmn> %d %d\n'\
% (self.output_dim, self.input_dim)
re_str += '<LearnRateCoef> %d <HidSize> %d <LOrder> %d <LStride> %d <MaxNorm> 0\n'\
% (1, self.hidden_size, self.lorder, 1)
lfiters = self.state_dict()['conv1.weight']
x = np.flipud(lfiters.squeeze().numpy().T)
re_str += to_kaldi_matrix(x)
proj_weights = self.state_dict()['project.weight']
x = proj_weights.squeeze().numpy()
re_str += to_kaldi_matrix(x)
linear_weights = self.state_dict()['linear.weight']
x = linear_weights.squeeze().numpy()
re_str += to_kaldi_matrix(x)
linear_bias = self.state_dict()['linear.bias']
x = linear_bias.squeeze().numpy()
re_str += to_kaldi_matrix(x)
return re_str
def load_kaldi_nnet(self, instr):
output = expect_token_number(
instr,
'<LearnRateCoef>',
)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, lr = output
output = expect_token_number(
instr,
'<HidSize>',
)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, hiddensize = output
self.hidden_size = int(hiddensize)
output = expect_token_number(
instr,
'<LOrder>',
)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, lorder = output
self.lorder = int(lorder)
output = expect_token_number(
instr,
'<LStride>',
)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, lstride = output
self.lstride = lstride
output = expect_token_number(
instr,
'<MaxNorm>',
)
if output is None:
raise Exception('UniDeepFsmn format error')
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('Fsmn format error')
instr, mat = output
mat1 = np.fliplr(mat.T).copy()
self.conv1 = nn.Conv2d(
self.output_dim,
self.output_dim, (self.lorder, 1), (1, 1),
groups=self.output_dim,
bias=False)
mat_th = torch.from_numpy(mat1).type(torch.FloatTensor)
mat_th = mat_th.unsqueeze(1)
mat_th = mat_th.unsqueeze(3)
self.conv1.weight = torch.nn.Parameter(mat_th)
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, mat = output
self.project = nn.Linear(self.hidden_size, self.output_dim, bias=False)
self.linear = nn.Linear(self.input_dim, self.hidden_size)
self.project.weight = torch.nn.Parameter(
torch.from_numpy(mat).type(torch.FloatTensor))
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, mat = output
self.linear.weight = torch.nn.Parameter(
torch.from_numpy(mat).type(torch.FloatTensor))
output = expect_kaldi_matrix(instr)
if output is None:
raise Exception('UniDeepFsmn format error')
instr, mat = output
self.linear.bias = torch.nn.Parameter(
torch.from_numpy(mat).type(torch.FloatTensor))
return instr

View File

@@ -14,6 +14,7 @@ if TYPE_CHECKING:
from .speaker_verification_pipeline import SpeakerVerificationPipeline
else:
_import_structure = {
'ans_dfsmn_pipeline': ['ANSDFSMNPipeline'],
'ans_pipeline': ['ANSPipeline'],
'asr_inference_pipeline': ['AutomaticSpeechRecognitionPipeline'],
'kws_farfield_pipeline': ['KWSFarfieldPipeline'],

View File

@@ -0,0 +1,187 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import collections
import io
import os
import sys
from typing import Any, Dict
import librosa
import numpy as np
import soundfile as sf
import torch
from modelscope.fileio import File
from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.utils.constant import ModelFile, Tasks
HOP_LENGTH = 960
N_FFT = 1920
WINDOW_NAME_HAM = 'hamming'
STFT_WIN_LEN = 1920
WINLEN = 3840
STRIDE = 1920
@PIPELINES.register_module(
Tasks.acoustic_noise_suppression,
module_name=Pipelines.speech_dfsmn_ans_psm_48k_causal)
class ANSDFSMNPipeline(Pipeline):
"""ANS (Acoustic Noise Suppression) inference pipeline based on DFSMN model.
Args:
stream_mode: set its work mode, default False
In stream model, it accepts bytes as pipeline input that should be the audio data in PCM format.
In normal model, it accepts str and treat it as the path of local wav file or the http link of remote wav file.
"""
SAMPLE_RATE = 48000
def __init__(self, model, **kwargs):
super().__init__(model=model, **kwargs)
model_bin_file = os.path.join(self.model.model_dir,
ModelFile.TORCH_MODEL_BIN_FILE)
if os.path.exists(model_bin_file):
checkpoint = torch.load(model_bin_file, map_location=self.device)
self.model.load_state_dict(checkpoint)
self.model.eval()
self.stream_mode = kwargs.get('stream_mode', False)
if self.stream_mode:
# the unit of WINLEN and STRIDE is frame, 1 frame of 16bit = 2 bytes
byte_buffer_length = \
(WINLEN + STRIDE * (self.model.lorder - 1)) * 2
self.buffer = collections.deque(maxlen=byte_buffer_length)
# padding head
for i in range(STRIDE * 2):
self.buffer.append(b'\0')
# it processes WINLEN frames at the first time, then STRIDE frames
self.byte_length_remain = (STRIDE * 2 - WINLEN) * 2
self.first_forward = True
self.tensor_give_up_length = (WINLEN - STRIDE) // 2
window = torch.hamming_window(
STFT_WIN_LEN, periodic=False, device=self.device)
def stft(x):
return torch.stft(
x,
N_FFT,
HOP_LENGTH,
STFT_WIN_LEN,
center=False,
window=window)
def istft(x, slen):
return librosa.istft(
x,
hop_length=HOP_LENGTH,
win_length=STFT_WIN_LEN,
window=WINDOW_NAME_HAM,
center=False,
length=slen)
self.stft = stft
self.istft = istft
def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
if self.stream_mode:
if not isinstance(inputs, bytes):
raise TypeError('Only support bytes in stream mode.')
if len(inputs) > self.buffer.maxlen:
raise ValueError(
f'inputs length too large: {len(inputs)} > {self.buffer.maxlen}'
)
tensor_list = []
current_index = 0
while self.byte_length_remain + len(
inputs) - current_index >= STRIDE * 2:
byte_length_to_add = STRIDE * 2 - self.byte_length_remain
for i in range(current_index,
current_index + byte_length_to_add):
self.buffer.append(inputs[i].to_bytes(
1, byteorder=sys.byteorder, signed=False))
bytes_io = io.BytesIO()
for b in self.buffer:
bytes_io.write(b)
data = np.frombuffer(bytes_io.getbuffer(), dtype=np.int16)
data_tensor = torch.from_numpy(data).type(torch.FloatTensor)
tensor_list.append(data_tensor)
self.byte_length_remain = 0
current_index += byte_length_to_add
for i in range(current_index, len(inputs)):
self.buffer.append(inputs[i].to_bytes(
1, byteorder=sys.byteorder, signed=False))
self.byte_length_remain += 1
return {'audio': tensor_list}
else:
if isinstance(inputs, str):
data_bytes = File.read(inputs)
elif isinstance(inputs, bytes):
data_bytes = inputs
else:
raise TypeError(f'Unsupported type {type(inputs)}.')
data_tensor = self.bytes2tensor(data_bytes)
return {'audio': data_tensor}
def bytes2tensor(self, file_bytes):
data1, fs = sf.read(io.BytesIO(file_bytes))
data1 = data1.astype(np.float32)
if len(data1.shape) > 1:
data1 = data1[:, 0]
if fs != self.SAMPLE_RATE:
data1 = librosa.resample(data1, fs, self.SAMPLE_RATE)
data = data1 * 32768
data_tensor = torch.from_numpy(data).type(torch.FloatTensor)
return data_tensor
def forward(self, inputs: Dict[str, Any],
**forward_params) -> Dict[str, Any]:
if self.stream_mode:
bytes_io = io.BytesIO()
for origin_audio in inputs['audio']:
masked_sig = self._forward(origin_audio)
if self.first_forward:
masked_sig = masked_sig[:-self.tensor_give_up_length]
self.first_forward = False
else:
masked_sig = masked_sig[-WINLEN:]
masked_sig = masked_sig[self.tensor_give_up_length:-self.
tensor_give_up_length]
bytes_io.write(masked_sig.astype(np.int16).tobytes())
outputs = bytes_io.getvalue()
else:
origin_audio = inputs['audio']
masked_sig = self._forward(origin_audio)
outputs = masked_sig.astype(np.int16).tobytes()
return {OutputKeys.OUTPUT_PCM: outputs}
def _forward(self, origin_audio):
with torch.no_grad():
audio_in = origin_audio.unsqueeze(0)
import torchaudio
fbanks = torchaudio.compliance.kaldi.fbank(
audio_in,
dither=1.0,
frame_length=40.0,
frame_shift=20.0,
num_mel_bins=120,
sample_frequency=self.SAMPLE_RATE,
window_type=WINDOW_NAME_HAM)
fbanks = fbanks.unsqueeze(0)
masks = self.model(fbanks)
spectrum = self.stft(origin_audio)
masks = masks.permute(2, 1, 0)
masked_spec = (spectrum * masks).cpu()
masked_spec = masked_spec.detach().numpy()
masked_spec_complex = masked_spec[:, :, 0] + 1j * masked_spec[:, :, 1]
masked_sig = self.istft(masked_spec_complex, len(origin_audio))
return masked_sig
def postprocess(self, inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
if not self.stream_mode and 'output_path' in kwargs.keys():
sf.write(
kwargs['output_path'],
np.frombuffer(inputs[OutputKeys.OUTPUT_PCM], dtype=np.int16),
self.SAMPLE_RATE)
return inputs

View File

@@ -36,8 +36,11 @@ class ANSPipeline(Pipeline):
"""
super().__init__(model=model, **kwargs)
self.model.eval()
self.stream_mode = kwargs.get('stream_mode', False)
def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
if self.stream_mode:
raise TypeError('This model does not support stream mode!')
if isinstance(inputs, bytes):
data1, fs = sf.read(io.BytesIO(inputs))
elif isinstance(inputs, str):

View File

@@ -105,6 +105,28 @@ def extract_pcm_from_wav(wav: bytes) -> bytes:
return data, sample_rate
def expect_token_number(instr, token):
first_token = re.match(r'^\s*' + token, instr)
if first_token is None:
return None
instr = instr[first_token.end():]
lr = re.match(r'^\s*(-?\d+\.?\d*e?-?\d*?)', instr)
if lr is None:
return None
return instr[lr.end():], lr.groups()[0]
def expect_kaldi_matrix(instr):
pos2 = instr.find('[', 0)
pos3 = instr.find(']', pos2)
mat = []
for stt in instr[pos2 + 1:pos3].split('\n'):
tmp_mat = np.fromstring(stt, dtype=np.float32, sep=' ')
if tmp_mat.size > 0:
mat.append(tmp_mat)
return instr[pos3 + 1:], np.array(mat)
# This implementation is adopted from scipy.io.wavfile.write,
# made publicly available under the BSD-3-Clause license at
# https://github.com/scipy/scipy/blob/v1.9.3/scipy/io/wavfile.py

View File

@@ -4,6 +4,7 @@ import os.path
import unittest
from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
@@ -17,6 +18,8 @@ FAREND_SPEECH_URL = 'https://modelscope.oss-cn-beijing.aliyuncs.com/' \
'test/audios/farend_speech.wav'
NOISE_SPEECH_FILE = 'data/test/audios/speech_with_noise.wav'
NOISE_SPEECH_FILE_48K = 'data/test/audios/speech_with_noise_48k.wav'
NOISE_SPEECH_FILE_48K_PCM = 'data/test/audios/speech_with_noise_48k.PCM'
NOISE_SPEECH_URL = 'https://modelscope.oss-cn-beijing.aliyuncs.com/' \
'test/audios/speech_with_noise.wav'
@@ -83,7 +86,7 @@ class SpeechSignalProcessTest(unittest.TestCase, DemoCompatibilityCheck):
print(f'Processed audio saved to {output_path}')
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_ans(self):
def test_frcrn_ans(self):
model_id = 'damo/speech_frcrn_ans_cirm_16k'
ans = pipeline(Tasks.acoustic_noise_suppression, model=model_id)
output_path = os.path.abspath('output.wav')
@@ -112,6 +115,41 @@ class SpeechSignalProcessTest(unittest.TestCase, DemoCompatibilityCheck):
ans(data, output_path=output_path)
print(f'Processed audio saved to {output_path}')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_dfsmn_ans(self):
model_id = 'damo/speech_dfsmn_ans_psm_48k_causal'
ans = pipeline(Tasks.acoustic_noise_suppression, model=model_id)
output_path = os.path.abspath('output.wav')
ans(os.path.join(os.getcwd(), NOISE_SPEECH_FILE_48K),
output_path=output_path)
print(f'Processed audio saved to {output_path}')
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_dfsmn_ans_bytes(self):
model_id = 'damo/speech_dfsmn_ans_psm_48k_causal'
ans = pipeline(Tasks.acoustic_noise_suppression, model=model_id)
output_path = os.path.abspath('output.wav')
with open(os.path.join(os.getcwd(), NOISE_SPEECH_FILE_48K), 'rb') as f:
data = f.read()
ans(data, output_path=output_path)
print(f'Processed audio saved to {output_path}')
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_dfsmn_ans_stream(self):
model_id = 'damo/speech_dfsmn_ans_psm_48k_causal'
ans = pipeline(
Tasks.acoustic_noise_suppression, model=model_id, stream_mode=True)
with open(os.path.join(os.getcwd(), NOISE_SPEECH_FILE_48K_PCM),
'rb') as f:
block_size = 3840
audio = f.read(block_size)
with open('output.pcm', 'wb') as w:
while len(audio) >= block_size:
result = ans(audio)
pcm = result[OutputKeys.OUTPUT_PCM]
w.write(pcm)
audio = f.read(block_size)
@unittest.skip('demo compatibility test is only enabled on a needed-basis')
def test_demo_compatibility(self):
self.compatibility_check()