Add better voice clones and prepare for finetuning

This commit is contained in:
Francis LaBounty
2023-05-25 16:24:41 -06:00
parent 0b16a49fe2
commit 40afeec9c0
12 changed files with 1050 additions and 37 deletions

0
hubert/__init__.py Normal file
View File

184
hubert/customtokenizer.py Normal file
View File

@@ -0,0 +1,184 @@
# From https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer
import json
import os.path
from zipfile import ZipFile
import numpy
import torch
from torch import nn, optim
from torch.serialization import MAP_LOCATION
class CustomTokenizer(nn.Module):
def __init__(self, hidden_size=1024, input_size=768, output_size=10000, version=0):
super(CustomTokenizer, self).__init__()
next_size = input_size
if version == 0:
self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
next_size = hidden_size
if version == 1:
self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
self.intermediate = nn.Linear(hidden_size, 4096)
next_size = 4096
self.fc = nn.Linear(next_size, output_size)
self.softmax = nn.LogSoftmax(dim=1)
self.optimizer: optim.Optimizer = None
self.lossfunc = nn.CrossEntropyLoss()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.version = version
def forward(self, x):
x, _ = self.lstm(x)
if self.version == 1:
x = self.intermediate(x)
x = self.fc(x)
x = self.softmax(x)
return x
@torch.no_grad()
def get_token(self, x):
"""
Used to get the token for the first
:param x: An array with shape (N, input_size) where N is a whole number greater or equal to 1, and input_size is the input size used when creating the model.
:return: An array with shape (N,) where N is the same as N from the input. Every number in the array is a whole number in range 0...output_size - 1 where output_size is the output size used when creating the model.
"""
return torch.argmax(self(x), dim=1)
def prepare_training(self):
self.optimizer = optim.Adam(self.parameters(), 0.001)
def train_step(self, x_train, y_train, log_loss=False):
# y_train = y_train[:-1]
# y_train = y_train[1:]
optimizer = self.optimizer
lossfunc = self.lossfunc
# Zero the gradients
self.zero_grad()
# Forward pass
y_pred = self(x_train)
y_train_len = len(y_train)
y_pred_len = y_pred.shape[0]
if y_train_len > y_pred_len:
diff = y_train_len - y_pred_len
y_train = y_train[diff:]
elif y_train_len < y_pred_len:
diff = y_pred_len - y_train_len
y_pred = y_pred[:-diff, :]
y_train_hot = torch.zeros(len(y_train), self.output_size)
y_train_hot[range(len(y_train)), y_train] = 1
y_train_hot = y_train_hot.to('cuda')
# Calculate the loss
loss = lossfunc(y_pred, y_train_hot)
# Print loss
if log_loss:
print('Loss', loss.item())
# Backward pass
loss.backward()
# Update the weights
optimizer.step()
def save(self, path):
info_path = os.path.basename(path) + '/.info'
torch.save(self.state_dict(), path)
data_from_model = Data(self.input_size, self.hidden_size, self.output_size, self.version)
with ZipFile(path, 'a') as model_zip:
model_zip.writestr(info_path, data_from_model.save())
model_zip.close()
@staticmethod
def load_from_checkpoint(path, map_location: MAP_LOCATION = None):
old = True
with ZipFile(path) as model_zip:
filesMatch = [file for file in model_zip.namelist() if file.endswith('/.info')]
file = filesMatch[0] if filesMatch else None
if file:
old = False
data_from_model = Data.load(model_zip.read(file).decode('utf-8'))
model_zip.close()
if old:
model = CustomTokenizer()
else:
model = CustomTokenizer(data_from_model.hidden_size, data_from_model.input_size, data_from_model.output_size, data_from_model.version)
model.load_state_dict(torch.load(path, map_location))
return model
class Data:
input_size: int
hidden_size: int
output_size: int
version: int
def __init__(self, input_size=768, hidden_size=1024, output_size=10000, version=0):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.version = version
@staticmethod
def load(string):
data = json.loads(string)
return Data(data['input_size'], data['hidden_size'], data['output_size'], data['version'])
def save(self):
data = {
'input_size': self.input_size,
'hidden_size': self.hidden_size,
'output_size': self.output_size,
'version': self.version,
}
return json.dumps(data)
def auto_train(data_path, save_path='model.pth', load_model: str | None = None, save_epochs=1):
data_x, data_y = [], []
if load_model and os.path.isfile(load_model):
print('Loading model from', load_model)
model_training = CustomTokenizer.load_from_checkpoint(load_model, 'cuda')
else:
print('Creating new model.')
model_training = CustomTokenizer(version=1).to('cuda') # Settings for the model to run without lstm
save_path = os.path.join(data_path, save_path)
base_save_path = '.'.join(save_path.split('.')[:-1])
sem_string = '_semantic.npy'
feat_string = '_semantic_features.npy'
ready = os.path.join(data_path, 'ready')
for input_file in os.listdir(ready):
full_path = os.path.join(ready, input_file)
if input_file.endswith(sem_string):
data_y.append(numpy.load(full_path))
elif input_file.endswith(feat_string):
data_x.append(numpy.load(full_path))
model_training.prepare_training()
epoch = 1
while 1:
for i in range(save_epochs):
j = 0
for x, y in zip(data_x, data_y):
model_training.train_step(torch.tensor(x).to('cuda'), torch.tensor(y).to('cuda'), j % 50 == 0) # Print loss every 50 steps
j += 1
save_p = save_path
save_p_2 = f'{base_save_path}_epoch_{epoch}.pth'
model_training.save(save_p)
model_training.save(save_p_2)
print(f'Epoch {epoch} completed')
epoch += 1

35
hubert/hubert_manager.py Normal file
View File

@@ -0,0 +1,35 @@
# From https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer
import os.path
import shutil
import urllib.request
import huggingface_hub
class HuBERTManager:
@staticmethod
def make_sure_hubert_installed(download_url: str = 'https://dl.fbaipublicfiles.com/hubert/hubert_base_ls960.pt', file_name: str = 'hubert.pt'):
install_dir = os.path.join('data', 'models', 'hubert')
if not os.path.isdir(install_dir):
os.makedirs(install_dir, exist_ok=True)
install_file = os.path.join(install_dir, file_name)
if not os.path.isfile(install_file):
print('Downloading HuBERT base model')
urllib.request.urlretrieve(download_url, install_file)
print('Downloaded HuBERT')
return install_file
@staticmethod
def make_sure_tokenizer_installed(model: str = 'quantifier_hubert_base_ls960_14.pth', repo: str = 'GitMylo/bark-voice-cloning', local_file: str = 'tokenizer.pth'):
install_dir = os.path.join('data', 'models', 'hubert')
if not os.path.isdir(install_dir):
os.makedirs(install_dir, exist_ok=True)
install_file = os.path.join(install_dir, local_file)
if not os.path.isfile(install_file):
print('Downloading HuBERT custom tokenizer')
huggingface_hub.hf_hub_download(repo, model, local_dir=install_dir, local_dir_use_symlinks=False)
shutil.move(os.path.join(install_dir, model), install_file)
print('Downloaded tokenizer')
return install_file

View File

@@ -0,0 +1,94 @@
# From https://github.com/gitmylo/bark-voice-cloning-HuBERT-quantizer
from pathlib import Path
import torch
from torch import nn
from einops import pack, unpack
import joblib
import fairseq
from torchaudio.functional import resample
from audiolm_pytorch.utils import curtail_to_multiple
import logging
logging.root.setLevel(logging.ERROR)
def exists(val):
return val is not None
def default(val, d):
return val if exists(val) else d
class CustomHubert(nn.Module):
"""
checkpoint and kmeans can be downloaded at https://github.com/facebookresearch/fairseq/tree/main/examples/hubert
or you can train your own
"""
def __init__(
self,
checkpoint_path,
target_sample_hz=16000,
seq_len_multiple_of=None,
output_layer=9
):
super().__init__()
self.target_sample_hz = target_sample_hz
self.seq_len_multiple_of = seq_len_multiple_of
self.output_layer = output_layer
model_path = Path(checkpoint_path)
assert model_path.exists(), f'path {checkpoint_path} does not exist'
checkpoint = torch.load(checkpoint_path)
load_model_input = {checkpoint_path: checkpoint}
model, *_ = fairseq.checkpoint_utils.load_model_ensemble_and_task(load_model_input)
self.model = model[0]
self.model.eval()
@property
def groups(self):
return 1
@torch.no_grad()
def forward(
self,
wav_input,
flatten=True,
input_sample_hz=None
):
device = wav_input.device
if exists(input_sample_hz):
wav_input = resample(wav_input, input_sample_hz, self.target_sample_hz)
if exists(self.seq_len_multiple_of):
wav_input = curtail_to_multiple(wav_input, self.seq_len_multiple_of)
embed = self.model(
wav_input,
features_only=True,
mask=False, # thanks to @maitycyrus for noticing that mask is defaulted to True in the fairseq code
output_layer=self.output_layer
)
embed, packed_shape = pack([embed['x']], '* d')
# codebook_indices = self.kmeans.predict(embed.cpu().detach().numpy())
codebook_indices = torch.from_numpy(embed.cpu().detach().numpy()).to(device) # .long()
if flatten:
return codebook_indices
codebook_indices, = unpack(codebook_indices, packed_shape, '*')
return codebook_indices