Reformat and rewrite _get_name_params (#57)

* Reformat

* rewrite _get_name_params

* Add workflow for automatic formatting

* Revert "Add workflow for automatic formatting"

This reverts commit 9111c5dbc1830248305fb075587a88be07ad3115.

* revert Retrieval_based_Voice_Conversion_WebUI.ipynb

---------

Co-authored-by: 源文雨 <41315874+fumiama@users.noreply.github.com>
This commit is contained in:
Ftps
2023-04-15 20:44:24 +09:00
committed by GitHub
parent aaa893c4b1
commit c8261b2ccc
45 changed files with 4878 additions and 2456 deletions

View File

@@ -48,8 +48,10 @@ def slice_segments(x, ids_str, segment_size=4):
idx_end = idx_str + segment_size
ret[i] = x[i, :, idx_str:idx_end]
return ret
def slice_segments2(x, ids_str, segment_size=4):
ret = torch.zeros_like(x[:, :segment_size])
ret = torch.zeros_like(x[:, :segment_size])
for i in range(x.size(0)):
idx_str = ids_str[i]
idx_end = idx_str + segment_size

View File

@@ -1,4 +1,4 @@
import math,pdb,os
import math, pdb, os
from time import time as ttime
import torch
from torch import nn
@@ -12,9 +12,20 @@ from torch.nn.utils import weight_norm, remove_weight_norm, spectral_norm
from infer_pack.commons import init_weights
import numpy as np
from infer_pack import commons
class TextEncoder256(nn.Module):
def __init__(
self, out_channels, hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, f0=True ):
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
f0=True,
):
super().__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
@@ -24,8 +35,8 @@ class TextEncoder256(nn.Module):
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.emb_phone = nn.Linear(256, hidden_channels)
self.lrelu=nn.LeakyReLU(0.1,inplace=True)
if(f0==True):
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
@@ -33,12 +44,12 @@ class TextEncoder256(nn.Module):
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, phone, pitch, lengths):
if(pitch==None):
if pitch == None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels) # [b, t, h]
x=self.lrelu(x)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1) # [b, h, t]
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(
x.dtype
@@ -48,8 +59,20 @@ class TextEncoder256(nn.Module):
m, logs = torch.split(stats, self.out_channels, dim=1)
return m, logs, x_mask
class TextEncoder256Sim(nn.Module):
def __init__( self, out_channels, hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, f0=True):
def __init__(
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
f0=True,
):
super().__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
@@ -59,8 +82,8 @@ class TextEncoder256Sim(nn.Module):
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.emb_phone = nn.Linear(256, hidden_channels)
self.lrelu=nn.LeakyReLU(0.1,inplace=True)
if(f0==True):
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
@@ -68,17 +91,21 @@ class TextEncoder256Sim(nn.Module):
self.proj = nn.Conv1d(hidden_channels, out_channels, 1)
def forward(self, phone, pitch, lengths):
if(pitch==None):
if pitch == None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels) # [b, t, h]
x=self.lrelu(x)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1) # [b, h, t]
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(x.dtype)
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(
x.dtype
)
x = self.encoder(x * x_mask, x_mask)
x = self.proj(x) * x_mask
return x,x_mask
return x, x_mask
class ResidualCouplingBlock(nn.Module):
def __init__(
self,
@@ -126,6 +153,8 @@ class ResidualCouplingBlock(nn.Module):
def remove_weight_norm(self):
for i in range(self.n_flows):
self.flows[i * 2].remove_weight_norm()
class PosteriorEncoder(nn.Module):
def __init__(
self,
@@ -169,6 +198,8 @@ class PosteriorEncoder(nn.Module):
def remove_weight_norm(self):
self.enc.remove_weight_norm()
class Generator(torch.nn.Module):
def __init__(
self,
@@ -243,8 +274,10 @@ class Generator(torch.nn.Module):
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
class SineGen(torch.nn.Module):
""" Definition of sine generator
"""Definition of sine generator
SineGen(samp_rate, harmonic_num = 0,
sine_amp = 0.1, noise_std = 0.003,
voiced_threshold = 0,
@@ -259,10 +292,15 @@ class SineGen(torch.nn.Module):
segment is always sin(np.pi) or cos(0)
"""
def __init__(self, samp_rate, harmonic_num=0,
sine_amp=0.1, noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False):
def __init__(
self,
samp_rate,
harmonic_num=0,
sine_amp=0.1,
noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False,
):
super(SineGen, self).__init__()
self.sine_amp = sine_amp
self.noise_std = noise_std
@@ -277,8 +315,8 @@ class SineGen(torch.nn.Module):
uv = uv * (f0 > self.voiced_threshold)
return uv
def forward(self, f0,upp):
""" sine_tensor, uv = forward(f0)
def forward(self, f0, upp):
"""sine_tensor, uv = forward(f0)
input F0: tensor(batchsize=1, length, dim=1)
f0 for unvoiced steps should be 0
output sine_tensor: tensor(batchsize=1, length, dim)
@@ -286,32 +324,52 @@ class SineGen(torch.nn.Module):
"""
with torch.no_grad():
f0 = f0[:, None].transpose(1, 2)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim,device=f0.device)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)
# fundamental component
f0_buf[:, :, 0] = f0[:, :, 0]
for idx in np.arange(self.harmonic_num):f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * (idx + 2)# idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic
rad_values = (f0_buf / self.sampling_rate) % 1###%1意味着n_har的乘积无法后处理优化
rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device)
for idx in np.arange(self.harmonic_num):
f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * (
idx + 2
) # idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic
rad_values = (f0_buf / self.sampling_rate) % 1 ###%1意味着n_har的乘积无法后处理优化
rand_ini = torch.rand(
f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device
)
rand_ini[:, 0] = 0
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini
tmp_over_one = torch.cumsum(rad_values, 1)# % 1 #####%1意味着后面的cumsum无法再优化
tmp_over_one*=upp
tmp_over_one=F.interpolate(tmp_over_one.transpose(2, 1), scale_factor=upp, mode='linear', align_corners=True).transpose(2, 1)
rad_values=F.interpolate(rad_values.transpose(2, 1), scale_factor=upp, mode='nearest').transpose(2, 1)#######
tmp_over_one%=1
tmp_over_one = torch.cumsum(rad_values, 1) # % 1 #####%1意味着后面的cumsum无法再优化
tmp_over_one *= upp
tmp_over_one = F.interpolate(
tmp_over_one.transpose(2, 1),
scale_factor=upp,
mode="linear",
align_corners=True,
).transpose(2, 1)
rad_values = F.interpolate(
rad_values.transpose(2, 1), scale_factor=upp, mode="nearest"
).transpose(
2, 1
) #######
tmp_over_one %= 1
tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0
cumsum_shift = torch.zeros_like(rad_values)
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
sine_waves = torch.sin(torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi)
sine_waves = torch.sin(
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi
)
sine_waves = sine_waves * self.sine_amp
uv = self._f02uv(f0)
uv = F.interpolate(uv.transpose(2, 1), scale_factor=upp, mode='nearest').transpose(2, 1)
uv = F.interpolate(
uv.transpose(2, 1), scale_factor=upp, mode="nearest"
).transpose(2, 1)
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
sine_waves = sine_waves * uv + noise
return sine_waves, uv, noise
class SourceModuleHnNSF(torch.nn.Module):
""" SourceModule for hn-nsf
"""SourceModule for hn-nsf
SourceModule(sampling_rate, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0)
sampling_rate: sampling_rate in Hz
@@ -328,26 +386,37 @@ class SourceModuleHnNSF(torch.nn.Module):
uv (batchsize, length, 1)
"""
def __init__(self, sampling_rate, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0,is_half=True):
def __init__(
self,
sampling_rate,
harmonic_num=0,
sine_amp=0.1,
add_noise_std=0.003,
voiced_threshod=0,
is_half=True,
):
super(SourceModuleHnNSF, self).__init__()
self.sine_amp = sine_amp
self.noise_std = add_noise_std
self.is_half=is_half
self.is_half = is_half
# to produce sine waveforms
self.l_sin_gen = SineGen(sampling_rate, harmonic_num,
sine_amp, add_noise_std, voiced_threshod)
self.l_sin_gen = SineGen(
sampling_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod
)
# to merge source harmonics into a single excitation
self.l_linear = torch.nn.Linear(harmonic_num + 1, 1)
self.l_tanh = torch.nn.Tanh()
def forward(self, x,upp=None):
sine_wavs, uv, _ = self.l_sin_gen(x,upp)
if(self.is_half):sine_wavs=sine_wavs.half()
def forward(self, x, upp=None):
sine_wavs, uv, _ = self.l_sin_gen(x, upp)
if self.is_half:
sine_wavs = sine_wavs.half()
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
return sine_merge,None,None# noise, uv
return sine_merge, None, None # noise, uv
class GeneratorNSF(torch.nn.Module):
def __init__(
self,
@@ -360,7 +429,7 @@ class GeneratorNSF(torch.nn.Module):
upsample_kernel_sizes,
gin_channels,
sr,
is_half=False
is_half=False,
):
super(GeneratorNSF, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
@@ -368,9 +437,7 @@ class GeneratorNSF(torch.nn.Module):
self.f0_upsamp = torch.nn.Upsample(scale_factor=np.prod(upsample_rates))
self.m_source = SourceModuleHnNSF(
sampling_rate=sr,
harmonic_num=0,
is_half=is_half
sampling_rate=sr, harmonic_num=0, is_half=is_half
)
self.noise_convs = nn.ModuleList()
self.conv_pre = Conv1d(
@@ -393,9 +460,16 @@ class GeneratorNSF(torch.nn.Module):
)
)
if i + 1 < len(upsample_rates):
stride_f0 = np.prod(upsample_rates[i + 1:])
self.noise_convs.append(Conv1d(
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=stride_f0 // 2))
stride_f0 = np.prod(upsample_rates[i + 1 :])
self.noise_convs.append(
Conv1d(
1,
c_cur,
kernel_size=stride_f0 * 2,
stride=stride_f0,
padding=stride_f0 // 2,
)
)
else:
self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1))
@@ -413,10 +487,10 @@ class GeneratorNSF(torch.nn.Module):
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
self.upp=np.prod(upsample_rates)
self.upp = np.prod(upsample_rates)
def forward(self, x, f0,g=None):
har_source, noi_source, uv = self.m_source(f0,self.upp)
def forward(self, x, f0, g=None):
har_source, noi_source, uv = self.m_source(f0, self.upp)
har_source = har_source.transpose(1, 2)
x = self.conv_pre(x)
if g is not None:
@@ -444,11 +518,15 @@ class GeneratorNSF(torch.nn.Module):
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
sr2sr={
"32k":32000,
"40k":40000,
"48k":48000,
sr2sr = {
"32k": 32000,
"40k": 40000,
"48k": 48000,
}
class SynthesizerTrnMs256NSFsid(nn.Module):
def __init__(
self,
@@ -472,10 +550,9 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
sr,
**kwargs
):
super().__init__()
if(type(sr)==type("strr")):
sr=sr2sr[sr]
if type(sr) == type("strr"):
sr = sr2sr[sr]
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
@@ -493,7 +570,7 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
self.segment_size = segment_size
self.gin_channels = gin_channels
# self.hop_length = hop_length#
self.spk_embed_dim=spk_embed_dim
self.spk_embed_dim = spk_embed_dim
self.enc_p = TextEncoder256(
inter_channels,
hidden_channels,
@@ -511,7 +588,9 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels, sr=sr, is_half=kwargs["is_half"]
gin_channels=gin_channels,
sr=sr,
is_half=kwargs["is_half"],
)
self.enc_q = PosteriorEncoder(
spec_channels,
@@ -526,13 +605,16 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
print("gin_channels:",gin_channels,"self.spk_embed_dim:",self.spk_embed_dim)
print("gin_channels:", gin_channels, "self.spk_embed_dim:", self.spk_embed_dim)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def forward(self, phone, phone_lengths, pitch,pitchf, y, y_lengths,ds):#这里ds是id[bs,1]
def forward(
self, phone, phone_lengths, pitch, pitchf, y, y_lengths, ds
): # 这里ds是id[bs,1]
# print(1,pitch.shape)#[bs,t]
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
@@ -542,20 +624,20 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
z, y_lengths, self.segment_size
)
# print(-1,pitchf.shape,ids_slice,self.segment_size,self.hop_length,self.segment_size//self.hop_length)
pitchf = commons.slice_segments2(
pitchf, ids_slice, self.segment_size
)
pitchf = commons.slice_segments2(pitchf, ids_slice, self.segment_size)
# print(-2,pitchf.shape,z_slice.shape)
o = self.dec(z_slice,pitchf, g=g)
o = self.dec(z_slice, pitchf, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths, pitch, nsff0,sid,max_len=None):
def infer(self, phone, phone_lengths, pitch, nsff0, sid, max_len=None):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec((z * x_mask)[:, :, :max_len], nsff0,g=g)
o = self.dec((z * x_mask)[:, :, :max_len], nsff0, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
class SynthesizerTrnMs256NSFsid_nono(nn.Module):
def __init__(
self,
@@ -579,7 +661,6 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
sr=None,
**kwargs
):
super().__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
@@ -598,7 +679,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
self.segment_size = segment_size
self.gin_channels = gin_channels
# self.hop_length = hop_length#
self.spk_embed_dim=spk_embed_dim
self.spk_embed_dim = spk_embed_dim
self.enc_p = TextEncoder256(
inter_channels,
hidden_channels,
@@ -606,7 +687,8 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
n_heads,
n_layers,
kernel_size,
p_dropout,f0=False
p_dropout,
f0=False,
)
self.dec = Generator(
inter_channels,
@@ -616,7 +698,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels
gin_channels=gin_channels,
)
self.enc_q = PosteriorEncoder(
spec_channels,
@@ -631,14 +713,14 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
print("gin_channels:",gin_channels,"self.spk_embed_dim:",self.spk_embed_dim)
print("gin_channels:", gin_channels, "self.spk_embed_dim:", self.spk_embed_dim)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def forward(self, phone, phone_lengths, y, y_lengths,ds):#这里ds是id[bs,1]
def forward(self, phone, phone_lengths, y, y_lengths, ds): # 这里ds是id[bs,1]
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
z, m_q, logs_q, y_mask = self.enc_q(y, y_lengths, g=g)
@@ -649,13 +731,15 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
o = self.dec(z_slice, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths,sid,max_len=None):
def infer(self, phone, phone_lengths, sid, max_len=None):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec((z * x_mask)[:, :, :max_len],g=g)
o = self.dec((z * x_mask)[:, :, :max_len], g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
class SynthesizerTrnMs256NSFsid_sim(nn.Module):
"""
Synthesizer for Training
@@ -684,7 +768,6 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
use_sdp=True,
**kwargs
):
super().__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
@@ -703,7 +786,7 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
self.segment_size = segment_size
self.gin_channels = gin_channels
# self.hop_length = hop_length#
self.spk_embed_dim=spk_embed_dim
self.spk_embed_dim = spk_embed_dim
self.enc_p = TextEncoder256Sim(
inter_channels,
hidden_channels,
@@ -721,20 +804,24 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,is_half=kwargs["is_half"]
gin_channels=gin_channels,
is_half=kwargs["is_half"],
)
self.flow = ResidualCouplingBlock(
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
print("gin_channels:",gin_channels,"self.spk_embed_dim:",self.spk_embed_dim)
print("gin_channels:", gin_channels, "self.spk_embed_dim:", self.spk_embed_dim)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def forward(self, phone, phone_lengths, pitch, pitchf, y_lengths,ds): # y是spec不需要了现在
def forward(
self, phone, phone_lengths, pitch, pitchf, y_lengths, ds
): # y是spec不需要了现在
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
x, x_mask = self.enc_p(phone, pitch, phone_lengths)
x = self.flow(x, x_mask, g=g, reverse=True)
@@ -742,22 +829,24 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
x, y_lengths, self.segment_size
)
pitchf = commons.slice_segments2(
pitchf, ids_slice, self.segment_size
)
pitchf = commons.slice_segments2(pitchf, ids_slice, self.segment_size)
o = self.dec(z_slice, pitchf, g=g)
return o, ids_slice
def infer(self, phone, phone_lengths, pitch, pitchf, ds,max_len=None): # y是spec不需要了现在
def infer(
self, phone, phone_lengths, pitch, pitchf, ds, max_len=None
): # y是spec不需要了现在
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
x, x_mask = self.enc_p(phone, pitch, phone_lengths)
x = self.flow(x, x_mask, g=g, reverse=True)
o = self.dec((x*x_mask)[:, :, :max_len], pitchf, g=g)
o = self.dec((x * x_mask)[:, :, :max_len], pitchf, g=g)
return o, o
class MultiPeriodDiscriminator(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminator, self).__init__()
periods = [2, 3, 5, 7, 11,17]
periods = [2, 3, 5, 7, 11, 17]
# periods = [3, 5, 7, 11, 17, 23, 37]
discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)]
@@ -767,7 +856,7 @@ class MultiPeriodDiscriminator(torch.nn.Module):
self.discriminators = nn.ModuleList(discs)
def forward(self, y, y_hat):
y_d_rs = []#
y_d_rs = [] #
y_d_gs = []
fmap_rs = []
fmap_gs = []
@@ -783,6 +872,7 @@ class MultiPeriodDiscriminator(torch.nn.Module):
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class DiscriminatorS(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(DiscriminatorS, self).__init__()
@@ -812,6 +902,7 @@ class DiscriminatorS(torch.nn.Module):
return x, fmap
class DiscriminatorP(torch.nn.Module):
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):
super(DiscriminatorP, self).__init__()
@@ -889,4 +980,3 @@ class DiscriminatorP(torch.nn.Module):
x = torch.flatten(x, 1, -1)
return x, fmap

View File

@@ -1,4 +1,4 @@
import math,pdb,os
import math, pdb, os
from time import time as ttime
import torch
from torch import nn
@@ -12,9 +12,20 @@ from torch.nn.utils import weight_norm, remove_weight_norm, spectral_norm
from infer_pack.commons import init_weights
import numpy as np
from infer_pack import commons
class TextEncoder256(nn.Module):
def __init__(
self, out_channels, hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, f0=True ):
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
f0=True,
):
super().__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
@@ -24,8 +35,8 @@ class TextEncoder256(nn.Module):
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.emb_phone = nn.Linear(256, hidden_channels)
self.lrelu=nn.LeakyReLU(0.1,inplace=True)
if(f0==True):
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
@@ -33,12 +44,12 @@ class TextEncoder256(nn.Module):
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, phone, pitch, lengths):
if(pitch==None):
if pitch == None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels) # [b, t, h]
x=self.lrelu(x)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1) # [b, h, t]
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(
x.dtype
@@ -48,8 +59,20 @@ class TextEncoder256(nn.Module):
m, logs = torch.split(stats, self.out_channels, dim=1)
return m, logs, x_mask
class TextEncoder256Sim(nn.Module):
def __init__( self, out_channels, hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, f0=True):
def __init__(
self,
out_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
f0=True,
):
super().__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
@@ -59,8 +82,8 @@ class TextEncoder256Sim(nn.Module):
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.emb_phone = nn.Linear(256, hidden_channels)
self.lrelu=nn.LeakyReLU(0.1,inplace=True)
if(f0==True):
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
@@ -68,17 +91,21 @@ class TextEncoder256Sim(nn.Module):
self.proj = nn.Conv1d(hidden_channels, out_channels, 1)
def forward(self, phone, pitch, lengths):
if(pitch==None):
if pitch == None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
x = x * math.sqrt(self.hidden_channels) # [b, t, h]
x=self.lrelu(x)
x = self.lrelu(x)
x = torch.transpose(x, 1, -1) # [b, h, t]
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(x.dtype)
x_mask = torch.unsqueeze(commons.sequence_mask(lengths, x.size(2)), 1).to(
x.dtype
)
x = self.encoder(x * x_mask, x_mask)
x = self.proj(x) * x_mask
return x,x_mask
return x, x_mask
class ResidualCouplingBlock(nn.Module):
def __init__(
self,
@@ -126,6 +153,8 @@ class ResidualCouplingBlock(nn.Module):
def remove_weight_norm(self):
for i in range(self.n_flows):
self.flows[i * 2].remove_weight_norm()
class PosteriorEncoder(nn.Module):
def __init__(
self,
@@ -169,6 +198,8 @@ class PosteriorEncoder(nn.Module):
def remove_weight_norm(self):
self.enc.remove_weight_norm()
class Generator(torch.nn.Module):
def __init__(
self,
@@ -243,8 +274,10 @@ class Generator(torch.nn.Module):
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
class SineGen(torch.nn.Module):
""" Definition of sine generator
"""Definition of sine generator
SineGen(samp_rate, harmonic_num = 0,
sine_amp = 0.1, noise_std = 0.003,
voiced_threshold = 0,
@@ -259,10 +292,15 @@ class SineGen(torch.nn.Module):
segment is always sin(np.pi) or cos(0)
"""
def __init__(self, samp_rate, harmonic_num=0,
sine_amp=0.1, noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False):
def __init__(
self,
samp_rate,
harmonic_num=0,
sine_amp=0.1,
noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False,
):
super(SineGen, self).__init__()
self.sine_amp = sine_amp
self.noise_std = noise_std
@@ -277,8 +315,8 @@ class SineGen(torch.nn.Module):
uv = uv * (f0 > self.voiced_threshold)
return uv
def forward(self, f0,upp):
""" sine_tensor, uv = forward(f0)
def forward(self, f0, upp):
"""sine_tensor, uv = forward(f0)
input F0: tensor(batchsize=1, length, dim=1)
f0 for unvoiced steps should be 0
output sine_tensor: tensor(batchsize=1, length, dim)
@@ -286,32 +324,52 @@ class SineGen(torch.nn.Module):
"""
with torch.no_grad():
f0 = f0[:, None].transpose(1, 2)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim,device=f0.device)
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)
# fundamental component
f0_buf[:, :, 0] = f0[:, :, 0]
for idx in np.arange(self.harmonic_num):f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * (idx + 2)# idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic
rad_values = (f0_buf / self.sampling_rate) % 1###%1意味着n_har的乘积无法后处理优化
rand_ini = torch.rand(f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device)
for idx in np.arange(self.harmonic_num):
f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * (
idx + 2
) # idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic
rad_values = (f0_buf / self.sampling_rate) % 1 ###%1意味着n_har的乘积无法后处理优化
rand_ini = torch.rand(
f0_buf.shape[0], f0_buf.shape[2], device=f0_buf.device
)
rand_ini[:, 0] = 0
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini
tmp_over_one = torch.cumsum(rad_values, 1)# % 1 #####%1意味着后面的cumsum无法再优化
tmp_over_one*=upp
tmp_over_one=F.interpolate(tmp_over_one.transpose(2, 1), scale_factor=upp, mode='linear', align_corners=True).transpose(2, 1)
rad_values=F.interpolate(rad_values.transpose(2, 1), scale_factor=upp, mode='nearest').transpose(2, 1)#######
tmp_over_one%=1
tmp_over_one = torch.cumsum(rad_values, 1) # % 1 #####%1意味着后面的cumsum无法再优化
tmp_over_one *= upp
tmp_over_one = F.interpolate(
tmp_over_one.transpose(2, 1),
scale_factor=upp,
mode="linear",
align_corners=True,
).transpose(2, 1)
rad_values = F.interpolate(
rad_values.transpose(2, 1), scale_factor=upp, mode="nearest"
).transpose(
2, 1
) #######
tmp_over_one %= 1
tmp_over_one_idx = (tmp_over_one[:, 1:, :] - tmp_over_one[:, :-1, :]) < 0
cumsum_shift = torch.zeros_like(rad_values)
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
sine_waves = torch.sin(torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi)
sine_waves = torch.sin(
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi
)
sine_waves = sine_waves * self.sine_amp
uv = self._f02uv(f0)
uv = F.interpolate(uv.transpose(2, 1), scale_factor=upp, mode='nearest').transpose(2, 1)
uv = F.interpolate(
uv.transpose(2, 1), scale_factor=upp, mode="nearest"
).transpose(2, 1)
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
sine_waves = sine_waves * uv + noise
return sine_waves, uv, noise
class SourceModuleHnNSF(torch.nn.Module):
""" SourceModule for hn-nsf
"""SourceModule for hn-nsf
SourceModule(sampling_rate, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0)
sampling_rate: sampling_rate in Hz
@@ -328,26 +386,37 @@ class SourceModuleHnNSF(torch.nn.Module):
uv (batchsize, length, 1)
"""
def __init__(self, sampling_rate, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0,is_half=True):
def __init__(
self,
sampling_rate,
harmonic_num=0,
sine_amp=0.1,
add_noise_std=0.003,
voiced_threshod=0,
is_half=True,
):
super(SourceModuleHnNSF, self).__init__()
self.sine_amp = sine_amp
self.noise_std = add_noise_std
self.is_half=is_half
self.is_half = is_half
# to produce sine waveforms
self.l_sin_gen = SineGen(sampling_rate, harmonic_num,
sine_amp, add_noise_std, voiced_threshod)
self.l_sin_gen = SineGen(
sampling_rate, harmonic_num, sine_amp, add_noise_std, voiced_threshod
)
# to merge source harmonics into a single excitation
self.l_linear = torch.nn.Linear(harmonic_num + 1, 1)
self.l_tanh = torch.nn.Tanh()
def forward(self, x,upp=None):
sine_wavs, uv, _ = self.l_sin_gen(x,upp)
if(self.is_half):sine_wavs=sine_wavs.half()
def forward(self, x, upp=None):
sine_wavs, uv, _ = self.l_sin_gen(x, upp)
if self.is_half:
sine_wavs = sine_wavs.half()
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
return sine_merge,None,None# noise, uv
return sine_merge, None, None # noise, uv
class GeneratorNSF(torch.nn.Module):
def __init__(
self,
@@ -360,7 +429,7 @@ class GeneratorNSF(torch.nn.Module):
upsample_kernel_sizes,
gin_channels,
sr,
is_half=False
is_half=False,
):
super(GeneratorNSF, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
@@ -368,9 +437,7 @@ class GeneratorNSF(torch.nn.Module):
self.f0_upsamp = torch.nn.Upsample(scale_factor=np.prod(upsample_rates))
self.m_source = SourceModuleHnNSF(
sampling_rate=sr,
harmonic_num=0,
is_half=is_half
sampling_rate=sr, harmonic_num=0, is_half=is_half
)
self.noise_convs = nn.ModuleList()
self.conv_pre = Conv1d(
@@ -393,9 +460,16 @@ class GeneratorNSF(torch.nn.Module):
)
)
if i + 1 < len(upsample_rates):
stride_f0 = np.prod(upsample_rates[i + 1:])
self.noise_convs.append(Conv1d(
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=stride_f0 // 2))
stride_f0 = np.prod(upsample_rates[i + 1 :])
self.noise_convs.append(
Conv1d(
1,
c_cur,
kernel_size=stride_f0 * 2,
stride=stride_f0,
padding=stride_f0 // 2,
)
)
else:
self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1))
@@ -413,10 +487,10 @@ class GeneratorNSF(torch.nn.Module):
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
self.upp=np.prod(upsample_rates)
self.upp = np.prod(upsample_rates)
def forward(self, x, f0,g=None):
har_source, noi_source, uv = self.m_source(f0,self.upp)
def forward(self, x, f0, g=None):
har_source, noi_source, uv = self.m_source(f0, self.upp)
har_source = har_source.transpose(1, 2)
x = self.conv_pre(x)
if g is not None:
@@ -444,11 +518,15 @@ class GeneratorNSF(torch.nn.Module):
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
sr2sr={
"32k":32000,
"40k":40000,
"48k":48000,
sr2sr = {
"32k": 32000,
"40k": 40000,
"48k": 48000,
}
class SynthesizerTrnMs256NSFsid(nn.Module):
def __init__(
self,
@@ -472,10 +550,9 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
sr,
**kwargs
):
super().__init__()
if(type(sr)==type("strr")):
sr=sr2sr[sr]
if type(sr) == type("strr"):
sr = sr2sr[sr]
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
@@ -493,7 +570,7 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
self.segment_size = segment_size
self.gin_channels = gin_channels
# self.hop_length = hop_length#
self.spk_embed_dim=spk_embed_dim
self.spk_embed_dim = spk_embed_dim
self.enc_p = TextEncoder256(
inter_channels,
hidden_channels,
@@ -511,7 +588,9 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels, sr=sr, is_half=kwargs["is_half"]
gin_channels=gin_channels,
sr=sr,
is_half=kwargs["is_half"],
)
self.enc_q = PosteriorEncoder(
spec_channels,
@@ -526,21 +605,22 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
print("gin_channels:",gin_channels,"self.spk_embed_dim:",self.spk_embed_dim)
print("gin_channels:", gin_channels, "self.spk_embed_dim:", self.spk_embed_dim)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def forward(self, phone, phone_lengths, pitch, nsff0 ,sid, rnd, max_len=None):
def forward(self, phone, phone_lengths, pitch, nsff0, sid, rnd, max_len=None):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * rnd) * x_mask
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec((z * x_mask)[:, :, :max_len], nsff0,g=g)
o = self.dec((z * x_mask)[:, :, :max_len], nsff0, g=g)
return o
class SynthesizerTrnMs256NSFsid_sim(nn.Module):
"""
Synthesizer for Training
@@ -569,7 +649,6 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
use_sdp=True,
**kwargs
):
super().__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
@@ -588,7 +667,7 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
self.segment_size = segment_size
self.gin_channels = gin_channels
# self.hop_length = hop_length#
self.spk_embed_dim=spk_embed_dim
self.spk_embed_dim = spk_embed_dim
self.enc_p = TextEncoder256Sim(
inter_channels,
hidden_channels,
@@ -606,30 +685,35 @@ class SynthesizerTrnMs256NSFsid_sim(nn.Module):
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels=gin_channels,is_half=kwargs["is_half"]
gin_channels=gin_channels,
is_half=kwargs["is_half"],
)
self.flow = ResidualCouplingBlock(
inter_channels, hidden_channels, 5, 1, 3, gin_channels=gin_channels
)
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
print("gin_channels:",gin_channels,"self.spk_embed_dim:",self.spk_embed_dim)
print("gin_channels:", gin_channels, "self.spk_embed_dim:", self.spk_embed_dim)
def remove_weight_norm(self):
self.dec.remove_weight_norm()
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def forward(self, phone, phone_lengths, pitch, pitchf, ds,max_len=None): # y是spec不需要了现在
def forward(
self, phone, phone_lengths, pitch, pitchf, ds, max_len=None
): # y是spec不需要了现在
g = self.emb_g(ds.unsqueeze(0)).unsqueeze(-1) # [b, 256, 1]##1是t广播的
x, x_mask = self.enc_p(phone, pitch, phone_lengths)
x = self.flow(x, x_mask, g=g, reverse=True)
o = self.dec((x*x_mask)[:, :, :max_len], pitchf, g=g)
o = self.dec((x * x_mask)[:, :, :max_len], pitchf, g=g)
return o
class MultiPeriodDiscriminator(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminator, self).__init__()
periods = [2, 3, 5, 7, 11,17]
periods = [2, 3, 5, 7, 11, 17]
# periods = [3, 5, 7, 11, 17, 23, 37]
discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)]
@@ -639,7 +723,7 @@ class MultiPeriodDiscriminator(torch.nn.Module):
self.discriminators = nn.ModuleList(discs)
def forward(self, y, y_hat):
y_d_rs = []#
y_d_rs = [] #
y_d_gs = []
fmap_rs = []
fmap_gs = []
@@ -655,6 +739,7 @@ class MultiPeriodDiscriminator(torch.nn.Module):
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class DiscriminatorS(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(DiscriminatorS, self).__init__()
@@ -684,6 +769,7 @@ class DiscriminatorS(torch.nn.Module):
return x, fmap
class DiscriminatorP(torch.nn.Module):
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):
super(DiscriminatorP, self).__init__()
@@ -761,4 +847,3 @@ class DiscriminatorP(torch.nn.Module):
x = torch.flatten(x, 1, -1)
return x, fmap

View File

@@ -9,66 +9,63 @@ DEFAULT_MIN_BIN_HEIGHT = 1e-3
DEFAULT_MIN_DERIVATIVE = 1e-3
def piecewise_rational_quadratic_transform(inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails=None,
tail_bound=1.,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE):
def piecewise_rational_quadratic_transform(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails=None,
tail_bound=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
if tails is None:
spline_fn = rational_quadratic_spline
spline_kwargs = {}
else:
spline_fn = unconstrained_rational_quadratic_spline
spline_kwargs = {
'tails': tails,
'tail_bound': tail_bound
}
spline_kwargs = {"tails": tails, "tail_bound": tail_bound}
outputs, logabsdet = spline_fn(
inputs=inputs,
unnormalized_widths=unnormalized_widths,
unnormalized_heights=unnormalized_heights,
unnormalized_derivatives=unnormalized_derivatives,
inverse=inverse,
min_bin_width=min_bin_width,
min_bin_height=min_bin_height,
min_derivative=min_derivative,
**spline_kwargs
inputs=inputs,
unnormalized_widths=unnormalized_widths,
unnormalized_heights=unnormalized_heights,
unnormalized_derivatives=unnormalized_derivatives,
inverse=inverse,
min_bin_width=min_bin_width,
min_bin_height=min_bin_height,
min_derivative=min_derivative,
**spline_kwargs
)
return outputs, logabsdet
def searchsorted(bin_locations, inputs, eps=1e-6):
bin_locations[..., -1] += eps
return torch.sum(
inputs[..., None] >= bin_locations,
dim=-1
) - 1
return torch.sum(inputs[..., None] >= bin_locations, dim=-1) - 1
def unconstrained_rational_quadratic_spline(inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails='linear',
tail_bound=1.,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE):
def unconstrained_rational_quadratic_spline(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
tails="linear",
tail_bound=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
inside_interval_mask = (inputs >= -tail_bound) & (inputs <= tail_bound)
outside_interval_mask = ~inside_interval_mask
outputs = torch.zeros_like(inputs)
logabsdet = torch.zeros_like(inputs)
if tails == 'linear':
if tails == "linear":
unnormalized_derivatives = F.pad(unnormalized_derivatives, pad=(1, 1))
constant = np.log(np.exp(1 - min_derivative) - 1)
unnormalized_derivatives[..., 0] = constant
@@ -77,45 +74,57 @@ def unconstrained_rational_quadratic_spline(inputs,
outputs[outside_interval_mask] = inputs[outside_interval_mask]
logabsdet[outside_interval_mask] = 0
else:
raise RuntimeError('{} tails are not implemented.'.format(tails))
raise RuntimeError("{} tails are not implemented.".format(tails))
outputs[inside_interval_mask], logabsdet[inside_interval_mask] = rational_quadratic_spline(
(
outputs[inside_interval_mask],
logabsdet[inside_interval_mask],
) = rational_quadratic_spline(
inputs=inputs[inside_interval_mask],
unnormalized_widths=unnormalized_widths[inside_interval_mask, :],
unnormalized_heights=unnormalized_heights[inside_interval_mask, :],
unnormalized_derivatives=unnormalized_derivatives[inside_interval_mask, :],
inverse=inverse,
left=-tail_bound, right=tail_bound, bottom=-tail_bound, top=tail_bound,
left=-tail_bound,
right=tail_bound,
bottom=-tail_bound,
top=tail_bound,
min_bin_width=min_bin_width,
min_bin_height=min_bin_height,
min_derivative=min_derivative
min_derivative=min_derivative,
)
return outputs, logabsdet
def rational_quadratic_spline(inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
left=0., right=1., bottom=0., top=1.,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE):
def rational_quadratic_spline(
inputs,
unnormalized_widths,
unnormalized_heights,
unnormalized_derivatives,
inverse=False,
left=0.0,
right=1.0,
bottom=0.0,
top=1.0,
min_bin_width=DEFAULT_MIN_BIN_WIDTH,
min_bin_height=DEFAULT_MIN_BIN_HEIGHT,
min_derivative=DEFAULT_MIN_DERIVATIVE,
):
if torch.min(inputs) < left or torch.max(inputs) > right:
raise ValueError('Input to a transform is not within its domain')
raise ValueError("Input to a transform is not within its domain")
num_bins = unnormalized_widths.shape[-1]
if min_bin_width * num_bins > 1.0:
raise ValueError('Minimal bin width too large for the number of bins')
raise ValueError("Minimal bin width too large for the number of bins")
if min_bin_height * num_bins > 1.0:
raise ValueError('Minimal bin height too large for the number of bins')
raise ValueError("Minimal bin height too large for the number of bins")
widths = F.softmax(unnormalized_widths, dim=-1)
widths = min_bin_width + (1 - min_bin_width * num_bins) * widths
cumwidths = torch.cumsum(widths, dim=-1)
cumwidths = F.pad(cumwidths, pad=(1, 0), mode='constant', value=0.0)
cumwidths = F.pad(cumwidths, pad=(1, 0), mode="constant", value=0.0)
cumwidths = (right - left) * cumwidths + left
cumwidths[..., 0] = left
cumwidths[..., -1] = right
@@ -126,7 +135,7 @@ def rational_quadratic_spline(inputs,
heights = F.softmax(unnormalized_heights, dim=-1)
heights = min_bin_height + (1 - min_bin_height * num_bins) * heights
cumheights = torch.cumsum(heights, dim=-1)
cumheights = F.pad(cumheights, pad=(1, 0), mode='constant', value=0.0)
cumheights = F.pad(cumheights, pad=(1, 0), mode="constant", value=0.0)
cumheights = (top - bottom) * cumheights + bottom
cumheights[..., 0] = bottom
cumheights[..., -1] = top
@@ -150,15 +159,13 @@ def rational_quadratic_spline(inputs,
input_heights = heights.gather(-1, bin_idx)[..., 0]
if inverse:
a = (((inputs - input_cumheights) * (input_derivatives
+ input_derivatives_plus_one
- 2 * input_delta)
+ input_heights * (input_delta - input_derivatives)))
b = (input_heights * input_derivatives
- (inputs - input_cumheights) * (input_derivatives
+ input_derivatives_plus_one
- 2 * input_delta))
c = - input_delta * (inputs - input_cumheights)
a = (inputs - input_cumheights) * (
input_derivatives + input_derivatives_plus_one - 2 * input_delta
) + input_heights * (input_delta - input_derivatives)
b = input_heights * input_derivatives - (inputs - input_cumheights) * (
input_derivatives + input_derivatives_plus_one - 2 * input_delta
)
c = -input_delta * (inputs - input_cumheights)
discriminant = b.pow(2) - 4 * a * c
assert (discriminant >= 0).all()
@@ -167,11 +174,15 @@ def rational_quadratic_spline(inputs,
outputs = root * input_bin_widths + input_cumwidths
theta_one_minus_theta = root * (1 - root)
denominator = input_delta + ((input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta)
derivative_numerator = input_delta.pow(2) * (input_derivatives_plus_one * root.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - root).pow(2))
denominator = input_delta + (
(input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta
)
derivative_numerator = input_delta.pow(2) * (
input_derivatives_plus_one * root.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - root).pow(2)
)
logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator)
return outputs, -logabsdet
@@ -179,15 +190,20 @@ def rational_quadratic_spline(inputs,
theta = (inputs - input_cumwidths) / input_bin_widths
theta_one_minus_theta = theta * (1 - theta)
numerator = input_heights * (input_delta * theta.pow(2)
+ input_derivatives * theta_one_minus_theta)
denominator = input_delta + ((input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta)
numerator = input_heights * (
input_delta * theta.pow(2) + input_derivatives * theta_one_minus_theta
)
denominator = input_delta + (
(input_derivatives + input_derivatives_plus_one - 2 * input_delta)
* theta_one_minus_theta
)
outputs = input_cumheights + numerator / denominator
derivative_numerator = input_delta.pow(2) * (input_derivatives_plus_one * theta.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - theta).pow(2))
derivative_numerator = input_delta.pow(2) * (
input_derivatives_plus_one * theta.pow(2)
+ 2 * input_delta * theta_one_minus_theta
+ input_derivatives * (1 - theta).pow(2)
)
logabsdet = torch.log(derivative_numerator) - 2 * torch.log(denominator)
return outputs, logabsdet