Files

272 lines
8.7 KiB
Python

import logging
import os
import io
from datetime import datetime
import traceback
import shutil
import zipfile
import librosa
from flask import send_file
import resampy # noqa
from main import socketio
from dataset import CHARACTER_ENCODING
from dataset.audio_processing import convert_audio
from dataset.analysis import save_dataset_info, get_text
class SocketIOHandler(logging.Handler):
"""
Sends logger messages to the frontend using flask-socketio.
These are handled in application.js
"""
def emit(self, record):
text = record.getMessage()
if text.startswith("Progress"):
text = text.split("-")[1]
current, total = text.split("/")
socketio.emit("progress", {"number": current, "total": total}, namespace="/voice")
elif text.startswith("Status"):
socketio.emit("status", {"text": text.replace("Status -", "")}, namespace="/voice")
elif text.startswith("Alignment"):
text = text.split("- ")[1]
iteration, image = text.split(", ")
socketio.emit("alignment", {"iteration": iteration, "image": image}, namespace="/voice")
else:
socketio.emit("logs", {"text": text}, namespace="/voice")
# Data
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("voice")
logger.addHandler(SocketIOHandler())
thread = None
def background_task(func, **kwargs):
"""
Runs a background task.
If function errors out it will send an error log to the error logging server and page.
Sends 'done' message to frontend when complete.
Parameters
----------
func : function
Function to run in background
kwargs : kwargs
Kwargs to pass to function
"""
try:
socketio.sleep(5)
func(logging=logger, **kwargs)
except Exception as e:
error = {"type": e.__class__.__name__, "text": str(e), "stacktrace": traceback.format_exc()}
socketio.emit("error", error, namespace="/voice")
raise e
socketio.emit("done", {"text": None}, namespace="/voice")
def start_progress_thread(func, **kwargs):
"""
Starts a background task using socketio.
Parameters
----------
func : function
Function to run in background
kwargs : kwargs
Kwargs to pass to function
"""
global thread
print("Starting Thread")
thread = socketio.start_background_task(background_task, func=func, **kwargs)
def serve_file(path, filename, mimetype, as_attachment=True):
"""
Serves a file as a response
Parameters
----------
path : str
Path to file
filename : str
Filename of generated attachment
mimetype : str
Mimetype of file
as_attachment : bool (optional)
Whether to respond as an attachment for download (default is True)
"""
with open(path, "rb") as f:
return send_file(
io.BytesIO(f.read()), attachment_filename=filename, mimetype=mimetype, as_attachment=as_attachment
)
def get_next_url(urls, path):
"""
Returns the URL of the next step in the voice cloning process.
Parameters
----------
urls : dict
Frontend url paths and names
path : str
Current URL
Returns
-------
str
URL of next step or '' if not found
"""
urls = list(urls.keys())
next_url_index = urls.index(path) + 1
return urls[next_url_index] if next_url_index < len(urls) else ""
def get_suffix():
"""
Generates a filename suffix using the currrent datetime.
Returns
-------
str
String suffix
"""
return datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
def delete_folder(path):
"""
Deletes a folder.
Parameters
----------
path : str
Path to folder
Raises
-------
AssertionError
If folder is not found
"""
assert os.path.isdir(path), f"{path} does not exist"
shutil.rmtree(path)
def import_dataset(dataset, dataset_directory, audio_folder, logging):
"""
Imports a dataset zip into the app.
Checks required files are present, saves the files,
converts the audio to the required format and generates the info file.
Deletes given zip regardless of success.
Parameters
----------
dataset : str
Path to dataset zip
dataset_directory : str
Destination path for the dataset
audio_folder : str
Destination path for the dataset audio
logging : logging
Logging object to write logs to
Raises
-------
AssertionError
If files are missing or invalid
"""
try:
with zipfile.ZipFile(dataset, mode="r") as z:
files_list = z.namelist()
assert ("metadata.csv" in files_list) or (
"trainlist.txt" in files_list and "vallist.txt" in files_list
), "Dataset doesn't include metadata.csv or trainlist.txt/vallist.txt. Make sure this is in the root of the zip file"
folders = [x.split("/")[0] for x in files_list if "/" in x]
assert (
"wavs" in folders
), "Dataset missing wavs folder. Make sure this folder is in the root of the zip file"
wavs = [x for x in files_list if x.startswith("wavs/") and x.endswith(".wav")]
assert wavs, "No wavs found in wavs folder"
logging.info("Creating directory")
os.makedirs(dataset_directory, exist_ok=False)
os.makedirs(audio_folder, exist_ok=False)
if "metadata.csv" in files_list:
metadata = z.read("metadata.csv").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
num_metadata_rows = len([row for row in metadata.split("\n") if row])
assert (
len(wavs) == num_metadata_rows
), f"Number of wavs and labels do not match. metadata: {num_metadata_rows}, wavs: {len(wavs)}"
# Save metadata
logging.info("Saving files")
with open(os.path.join(dataset_directory, "metadata.csv"), "w", encoding=CHARACTER_ENCODING) as f:
f.write(metadata)
else:
trainlist = z.read("trainlist.txt").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
vallist = z.read("vallist.txt").decode(CHARACTER_ENCODING, "ignore").replace("\r\n", "\n")
num_rows = len([row for row in trainlist.split("\n") if row]) + len(
[row for row in vallist.split("\n") if row]
)
assert (
len(wavs) == num_rows
), f"Number of wavs and labels do not match. trainlist+vallist: {num_rows}, wavs: {len(wavs)}"
# Save trainlist & vallist
logging.info("Saving files")
with open(os.path.join(dataset_directory, "trainlist.txt"), "w", encoding=CHARACTER_ENCODING) as f:
f.write(trainlist)
with open(os.path.join(dataset_directory, "vallist.txt"), "w", encoding=CHARACTER_ENCODING) as f:
f.write(vallist)
# Save wavs
total_wavs = len(wavs)
clip_lengths = []
filenames = {}
for i in range(total_wavs):
wav = wavs[i]
data = z.read(wav)
path = os.path.join(dataset_directory, "wavs", wav.split("/")[1])
with open(path, "wb") as f:
f.write(data)
new_path = convert_audio(path)
duration = librosa.get_duration(filename=new_path)
clip_lengths.append(duration)
filenames[path] = new_path
logging.info(f"Progress - {i+1}/{total_wavs}")
logging.info(f"Longest clip: {max(clip_lengths)}s, Shortest clip: {min(clip_lengths)}s")
# Get around "file in use" by using delay
logging.info("Deleting temp files")
for old_path, new_path in filenames.items():
os.remove(old_path)
os.rename(new_path, old_path)
# Create info file
logging.info("Creating info file")
words = (
get_text(os.path.join(dataset_directory, "metadata.csv"))
if "metadata.csv" in files_list
else get_text(os.path.join(dataset_directory, "trainlist.txt"))
+ get_text(os.path.join(dataset_directory, "vallist.txt"))
)
save_dataset_info(
words,
os.path.join(dataset_directory, "wavs"),
os.path.join(dataset_directory, "info.json"),
clip_lengths=clip_lengths,
)
except Exception as e:
os.remove(dataset)
raise e
os.remove(dataset)