* vc ssr

* Add more patches for hf (#1160)

* clone and lint #1205 (#1209)

* 更新格式

* fix cache path (#1211)

Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>

* “update"

* fix create_commit login (#1210)

* support multiple include/exclude filter patterns in command line (#1214)

Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>

* Use legacy cache (#1215)

* fix name (#1216)

Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>

* fix path name for log accuracy (#1217)


* change log msg

---------

Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>

* fix visibility (#1222)

Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>

* Merge 1.23 hotfix to master (#1227)

* 修复格式问题

* fix 路径问题

* Update test_speech_super_resolution.py

* Update test_voice_conversion.py

---------

Co-authored-by: tastelikefeet <58414341+tastelikefeet@users.noreply.github.com>
Co-authored-by: Yingda Chen <yingdachen@apache.org>
Co-authored-by: Yingda Chen <yingda.chen@alibaba-inc.com>
Co-authored-by: zhongyuqi <zhongyuqi@microbt.com>
This commit is contained in:
Z-yq
2025-06-04 16:08:20 +08:00
committed by GitHub
parent eb93fef68b
commit f9b8d4b9d2
24 changed files with 3938 additions and 3 deletions

View File

@@ -225,7 +225,8 @@ class Models(object):
audio_quantization = 'audio-quantization'
laura_codec = 'laura-codec'
funasr = 'funasr'
hifissr = 'hifissr'
unetvc_16k = 'unetvc_16k'
# multi-modal models
ofa = 'ofa'
clip = 'clip-multi-modal-embedding'
@@ -581,6 +582,8 @@ class Pipelines(object):
audio_quantization = 'audio-quantization'
audio_quantization_inference = 'audio-quantization-inference'
laura_codec_tts_inference = 'laura-codec-tts-inference'
speech_super_resolution_inference = 'speech-super-resolution-inference'
voice_conversion = 'voice-conversion'
# multi-modal tasks
image_captioning = 'image-captioning'

View File

@@ -1,3 +1,3 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from . import ans, asr, itn, kws, separation, sv, tts
from . import ans, asr, itn, kws, separation, ssr, sv, tts, vc

View File

@@ -0,0 +1,20 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .ssr_infer import HifiSSR
else:
_import_structure = {
'hifissr': ['HifiSSR'],
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,700 @@
"""
StarGAN v2
Copyright (c) 2020-present NAVER Corp.
This work is licensed under the Creative Commons Attribution-NonCommercial
4.0 International License. To view a copy of this license, visit
http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to
Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class DownSample(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
elif self.layer_type == 'timepreserve':
return F.avg_pool2d(x, (2, 1))
elif self.layer_type == 'half':
return F.avg_pool2d(x, 2)
else:
raise
class UpSample(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
elif self.layer_type == 'timepreserve':
return F.interpolate(x, scale_factor=(2, 1), mode='nearest')
elif self.layer_type == 'half':
return F.interpolate(x, scale_factor=2, mode='nearest')
else:
raise f'unknown upsample type: {self.layer_type}'
class ResBlk(nn.Module):
def __init__(self,
dim_in,
dim_out,
actv=nn.LeakyReLU(0.2),
normalize=False,
style_dim=256,
downsample='none'):
super().__init__()
self.actv = actv
self.normalize = normalize
self.downsample = DownSample(downsample)
self.learned_sc = dim_in != dim_out
self.conv1 = nn.Conv2d(dim_in, dim_in, 3, 1, 1)
self.conv2 = nn.Conv2d(dim_in, dim_out, 3, 1, 1)
if self.normalize:
# self.norm1=nn.InstanceNorm2d(dim_in)
# self.norm2=nn.InstanceNorm2d(dim_in)
self.norm1 = AdaIN(style_dim, dim_in)
self.norm2 = AdaIN(style_dim, dim_in)
if self.learned_sc:
self.conv1x1 = nn.Conv2d(dim_in, dim_out, 1, 1, 0, bias=False)
def _shortcut(self, x):
if self.learned_sc:
x = self.conv1x1(x)
if self.downsample:
x = self.downsample(x)
return x
def _residual(self, x, s=None):
if self.normalize:
x = self.norm1(x, s)
x = self.actv(x)
x = self.conv1(x)
x = self.downsample(x)
if self.normalize:
x = self.norm2(x, s)
x = self.actv(x)
x = self.conv2(x)
return x
def forward(self, x, s=None):
x = self._shortcut(x) + self._residual(x, s)
return x / math.sqrt(2) # unit variance
class ResBlk1D(nn.Module):
def __init__(self,
dim_in,
dim_out,
actv=nn.LeakyReLU(0.2),
normalize=False,
out_for_onnx=False,
downsample='none'):
super().__init__()
self.actv = actv
self.normalize = normalize
self.downsample = DownSample(downsample)
self.learned_sc = dim_in != dim_out
self.conv1 = nn.Conv1d(dim_in, dim_in, 3, 1, 1)
self.conv2 = nn.Conv1d(dim_in, dim_out, 3, 1, 1)
if self.normalize:
self.norm1 = nn.InstanceNorm1d(dim_in)
self.norm2 = nn.InstanceNorm1d(dim_in)
if self.learned_sc:
self.conv1x1 = nn.Conv1d(dim_in, dim_out, 1, 1, 0, bias=False)
def _shortcut(self, x):
if self.learned_sc:
x = self.conv1x1(x)
if self.downsample:
x = self.downsample(x)
return x
def _residual(self, x):
if self.normalize:
x = self.norm1(x)
x = self.actv(x)
x = self.conv1(x)
x = self.downsample(x)
if self.normalize:
x = self.norm2(x)
x = self.actv(x)
x = self.conv2(x)
return x
def forward(self, x):
x = self._shortcut(x) + self._residual(x)
return x / math.sqrt(2) # unit variance
class AdaIN(nn.Module):
def __init__(self, style_dim, num_features):
super().__init__()
self.norm = nn.InstanceNorm2d(num_features)
self.fc = nn.Linear(style_dim, num_features * 2)
# self.emb=torch.nn.Linear(num_features,style_dim)
self.spk_emb = torch.nn.Parameter(torch.randn([1, 1000, style_dim]))
self.mha = torch.nn.MultiheadAttention(
style_dim, 4, bias=False, batch_first=True)
def forward(self, x, s: torch.Tensor):
s = s.unsqueeze(1)
B = s.size(0)
key = self.spk_emb.repeat(B, 1, 1)
value, _ = self.mha(s, key, key)
h = self.fc(value).squeeze(dim=1)
h = h.view(h.size(0), h.size(1), 1, 1)
gamma, beta = torch.chunk(h, chunks=2, dim=1)
return (1 + gamma) * self.norm(x) + beta
class AdainResBlk(nn.Module):
def __init__(self,
dim_in,
dim_out,
style_dim=256,
w_hpf=0,
actv=nn.LeakyReLU(0.2),
upsample='none'):
super().__init__()
self.w_hpf = w_hpf
self.actv = actv
self.upsample = UpSample(upsample)
# self.norm=norm
self.learned_sc = dim_in != dim_out
self.conv1 = nn.Conv2d(dim_in, dim_out, 3, 1, 1)
self.conv2 = nn.Conv2d(dim_out, dim_out, 3, 1, 1)
self.norm1 = AdaIN(style_dim, dim_in)
self.norm2 = AdaIN(style_dim, dim_out)
if self.learned_sc:
self.conv1x1 = nn.Conv2d(dim_in, dim_out, 1, 1, 0, bias=False)
def _shortcut(self, x):
x = self.upsample(x)
if self.learned_sc:
x = self.conv1x1(x)
return x
def _residual(self, x, s):
x = self.norm1(x, s)
x = self.actv(x)
x = self.upsample(x)
x = self.conv1(x)
x = self.norm2(x, s)
x = self.actv(x)
x = self.conv2(x)
return x
def forward(self, x, s):
out = self._residual(x, s)
if self.w_hpf == 0:
out = (out + self._shortcut(x)) / math.sqrt(2)
return out
class HighPass(nn.Module):
def __init__(self, w_hpf):
super(HighPass, self).__init__()
self.filter = torch.tensor([[-1, -1, -1], [-1, 8., -1], [-1, -1, -1]
]) / w_hpf
def forward(self, x):
filter = self.filter.unsqueeze(0).unsqueeze(1).repeat(
x.size(1), 1, 1, 1)
return F.conv2d(x, filter, padding=1, groups=x.size(1))
class UnetMapping(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
max_conv_dim=48 * 8,
repeat_num=4):
super().__init__()
self.stem = nn.Conv2d(1, dim_in, 3, 1, 1)
self.encode = nn.ModuleList()
self.decode = nn.ModuleList()
self.to_out = nn.Sequential(
nn.InstanceNorm2d(dim_in, affine=True), nn.LeakyReLU(0.2),
nn.Conv2d(dim_in, 1, 1, 1, 0))
for lid in range(repeat_num):
if lid in [1, 3]:
_downtype = 'timepreserve'
else:
_downtype = 'half'
dim_out = min(dim_in * 2, max_conv_dim)
self.encode.append(
ResBlk(
dim_in,
dim_out,
style_dim=style_dim,
normalize=True,
downsample=_downtype))
self.decode.insert(0,
AdainResBlk(
dim_out,
dim_in,
style_dim,
w_hpf=0,
upsample=_downtype)) # stack-like
dim_in = dim_out
# bottleneck blocks (encoder)
for _ in range(repeat_num):
self.encode.append(
ResBlk(dim_out, dim_out, style_dim=style_dim, normalize=True))
# bottleneck blocks (decoder)
for _ in range(repeat_num):
self.decode.insert(0, AdainResBlk(dim_out, dim_out, style_dim))
# self.proj = nn.Conv1d(80, 80 * 2, 1)
self.style_extractor = StyleEncoder(dim_in, style_dim, num_domains=8)
self.flow = FlowBlocks(256, style_dim, 5, 1, 4)
def forward(self, x: torch.Tensor, c: torch.Tensor):
s = self.style_extractor(c)
x = self.stem(x)
for block in self.encode:
x = block(x, s)
for block in self.decode:
x = block(x, s)
out = self.to_out(x).squeeze(dim=1)
out = self.flow(out, reverse=True)
return out
class MaskMapping(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
max_conv_dim=48 * 8,
repeat_num=4):
super().__init__()
self.stem = nn.Conv2d(1, dim_in, 3, 1, 1)
self.encode = nn.ModuleList()
self.decode = nn.ModuleList()
self.to_out = nn.Sequential(
nn.InstanceNorm2d(dim_in, affine=True), nn.LeakyReLU(0.2),
nn.Conv2d(dim_in, 1, 1, 1, 0))
for lid in range(repeat_num):
if lid in [1, 3]:
_downtype = 'timepreserve'
else:
_downtype = 'half'
dim_out = min(dim_in * 2, max_conv_dim)
self.encode.append(
ResBlk(
dim_in,
dim_out,
style_dim=style_dim,
normalize=True,
downsample=_downtype))
self.decode.insert(0,
AdainResBlk(
dim_out,
dim_in,
style_dim,
w_hpf=0,
upsample=_downtype)) # stack-like
dim_in = dim_out
# bottleneck blocks (encoder)
for _ in range(repeat_num):
self.encode.append(
ResBlk(dim_out, dim_out, style_dim=style_dim, normalize=True))
# bottleneck blocks (decoder)
for _ in range(repeat_num):
self.decode.insert(0, AdainResBlk(dim_out, dim_out, style_dim))
# self.proj = nn.Conv1d(80, 80 * 2, 1)
self.style_extractor = StyleEncoder(dim_in, style_dim, num_domains=8)
self.flow = FlowBlocks(256, style_dim, 5, 1, 4)
def forward(self, x: torch.Tensor, c: torch.Tensor):
s = self.style_extractor(c)
t = c.size(-1)
x = torch.cat((c.unsqueeze(1), x), dim=-1)
x = self.stem(x)
for block in self.encode:
x = block(x, s)
for block in self.decode:
x = block(x, s)
out = self.to_out(x).squeeze(dim=1)
out = self.flow(out, reverse=True)
out = out[:, :, t:]
return out
class StyleEncoder(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
num_domains=4,
max_conv_dim=384):
super().__init__()
blocks = []
blocks += [nn.Conv1d(256, dim_in, 3, 1, 1)]
repeat_num = 4
for _ in range(repeat_num):
dim_out = min(dim_in * 2, max_conv_dim)
blocks += [ResBlk1D(dim_in, dim_out, downsample='none')]
dim_in = dim_out
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.Conv1d(dim_out, dim_out, 5, 1, 0)]
blocks += [nn.AdaptiveAvgPool1d(1)]
blocks += [nn.LeakyReLU(0.2)]
self.shared = nn.Sequential(*blocks)
self.unshared = nn.ModuleList()
for _ in range(num_domains):
self.unshared += [nn.Linear(dim_out, style_dim // num_domains)]
def forward(self, x):
h = self.shared(x)
h = h.view(h.size(0), -1)
out = []
for layer in self.unshared:
out += [layer(h)]
out = torch.cat(out, dim=-1) # (batch, num_domains, style_dim)
return out
class ResidualCouplingLayer(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=0,
gin_channels=0,
mean_only=False,
):
assert channels % 2 == 0, 'channels should be divisible by 2'
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.half_channels = channels // 2
self.mean_only = mean_only
self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1)
self.enc = WN(
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
p_dropout=p_dropout,
gin_channels=gin_channels,
)
self.post = nn.Conv1d(hidden_channels,
self.half_channels * (2 - mean_only), 1)
self.post.weight.data.zero_()
self.post.bias.data.zero_()
def forward(self, x, reverse=False):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0)
h = self.enc(h)
stats = self.post(h)
if not self.mean_only:
m, logs = torch.split(stats, [self.half_channels] * 2, 1)
# print(m)
# print(logs)
else:
m = stats
logs = torch.zeros_like(m)
if not reverse:
x1 = m + x1 * torch.exp(logs)
x = torch.cat([x0, x1], 1)
logdet = torch.sum(logs, [1, 2])
return x, logdet
else:
x1 = (x1 - m) * torch.exp(-logs)
x = torch.cat([x0, x1], 1)
return x
def fused_add_tanh_sigmoid_multiply(input_a, n_channels):
n_channels_int = n_channels[0]
in_act = input_a
t_act = torch.tanh(in_act[:, :n_channels_int, :])
s_act = torch.sigmoid(in_act[:, n_channels_int:, :])
acts = t_act * s_act
return acts
class WN(nn.Module):
def __init__(
self,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
p_dropout=0,
):
super(WN, self).__init__()
assert kernel_size % 2 == 1
self.hidden_channels = hidden_channels
self.kernel_size = (kernel_size, )
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.p_dropout = p_dropout
self.in_layers = nn.ModuleList()
self.res_skip_layers = nn.ModuleList()
self.drop = nn.Dropout(p_dropout)
cond_layer = nn.Conv1d(hidden_channels, 2 * hidden_channels * n_layers,
1)
self.cond_layer = cond_layer
for i in range(n_layers):
dilation = dilation_rate**i
padding = int((kernel_size * dilation - dilation) / 2)
in_layer = nn.Conv1d(
hidden_channels,
2 * hidden_channels,
kernel_size,
dilation=dilation,
padding=padding,
)
self.in_layers.append(in_layer)
# last one is not necessary
if i < n_layers - 1:
res_skip_channels = 2 * hidden_channels
else:
res_skip_channels = hidden_channels
res_skip_layer = nn.Conv1d(hidden_channels, res_skip_channels, 1)
self.res_skip_layers.append(res_skip_layer)
def forward(self, x, **kwargs):
output = torch.zeros_like(x)
n_channels_tensor = torch.IntTensor([self.hidden_channels])
for i in range(self.n_layers):
x_in = self.in_layers[i](x)
acts = fused_add_tanh_sigmoid_multiply(x_in, n_channels_tensor)
acts = self.drop(acts)
res_skip_acts = self.res_skip_layers[i](acts)
if i < self.n_layers - 1:
res_acts = res_skip_acts[:, :self.hidden_channels, :]
x = (x + res_acts)
output = output + res_skip_acts[:, self.hidden_channels:, :]
else:
output = output + res_skip_acts
return output
class Discriminator(nn.Module):
def __init__(self,
dim_in=48,
num_domains=2,
max_conv_dim=384,
repeat_num=4):
super().__init__()
# real/fake discriminator
self.dis = Discriminator2d(
dim_in=dim_in,
num_domains=num_domains,
max_conv_dim=max_conv_dim,
repeat_num=repeat_num)
# adversarial classifier
self.cls = Discriminator2d(
dim_in=dim_in,
num_domains=num_domains,
max_conv_dim=max_conv_dim,
repeat_num=repeat_num)
self.num_domains = num_domains
def forward(self, x, y):
return self.dis(x, y)
def classifier(self, x):
return self.cls.get_feature(x)
class LinearNorm(torch.nn.Module):
def __init__(self, in_dim, out_dim, bias=True, w_init_gain='linear'):
super(LinearNorm, self).__init__()
self.linear_layer = torch.nn.Linear(in_dim, out_dim, bias=bias)
torch.nn.init.xavier_uniform_(
self.linear_layer.weight,
gain=torch.nn.init.calculate_gain(w_init_gain))
def forward(self, x):
return self.linear_layer(x)
class Discriminator2d(nn.Module):
def __init__(self,
dim_in=48,
num_domains=2,
max_conv_dim=384,
repeat_num=4):
super().__init__()
blocks = []
blocks += [nn.Conv2d(1, dim_in, 3, 1, 1)]
for lid in range(repeat_num):
dim_out = min(dim_in * 2, max_conv_dim)
blocks += [ResBlk(dim_in, dim_out, downsample='half')]
dim_in = dim_out
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.Conv2d(dim_out, dim_out, 5, 1, 0)]
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.AdaptiveAvgPool2d(1)]
blocks += [nn.Conv2d(dim_out, num_domains, 1, 1, 0)]
self.main = nn.Sequential(*blocks)
def get_feature(self, x):
out = self.main(x)
out = out.view(out.size(0), -1) # (batch, num_domains)
return out
def forward(self, x):
out = self.get_feature(x)
return out
class FlowBlocks(nn.Module):
def __init__(
self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
n_flows=4,
gin_channels=0,
):
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.n_flows = n_flows
self.gin_channels = gin_channels
self.flows = nn.ModuleList()
for i in range(n_flows):
self.flows.append(
ResidualCouplingLayer(
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=gin_channels,
mean_only=False,
))
self.flows.append(Flip())
def forward(self, x, reverse=False):
if not reverse:
for flow in self.flows:
x, log = flow(x, reverse=reverse)
return x, log
else:
for flow in reversed(self.flows):
x = flow(x, reverse=reverse)
return x
class Flip(nn.Module):
def forward(self, x, *args, reverse=False, **kwargs):
x = torch.flip(x, [1])
if not reverse:
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)
return x, logdet
else:
return x
def print_network(model):
"""Print out the network information."""
num_params = 0
for p in model.parameters():
num_params += p.numel()
print('The number of parameters: {}'.format(num_params))
if __name__ == '__main__':
generator = UnetMapping(48, 256)
a = torch.randn([1, 1, 256, 224])
c = torch.randn([1, 256, 1000])
b = generator(a, c)
print(b.shape)
print_network(generator)

View File

@@ -0,0 +1,592 @@
# from https://github.com/jik876/hifi-gan
import logging
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Conv1d, ConvTranspose1d
LRELU_SLOPE = 0.1
def get_sinusoid_encoding_table(n_position, d_hid, padding_idx=None):
"""Sinusoid position encoding table"""
def cal_angle(position, hid_idx):
return position / np.power(10000, 2 * (hid_idx // 2) / d_hid)
def get_posi_angle_vec(position):
return [cal_angle(position, hid_j) for hid_j in range(d_hid)]
sinusoid_table = np.array(
[get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
if padding_idx is not None:
# zero vector for padding dimension
sinusoid_table[padding_idx] = 0.0
return torch.FloatTensor(sinusoid_table)
def overlap_and_add(signal, frame_step):
outer_dimensions = signal.size()[:-2]
frames, frame_length = signal.size()[-2:]
# gcd=Greatest Common Divisor
subframe_length = math.gcd(frame_length, frame_step)
subframe_step = frame_step // subframe_length
subframes_per_frame = frame_length // subframe_length
output_size = frame_step * (frames - 1) + frame_length
output_subframes = output_size // subframe_length
subframe_signal = signal.view(*outer_dimensions, -1, subframe_length)
frame = torch.arange(0, output_subframes).unfold(0, subframes_per_frame,
subframe_step)
frame = signal.new_tensor(frame).long() # signal may in GPU or CPU
frame = frame.contiguous().view(-1)
result = signal.new_zeros(*outer_dimensions, output_subframes,
subframe_length)
device_of_result = result.device
result.index_add_(-2, frame.to(device_of_result), subframe_signal)
result = result.view(*outer_dimensions, -1)
return result
class LastLayer(nn.Module):
def __init__(self, in_channels, out_channels, nonlinear_activation,
nonlinear_activation_params, pad, kernel_size, pad_params,
bias):
super(LastLayer, self).__init__()
self.activation = getattr(
torch.nn, nonlinear_activation)(**nonlinear_activation_params)
self.pad = getattr(torch.nn, pad)((kernel_size - 1) // 2, **pad_params)
self.conv = torch.nn.Conv1d(
in_channels, out_channels, kernel_size, bias=bias)
def forward(self, x):
x = self.activation(x)
x = self.pad(x)
x = self.conv(x)
return x
class Conv1d1x1(Conv1d):
"""1x1 Conv1d with customized initialization."""
def __init__(self, in_channels, out_channels, bias):
"""Initialize 1x1 Conv1d module."""
super(Conv1d1x1, self).__init__(
in_channels,
out_channels,
kernel_size=1,
padding=0,
dilation=1,
bias=bias)
class LastLinear(nn.Module):
def __init__(self, hidden_channel, out_channel, bias=True):
super(LastLinear, self).__init__()
self.activation = nn.LeakyReLU(negative_slope=0.2)
self.bn_1 = nn.BatchNorm1d(hidden_channel)
self.linear_1 = Conv1d1x1(hidden_channel, hidden_channel, bias=bias)
self.bn_2 = nn.BatchNorm1d(hidden_channel)
self.linear_2 = Conv1d1x1(hidden_channel, out_channel, bias=bias)
def forward(self, x):
x = self.activation(x)
x = self.bn_1(x)
x = self.linear_1(x)
x = self.activation(x)
x = self.bn_2(x)
x = self.linear_2(x)
return x
class Stretch2d(torch.nn.Module):
"""Stretch2d module."""
def __init__(self, x_scale, y_scale, mode='nearest'):
"""Initialize Stretch2d module.
Args:
x_scale (int): X scaling factor (Time axis in spectrogram).
y_scale (int): Y scaling factor (Frequency axis in spectrogram).
mode (str): Interpolation mode.
"""
super(Stretch2d, self).__init__()
self.x_scale = x_scale
self.y_scale = y_scale
self.mode = mode
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, C, F, T).
Returns:
Tensor: Interpolated tensor (B, C, F * y_scale, T * x_scale),
"""
return F.interpolate(
x, scale_factor=(self.y_scale, self.x_scale), mode=self.mode)
class UpsampleLayer(nn.Module):
def __init__(self,
in_channel,
out_channel,
upsample_rate,
kernel_size,
stride,
padding,
dilation=1,
bias=True):
super(UpsampleLayer, self).__init__()
self.upsample = Stretch2d(upsample_rate, 1, mode='nearest')
self.conv = nn.Conv1d(
in_channel,
out_channel,
kernel_size,
stride,
padding,
dilation=dilation,
bias=bias)
def forward(self, x):
x = self.upsample(x.unsqueeze(1))
x = self.conv(x.squeeze(1))
return x
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
class ResBlock1(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5), bias=True):
super(ResBlock1, self).__init__()
self.convs1 = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[2],
padding=get_padding(kernel_size, dilation[2]),
bias=bias),
])
self.convs2 = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
])
def forward(self, x):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = c1(xt)
xt = F.leaky_relu(xt, LRELU_SLOPE)
xt = c2(xt)
x = xt + x
return x
class ResBlock2(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3), bias=True):
super(ResBlock2, self).__init__()
self.convs = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
bias=bias),
])
def forward(self, x):
for c in self.convs:
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = c(xt)
x = xt + x
return x
class BasisSignalLayer(nn.Module):
"""Basis Signal"""
def __init__(self, basis_signal_weight, L=64):
super(BasisSignalLayer, self).__init__()
self.layer = nn.Linear(
basis_signal_weight.size(0),
basis_signal_weight.size(1),
bias=False)
self.layer.weight = nn.Parameter(basis_signal_weight)
self.L = L
def forward(self, weight):
source = self.layer(weight)
source = overlap_and_add(source, self.L // 2)
return source
"""Residual stack module in MelGAN."""
class CausalConv1d(torch.nn.Module):
"""CausalConv1d module with customized initialization."""
def __init__(self,
in_channels,
out_channels,
kernel_size,
dilation=1,
bias=True,
pad='ConstantPad1d',
pad_params={'value': 0.0}):
"""Initialize CausalConv1d module."""
super(CausalConv1d, self).__init__()
self.pad = getattr(torch.nn, pad)((kernel_size - 1) * dilation,
**pad_params)
self.conv = torch.nn.Conv1d(
in_channels,
out_channels,
kernel_size,
dilation=dilation,
bias=bias)
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, in_channels, T).
Returns:
Tensor: Output tensor (B, out_channels, T).
"""
return self.conv(self.pad(x))[:, :, :x.size(2)]
class CausalConvTranspose1d(torch.nn.Module):
"""CausalConvTranspose1d module with customized initialization."""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
bias=True):
"""Initialize CausalConvTranspose1d module."""
super(CausalConvTranspose1d, self).__init__()
self.deconv = torch.nn.ConvTranspose1d(
in_channels, out_channels, kernel_size, stride, bias=bias)
self.stride = stride
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, in_channels, T_in).
Returns:
Tensor: Output tensor (B, out_channels, T_out).
"""
return self.deconv(x)[:, :, :-self.stride]
class ResidualStack(torch.nn.Module):
"""Residual stack module introduced in MelGAN."""
def __init__(
self,
kernel_size=3,
channels=32,
dilation=1,
bias=True,
nonlinear_activation='LeakyReLU',
nonlinear_activation_params={'negative_slope': 0.2},
pad='ReflectionPad1d',
pad_params={},
use_causal_conv=False,
):
"""Initialize ResidualStack module.
Args:
kernel_size (int): Kernel size of dilation convolution layer.
channels (int): Number of channels of convolution layers.
dilation (int): Dilation factor.
bias (bool): Whether to add bias parameter in convolution layers.
nonlinear_activation (str): Activation function module name.
nonlinear_activation_params (dict): Hyperparameters for
pad (str): Padding function module name before dilated
pad_params (dict): Hyperparameters for padding function.
use_causal_conv (bool): Whether to use causal convolution.
"""
super(ResidualStack, self).__init__()
# defile residual stack part
if not use_causal_conv:
assert (kernel_size
- 1) % 2 == 0, 'Not support even number kernel size.'
self.stack = torch.nn.Sequential(
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
getattr(torch.nn, pad)((kernel_size - 1) // 2 * dilation,
**pad_params),
torch.nn.Conv1d(
channels,
channels,
kernel_size,
dilation=dilation,
bias=bias),
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
torch.nn.Conv1d(channels, channels, 1, bias=bias),
)
else:
self.stack = torch.nn.Sequential(
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
CausalConv1d(
channels,
channels,
kernel_size,
dilation=dilation,
bias=bias,
pad=pad,
pad_params=pad_params),
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
torch.nn.Conv1d(channels, channels, 1, bias=bias),
)
# defile extra layer for skip connection
self.skip_layer = torch.nn.Conv1d(channels, channels, 1, bias=bias)
def forward(self, c):
"""Calculate forward propagation.
Args:
c (Tensor): Input tensor (B, channels, T).
Returns:
Tensor: Output tensor (B, chennels, T).
"""
return self.stack(c) + self.skip_layer(c)
class HiFiGANGenerator(torch.nn.Module):
def __init__(
self,
input_channels=80,
resblock_kernel_sizes=[3, 7, 11],
upsample_rates=[5, 4, 4, 2],
upsample_initial_channel=256,
resblock_type='1',
upsample_kernel_sizes=[10, 8, 8, 4],
resblock_dilation_sizes=[[1, 3, 5], [1, 3, 5], [1, 3, 5]],
transposedconv=True,
weight_norm=True,
bias=True,
):
super(HiFiGANGenerator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.conv_pre = Conv1d(
input_channels,
upsample_initial_channel,
7,
1,
padding=3,
bias=bias)
resblock = ResBlock1 if resblock_type == '1' else ResBlock2
self.ups = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(
UpsampleLayer(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
upsample_rate=u,
kernel_size=k,
stride=1,
padding=k // 2,
bias=bias) if transposedconv is False else ConvTranspose1d(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
k,
u,
padding=(u // 2 + u % 2),
output_padding=u % 2,
bias=bias))
self.resblocks = nn.ModuleList()
for i in range(len(self.ups)):
ch = upsample_initial_channel // (2**(i + 1))
for j, (k, d) in enumerate(
zip(resblock_kernel_sizes, resblock_dilation_sizes)):
self.resblocks.append(resblock(ch, k, d, bias=bias))
self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=bias)
# apply weight norm
if weight_norm:
self.apply_weight_norm()
# reset parameters
self.reset_parameters()
def remove_weight_norm(self):
"""Remove weight normalization module from all of the layers."""
def _remove_weight_norm(m):
try:
logging.debug(f'Weight norm is removed from {m}.')
torch.nn.utils.remove_weight_norm(m)
except ValueError: # this module didn't have weight norm
return
self.apply(_remove_weight_norm)
def apply_weight_norm(self):
"""Apply weight normalization module from all of the layers."""
def _apply_weight_norm(m):
if isinstance(m, torch.nn.Conv1d) or isinstance(
m, torch.nn.ConvTranspose1d):
torch.nn.utils.weight_norm(m)
logging.debug(f'Weight norm is applied to {m}.')
self.apply(_apply_weight_norm)
def reset_parameters(self):
"""Reset parameters.
This initialization follows official implementation manner.
https://github.com/descriptinc/melgan-neurips/blob/master/mel2wav/modules.py
"""
def _reset_parameters(m):
if isinstance(m, torch.nn.Conv1d) or isinstance(
m, torch.nn.ConvTranspose1d):
m.weight.data.normal_(0.0, 0.01)
logging.debug(f'Reset parameters in {m}.')
self.apply(_reset_parameters)
def forward(self, x):
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
# x = torch.tanh(x)
return x
def inference(self, x):
if not isinstance(x, torch.Tensor):
x = torch.tensor(
x, dtype=torch.float).to(next(self.parameters()).device)
x = x.transpose(1, 0).unsqueeze(0)
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
# x = torch.tanh(x)
return x
if __name__ == '__main__':
import thop
layer = HiFiGANGenerator(
input_channels=256,
upsample_initial_channel=256,
upsample_rates=[4, 4, 4, 5],
upsample_kernel_sizes=[8, 8, 8, 10])
a = torch.randn([1, 256, 50])
b = layer(a)
fp, p = thop.profile(layer, [a])
print(b.shape)
print(fp / 1024 / 1024 / 1024)
print(p / 1024)
count = 0
for p in layer.parameters():
count += p.numel()
print(count)

View File

@@ -0,0 +1,68 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from typing import Dict
import librosa
import soundfile as sf
import torch
from torchaudio.transforms import Spectrogram
from modelscope.metainfo import Models
from modelscope.models import TorchModel
from modelscope.models.audio.ssr.models.hifigan import HiFiGANGenerator
from modelscope.models.audio.ssr.models.Unet import MaskMapping
from modelscope.models.base import Tensor
from modelscope.models.builder import MODELS
from modelscope.utils.constant import Tasks
@MODELS.register_module(
Tasks.speech_super_resolution, module_name=Models.hifissr)
class HifiSSR(TorchModel):
r"""A decorator of FRCRN for integrating into modelscope framework"""
def __init__(self, model_dir: str, *args, **kwargs):
"""initialize the frcrn model from the `model_dir` path.
Args:
model_dir (str): the model path.
"""
super().__init__(model_dir, *args, **kwargs)
self.device = kwargs.get('device', 'cpu')
self.front = Spectrogram(512, 512, int(48000 * 0.01)).to(self.device)
self.vocoder = HiFiGANGenerator(
input_channels=256,
upsample_rates=[5, 4, 4, 3, 2],
upsample_kernel_sizes=[10, 8, 8, 6, 4],
weight_norm=False,
upsample_initial_channel=1024).to(self.device)
self.mapping = MaskMapping(32, 256).to(self.device)
model_bin_file = os.path.join(model_dir, 'checkpoint.pt')
if os.path.exists(model_bin_file):
checkpoint = torch.load(model_bin_file, map_location=self.device)
self.vocoder.load_state_dict(checkpoint['voc_state_dict'])
self.vocoder.eval()
self.mapping.load_state_dict(checkpoint['unet_state_dict'])
self.mapping.eval()
def forward(self, inputs: Dict[str, Tensor]) -> Dict[str, Tensor]:
ref_fp = inputs['ref_wav']
source_fp = inputs['source_wav']
out_fp = inputs['out_wav']
sr = 48000
wav = librosa.load(source_fp, sr=sr)[0]
source_mel = self.front(
torch.FloatTensor(wav).unsqueeze(0).to(self.device))[:, :-1]
source_mel = torch.log10(source_mel + 1e-6)
source_mel = source_mel.unsqueeze(0)
ref_wav = librosa.load(ref_fp, sr=sr)[0]
ref_mel = self.front(
torch.FloatTensor(ref_wav).unsqueeze(0).to(self.device))[:, :-1]
ref_mel = torch.log10(ref_mel + 1e-6)
with torch.no_grad():
g_out = self.mapping(source_mel, ref_mel)
g_out_wav = self.vocoder(g_out)
g_out_wav = g_out_wav.flatten()
if os.path.exists(out_fp):
sf.write(out_fp, g_out_wav.cpu().data.numpy(), sr)
return g_out_wav.cpu().data.numpy()

View File

@@ -0,0 +1,20 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .converter import UnetVC
else:
_import_structure = {
'unetvc_16k': ['UnetVC'],
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,74 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from typing import Dict
import soundfile as sf
import torch
from modelscope.metainfo import Models
from modelscope.models import TorchModel
from modelscope.models.audio.vc.src.encoder import Encoder
from modelscope.models.audio.vc.src.sv_models.DTDNN import \
SpeakerVerificationCamplus
from modelscope.models.audio.vc.src.vocoder import (ConditionGenerator,
HiFiGANGenerator)
from modelscope.models.base import Tensor
from modelscope.models.builder import MODELS
from modelscope.utils.constant import Tasks
@MODELS.register_module(Tasks.voice_conversion, module_name=Models.unetvc_16k)
class UnetVC(TorchModel):
r"""A decorator of FRCRN for integrating into modelscope framework"""
def __init__(self, model_dir: str, *args, **kwargs):
"""initialize the frcrn model from the `model_dir` path.
Args:
model_dir (str): the model path.
"""
super().__init__(model_dir, *args, **kwargs)
device = kwargs.get('device', 'cpu')
self.device = device
static_path = os.path.join(model_dir, 'static')
self.encoder = Encoder(
os.path.join(static_path, 'encoder_am.mvn'),
os.path.join(static_path, 'encoder.onnx'))
self.spk_emb = SpeakerVerificationCamplus(
os.path.join(static_path, 'campplus_cn_common.bin'), device)
self.converter = ConditionGenerator(
unet=True, extra_info=True).to(device)
G_path = os.path.join(static_path, 'converter.pth')
self.converter.load_state_dict(
torch.load(G_path, map_location=lambda storage, loc: storage))
self.converter.eval()
self.vocoder = HiFiGANGenerator().to(device)
self.vocoder.load_state_dict(
torch.load(
os.path.join(static_path, 'vocoder.pth'),
map_location=self.device)['state_dict'])
self.vocoder.eval()
self.vocoder.remove_weight_norm()
def forward(self, inputs: Dict[str, Tensor]) -> Dict[str, Tensor]:
target_wav_path = inputs['target_wav']
source_wav_path = inputs['source_wav']
save_wav_path = inputs['save_path']
with torch.no_grad():
source_enc = self.encoder.inference(source_wav_path).to(
self.device)
spk_emb = self.spk_emb.forward(target_wav_path).to(self.device)
style_mc = self.encoder.get_feats(target_wav_path).to(self.device)
coded_sp_converted_norm = self.converter(source_enc, spk_emb,
style_mc)
wav = self.vocoder(coded_sp_converted_norm.permute([0, 2, 1]))
if os.path.exists(save_wav_path):
sf.write(save_wav_path,
wav.flatten().cpu().data.numpy(), 16000)
return wav.flatten().cpu().data.numpy()

View File

@@ -0,0 +1,581 @@
"""
StarGAN v2
Copyright (c) 2020-present NAVER Corp.
This work is licensed under the Creative Commons Attribution-NonCommercial
4.0 International License. To view a copy of this license, visit
http://creativecommons.org/licenses/by-nc/4.0/ or send a letter to
Creative Commons, PO Box 1866, Mountain View, CA 94042, USA.
"""
import copy
import math
import os
import os.path as osp
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
class DownSample(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
elif self.layer_type == 'timepreserve':
return F.avg_pool2d(x, (2, 1))
elif self.layer_type == 'half':
return F.avg_pool2d(x, 2)
else:
raise RuntimeError(
'Got unexpected donwsampletype %s, expected is [none, timepreserve, half]'
% self.layer_type)
class UpSample(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
elif self.layer_type == 'timepreserve':
return F.interpolate(x, scale_factor=(2, 1), mode='nearest')
elif self.layer_type == 'half':
return F.interpolate(x, scale_factor=2, mode='nearest')
else:
raise RuntimeError(
'Got unexpected upsampletype %s, expected is [none, timepreserve, half]'
% self.layer_type)
class ResBlk(nn.Module):
def __init__(self,
dim_in,
dim_out,
actv=nn.LeakyReLU(0.2),
normalize=False,
out_for_onnx=False,
downsample='none'):
super().__init__()
self.actv = actv
self.normalize = normalize
self.downsample = DownSample(downsample)
self.learned_sc = dim_in != dim_out
self.conv1 = nn.Conv2d(dim_in, dim_in, 3, 1, 1)
self.conv2 = nn.Conv2d(dim_in, dim_out, 3, 1, 1)
if self.normalize:
self.norm1 = nn.InstanceNorm2d(dim_in)
self.norm2 = nn.InstanceNorm2d(dim_in)
if out_for_onnx:
self.norm1.training = False
self.norm2.training = False
# self.norm1 = AdaIN(dim_in,dim_in)
# self.norm2 = AdaIN(dim_in,dim_in)
if self.learned_sc:
self.conv1x1 = nn.Conv2d(dim_in, dim_out, 1, 1, 0, bias=False)
def _shortcut(self, x):
if self.learned_sc:
x = self.conv1x1(x)
if self.downsample:
x = self.downsample(x)
return x
def _residual(self, x):
if self.normalize:
x = self.norm1(x)
x = self.actv(x)
x = self.conv1(x)
x = self.downsample(x)
if self.normalize:
x = self.norm2(x)
x = self.actv(x)
x = self.conv2(x)
return x
def forward(self, x):
x = self._shortcut(x) + self._residual(x)
return x / math.sqrt(2) # unit variance
class AdaIN(nn.Module):
def __init__(self,
style_dim,
num_features,
out_for_onnx=False,
device=None):
super().__init__()
self.norm = nn.InstanceNorm2d(num_features)
if out_for_onnx:
self.norm.training = False
self.fc = nn.Linear(style_dim, num_features * 2)
self.emb = torch.nn.Linear(192, style_dim)
self.spk_emb = torch.nn.Parameter(torch.randn([1, 1000, style_dim]))
def forward(self, x, s: torch.Tensor):
s = self.emb(s)
s = s.unsqueeze(1)
score = torch.sum(s * self.spk_emb, dim=-1)
score = torch.softmax(score, dim=-1).unsqueeze(-1)
value = torch.sum(self.spk_emb * score, dim=1)
h = self.fc(value)
h = h.view(h.size(0), h.size(1), 1, 1)
gamma, beta = torch.chunk(h, chunks=2, dim=1)
# print(x.shape)
return (1 + gamma) * self.norm(x) + beta
class AdainResBlk(nn.Module):
def __init__(self,
dim_in,
dim_out,
style_dim=64,
w_hpf=0,
actv=nn.LeakyReLU(0.2),
upsample='none',
out_for_onnx=False):
super().__init__()
self.w_hpf = w_hpf
self.actv = actv
self.upsample = UpSample(upsample)
# self.norm=norm
self.learned_sc = dim_in != dim_out
self.conv1 = nn.Conv2d(dim_in, dim_out, 3, 1, 1)
self.conv2 = nn.Conv2d(dim_out, dim_out, 3, 1, 1)
self.norm1 = AdaIN(style_dim, dim_in, out_for_onnx)
self.norm2 = AdaIN(style_dim, dim_out, out_for_onnx)
if self.learned_sc:
self.conv1x1 = nn.Conv2d(dim_in, dim_out, 1, 1, 0, bias=False)
def _shortcut(self, x):
x = self.upsample(x)
if self.learned_sc:
x = self.conv1x1(x)
return x
def _residual(self, x, s):
x = self.norm1(x, s)
x = self.actv(x)
x = self.upsample(x)
x = self.conv1(x)
x = self.norm2(x, s)
x = self.actv(x)
x = self.conv2(x)
return x
def forward(self, x, s):
out = self._residual(x, s)
if self.w_hpf == 0:
out = (out + self._shortcut(x)) / math.sqrt(2)
return out
class HighPass(nn.Module):
def __init__(self, w_hpf):
super(HighPass, self).__init__()
self.filter = torch.tensor([[-1, -1, -1], [-1, 8.0, -1], [-1, -1, -1]
]) / w_hpf
def forward(self, x):
filter = self.filter.unsqueeze(0).unsqueeze(1).repeat(
x.size(1), 1, 1, 1)
return F.conv2d(x, filter, padding=1, groups=x.size(1))
class Generator(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
max_conv_dim=48 * 8,
out_for_onnx=False):
super().__init__()
self.out_for_onnx = out_for_onnx
self.stem = nn.Conv2d(1, dim_in, 3, 1, 1)
self.encode = nn.ModuleList()
self.decode = nn.ModuleList()
self.to_out = nn.Sequential(
nn.InstanceNorm2d(dim_in, affine=True), nn.LeakyReLU(0.2),
nn.Conv2d(dim_in, 1, 1, 1, 0))
if out_for_onnx:
for m in self.to_out.modules():
if isinstance(m, torch.nn.InstanceNorm2d):
m.eval()
# self.to_out.training=False
# down/up-sampling blocks
# self.spk_embedding=torch.nn.Embedding(num_spk,style_dim)
repeat_num = 4 # int(np.log2(img_size)) - 4
for lid in range(repeat_num):
if lid in [1, 3]:
_downtype = 'timepreserve'
else:
_downtype = 'half'
dim_out = min(dim_in * 2, max_conv_dim)
self.encode.append(
ResBlk(
dim_in,
dim_out,
normalize=True,
downsample=_downtype,
out_for_onnx=out_for_onnx))
self.decode.insert(0,
AdainResBlk(
dim_out,
dim_in,
style_dim,
w_hpf=1,
upsample=_downtype,
out_for_onnx=out_for_onnx)) # stack-like
dim_in = dim_out
# bottleneck blocks (encoder)
for _ in range(2):
self.encode.append(
ResBlk(
dim_out,
dim_out,
normalize=True,
out_for_onnx=out_for_onnx))
# bottleneck blocks (decoder)
for _ in range(2):
self.decode.insert(
0,
AdainResBlk(
dim_out,
dim_out,
style_dim,
w_hpf=1,
out_for_onnx=out_for_onnx))
def forward(self, x: torch.Tensor, c):
x = self.stem(x)
for block in self.encode:
x = block(x)
for block in self.decode:
x = block(x, c)
out = self.to_out(x)
return out
class Generator2(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
max_conv_dim=48 * 8,
num_spk=1883,
w_hpf=1,
F0_channel=0,
out_for_onnx=False):
super().__init__()
self.out_for_onnx = out_for_onnx
self.stem = nn.Conv2d(1, dim_in, 3, 1, 1)
self.encode = nn.ModuleList()
self.decode = nn.ModuleList()
self.to_out = nn.Sequential(
nn.InstanceNorm2d(dim_in, affine=True), nn.LeakyReLU(0.2),
nn.Conv2d(dim_in, 1, 1, 1, 0))
self.F0_channel = F0_channel
# down/up-sampling blocks
self.spk_embedding = torch.nn.Embedding(num_spk, style_dim)
repeat_num = 4 # int(np.log2(img_size)) - 4
if w_hpf > 0:
repeat_num += 1
for lid in range(repeat_num):
if lid in [1, 3]:
_downtype = 'timepreserve'
else:
_downtype = 'half'
dim_out = min(dim_in * 2, max_conv_dim)
self.encode.append(
ResBlk(dim_in, dim_out, normalize=False, downsample=_downtype))
self.decode.insert(0,
AdainResBlk(
dim_out,
dim_in,
style_dim,
w_hpf=w_hpf,
upsample=_downtype,
norm=False)) # stack-like
dim_in = dim_out
# bottleneck blocks (encoder)
for _ in range(2):
self.encode.append(ResBlk(dim_out, dim_out, normalize=True))
# F0 blocks
# bottleneck blocks (decoder)
for _ in range(2):
self.decode.insert(
0,
AdainResBlk(
dim_out + int(F0_channel / 2),
dim_out + int(F0_channel / 2),
style_dim,
w_hpf=w_hpf,
norm=False))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.hpf = HighPass(w_hpf, device)
def forward(self, x, c):
if self.out_for_onnx:
x = x.permute(0, 3, 1, 2)
x = self.stem(x)
for block in self.encode:
x = block(x)
s = self.spk_embedding(c)
for block in self.decode:
x = block(x, s)
out = self.to_out(x)
if self.out_for_onnx:
out = out.squeeze(dim=1)
return out
class MappingNetwork(nn.Module):
def __init__(self,
latent_dim=16,
style_dim=48,
num_domains=2,
hidden_dim=384):
super().__init__()
layers = []
layers += [nn.Linear(latent_dim, hidden_dim)]
layers += [nn.ReLU()]
for _ in range(3):
layers += [nn.Linear(hidden_dim, hidden_dim)]
layers += [nn.ReLU()]
self.shared = nn.Sequential(*layers)
self.unshared = nn.ModuleList()
for _ in range(num_domains):
self.unshared += [
nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, style_dim),
)
]
def forward(self, z, y):
h = self.shared(z)
out = []
for layer in self.unshared:
out += [layer(h)]
out = torch.stack(out, dim=1) # (batch, num_domains, style_dim)
idx = torch.LongTensor(range(y.size(0))).to(y.device)
s = out[idx, y] # (batch, style_dim)
return s
class StyleEncoder(nn.Module):
def __init__(self,
dim_in=48,
style_dim=48,
num_domains=2,
max_conv_dim=384):
super().__init__()
blocks = []
blocks += [nn.Conv2d(1, dim_in, 3, 1, 1)]
repeat_num = 4
for _ in range(repeat_num):
dim_out = min(dim_in * 2, max_conv_dim)
blocks += [ResBlk(dim_in, dim_out, downsample='half')]
dim_in = dim_out
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.Conv2d(dim_out, dim_out, 5, 1, 0)]
blocks += [nn.AdaptiveAvgPool2d(1)]
blocks += [nn.LeakyReLU(0.2)]
self.shared = nn.Sequential(*blocks)
self.unshared = nn.ModuleList()
for _ in range(num_domains):
self.unshared += [nn.Linear(dim_out, style_dim)]
def forward(self, x, y):
h = self.shared(x)
h = h.view(h.size(0), -1)
out = []
for layer in self.unshared:
out += [layer(h)]
out = torch.stack(out, dim=1) # (batch, num_domains, style_dim)
idx = torch.LongTensor(range(y.size(0))).to(y.device)
s = out[idx, y] # (batch, style_dim)
return s
class Discriminator(nn.Module):
def __init__(self,
dim_in=48,
num_domains=2,
max_conv_dim=384,
repeat_num=4):
super().__init__()
# real/fake discriminator
self.dis = Discriminator2d(
dim_in=dim_in,
num_domains=num_domains,
max_conv_dim=max_conv_dim,
repeat_num=repeat_num)
# adversarial classifier
self.cls = Discriminator2d(
dim_in=dim_in,
num_domains=num_domains,
max_conv_dim=max_conv_dim,
repeat_num=repeat_num)
self.num_domains = num_domains
def forward(self, x, y):
return self.dis(x, y)
def classifier(self, x):
return self.cls.get_feature(x)
class LinearNorm(torch.nn.Module):
def __init__(self, in_dim, out_dim, bias=True, w_init_gain='linear'):
super(LinearNorm, self).__init__()
self.linear_layer = torch.nn.Linear(in_dim, out_dim, bias=bias)
torch.nn.init.xavier_uniform_(
self.linear_layer.weight,
gain=torch.nn.init.calculate_gain(w_init_gain))
def forward(self, x):
return self.linear_layer(x)
class Discriminator2d(nn.Module):
def __init__(self,
dim_in=48,
num_domains=2,
max_conv_dim=384,
repeat_num=4):
super().__init__()
blocks = []
blocks += [nn.Conv2d(1, dim_in, 3, 1, 1)]
for lid in range(repeat_num):
dim_out = min(dim_in * 2, max_conv_dim)
blocks += [ResBlk(dim_in, dim_out, downsample='half')]
dim_in = dim_out
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.Conv2d(dim_out, dim_out, 5, 1, 0)]
blocks += [nn.LeakyReLU(0.2)]
blocks += [nn.AdaptiveAvgPool2d(1)]
blocks += [nn.Conv2d(dim_out, num_domains, 1, 1, 0)]
self.main = nn.Sequential(*blocks)
def get_feature(self, x):
out = self.main(x)
out = out.view(out.size(0), -1) # (batch, num_domains)
return out
def forward(self, x, y):
out = self.get_feature(x)
idx = torch.LongTensor(range(y.size(0))).to(y.device)
out = out[idx, y] # (batch)
return out
def print_network(model, name):
"""Print out the network information."""
num_params = 0
for p in model.parameters():
num_params += p.numel()
print(model)
print(name)
print('The number of parameters: {}'.format(num_params))
def build_model(args, F0_model, ASR_model):
generator = Generator(
args.dim_in,
args.style_dim,
args.max_conv_dim,
w_hpf=args.w_hpf,
F0_channel=args.F0_channel)
mapping_network = MappingNetwork(
args.latent_dim,
args.style_dim,
args.num_domains,
hidden_dim=args.max_conv_dim)
style_encoder = StyleEncoder(args.dim_in, args.style_dim, args.num_domains,
args.max_conv_dim)
discriminator = Discriminator(args.dim_in, args.num_domains,
args.max_conv_dim, args.n_repeat)
generator_ema = copy.deepcopy(generator)
mapping_network_ema = copy.deepcopy(mapping_network)
style_encoder_ema = copy.deepcopy(style_encoder)
print(generator, 'generator')
print(mapping_network, 'mapping_network')
print(style_encoder, 'style_encoder')
nets = Munch(
generator=generator,
mapping_network=mapping_network,
style_encoder=style_encoder,
discriminator=discriminator,
f0_model=F0_model,
asr_model=ASR_model)
nets_ema = Munch(
generator=generator_ema,
mapping_network=mapping_network_ema,
style_encoder=style_encoder_ema)
return nets, nets_ema
if __name__ == '__main__':
generator = Generator(48, 48, 256, w_hpf=1, F0_channel=0)
a = torch.randn([1, 1, 256 + 32, 80])
c = torch.randint(0, 1883, [1])
b = generator(a, c)
print(b.shape)

View File

@@ -0,0 +1,280 @@
import librosa
import numpy as np
import onnxruntime
import torch
import torchaudio.compliance.kaldi as kaldi
from torch.nn.utils.rnn import pad_sequence
def load_cmvn(cmvn_file):
with open(cmvn_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
means_list = []
vars_list = []
for i in range(len(lines)):
line_item = lines[i].split()
if line_item[0] == '<AddShift>':
line_item = lines[i + 1].split()
if line_item[0] == '<LearnRateCoef>':
add_shift_line = line_item[3:(len(line_item) - 1)]
means_list = list(add_shift_line)
continue
elif line_item[0] == '<Rescale>':
line_item = lines[i + 1].split()
if line_item[0] == '<LearnRateCoef>':
rescale_line = line_item[3:(len(line_item) - 1)]
vars_list = list(rescale_line)
continue
means = np.array(means_list).astype(np.float32)
vars = np.array(vars_list).astype(np.float32)
cmvn = np.array([means, vars])
cmvn = torch.as_tensor(cmvn, dtype=torch.float32)
return cmvn
def apply_cmvn(inputs, cmvn): # noqa
"""
Apply CMVN with mvn data
"""
device = inputs.device
# dtype = inputs.dtype
frame, dim = inputs.shape
means = cmvn[0:1, :dim]
vars = cmvn[1:2, :dim]
inputs += means.to(device)
inputs *= vars.to(device)
return inputs.type(torch.float32)
def apply_lfr(inputs, lfr_m, lfr_n):
LFR_inputs = []
T = inputs.shape[0]
T_lfr = int(np.ceil(T / lfr_n))
left_padding = inputs[0].repeat((lfr_m - 1) // 2, 1)
inputs = torch.vstack((left_padding, inputs))
T = T + (lfr_m - 1) // 2
for i in range(T_lfr):
if lfr_m <= T - i * lfr_n:
LFR_inputs.append(
(inputs[i * lfr_n:i * lfr_n + lfr_m]).view(1, -1))
else: # process last LFR frame
num_padding = lfr_m - (T - i * lfr_n)
frame = (inputs[i * lfr_n:]).view(-1)
for _ in range(num_padding):
frame = torch.hstack((frame, inputs[-1]))
LFR_inputs.append(frame)
LFR_outputs = torch.vstack(LFR_inputs)
return LFR_outputs.type(torch.float32)
class WavFrontend(torch.nn.Module):
def __init__(
self,
cmvn_file: str = None,
fs: int = 16000,
window: str = 'hamming',
n_mels: int = 80,
frame_length: int = 25,
frame_shift: int = 10,
filter_length_min: int = -1,
filter_length_max: int = -1,
lfr_m: int = 1,
lfr_n: int = 1,
dither: float = 1.0,
snip_edges: bool = True,
upsacle_samples: bool = False,
**kwargs,
):
super().__init__()
self.fs = fs
self.window = window
self.n_mels = n_mels
self.frame_length = frame_length
self.frame_shift = frame_shift
self.filter_length_min = filter_length_min
self.filter_length_max = filter_length_max
self.lfr_m = lfr_m
self.lfr_n = lfr_n
self.cmvn_file = cmvn_file
self.dither = dither
self.snip_edges = snip_edges
self.upsacle_samples = upsacle_samples
self.cmvn = None if self.cmvn_file is None else load_cmvn(
self.cmvn_file)
def output_size(self) -> int:
return self.n_mels * self.lfr_m
def forward(
self,
input: torch.Tensor,
input_lengths,
**kwargs,
):
batch_size = input.size(0)
feats = []
feats_lens = []
for i in range(batch_size):
waveform_length = input_lengths[i]
waveform = input[i][:waveform_length]
if self.upsacle_samples:
# print(waveform )
waveform = waveform * (1 << 15)
# print(waveform)
waveform = waveform.unsqueeze(0)
# print('fbank:',self.upsacle_samples,self.n_mels,self.frame_length,self.frame_shift,self.dither,self.window,self.fs,self.snip_edges)
mat = kaldi.fbank(
waveform,
num_mel_bins=self.n_mels,
frame_length=self.frame_length,
frame_shift=self.frame_shift,
dither=self.dither,
energy_floor=0.0,
window_type=self.window,
sample_frequency=self.fs,
snip_edges=self.snip_edges,
)
# print("front",mat.shape)
if self.lfr_m != 1 or self.lfr_n != 1:
mat = apply_lfr(mat, self.lfr_m, self.lfr_n)
if self.cmvn is not None:
mat = apply_cmvn(mat, self.cmvn)
feat_length = mat.size(0)
feats.append(mat)
feats_lens.append(feat_length)
feats_lens = torch.as_tensor(feats_lens)
if batch_size == 1:
feats_pad = feats[0][None, :, :]
else:
feats_pad = pad_sequence(
feats, batch_first=True, padding_value=0.0)
# print(feats_pad.shape,feats_lens)
return feats_pad, feats_lens
def forward_fbank(self, input: torch.Tensor, input_lengths: torch.Tensor):
batch_size = input.size(0)
feats = []
feats_lens = []
for i in range(batch_size):
waveform_length = input_lengths[i]
waveform = input[i][:waveform_length]
if self.upsacle_samples:
waveform = waveform * (1 << 15)
waveform = waveform.unsqueeze(0)
mat = kaldi.fbank(
waveform,
num_mel_bins=self.n_mels,
frame_length=self.frame_length,
frame_shift=self.frame_shift,
dither=self.dither,
energy_floor=0.0,
window_type=self.window,
sample_frequency=self.fs,
)
feat_length = mat.size(0)
feats.append(mat)
feats_lens.append(feat_length)
feats_lens = torch.as_tensor(feats_lens)
feats_pad = pad_sequence(feats, batch_first=True, padding_value=0.0)
return feats_pad, feats_lens
def forward_lfr_cmvn(self, input: torch.Tensor,
input_lengths: torch.Tensor):
batch_size = input.size(0)
feats = []
feats_lens = []
for i in range(batch_size):
mat = input[i, :input_lengths[i], :]
if self.lfr_m != 1 or self.lfr_n != 1:
mat = apply_lfr(mat, self.lfr_m, self.lfr_n)
if self.cmvn is not None:
mat = apply_cmvn(mat, self.cmvn)
feat_length = mat.size(0)
feats.append(mat)
feats_lens.append(feat_length)
feats_lens = torch.as_tensor(feats_lens)
feats_pad = pad_sequence(feats, batch_first=True, padding_value=0.0)
return feats_pad, feats_lens
def make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None):
if length_dim == 0:
raise ValueError('length_dim cannot be 0: {}'.format(length_dim))
if not isinstance(lengths, list):
lengths = lengths.tolist()
bs = int(len(lengths))
if maxlen is None:
if xs is None:
maxlen = int(max(lengths))
else:
maxlen = xs.size(length_dim)
else:
assert xs is None
assert maxlen >= int(max(lengths))
seq_range = torch.arange(0, maxlen, dtype=torch.int64)
seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen)
seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1)
mask = seq_range_expand >= seq_length_expand
if xs is not None:
assert xs.size(0) == bs, (xs.size(0), bs)
if length_dim < 0:
length_dim = xs.dim() + length_dim
# ind = (:, None, ..., None, :, , None, ..., None)
ind = tuple(
slice(None) if i in (0, length_dim) else None
for i in range(xs.dim()))
mask = mask[ind].expand_as(xs).to(xs.device)
return mask
class Encoder:
def __init__(self, encoder_front_path, encoder_onnx_path):
self.front = WavFrontend(
encoder_front_path, lfr_m=7, lfr_n=6, dither=0.0)
self.asr_session = onnxruntime.InferenceSession(
encoder_onnx_path,
provider_options=onnxruntime.get_available_providers())
def inference(self, wav_path):
wav = librosa.load(wav_path, sr=16000)[0]
wav_len = len(wav)
wav = wav.reshape([1, -1])
wav = torch.FloatTensor(wav)
wav_len = torch.IntTensor(np.array([wav_len]))
feats, feats_len = self.front(wav, wav_len)
feats = feats.detach().cpu().numpy()
# print(feats.shape)
masks = ~make_pad_mask(feats_len)[:, None, :]
outs = self.asr_session.run(
['ys_pad', 'olens'],
input_feed={
'xs_pad': feats,
'masks': masks.cpu().detach().numpy().astype('float32')
})
return torch.FloatTensor(outs[0])
def get_feats(self, wav_path):
wav = librosa.load(wav_path, sr=16000)[0]
wav_len = len(wav)
wav = wav.reshape([1, -1])
wav = torch.FloatTensor(wav)
wav_len = torch.IntTensor(np.array([wav_len]))
feats, feats_len = self.front(wav, wav_len)
return feats

View File

@@ -0,0 +1,196 @@
from collections import OrderedDict
import librosa
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio.compliance.kaldi as Kaldi
from .layers import (BasicResBlock, CAMDenseTDNNBlock, DenseLayer, StatsPool,
TDNNLayer, TransitLayer, get_nonlinear)
class FCM(nn.Module):
def __init__(self,
block=BasicResBlock,
num_blocks=[2, 2],
m_channels=32,
feat_dim=80):
super(FCM, self).__init__()
self.in_planes = m_channels
self.conv1 = nn.Conv2d(
1, m_channels, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(m_channels)
self.layer1 = self._make_layer(
block, m_channels, num_blocks[0], stride=2)
self.layer2 = self._make_layer(
block, m_channels, num_blocks[0], stride=2)
self.conv2 = nn.Conv2d(
m_channels,
m_channels,
kernel_size=3,
stride=(2, 1),
padding=1,
bias=False)
self.bn2 = nn.BatchNorm2d(m_channels)
self.out_channels = m_channels * (feat_dim // 8)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
x = x.unsqueeze(1)
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = F.relu(self.bn2(self.conv2(out)))
shape = out.shape
out = out.reshape(shape[0], shape[1] * shape[2], shape[3])
return out
class CAMPPlus(nn.Module):
def __init__(self,
feat_dim=80,
embedding_size=512,
growth_rate=32,
bn_size=4,
init_channels=128,
config_str='batchnorm-relu',
memory_efficient=True):
super(CAMPPlus, self).__init__()
self.head = FCM(feat_dim=feat_dim)
channels = self.head.out_channels
self.xvector = nn.Sequential(
OrderedDict([
('tdnn',
TDNNLayer(
channels,
init_channels,
5,
stride=2,
dilation=1,
padding=-1,
config_str=config_str)),
]))
channels = init_channels
for i, (num_layers, kernel_size, dilation) in enumerate(
zip((12, 24, 16), (3, 3, 3), (1, 2, 2))):
block = CAMDenseTDNNBlock(
num_layers=num_layers,
in_channels=channels,
out_channels=growth_rate,
bn_channels=bn_size * growth_rate,
kernel_size=kernel_size,
dilation=dilation,
config_str=config_str,
memory_efficient=memory_efficient,
)
self.xvector.add_module('block%d' % (i + 1), block)
channels = channels + num_layers * growth_rate
self.xvector.add_module(
'transit%d' % (i + 1),
TransitLayer(
channels, channels // 2, bias=False,
config_str=config_str))
channels //= 2
self.xvector.add_module('out_nonlinear',
get_nonlinear(config_str, channels))
self.xvector.add_module('stats', StatsPool())
self.xvector.add_module(
'dense',
DenseLayer(channels * 2, embedding_size, config_str='batchnorm_'))
for m in self.modules():
if isinstance(m, (nn.Conv1d, nn.Linear)):
nn.init.kaiming_normal_(m.weight.data)
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, x):
x = x.permute(0, 2, 1) # (B,T,F) => (B,F,T)
x = self.head(x)
x = self.xvector(x)
return x
class SpeakerVerificationCamplus:
r"""Enhanced Res2Net_aug architecture with local and global feature fusion.
ERes2Net_aug is an upgraded version of ERes2Net that uses a larger
parameters to achieve better recognition performance.
Args:
model_dir: A model dir.
model_config: The model config.
"""
def __init__(self, pretrained_model_name, device='cpu', *args, **kwargs):
super().__init__()
self.feature_dim = 80
self.device = torch.device(device)
self.embedding_model = CAMPPlus(embedding_size=192)
self.__load_check_point(pretrained_model_name)
self.embedding_model.to(self.device)
self.embedding_model.eval()
def forward(self, audio):
if isinstance(audio, np.ndarray):
audio = torch.from_numpy(audio)
elif isinstance(audio, str):
audio = librosa.load(audio, sr=16000)[0]
audio = torch.from_numpy(audio)
if len(audio.shape) == 1:
audio = audio.unsqueeze(0)
elif len(audio.shape) == 3:
audio = audio.squeeze(1)
assert len(
audio.shape
) == 2, 'modelscope error: the shape of input audio to model needs to'
# audio shape: [N, T]
feature = self.__extract_feature(audio)
embedding = self.embedding_model(feature.to(self.device))
return embedding
def inference(self, feature):
feature = feature - feature.mean(dim=1, keepdim=True)
embedding = self.embedding_model(feature.to(self.device))
return embedding
def __extract_feature(self, audio):
B = audio.size(0)
feature = Kaldi.fbank(
audio.flatten().unsqueeze(0), num_mel_bins=self.feature_dim)
# print(feature.shape)
feature = feature - feature.mean(dim=0, keepdim=True)
pad = torch.zeros([2, self.feature_dim], device=feature.device)
feature = torch.cat([feature, pad], dim=0)
feature = feature.reshape([B, -1, self.feature_dim])
return feature
def __load_check_point(self, pretrained_model_name, device=None):
if not device:
device = torch.device('cpu')
self.embedding_model.load_state_dict(
torch.load(pretrained_model_name, map_location=device),
strict=True)

View File

@@ -0,0 +1,32 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import torch
import torch.nn as nn
class AFF(nn.Module):
def __init__(self, channels=64, r=4):
super(AFF, self).__init__()
inter_channels = int(channels // r)
self.local_att = nn.Sequential(
nn.Conv2d(
channels * 2,
inter_channels,
kernel_size=1,
stride=1,
padding=0),
nn.BatchNorm2d(inter_channels),
nn.SiLU(inplace=True),
nn.Conv2d(
inter_channels, channels, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(channels),
)
def forward(self, x, ds_y):
xa = torch.cat((x, ds_y), dim=1)
x_att = self.local_att(xa)
x_att = 1.0 + torch.tanh(x_att)
xo = torch.mul(x, x_att) + torch.mul(ds_y, 2.0 - x_att)
return xo

View File

@@ -0,0 +1,266 @@
# Copyright 3D-Speaker (https://github.com/alibaba-damo-academy/3D-Speaker). All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
import torch
import torch.nn.functional as F
import torch.utils.checkpoint as cp
from torch import nn
def get_nonlinear(config_str, channels):
nonlinear = nn.Sequential()
for name in config_str.split('-'):
if name == 'relu':
nonlinear.add_module('relu', nn.ReLU(inplace=True))
elif name == 'prelu':
nonlinear.add_module('prelu', nn.PReLU(channels))
elif name == 'batchnorm':
nonlinear.add_module('batchnorm', nn.BatchNorm1d(channels))
elif name == 'batchnorm_':
nonlinear.add_module('batchnorm',
nn.BatchNorm1d(channels, affine=False))
else:
raise ValueError('Unexpected module ({}).'.format(name))
return nonlinear
def statistics_pooling(x, dim=-1, keepdim=False, unbiased=True, eps=1e-2):
mean = x.mean(dim=dim)
std = x.std(dim=dim, unbiased=unbiased)
stats = torch.cat([mean, std], dim=-1)
if keepdim:
stats = stats.unsqueeze(dim=dim)
return stats
class StatsPool(nn.Module):
def forward(self, x):
return statistics_pooling(x)
class TDNNLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
bias=False,
config_str='batchnorm-relu'):
super(TDNNLayer, self).__init__()
if padding < 0:
assert kernel_size % 2 == 1, 'Expect equal paddings, but got even kernel size ({})'.format(
kernel_size)
padding = (kernel_size - 1) // 2 * dilation
self.linear = nn.Conv1d(
in_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias)
self.nonlinear = get_nonlinear(config_str, out_channels)
def forward(self, x):
x = self.linear(x)
x = self.nonlinear(x)
return x
class CAMLayer(nn.Module):
def __init__(self,
bn_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
bias,
reduction=2):
super(CAMLayer, self).__init__()
self.linear_local = nn.Conv1d(
bn_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias)
self.linear1 = nn.Conv1d(bn_channels, bn_channels // reduction, 1)
self.relu = nn.ReLU(inplace=True)
self.linear2 = nn.Conv1d(bn_channels // reduction, out_channels, 1)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
y = self.linear_local(x)
context = x.mean(-1, keepdim=True) + self.seg_pooling(x)
context = self.relu(self.linear1(context))
m = self.sigmoid(self.linear2(context))
return y * m
def seg_pooling(self, x, seg_len=100, stype='avg'):
if stype == 'avg':
seg = F.avg_pool1d(
x, kernel_size=seg_len, stride=seg_len, ceil_mode=True)
elif stype == 'max':
seg = F.max_pool1d(
x, kernel_size=seg_len, stride=seg_len, ceil_mode=True)
else:
raise ValueError('Wrong segment pooling type.')
shape = seg.shape
seg = seg.unsqueeze(-1).expand(*shape,
seg_len).reshape(*shape[:-1], -1)
seg = seg[..., :x.shape[-1]]
return seg
class CAMDenseTDNNLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
bn_channels,
kernel_size,
stride=1,
dilation=1,
bias=False,
config_str='batchnorm-relu',
memory_efficient=False):
super(CAMDenseTDNNLayer, self).__init__()
assert kernel_size % 2 == 1, 'Expect equal paddings, but got even kernel size ({})'.format(
kernel_size)
padding = (kernel_size - 1) // 2 * dilation
self.memory_efficient = memory_efficient
self.nonlinear1 = get_nonlinear(config_str, in_channels)
self.linear1 = nn.Conv1d(in_channels, bn_channels, 1, bias=False)
self.nonlinear2 = get_nonlinear(config_str, bn_channels)
self.cam_layer = CAMLayer(
bn_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias)
def bn_function(self, x):
return self.linear1(self.nonlinear1(x))
def forward(self, x):
if self.training and self.memory_efficient:
x = cp.checkpoint(self.bn_function, x)
else:
x = self.bn_function(x)
x = self.cam_layer(self.nonlinear2(x))
return x
class CAMDenseTDNNBlock(nn.ModuleList):
def __init__(self,
num_layers,
in_channels,
out_channels,
bn_channels,
kernel_size,
stride=1,
dilation=1,
bias=False,
config_str='batchnorm-relu',
memory_efficient=False):
super(CAMDenseTDNNBlock, self).__init__()
for i in range(num_layers):
layer = CAMDenseTDNNLayer(
in_channels=in_channels + i * out_channels,
out_channels=out_channels,
bn_channels=bn_channels,
kernel_size=kernel_size,
stride=stride,
dilation=dilation,
bias=bias,
config_str=config_str,
memory_efficient=memory_efficient,
)
self.add_module('tdnnd%d' % (i + 1), layer)
def forward(self, x):
for layer in self:
x = torch.cat([x, layer(x)], dim=1)
return x
class TransitLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
bias=True,
config_str='batchnorm-relu'):
super(TransitLayer, self).__init__()
self.nonlinear = get_nonlinear(config_str, in_channels)
self.linear = nn.Conv1d(in_channels, out_channels, 1, bias=bias)
def forward(self, x):
x = self.nonlinear(x)
x = self.linear(x)
return x
class DenseLayer(nn.Module):
def __init__(self,
in_channels,
out_channels,
bias=False,
config_str='batchnorm-relu'):
super(DenseLayer, self).__init__()
self.linear = nn.Conv1d(in_channels, out_channels, 1, bias=bias)
self.nonlinear = get_nonlinear(config_str, out_channels)
def forward(self, x):
if len(x.shape) == 2:
x = self.linear(x.unsqueeze(dim=-1)).squeeze(dim=-1)
else:
x = self.linear(x)
x = self.nonlinear(x)
return x
class BasicResBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicResBlock, self).__init__()
self.conv1 = nn.Conv2d(
in_planes,
planes,
kernel_size=3,
stride=(stride, 1),
padding=1,
bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(
planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(
in_planes,
self.expansion * planes,
kernel_size=1,
stride=(stride, 1),
bias=False), nn.BatchNorm2d(self.expansion * planes))
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out

View File

@@ -0,0 +1,107 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
""" This implementation is adapted from https://github.com/wenet-e2e/wespeaker.
"""
import torch
import torch.nn as nn
class TAP(nn.Module):
"""
Temporal average pooling, only first-order mean is considered
"""
def __init__(self, **kwargs):
super(TAP, self).__init__()
def forward(self, x):
pooling_mean = x.mean(dim=-1)
# To be compatable with 2D input
pooling_mean = pooling_mean.flatten(start_dim=1)
return pooling_mean
class TSDP(nn.Module):
"""
Temporal standard deviation pooling, only second-order std is considered
"""
def __init__(self, **kwargs):
super(TSDP, self).__init__()
def forward(self, x):
# The last dimension is the temporal axis
pooling_std = torch.sqrt(torch.var(x, dim=-1) + 1e-8)
pooling_std = pooling_std.flatten(start_dim=1)
return pooling_std
class TSTP(nn.Module):
"""
Temporal statistics pooling, concatenate mean and std, which is used in
x-vector
Comment: simple concatenation can not make full use of both statistics
"""
def __init__(self, **kwargs):
super(TSTP, self).__init__()
def forward(self, x):
# The last dimension is the temporal axis
pooling_mean = x.mean(dim=-1)
pooling_std = torch.sqrt(torch.var(x, dim=-1) + 1e-8)
pooling_mean = pooling_mean.flatten(start_dim=1)
pooling_std = pooling_std.flatten(start_dim=1)
stats = torch.cat((pooling_mean, pooling_std), 1)
return stats
class ASTP(nn.Module):
"""Attentive statistics pooling: Channel- and context-dependent
statistics pooling, first used in ECAPA_TDNN.
"""
def __init__(self, in_dim, bottleneck_dim=128, global_context_att=False):
super(ASTP, self).__init__()
self.global_context_att = global_context_att
# Use Conv1d with stride == 1 rather than Linear, then we don't
# need to transpose inputs.
if global_context_att:
self.linear1 = nn.Conv1d(
in_dim * 3, bottleneck_dim,
kernel_size=1) # equals W and b in the paper
else:
self.linear1 = nn.Conv1d(
in_dim, bottleneck_dim,
kernel_size=1) # equals W and b in the paper
self.linear2 = nn.Conv1d(
bottleneck_dim, in_dim,
kernel_size=1) # equals V and k in the paper
def forward(self, x):
"""
x: a 3-dimensional tensor in tdnn-based architecture (B,F,T)
or a 4-dimensional tensor in resnet architecture (B,C,F,T)
0-dim: batch-dimension, last-dim: time-dimension (frame-dimension)
"""
if len(x.shape) == 4:
x = x.reshape(x.shape[0], x.shape[1] * x.shape[2], x.shape[3])
assert len(x.shape) == 3
if self.global_context_att:
context_mean = torch.mean(x, dim=-1, keepdim=True).expand_as(x)
context_std = torch.sqrt(
torch.var(x, dim=-1, keepdim=True) + 1e-10).expand_as(x)
x_in = torch.cat((x, context_mean, context_std), dim=1)
else:
x_in = x
# DON'T use ReLU here! ReLU may be hard to converge.
alpha = torch.tanh(
self.linear1(x_in)) # alpha = F.relu(self.linear1(x_in))
alpha = torch.softmax(self.linear2(alpha), dim=2)
mean = torch.sum(alpha * x, dim=2)
var = torch.sum(alpha * (x**2), dim=2) - mean**2
std = torch.sqrt(var.clamp(min=1e-10))
return torch.cat([mean, std], dim=1)

View File

@@ -0,0 +1,829 @@
# from https://github.com/jik876/hifi-gan
import logging
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Conv1d, ConvTranspose1d
from .Starganv3 import Generator
LRELU_SLOPE = 0.1
def get_sinusoid_encoding_table(n_position, d_hid, padding_idx=None):
"""Sinusoid position encoding table"""
def cal_angle(position, hid_idx):
return position / np.power(10000, 2 * (hid_idx // 2) / d_hid)
def get_posi_angle_vec(position):
return [cal_angle(position, hid_j) for hid_j in range(d_hid)]
sinusoid_table = np.array(
[get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
if padding_idx is not None:
# zero vector for padding dimension
sinusoid_table[padding_idx] = 0.0
return torch.FloatTensor(sinusoid_table)
def overlap_and_add(signal, frame_step):
outer_dimensions = signal.size()[:-2]
frames, frame_length = signal.size()[-2:]
# gcd=Greatest Common Divisor
subframe_length = math.gcd(frame_length, frame_step)
subframe_step = frame_step // subframe_length
subframes_per_frame = frame_length // subframe_length
output_size = frame_step * (frames - 1) + frame_length
output_subframes = output_size // subframe_length
subframe_signal = signal.view(*outer_dimensions, -1, subframe_length)
frame = torch.arange(0, output_subframes).unfold(0, subframes_per_frame,
subframe_step)
frame = signal.new_tensor(frame).long() # signal may in GPU or CPU
frame = frame.contiguous().view(-1)
result = signal.new_zeros(*outer_dimensions, output_subframes,
subframe_length)
device_of_result = result.device
result.index_add_(-2, frame.to(device_of_result), subframe_signal)
result = result.view(*outer_dimensions, -1)
return result
class LastLayer(nn.Module):
def __init__(self, in_channels, out_channels, nonlinear_activation,
nonlinear_activation_params, pad, kernel_size, pad_params,
bias):
super(LastLayer, self).__init__()
self.activation = getattr(
torch.nn, nonlinear_activation)(**nonlinear_activation_params)
self.pad = getattr(torch.nn, pad)((kernel_size - 1) // 2, **pad_params)
self.conv = torch.nn.Conv1d(
in_channels, out_channels, kernel_size, bias=bias)
def forward(self, x):
x = self.activation(x)
x = self.pad(x)
x = self.conv(x)
return x
class Conv1d1x1(Conv1d):
"""1x1 Conv1d with customized initialization."""
def __init__(self, in_channels, out_channels, bias):
"""Initialize 1x1 Conv1d module."""
super(Conv1d1x1, self).__init__(
in_channels,
out_channels,
kernel_size=1,
padding=0,
dilation=1,
bias=bias)
class LastLinear(nn.Module):
def __init__(self, hidden_channel, out_channel, bias=True):
super(LastLinear, self).__init__()
self.activation = nn.LeakyReLU(negative_slope=0.2)
self.bn_1 = nn.BatchNorm1d(hidden_channel)
self.linear_1 = Conv1d1x1(hidden_channel, hidden_channel, bias=bias)
self.bn_2 = nn.BatchNorm1d(hidden_channel)
self.linear_2 = Conv1d1x1(hidden_channel, out_channel, bias=bias)
def forward(self, x):
x = self.activation(x)
x = self.bn_1(x)
x = self.linear_1(x)
x = self.activation(x)
x = self.bn_2(x)
x = self.linear_2(x)
return x
class Stretch2d(torch.nn.Module):
"""Stretch2d module."""
def __init__(self, x_scale, y_scale, mode='nearest'):
"""Initialize Stretch2d module.
Args:
x_scale (int): X scaling factor (Time axis in spectrogram).
y_scale (int): Y scaling factor (Frequency axis in spectrogram).
mode (str): Interpolation mode.
"""
super(Stretch2d, self).__init__()
self.x_scale = x_scale
self.y_scale = y_scale
self.mode = mode
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, C, F, T).
Returns:
Tensor: Interpolated tensor (B, C, F * y_scale, T * x_scale),
"""
return F.interpolate(
x, scale_factor=(self.y_scale, self.x_scale), mode=self.mode)
class UpsampleLayer(nn.Module):
def __init__(self,
in_channel,
out_channel,
upsample_rate,
kernel_size,
stride,
padding,
dilation=1,
bias=True):
super(UpsampleLayer, self).__init__()
self.upsample = Stretch2d(upsample_rate, 1, mode='nearest')
self.conv = nn.Conv1d(
in_channel,
out_channel,
kernel_size,
stride,
padding,
dilation=dilation,
bias=bias)
def forward(self, x):
x = self.upsample(x.unsqueeze(1))
x = self.conv(x.squeeze(1))
return x
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
class ResBlock1(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5), bias=True):
super(ResBlock1, self).__init__()
self.convs1 = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[2],
padding=get_padding(kernel_size, dilation[2]),
bias=bias),
])
self.convs2 = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
bias=bias),
])
def forward(self, x):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = c1(xt)
xt = F.leaky_relu(xt, LRELU_SLOPE)
xt = c2(xt)
x = xt + x
return x
class ResBlock2(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3), bias=True):
super(ResBlock2, self).__init__()
self.convs = nn.ModuleList([
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
bias=bias),
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
bias=bias),
])
def forward(self, x):
for c in self.convs:
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = c(xt)
x = xt + x
return x
class BasisSignalLayer(nn.Module):
"""Basis Signal"""
def __init__(self, basis_signal_weight, L=64):
super(BasisSignalLayer, self).__init__()
self.layer = nn.Linear(
basis_signal_weight.size(0),
basis_signal_weight.size(1),
bias=False)
self.layer.weight = nn.Parameter(basis_signal_weight)
self.L = L
def forward(self, weight):
source = self.layer(weight)
source = overlap_and_add(source, self.L // 2)
return source
"""Residual stack module in MelGAN."""
class CausalConv1d(torch.nn.Module):
"""CausalConv1d module with customized initialization."""
def __init__(self,
in_channels,
out_channels,
kernel_size,
dilation=1,
bias=True,
pad='ConstantPad1d',
pad_params={'value': 0.0}):
"""Initialize CausalConv1d module."""
super(CausalConv1d, self).__init__()
self.pad = getattr(torch.nn, pad)((kernel_size - 1) * dilation,
**pad_params)
self.conv = torch.nn.Conv1d(
in_channels,
out_channels,
kernel_size,
dilation=dilation,
bias=bias)
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, in_channels, T).
Returns:
Tensor: Output tensor (B, out_channels, T).
"""
return self.conv(self.pad(x))[:, :, :x.size(2)]
class CausalConvTranspose1d(torch.nn.Module):
"""CausalConvTranspose1d module with customized initialization."""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
bias=True):
"""Initialize CausalConvTranspose1d module."""
super(CausalConvTranspose1d, self).__init__()
self.deconv = torch.nn.ConvTranspose1d(
in_channels, out_channels, kernel_size, stride, bias=bias)
self.stride = stride
def forward(self, x):
"""Calculate forward propagation.
Args:
x (Tensor): Input tensor (B, in_channels, T_in).
Returns:
Tensor: Output tensor (B, out_channels, T_out).
"""
return self.deconv(x)[:, :, :-self.stride]
class ResidualStack(torch.nn.Module):
"""Residual stack module introduced in MelGAN."""
def __init__(
self,
kernel_size=3,
channels=32,
dilation=1,
bias=True,
nonlinear_activation='LeakyReLU',
nonlinear_activation_params={'negative_slope': 0.2},
pad='ReflectionPad1d',
pad_params={},
use_causal_conv=False,
):
"""Initialize ResidualStack module.
Args:
kernel_size (int): Kernel size of dilation convolution layer.
channels (int): Number of channels of convolution layers.
dilation (int): Dilation factor.
bias (bool): Whether to add bias parameter in convolution layers.
nonlinear_activation (str): Activation function module name.
nonlinear_activation_params (dict): Hyperparameters for activation function.
pad (str): Padding function module name before dilated convolution layer.
pad_params (dict): Hyperparameters for padding function.
use_causal_conv (bool): Whether to use causal convolution.
"""
super(ResidualStack, self).__init__()
# defile residual stack part
if not use_causal_conv:
assert (kernel_size
- 1) % 2 == 0, 'Not support even number kernel size.'
self.stack = torch.nn.Sequential(
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
getattr(torch.nn, pad)((kernel_size - 1) // 2 * dilation,
**pad_params),
torch.nn.Conv1d(
channels,
channels,
kernel_size,
dilation=dilation,
bias=bias),
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
torch.nn.Conv1d(channels, channels, 1, bias=bias),
)
else:
self.stack = torch.nn.Sequential(
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
CausalConv1d(
channels,
channels,
kernel_size,
dilation=dilation,
bias=bias,
pad=pad,
pad_params=pad_params),
getattr(torch.nn,
nonlinear_activation)(**nonlinear_activation_params),
torch.nn.Conv1d(channels, channels, 1, bias=bias),
)
# defile extra layer for skip connection
self.skip_layer = torch.nn.Conv1d(channels, channels, 1, bias=bias)
def forward(self, c):
"""Calculate forward propagation.
Args:
c (Tensor): Input tensor (B, channels, T).
Returns:
Tensor: Output tensor (B, chennels, T).
"""
return self.stack(c) + self.skip_layer(c)
class HiFiGANGenerator(torch.nn.Module):
def __init__(
self,
input_channels=80,
resblock_kernel_sizes=[3, 7, 11],
upsample_rates=[5, 4, 4, 2],
upsample_initial_channel=256,
resblock_type='1',
upsample_kernel_sizes=[10, 8, 8, 4],
resblock_dilation_sizes=[[1, 3, 5], [1, 3, 5], [1, 3, 5]],
transposedconv=True,
bias=True,
):
super(HiFiGANGenerator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.conv_pre = Conv1d(
input_channels,
upsample_initial_channel,
7,
1,
padding=3,
bias=bias)
resblock = ResBlock1 if resblock_type == '1' else ResBlock2
self.ups = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(
UpsampleLayer(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
upsample_rate=u,
kernel_size=k,
stride=1,
padding=k // 2,
bias=bias) if transposedconv is False else ConvTranspose1d(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
k,
u,
padding=(u // 2 + u % 2),
output_padding=u % 2,
bias=bias))
self.resblocks = nn.ModuleList()
for i in range(len(self.ups)):
ch = upsample_initial_channel // (2**(i + 1))
for j, (k, d) in enumerate(
zip(resblock_kernel_sizes, resblock_dilation_sizes)):
self.resblocks.append(resblock(ch, k, d, bias=bias))
self.conv_post = Conv1d(ch, 1, 7, 1, padding=3, bias=bias)
# apply weight norm
self.apply_weight_norm()
# reset parameters
self.reset_parameters()
def remove_weight_norm(self):
"""Remove weight normalization module from all of the layers."""
def _remove_weight_norm(m):
try:
logging.debug(f'Weight norm is removed from {m}.')
torch.nn.utils.remove_weight_norm(m)
except ValueError: # this module didn't have weight norm
return
self.apply(_remove_weight_norm)
def apply_weight_norm(self):
"""Apply weight normalization module from all of the layers."""
def _apply_weight_norm(m):
if isinstance(m, torch.nn.Conv1d) or isinstance(
m, torch.nn.ConvTranspose1d):
torch.nn.utils.weight_norm(m)
logging.debug(f'Weight norm is applied to {m}.')
self.apply(_apply_weight_norm)
def reset_parameters(self):
"""Reset parameters.
This initialization follows official implementation manner.
https://github.com/descriptinc/melgan-neurips/blob/master/mel2wav/modules.py
"""
def _reset_parameters(m):
if isinstance(m, torch.nn.Conv1d) or isinstance(
m, torch.nn.ConvTranspose1d):
m.weight.data.normal_(0.0, 0.01)
logging.debug(f'Reset parameters in {m}.')
self.apply(_reset_parameters)
def forward(self, x):
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
# x = torch.tanh(x)
return x
def inference(self, x):
if not isinstance(x, torch.Tensor):
x = torch.tensor(
x, dtype=torch.float).to(next(self.parameters()).device)
x = x.transpose(1, 0).unsqueeze(0)
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
# x = torch.tanh(x)
return x
class ConditionGenerator(torch.nn.Module):
def __init__(
self,
input_channels=512,
resblock_kernel_sizes=[3, 7, 11],
upsample_rates=[3, 2],
upsample_initial_channel=512,
resblock_type='1',
upsample_kernel_sizes=[6, 4],
resblock_dilation_sizes=[[1, 3, 5], [1, 3, 5], [1, 3, 5]],
transposedconv=True,
unet=False,
extra_info=False,
bias=True,
):
super(ConditionGenerator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.conv_pre = Conv1d(
input_channels,
upsample_initial_channel,
7,
1,
padding=3,
bias=bias)
self.spk_fc = Conv1d(192, upsample_initial_channel, 1, 1)
resblock = ResBlock1 if resblock_type == '1' else ResBlock2
self.spk_info = torch.nn.Parameter(torch.randn([1, 10000, 192]))
self.ups = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(
UpsampleLayer(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
upsample_rate=u,
kernel_size=k,
stride=1,
padding=k // 2,
bias=bias) if transposedconv is False else ConvTranspose1d(
upsample_initial_channel // (2**i),
upsample_initial_channel // (2**(i + 1)),
k,
u,
padding=(u // 2 + u % 2),
output_padding=u % 2,
bias=bias))
self.resblocks = nn.ModuleList()
for i in range(len(self.ups)):
ch = upsample_initial_channel // (2**(i + 1))
for j, (k, d) in enumerate(
zip(resblock_kernel_sizes, resblock_dilation_sizes)):
self.resblocks.append(resblock(ch, k, d, bias=bias))
self.conv_post = Conv1d(ch, 80, 7, 1, padding=3, bias=bias)
if unet:
self.unet = Generator(dim_in=64, style_dim=192, max_conv_dim=256)
else:
self.unet = None
if extra_info:
self.extra_layer = FsmnEncoderV2()
else:
self.extra_layer = None
def forward(self, inp, s, extra_mc=None, a=0.5, b=0.5):
inp = inp.permute([0, 2, 1])
score = torch.sum(s.unsqueeze(1) * self.spk_info, dim=-1, keepdim=True)
score = torch.softmax(score, dim=1)
value = score * self.spk_info
value = torch.sum(value, dim=1)
spk_inp = s * a + value * b
if extra_mc is not None:
# print(extra_mc.shape,inp.shape)
extra_info = self.extra_layer(extra_mc)
spk_inp += extra_info
x = self.conv_pre(inp) + self.spk_fc(spk_inp.unsqueeze(-1))
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
if self.unet is not None:
# print('unet infer...')
x = self.unet(x.unsqueeze(1), spk_inp)
x = x.squeeze(1)
x = x.permute([0, 2, 1])
# x = torch.tanh(x)
return x
def inference(self, x):
if not isinstance(x, torch.Tensor):
x = torch.tensor(
x, dtype=torch.float).to(next(self.parameters()).device)
x = x.transpose(1, 0).unsqueeze(0)
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
# x = torch.tanh(x)
return x
class FeedForwardNet(nn.Module):
"""A two-feed-forward-layer module"""
def __init__(self, d_in, d_hid, d_out, kernel_size=[1, 1], dropout=0.1):
super().__init__()
# Use Conv1D
# position-wise
self.w_1 = nn.Conv1d(
d_in,
d_hid,
kernel_size=kernel_size[0],
padding=(kernel_size[0] - 1) // 2,
)
# position-wise
self.w_2 = nn.Conv1d(
d_hid,
d_out,
kernel_size=kernel_size[1],
padding=(kernel_size[1] - 1) // 2,
bias=False,
)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
output = x.transpose(1, 2)
output = F.relu(self.w_1(output))
output = self.dropout(output)
output = self.w_2(output)
output = output.transpose(1, 2)
return output
class MemoryBlockV2(nn.Module):
def __init__(self, d, filter_size, shift, dropout=0.0):
super(MemoryBlockV2, self).__init__()
left_padding = int(round((filter_size - 1) / 2))
right_padding = int((filter_size - 1) / 2)
if shift > 0:
left_padding += shift
right_padding -= shift
self.lp, self.rp = left_padding, right_padding
self.conv_dw = nn.Conv1d(d, d, filter_size, 1, 0, groups=d, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, input, mask=None):
if mask is not None:
input = input.masked_fill(mask.unsqueeze(-1), 0)
x = F.pad(
input, (0, 0, self.lp, self.rp, 0, 0), mode='constant', value=0.0)
output = self.conv_dw(x.contiguous().transpose(
1, 2)).contiguous().transpose(1, 2)
output += input
output = self.dropout(output)
if mask is not None:
output = output.masked_fill(mask.unsqueeze(-1), 0)
return output
class FsmnEncoderV2(nn.Module):
def __init__(
self,
filter_size=11,
fsmn_num_layers=8,
input_dim=560,
num_memory_units=256,
ffn_inner_dim=1024,
dropout=0.1,
spk_dim=192,
shift=0,
):
super(FsmnEncoderV2, self).__init__()
self.filter_size = filter_size
self.fsmn_num_layers = fsmn_num_layers
self.num_memory_units = num_memory_units
self.ffn_inner_dim = ffn_inner_dim
self.dropout = dropout
self.shift = shift
if not isinstance(shift, list):
self.shift = [shift for _ in range(self.fsmn_num_layers)]
self.adapter = nn.ModuleList()
self.ffn_lst = nn.ModuleList()
self.proj = nn.Linear(input_dim, num_memory_units)
self.ffn_lst.append(
FeedForwardNet(
num_memory_units,
ffn_inner_dim,
num_memory_units,
dropout=dropout))
for i in range(1, fsmn_num_layers):
self.ffn_lst.append(
FeedForwardNet(
num_memory_units,
ffn_inner_dim,
num_memory_units,
dropout=dropout))
self.memory_block_lst = nn.ModuleList()
for i in range(fsmn_num_layers):
self.memory_block_lst.append(
MemoryBlockV2(num_memory_units, filter_size, self.shift[i],
dropout))
self.fc = torch.nn.Linear(num_memory_units, spk_dim, bias=False)
# self.pool=torch.nn.AdaptiveMaxPool1d()
def forward(self, input, mask=None):
x = F.dropout(input, self.dropout, self.training)
x = self.proj(x)
for ffn, memory_block in zip(self.ffn_lst, self.memory_block_lst):
# print(x.shape)
context = ffn(x)
memory = memory_block(context, mask)
memory = F.dropout(memory, self.dropout, self.training)
if memory.size(-1) == x.size(-1):
memory += x
x = self.fc(x)
x = torch.mean(x, dim=1)
return x

View File

@@ -13,6 +13,8 @@ if TYPE_CHECKING:
from .inverse_text_processing_pipeline import InverseTextProcessingPipeline
from .separation_pipeline import SeparationPipeline
from .speaker_verification_pipeline import SpeakerVerificationPipeline
from .ssr_pipeline import SSRPipeline
from .voice_conversion_pipeline import VCPipeline
else:
_import_structure = {
'ans_dfsmn_pipeline': ['ANSDFSMNPipeline'],
@@ -25,7 +27,9 @@ else:
'itn_inference_pipeline': ['InverseTextProcessingPipeline'],
'inverse_text_processing_pipeline': ['InverseTextProcessingPipeline'],
'separation_pipeline': ['SeparationPipeline'],
'speaker_verification_pipeline': ['SpeakerVerificationPipeline']
'speaker_verification_pipeline': ['SpeakerVerificationPipeline'],
'speech-super-resolution-inference': ['SSRPipeline'],
'voice_conversion': ['VCPipeline']
}
import sys

View File

@@ -0,0 +1,49 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import Any, Dict
import numpy as np
import torch
from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.utils.constant import Tasks
@PIPELINES.register_module(
Tasks.speech_super_resolution,
module_name=Pipelines.speech_super_resolution_inference)
class SSRPipeline(Pipeline):
r"""ANS (Acoustic Noise Suppression) Inference Pipeline .
When invoke the class with pipeline.__call__(), it accept only one
parameter:
inputs(str): the path of wav file
"""
SAMPLE_RATE = 48000
def __init__(self, model, **kwargs):
"""
use `model` and `preprocessor` to create a kws pipeline for prediction
Args:
model: model id on modelscope hub.
"""
super().__init__(model=model, **kwargs)
self.model.eval()
self.stream_mode = kwargs.get('stream_mode', False)
def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
return inputs
def forward(self, inputs: Dict[str, Any],
**forward_params) -> Dict[str, Any]:
with torch.no_grad():
outputs = self.model(inputs)
outputs *= 32768.
outputs = np.array(outputs, 'int16').tobytes()
return {OutputKeys.OUTPUT_PCM: outputs}
def postprocess(self, inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
return inputs

View File

@@ -0,0 +1,48 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import Any, Dict
import numpy as np
import torch
from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.utils.constant import Tasks
@PIPELINES.register_module(
Tasks.voice_conversion, module_name=Pipelines.voice_conversion)
class VCPipeline(Pipeline):
r"""ANS (Acoustic Noise Suppression) Inference Pipeline .
When invoke the class with pipeline.__call__(), it accept only one
parameter:
inputs(str): the path of wav file
"""
SAMPLE_RATE = 16000
def __init__(self, model, **kwargs):
"""
use `model` and `preprocessor` to create a kws pipeline for prediction
Args:
model: model id on modelscope hub.
"""
super().__init__(model=model, **kwargs)
self.model.eval()
self.stream_mode = kwargs.get('stream_mode', False)
def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
return inputs
def forward(self, inputs: Dict[str, Any],
**forward_params) -> Dict[str, Any]:
with torch.no_grad():
outputs = self.model(inputs)
outputs *= 32768.
outputs = np.array(outputs, 'int16').tobytes()
return {OutputKeys.OUTPUT_PCM: outputs}
def postprocess(self, inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
return inputs

View File

@@ -264,6 +264,8 @@ class AudioTasks(object):
speaker_diarization_dialogue_detection = 'speaker-diarization-dialogue-detection'
speaker_diarization_semantic_speaker_turn_detection = 'speaker-diarization-semantic-speaker-turn-detection'
emotion_recognition = 'emotion-recognition'
speech_super_resolution = 'speech-super-resolution'
voice_conversion = 'voice-conversion'
class MultiModalTasks(object):

View File

@@ -0,0 +1,31 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.test_utils import test_level
class HifiSSRTestTask(unittest.TestCase):
def setUp(self) -> None:
self.task = Tasks.speech_super_resolution
self.model_id = 'ACoderPassBy/HifiSSR'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_face_compare(self):
ref_wav = 'data/test/audios/speaker1_a_en_16k.wav'
source_wav = 'data/test/audios/speaker1_a_en_16k.wav'
# out_wav= ''
inp_data = {
'ref_wav': ref_wav,
'source_wav': source_wav,
'out_wav': ''
}
pipe = pipeline(Tasks.speech_super_resolution, model=self.model_id)
pipe(inp_data) # 输出结果将保存为"out.wav"
print('ssr success!')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,33 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.test_utils import test_level
class UnetVCTestTask(unittest.TestCase):
def setUp(self) -> None:
self.task = Tasks.voice_conversion
self.model_id = 'ACoderPassBy/UnetVC'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_face_compare(self):
ref_wav = 'data/test/audios/speaker1_a_en_16k.wav'
source_wav = 'data/test/audios/speaker1_a_en_16k.wav'
inp_data = {
'source_wav': ref_wav,
'target_wav': source_wav,
'save_path': '',
}
pipe = pipeline(
Tasks.voice_conversion,
model=self.model_id,
model_revision='v1.0.0')
pipe(inp_data) # 输出结果将保存为"out.wav"
print('speech vc success!')
if __name__ == '__main__':
unittest.main()