|
| 1 | +# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/01_KnowledgeEmbedding.ipynb. |
| 2 | + |
| 3 | +# %% auto 0 |
| 4 | +__all__ = ['remove_newlines', 'text2data', 'tokenize_and_split', 'tokenize_data', 'embed_data', 'mean_pooling', 'run_embeddings', |
| 5 | + 'cosine_similarity', 'top_scores', 'create_context', 'answer_question'] |
| 6 | + |
| 7 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 3 |
| 8 | +import os |
| 9 | +import glob |
| 10 | +import pandas as pd |
| 11 | +from tqdm.auto import tqdm |
| 12 | +from typing import List, Dict, Set, Union, Callable |
| 13 | +import torch |
| 14 | +import numpy as np |
| 15 | +from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM |
| 16 | +import torch.nn.functional as F |
| 17 | +from functools import partial |
| 18 | +import transformers |
| 19 | + |
| 20 | + |
| 21 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 4 |
| 22 | +def remove_newlines(text: str) -> str: |
| 23 | + return " ".join(text.split()) |
| 24 | + |
| 25 | + |
| 26 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 5 |
| 27 | +def text2data(path: str, extensions: Union[str, Set[str]] = {"txt"}, recursive: bool = False) -> List[Dict[str, str]]: |
| 28 | + """ |
| 29 | + Go over all the text files in a folder and create a list of dicts with the file name and the cleaned text. |
| 30 | + """ |
| 31 | + texts = [] |
| 32 | + |
| 33 | + if isinstance(extensions, str) and extensions: |
| 34 | + extensions = {extensions} |
| 35 | + |
| 36 | + path = os.path.join(path, "**", "*") |
| 37 | + files = [f for f in glob.glob(path, recursive=recursive) if (not extensions) or (f.split('.')[-1] in extensions and os.path.isfile(f))] |
| 38 | + |
| 39 | + for file in tqdm(files): |
| 40 | + try: |
| 41 | + with open(file, "r", encoding="UTF-8") as f: |
| 42 | + name_value = os.path.relpath(file, start=os.path.dirname(path)) |
| 43 | + name_value = name_value.replace("-", " ").replace("_", " ").replace("#update", "") |
| 44 | + texts.append({'source': name_value, 'data': name_value + '.' + remove_newlines(f.read())}) |
| 45 | + except Exception as e: |
| 46 | + print('error:', e) |
| 47 | + |
| 48 | + return texts |
| 49 | + |
| 50 | + |
| 51 | + |
| 52 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 7 |
| 53 | +def tokenize_and_split(text: str, tokenizer, max_tokens: int = 500, sentence_sep: str = '. ') -> List[list]: |
| 54 | + ''' |
| 55 | + Function to encode the text and split the text into chunks of a maximum number of tokens |
| 56 | + ''' |
| 57 | + sentences = text.split(sentence_sep) |
| 58 | + tokens_list = [tokenizer.encode(" " + sentence + '.') for sentence in sentences] |
| 59 | + |
| 60 | + chunks = [] |
| 61 | + chunk = [] |
| 62 | + |
| 63 | + for tokens in tokens_list: |
| 64 | + if len(tokens) > max_tokens: |
| 65 | + chunks.extend(tokens[i:i + max_tokens] for i in range(0, len(tokens), max_tokens)) |
| 66 | + elif len(chunk) + len(tokens) > max_tokens: |
| 67 | + chunks.append(chunk) |
| 68 | + chunk = [] |
| 69 | + else: |
| 70 | + chunk.extend(tokens) |
| 71 | + |
| 72 | + if chunk: |
| 73 | + chunks.append(chunk) |
| 74 | + |
| 75 | + return chunks |
| 76 | + |
| 77 | + |
| 78 | + |
| 79 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 8 |
| 80 | +def tokenize_data(data: List[Dict[str, str]], tokenizer, max_tokens: int = 500, sentence_sep: str = '. ') -> List[Dict[str, List[int]]]: |
| 81 | + ''' |
| 82 | + Function taking a list of dicts of the form {'source': file_name , 'data':clean_text} |
| 83 | + split the text into chunks of size < max_tokens and return a list of dicts of the form {'source': file_name , 'tokens':list of int} |
| 84 | + ''' |
| 85 | + tokenized = [{'source': line['source'], 'tokens': chunk} |
| 86 | + for line in data |
| 87 | + for chunk in tokenize_and_split(line['data'], tokenizer, max_tokens, sentence_sep)] |
| 88 | + return tokenized |
| 89 | + |
| 90 | + |
| 91 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 9 |
| 92 | +def embed_data(data: List[Dict[str, List[int]]], embedding_model: Callable[[List[int]], np.array]) -> List[dict]: |
| 93 | + ''' |
| 94 | + This function takes a list of tokenized sentences. Each sentence is inside a dict of the form {'source': file_name , 'tokens':list of int} |
| 95 | + The function adds the embedding to the dict. |
| 96 | + The function embedding_model should accept a list of integers and return a 1D torch.Tensor. |
| 97 | + ''' |
| 98 | + return [{'source': line['source'], 'tokens': line['tokens'], 'embeddings': embedding_model(line['tokens'])} for line in tqdm(data)] |
| 99 | + |
| 100 | + |
| 101 | + |
| 102 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 10 |
| 103 | +def mean_pooling(model_output, attention_mask): |
| 104 | + ''' |
| 105 | + Mean Pooling - Take attention mask into account for correct averaging |
| 106 | + ''' |
| 107 | + token_embeddings = model_output[0] #First element of model_output contains all token embeddings |
| 108 | + input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() |
| 109 | + return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) |
| 110 | + |
| 111 | + |
| 112 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 11 |
| 113 | +def run_embeddings(tokenized : List[int], model : Callable[[torch.Tensor, torch.Tensor], torch.Tensor]) -> np.array: |
| 114 | + ''' |
| 115 | + Run the embedding model on one tokenized sentence |
| 116 | + ''' |
| 117 | + tokenized_tensor = torch.tensor(tokenized).unsqueeze(0).to(model.device) |
| 118 | + attention_mask = torch.ones_like(tokenized_tensor).to(model.device) |
| 119 | + |
| 120 | + # Compute token embeddings |
| 121 | + with torch.no_grad(): |
| 122 | + model_output = model(input_ids = tokenized_tensor, attention_mask = attention_mask) |
| 123 | + |
| 124 | + sentence_embeddings = mean_pooling(model_output, attention_mask) |
| 125 | + sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) |
| 126 | + |
| 127 | + return sentence_embeddings.squeeze().to('cpu').numpy() |
| 128 | + |
| 129 | + |
| 130 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 13 |
| 131 | +def cosine_similarity(a: np.array, b: np.array) -> float: |
| 132 | + ''' |
| 133 | + Calculate the Cosine Similarity between 2 numpy vectors. |
| 134 | + ''' |
| 135 | + # Ensure the inputs are numpy arrays |
| 136 | + a, b = np.asarray(a), np.asarray(b) |
| 137 | + |
| 138 | + # Check if the inputs are not empty and have the same shape |
| 139 | + if a.size == 0 or b.size == 0: |
| 140 | + raise ValueError("Input arrays should not be empty.") |
| 141 | + if a.shape != b.shape: |
| 142 | + raise ValueError("Input arrays should have the same shape.") |
| 143 | + |
| 144 | + # Compute norms and handle zero division |
| 145 | + norm_a, norm_b = np.linalg.norm(a), np.linalg.norm(b) |
| 146 | + if norm_a == 0 or norm_b == 0: |
| 147 | + raise ValueError("Input arrays should not be zero vectors.") |
| 148 | + |
| 149 | + return np.dot(a, b) / (norm_a * norm_b) |
| 150 | + |
| 151 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 14 |
| 152 | +def top_scores(question: str, embedded: List[dict], model : Callable, tokenizer : Callable, n: int = 5, same_th: float = 0.2): |
| 153 | + ''' |
| 154 | + Return n top answers. |
| 155 | + ''' |
| 156 | + # Ensure n is greater than 0 |
| 157 | + if n <= 0: |
| 158 | + raise ValueError("The number of top answers should be greater than 0.") |
| 159 | + |
| 160 | + # Ensure same_th is within the appropriate range |
| 161 | + if not 0 <= same_th < 1: |
| 162 | + raise ValueError("The same_th parameter should be in the range [0, 1).") |
| 163 | + |
| 164 | + tokenized_question = tokenizer(question)['input_ids'] |
| 165 | + embedded_question = run_embeddings(tokenized_question, model) |
| 166 | + |
| 167 | + # Compute distances and get the indices of the sorted elements |
| 168 | + dist = np.array([cosine_similarity(embedded_question, embed['embeddings']) for embed in embedded]) |
| 169 | + nearest = dist.argsort()[::-1] |
| 170 | + |
| 171 | + # Initialize list of top answers |
| 172 | + answers = [nearest[0]] |
| 173 | + |
| 174 | + # Skip the first index (0) since it's already in the list of answers |
| 175 | + for i in nearest[1:]: |
| 176 | + # Compute minimum distance to already chosen answers |
| 177 | + min_dist = min(cosine_similarity(embedded[i]['embeddings'], embedded[a]['embeddings']) for a in answers) |
| 178 | + |
| 179 | + # If the minimum distance is greater than the threshold, add the answer to the list |
| 180 | + if min_dist > same_th: |
| 181 | + answers.append(i) |
| 182 | + # Break the loop if we have enough answers |
| 183 | + if len(answers) == n: |
| 184 | + break |
| 185 | + |
| 186 | + # Return the indices of the top answers |
| 187 | + return answers |
| 188 | + |
| 189 | + |
| 190 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 15 |
| 191 | +def create_context(question: str, embedded: List[dict], model : Callable, tokenizer: Callable, max_len: int = 1700, **kwargs) -> str: |
| 192 | + """ |
| 193 | + Create a context for a question by finding the most similar context from the embedded data. |
| 194 | + """ |
| 195 | + |
| 196 | + # Get top answers |
| 197 | + answers = top_scores(question, embedded, tokenizer = tokenizer, model = model, **kwargs) |
| 198 | + |
| 199 | + context_parts = [] |
| 200 | + cur_len = 0 |
| 201 | + |
| 202 | + # Add the text to the context until the max_len is reached |
| 203 | + for ans in answers: |
| 204 | + # Calculate the length of the current answer's tokens |
| 205 | + ans_len = len(embedded[ans]['tokens']) + 4 # accounting for the "###" separator and newlines |
| 206 | + |
| 207 | + # Check if adding the current answer exceeds the max_len |
| 208 | + if cur_len + ans_len > max_len: |
| 209 | + break |
| 210 | + |
| 211 | + # Add current answer's tokens to context_parts and update cur_len |
| 212 | + context_parts.append(tokenizer.decode(embedded[ans]['tokens'])) |
| 213 | + cur_len += ans_len |
| 214 | + |
| 215 | + # Join context_parts with separator and return the context |
| 216 | + context = "\n\n###\n\n".join(context_parts) |
| 217 | + |
| 218 | + return context |
| 219 | + |
| 220 | + |
| 221 | +# %% ../nbs/01_KnowledgeEmbedding.ipynb 17 |
| 222 | +def answer_question(question: str, |
| 223 | + embedded: List[dict], |
| 224 | + context_model: Callable, |
| 225 | + context_tokenizer: Callable, |
| 226 | + model: Callable, |
| 227 | + tokenizer: Callable, |
| 228 | + max_len: int = 1700, |
| 229 | + max_added_tokens: int = 150, |
| 230 | + temperature: float = 0.3, |
| 231 | + debug: bool = False) -> str: |
| 232 | + """ |
| 233 | + Generate an answer to a question based on the most similar context found in the embedded data. |
| 234 | +
|
| 235 | + Parameters: |
| 236 | + question (str): The question to answer. |
| 237 | + embedded (List[dict]): List of embedded data. |
| 238 | + context_model (Callable): Model used to create context. |
| 239 | + context_tokenizer (Callable): Tokenizer used with context model. |
| 240 | + model (Callable): Model used to generate answers. |
| 241 | + tokenizer (Callable): Tokenizer used with answer model. |
| 242 | + max_len (int): Maximum length for the context. Defaults to 1700. |
| 243 | + max_added_tokens (int): Maximum number of new tokens for the generated answer. Defaults to 150. |
| 244 | + temperature (float): Temperature for the answer generation. Defaults to 0.3. |
| 245 | + debug (bool): If True, print the generated context. |
| 246 | +
|
| 247 | + Returns: |
| 248 | + str: Generated answer. |
| 249 | + """ |
| 250 | + |
| 251 | + # Create context for the question |
| 252 | + context = create_context(question, embedded, model=context_model, tokenizer=context_tokenizer, max_len=max_len) |
| 253 | + |
| 254 | + if debug: |
| 255 | + print("Generated Context:\n" + context + "\n\n") |
| 256 | + |
| 257 | + # Format the prompt for the model |
| 258 | + prompt_format = ("<human>: Answer the question based on the context below, and if the question can't be answered " |
| 259 | + "based on the context, say \"I don't know\"\n\nContext: {context}\n\n---\n\nQuestion: {question}<bot>:") |
| 260 | + prompt = prompt_format.format(context=context, question=question) |
| 261 | + |
| 262 | + # Prepare model inputs |
| 263 | + inputs = tokenizer(prompt, return_tensors='pt').to(model.device) |
| 264 | + input_length = inputs['input_ids'].shape[1] |
| 265 | + |
| 266 | + # Generate model outputs |
| 267 | + outputs = model.generate(**inputs, |
| 268 | + max_length=input_length+max_added_tokens, |
| 269 | + do_sample=True, |
| 270 | + temperature=temperature, |
| 271 | + top_p=0.7, |
| 272 | + top_k=50) |
| 273 | + |
| 274 | + # Decode model output tokens |
| 275 | + output_tokens = outputs[0, input_length:] |
| 276 | + output_str = tokenizer.decode(output_tokens, skip_special_tokens=True) |
| 277 | + |
| 278 | + return output_str |
| 279 | + |
0 commit comments