-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathsimplebigrammodel.py
More file actions
104 lines (82 loc) · 3.05 KB
/
simplebigrammodel.py
File metadata and controls
104 lines (82 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import random
from typing import List
random.seed(42) # 去掉此行,获得随机结果
prompts = ["春江", "往事"]
max_new_token = 100
max_iters = 8000
batch_size = 32
block_size = 8
with open('ci.txt', 'r', encoding='utf-8') as f:
text = f.read()
class Tokenizer:
def __init__(self, text: str):
self.chars = sorted(list(set(text)))
self.vocab_size = len(self.chars)
self.stoi = {ch: i for i, ch in enumerate(self.chars)}
self.itos = {i: ch for i, ch in enumerate(self.chars)}
def encode(self, s: str) -> List[int]:
return [self.stoi[c] for c in s]
def decode(self, l: List[int]) -> str:
return ''.join([self.itos[i] for i in l])
class BigramLanguageModel():
def __init__(self, vocab_size: int):
self.vocab_size = vocab_size
self.transition = [[0 for _ in range(vocab_size)]
for _ in range(vocab_size)]
def __call__(self, x):
# 方便直接调用model(x)
return self.forward(x)
def forward(self, idx: List[List[int]]) -> List[List[List[float]]]:
B = len(idx) # 批次大小
T = len(idx[0]) # 每一批的序列长度
logits = [
[[0.0 for _ in range(self.vocab_size)]
for _ in range(T)]
for _ in range(B)
]
for b in range(B):
for t in range(T):
current_token = idx[b][t]
# 计算了每一个token的下一个token的概率
logits[b][t] = self.transition[current_token]
return logits
def generate(self, idx: List[List[int]], max_new_tokens: int) -> List[int]:
for _ in range(max_new_tokens):
logits_batch = self(idx)
for batch_idx, logits in enumerate(logits_batch):
logits = logits[-1]
total = max(sum(logits),1)
logits = [logit / total for logit in logits]
next_token = random.choices(
range(self.vocab_size),
weights=logits,
k=1
)[0]
idx[batch_idx].append(next_token)
return idx
def get_batch(tokens, batch_size, block_size):
ix = random.choices(range(len(tokens) - block_size), k=batch_size)
x, y = [], []
for i in ix:
x.append(tokens[i:i+block_size])
y.append(tokens[i+1:i+block_size+1])
return x, y
tokenizer = Tokenizer(text)
vocab_size = tokenizer.vocab_size
tokens = tokenizer.encode(text)
model = BigramLanguageModel(vocab_size)
# 训练
for iter in range(max_iters):
x_batch, y_batch = get_batch(tokens, batch_size, block_size)
for i in range(len(x_batch)):
for j in range(len(x_batch[i])):
x = x_batch[i][j]
y = y_batch[i][j]
model.transition[x][y] += 1
prompt_tokens = [tokenizer.encode(prompt) for prompt in prompts]
# 推理
result = model.generate(prompt_tokens, max_new_token)
# decode
for tokens in result:
print(tokenizer.decode(tokens))
print('-'*10)