mirror of
https://github.com/AIGC-Audio/AudioGPT.git
synced 2025-12-18 04:47:54 +01:00
54 lines
2.0 KiB
Python
54 lines
2.0 KiB
Python
import os
|
|
import traceback
|
|
from multiprocessing import Queue, Process
|
|
|
|
|
|
def chunked_worker(worker_id, map_func, args, results_queue=None, init_ctx_func=None):
|
|
ctx = init_ctx_func(worker_id) if init_ctx_func is not None else None
|
|
for job_idx, arg in args:
|
|
try:
|
|
if ctx is not None:
|
|
res = map_func(*arg, ctx=ctx)
|
|
else:
|
|
res = map_func(*arg)
|
|
results_queue.put((job_idx, res))
|
|
except:
|
|
traceback.print_exc()
|
|
results_queue.put((job_idx, None))
|
|
|
|
def chunked_multiprocess_run(map_func, args, num_workers=None, ordered=True, init_ctx_func=None, q_max_size=1000):
|
|
args = zip(range(len(args)), args)
|
|
args = list(args)
|
|
n_jobs = len(args)
|
|
if num_workers is None:
|
|
num_workers = int(os.getenv('N_PROC', os.cpu_count()))
|
|
results_queues = []
|
|
if ordered:
|
|
for i in range(num_workers):
|
|
results_queues.append(Queue(maxsize=q_max_size // num_workers))
|
|
else:
|
|
results_queue = Queue(maxsize=q_max_size)
|
|
for i in range(num_workers):
|
|
results_queues.append(results_queue)
|
|
workers = []
|
|
for i in range(num_workers):
|
|
args_worker = args[i::num_workers]
|
|
p = Process(target=chunked_worker, args=(
|
|
i, map_func, args_worker, results_queues[i], init_ctx_func), daemon=True)
|
|
workers.append(p)
|
|
p.start()
|
|
for n_finished in range(n_jobs):
|
|
results_queue = results_queues[n_finished % num_workers]
|
|
job_idx, res = results_queue.get()
|
|
assert job_idx == n_finished or not ordered, (job_idx, n_finished)
|
|
yield res
|
|
for w in workers:
|
|
w.join()
|
|
w.close()
|
|
|
|
def multiprocess_run_tqdm(map_func, args, num_workers=None, ordered=True, init_ctx_func=None,
|
|
multithread=False, desc=None):
|
|
for i, res in tqdm(enumerate(
|
|
multiprocess_run(map_func, args, num_workers, ordered, init_ctx_func, multithread)),
|
|
total=len(args), desc=desc):
|
|
yield i, res |