Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions kokoro/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from huggingface_hub import hf_hub_download
from loguru import logger
from misaki import en, espeak
from typing import Callable, Generator, List, Optional, Tuple, Union
from typing import Callable, Dict, Generator, List, Optional, Tuple, Union
import re
import torch
import os
Expand Down Expand Up @@ -290,6 +290,44 @@ def generate_from_tokens(
if output is not None and output.pred_dur is not None:
KPipeline.join_timestamps(tks, output.pred_dur)
yield self.Result(graphemes=gs, phonemes=ps, tokens=tks, output=output)

@staticmethod
def get_phonemes_level_annot(
phonemes: str,
pred_dur: torch.LongTensor,
vocab: Dict[str, int]
) -> List[Tuple[str, float, float]]:
frames_per_second = 40 # 24000 / 600
seq = list(filter(lambda p: p in vocab, phonemes))
seq = ['<bos>'] + seq + ['<eos>']
if len(seq) != len(pred_dur):
logger.warning(f"len(seq) != len(pred_dur): {len(seq)} != {len(pred_dur)}")
return []
pred_dur = pred_dur.to(dtype=torch.long).cpu()
cum = torch.cumsum(pred_dur, dim=0)
starts = torch.zeros_like(cum)
starts[1:] = cum[:-1]
results: List[Tuple[str, float, float]] = []
start_idx = 1
end_idx = len(seq) - 1
stresses = set(['ˈ', 'ˌ'])
next_stress: Optional[Tuple[str, float]] = None
for k in range(start_idx, end_idx):
ph = seq[k]
if next_stress:
stress, start_idx = next_stress
ph = stress + ph
next_stress = None
s_frames = int(starts[k].item())
e_frames = int(cum[k].item())
s_sec = s_frames / frames_per_second
e_sec = e_frames / frames_per_second
if ph in stresses and k + 1 < end_idx:
next_stress = (ph, s_sec)
continue
results.append((ph, s_sec, e_sec))
return results


@staticmethod
def join_timestamps(tokens: List[en.MToken], pred_dur: torch.LongTensor):
Expand Down Expand Up @@ -336,6 +374,7 @@ class Result:
tokens: Optional[List[en.MToken]] = None
output: Optional[KModel.Output] = None
text_index: Optional[int] = None
phonemes_annot: Optional[List[Tuple[str, float, float]]] = None

@property
def audio(self) -> Optional[torch.FloatTensor]:
Expand All @@ -350,12 +389,16 @@ def __iter__(self):
yield self.graphemes
yield self.phonemes
yield self.audio
if self.phonemes_annot is not None:
yield self.phonemes_annot

def __getitem__(self, index):
if self.phonemes_annot is not None:
return [self.graphemes, self.phonemes, self.audio, self.phonemes_annot][index]
return [self.graphemes, self.phonemes, self.audio][index]

def __len__(self):
return 3
return 4 if self.phonemes_annot is not None else 3
#### MARK: END BACKWARD COMPAT ####

def __call__(
Expand All @@ -364,7 +407,8 @@ def __call__(
voice: Optional[str] = None,
speed: Union[float, Callable[[int], float]] = 1,
split_pattern: Optional[str] = r'\n+',
model: Optional[KModel] = None
model: Optional[KModel] = None,
return_phonemes_annotations: bool = False
) -> Generator['KPipeline.Result', None, None]:
model = model or self.model
if model and voice is None:
Expand All @@ -391,9 +435,16 @@ def __call__(
logger.warning(f"Unexpected len(ps) == {len(ps)} > 510 and ps == '{ps}'")
ps = ps[:510]
output = KPipeline.infer(model, ps, pack, speed) if model else None
phonemes_annot = None
if output is not None and output.pred_dur is not None:
KPipeline.join_timestamps(tks, output.pred_dur)
yield self.Result(graphemes=gs, phonemes=ps, tokens=tks, output=output, text_index=graphemes_index)
phonemes_annot = KPipeline.get_phonemes_level_annot(ps, output.pred_dur, model.vocab) if return_phonemes_annotations else None
elif return_phonemes_annotations:
phonemes_annot = []
logger.warning("No output.pred_dur available for phoneme-level annotations")
yield self.Result(
graphemes=gs, phonemes=ps, tokens=tks, output=output, text_index=graphemes_index, phonemes_annot=phonemes_annot
)

# Non-English processing with chunking
else:
Expand Down