mirror of
https://github.com/voice-cloning-app/Voice-Cloning-App.git
synced 2025-12-16 19:58:00 +01:00
272 lines
8.7 KiB
Python
272 lines
8.7 KiB
Python
import logging
|
|
import os
|
|
import io
|
|
from datetime import datetime
|
|
import traceback
|
|
import shutil
|
|
import zipfile
|
|
import librosa
|
|
from flask import send_file
|
|
import resampy # noqa
|
|
|
|
from main import socketio
|
|
from dataset import CHARACTER_ENCODING
|
|
from dataset.audio_processing import convert_audio
|
|
from dataset.analysis import save_dataset_info, get_text
|
|
|
|
|
|
class SocketIOHandler(logging.Handler):
|
|
"""
|
|
Sends logger messages to the frontend using flask-socketio.
|
|
These are handled in application.js
|
|
"""
|
|
|
|
def emit(self, record):
|
|
text = record.getMessage()
|
|
if text.startswith("Progress"):
|
|
text = text.split("-")[1]
|
|
current, total = text.split("/")
|
|
socketio.emit("progress", {"number": current, "total": total}, namespace="/voice")
|
|
elif text.startswith("Status"):
|
|
socketio.emit("status", {"text": text.replace("Status -", "")}, namespace="/voice")
|
|
elif text.startswith("Alignment"):
|
|
text = text.split("- ")[1]
|
|
iteration, image = text.split(", ")
|
|
socketio.emit("alignment", {"iteration": iteration, "image": image}, namespace="/voice")
|
|
else:
|
|
socketio.emit("logs", {"text": text}, namespace="/voice")
|
|
|
|
|
|
# Data
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("voice")
|
|
logger.addHandler(SocketIOHandler())
|
|
thread = None
|
|
|
|
|
|
def background_task(func, **kwargs):
|
|
"""
|
|
Runs a background task.
|
|
If function errors out it will send an error log to the error logging server and page.
|
|
Sends 'done' message to frontend when complete.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to run in background
|
|
kwargs : kwargs
|
|
Kwargs to pass to function
|
|
"""
|
|
try:
|
|
socketio.sleep(5)
|
|
func(logging=logger, **kwargs)
|
|
except Exception as e:
|
|
error = {"type": e.__class__.__name__, "text": str(e), "stacktrace": traceback.format_exc()}
|
|
socketio.emit("error", error, namespace="/voice")
|
|
raise e
|
|
|
|
socketio.emit("done", {"text": None}, namespace="/voice")
|
|
|
|
|
|
def start_progress_thread(func, **kwargs):
|
|
"""
|
|
Starts a background task using socketio.
|
|
|
|
Parameters
|
|
----------
|
|
func : function
|
|
Function to run in background
|
|
kwargs : kwargs
|
|
Kwargs to pass to function
|
|
"""
|
|
global thread
|
|
print("Starting Thread")
|
|
thread = socketio.start_background_task(background_task, func=func, **kwargs)
|
|
|
|
|
|
def serve_file(path, filename, mimetype, as_attachment=True):
|
|
"""
|
|
Serves a file as a response
|
|
|
|
Parameters
|
|
----------
|
|
path : str
|
|
Path to file
|
|
filename : str
|
|
Filename of generated attachment
|
|
mimetype : str
|
|
Mimetype of file
|
|
as_attachment : bool (optional)
|
|
Whether to respond as an attachment for download (default is True)
|
|
"""
|
|
with open(path, "rb") as f:
|
|
return send_file(
|
|
io.BytesIO(f.read()), attachment_filename=filename, mimetype=mimetype, as_attachment=as_attachment
|
|
)
|
|
|
|
|
|
def get_next_url(urls, path):
|
|
"""
|
|
Returns the URL of the next step in the voice cloning process.
|
|
|
|
Parameters
|
|
----------
|
|
urls : dict
|
|
Frontend url paths and names
|
|
path : str
|
|
Current URL
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
URL of next step or '' if not found
|
|
"""
|
|
urls = list(urls.keys())
|
|
next_url_index = urls.index(path) + 1
|
|
return urls[next_url_index] if next_url_index < len(urls) else ""
|
|
|
|
|
|
def get_suffix():
|
|
"""
|
|
Generates a filename suffix using the currrent datetime.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
String suffix
|
|
"""
|
|
return datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
|
|
|
|
|
|
def delete_folder(path):
|
|
"""
|
|
Deletes a folder.
|
|
|
|
Parameters
|
|
----------
|
|
path : str
|
|
Path to folder
|
|
|
|
Raises
|
|
-------
|
|
AssertionError
|
|
If folder is not found
|
|
"""
|
|
assert os.path.isdir(path), f"{path} does not exist"
|
|
shutil.rmtree(path)
|
|
|
|
|
|
def import_dataset(dataset, dataset_directory, audio_folder, logging):
|
|
"""
|
|
Imports a dataset zip into the app.
|
|
Checks required files are present, saves the files,
|
|
converts the audio to the required format and generates the info file.
|
|
Deletes given zip regardless of success.
|
|
|
|
Parameters
|
|
----------
|
|
dataset : str
|
|
Path to dataset zip
|
|
dataset_directory : str
|
|
Destination path for the dataset
|
|
audio_folder : str
|
|
Destination path for the dataset audio
|
|
logging : logging
|
|
Logging object to write logs to
|
|
|
|
Raises
|
|
-------
|
|
AssertionError
|
|
If files are missing or invalid
|
|
"""
|
|
try:
|
|
with zipfile.ZipFile(dataset, mode="r") as z:
|
|
files_list = z.namelist()
|
|
|
|
assert ("metadata.csv" in files_list) or (
|
|
"trainlist.txt" in files_list and "vallist.txt" in files_list
|
|
), "Dataset doesn't include metadata.csv or trainlist.txt/vallist.txt. Make sure this is in the root of the zip file"
|
|
|
|
folders = [x.split("/")[0] for x in files_list if "/" in x]
|
|
assert (
|
|
"wavs" in folders
|
|
), "Dataset missing wavs folder. Make sure this folder is in the root of the zip file"
|
|
|
|
wavs = [x for x in files_list if x.startswith("wavs/") and x.endswith(".wav")]
|
|
assert wavs, "No wavs found in wavs folder"
|
|
|
|
logging.info("Creating directory")
|
|
os.makedirs(dataset_directory, exist_ok=False)
|
|
os.makedirs(audio_folder, exist_ok=False)
|
|
|
|
if "metadata.csv" in files_list:
|
|
metadata = z.read("metadata.csv").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
|
|
num_metadata_rows = len([row for row in metadata.split("\n") if row])
|
|
assert (
|
|
len(wavs) == num_metadata_rows
|
|
), f"Number of wavs and labels do not match. metadata: {num_metadata_rows}, wavs: {len(wavs)}"
|
|
|
|
# Save metadata
|
|
logging.info("Saving files")
|
|
with open(os.path.join(dataset_directory, "metadata.csv"), "w", encoding=CHARACTER_ENCODING) as f:
|
|
f.write(metadata)
|
|
else:
|
|
trainlist = z.read("trainlist.txt").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
|
|
vallist = z.read("vallist.txt").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
|
|
num_rows = len([row for row in trainlist.split("\n") if row]) + len(
|
|
[row for row in vallist.split("\n") if row]
|
|
)
|
|
assert (
|
|
len(wavs) == num_rows
|
|
), f"Number of wavs and labels do not match. trainlist+vallist: {num_rows}, wavs: {len(wavs)}"
|
|
|
|
# Save trainlist & vallist
|
|
logging.info("Saving files")
|
|
with open(os.path.join(dataset_directory, "trainlist.txt"), "w", encoding=CHARACTER_ENCODING) as f:
|
|
f.write(trainlist)
|
|
with open(os.path.join(dataset_directory, "vallist.txt"), "w", encoding=CHARACTER_ENCODING) as f:
|
|
f.write(vallist)
|
|
|
|
# Save wavs
|
|
total_wavs = len(wavs)
|
|
clip_lengths = []
|
|
filenames = {}
|
|
for i in range(total_wavs):
|
|
wav = wavs[i]
|
|
data = z.read(wav)
|
|
path = os.path.join(dataset_directory, "wavs", wav.split("/")[1])
|
|
with open(path, "wb") as f:
|
|
f.write(data)
|
|
new_path = convert_audio(path)
|
|
duration = librosa.get_duration(filename=new_path)
|
|
clip_lengths.append(duration)
|
|
filenames[path] = new_path
|
|
logging.info(f"Progress - {i+1}/{total_wavs}")
|
|
|
|
logging.info(f"Longest clip: {max(clip_lengths)}s, Shortest clip: {min(clip_lengths)}s")
|
|
# Get around "file in use" by using delay
|
|
logging.info("Deleting temp files")
|
|
for old_path, new_path in filenames.items():
|
|
os.remove(old_path)
|
|
os.rename(new_path, old_path)
|
|
|
|
# Create info file
|
|
logging.info("Creating info file")
|
|
words = (
|
|
get_text(os.path.join(dataset_directory, "metadata.csv"))
|
|
if "metadata.csv" in files_list
|
|
else get_text(os.path.join(dataset_directory, "trainlist.txt"))
|
|
+ get_text(os.path.join(dataset_directory, "vallist.txt"))
|
|
)
|
|
save_dataset_info(
|
|
words,
|
|
os.path.join(dataset_directory, "wavs"),
|
|
os.path.join(dataset_directory, "info.json"),
|
|
clip_lengths=clip_lengths,
|
|
)
|
|
except Exception as e:
|
|
os.remove(dataset)
|
|
raise e
|
|
|
|
os.remove(dataset)
|