|
| 1 | +from concurrent.futures import thread |
1 | 2 | import gc |
2 | 3 | import multiprocessing as mp |
| 4 | +import os |
| 5 | +import queue |
3 | 6 | import threading |
| 7 | +import time |
| 8 | +import numpy as np |
| 9 | +from scipy.io import wavfile |
4 | 10 |
|
5 | | -from torch import mul |
6 | | - |
| 11 | +from src.api.api import ServiceNames, TaskStatus, VoiceCloneProgress |
| 12 | +from src.service.task import TaskService |
7 | 13 | from ..utils.response import EaseVoiceResponse, ResponseStatus |
8 | 14 |
|
9 | 15 |
|
10 | 16 | from ..easevoice.inference import InferenceResult, InferenceTask, InferenceTaskData, Runner |
11 | 17 | from ..logger import logger |
| 18 | +from src.easevoice import inference |
| 19 | + |
| 20 | + |
| 21 | +class VoiceCloneService: |
| 22 | + """ |
| 23 | + VoiceService is a long run service that listens for voice clone tasks and processes them. |
| 24 | + """ |
12 | 25 |
|
| 26 | + def __init__(self, task_service: TaskService): |
| 27 | + self.task_service = task_service |
13 | 28 |
|
14 | | -class VoiceService: |
15 | | - def __init__(self): |
16 | 29 | self.queue = mp.Queue() |
17 | | - self.runner_process = mp.Process(target=VoiceService.init_runner, args=(self.queue,)) |
| 30 | + self.runner_process = mp.Process(target=VoiceCloneService._init_runner, args=(self.queue,)) |
18 | 31 | self.runner_process.start() |
19 | | - self.locker = threading.Lock() |
| 32 | + |
| 33 | + self._run_tasks = threading.Thread(target=self._run) |
| 34 | + self._run_tasks.start() |
20 | 35 |
|
21 | 36 | @staticmethod |
22 | | - def init_runner(queue: mp.Queue): |
| 37 | + def _init_runner(queue: mp.Queue): |
23 | 38 | """ |
24 | 39 | Call this method to start the runner process |
25 | 40 | """ |
26 | 41 | runner = Runner(queue) |
27 | 42 | runner.run() |
28 | 43 | gc.collect() |
29 | 44 |
|
30 | | - def clone(self, input: dict): |
31 | | - ok = self.locker.acquire(timeout=5) |
32 | | - if not ok: |
33 | | - return EaseVoiceResponse(ResponseStatus.FAILED, "There is another task running, please try again later") |
34 | | - |
35 | | - try: |
36 | | - data = InferenceTaskData(**input) |
37 | | - queue = mp.Queue() |
38 | | - task = InferenceTask(result_queue=queue, data=data) |
39 | | - self.queue.put(task) |
40 | | - result: InferenceResult = task.result_queue.get(timeout=600) |
41 | | - except Exception as e: |
42 | | - logger.error(f"failed to clone voice for {input}, error: {e}", exc_info=True) |
43 | | - result = InferenceResult(error=str(e)) |
44 | | - |
45 | | - finally: |
46 | | - self.locker.release() |
47 | | - |
48 | | - if result.error: |
49 | | - return EaseVoiceResponse(ResponseStatus.FAILED, result.error) |
50 | | - |
51 | | - return EaseVoiceResponse( |
52 | | - ResponseStatus.SUCCESS, |
53 | | - "Cloned voice successfully", |
54 | | - { |
55 | | - "items": result.items, |
56 | | - "seed": result.seed |
57 | | - }) |
| 45 | + def _run(self): |
| 46 | + while True: |
| 47 | + tasks = self.task_service.filter_tasks(lambda t: t.service_name == ServiceNames.VOICE_CLONE and t.progress.status == TaskStatus.PENDING) |
| 48 | + if len(tasks) == 0: |
| 49 | + logger.debug("No pending tasks found for voice clone") |
| 50 | + else: |
| 51 | + task = tasks[0] |
| 52 | + logger.info(f"Processing task {task.taskID}, args: {task.args}") |
| 53 | + |
| 54 | + task.progress.status = TaskStatus.IN_PROGRESS |
| 55 | + self.task_service.submit_task(task) |
| 56 | + |
| 57 | + try: |
| 58 | + data = InferenceTaskData(**task.args) |
| 59 | + queue = mp.Queue() |
| 60 | + infer_task = InferenceTask(result_queue=queue, data=data) |
| 61 | + self.queue.put(infer_task) |
| 62 | + result: InferenceResult = infer_task.result_queue.get(timeout=600) |
| 63 | + except Exception as e: |
| 64 | + logger.error(f"failed to clone voice for {task.args}, error: {e}", exc_info=True) |
| 65 | + result = InferenceResult(error=str(e)) |
| 66 | + |
| 67 | + if result.error: |
| 68 | + progress = VoiceCloneProgress(status=TaskStatus.FAILED, current_step="Failed", total_steps=1, completed_steps=1, current_step_progress=100, message=result.error) |
| 69 | + task.progress = progress |
| 70 | + self.task_service.submit_task(task) |
| 71 | + logger.error(f"failed to clone voice for {task.args}, error: {result.error}") |
| 72 | + else: |
| 73 | + try: |
| 74 | + sampling_rate = result.items[0][0] |
| 75 | + audio = np.concatenate([item[1] for item in result.items]) |
| 76 | + output_file = os.path.join(task.homePath, "output.wav") |
| 77 | + wavfile.write(output_file, sampling_rate, audio) |
| 78 | + |
| 79 | + progress = VoiceCloneProgress(status=TaskStatus.COMPLETED, current_step="Completed", total_steps=1, completed_steps=1, current_step_progress=100) |
| 80 | + task.progress = progress |
| 81 | + self.task_service.submit_task(task) |
| 82 | + logger.info(f"Successfully cloned voice for {task.args}") |
| 83 | + |
| 84 | + except Exception as e: |
| 85 | + logger.error(f"failed to clone voice for {task.args}, error: {e}", exc_info=True) |
| 86 | + progress = VoiceCloneProgress(status=TaskStatus.FAILED, current_step="Failed", total_steps=1, completed_steps=1, current_step_progress=100, message=str(e)) |
| 87 | + task.progress = progress |
| 88 | + self.task_service.submit_task(task) |
| 89 | + logger.error(f"failed to clone voice for {task.args}, error: {e}") |
| 90 | + |
| 91 | + time.sleep(1) |
0 commit comments