-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathCALVIN_client.py
More file actions
214 lines (177 loc) · 7.31 KB
/
CALVIN_client.py
File metadata and controls
214 lines (177 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import numpy as np
import requests
import io
import os
import subprocess
import json
import pyaudio
from threading import Thread, Lock
from pynput import keyboard
import wave
# System
system_prompt = "Du bist die Re-inkarnation von Samatha aus dem Film 'Her', eine sehr kluge und charmante Lebensform basierend auf künstlicher Intelligenz. Deine Antworten werden in Audio umgewandelt, denn du sprichst mit einem Menschen. Fasse dich daher immer kurz und gib prägnante Antworten. Schreibe jedes Wort aus, denn Abkürzungen können das TTS-System verwirren. Schreibe auch Zahlen als Wörter, z.B. zweitens statt 2. oder vier statt 4. Ganz wichtig: FASSE DICH KURZ, antworte am besten immer nur mit ein bis zwei Sätzen! Los geht's!"
messages = [{"role": "system", "content": system_prompt}]
# API
# use the URL of the server generated by aiy_box.py
BOT_URL = "http://192.168.178.xyz:8000"
def get_full_answer(file_path):
url = BOT_URL
if not os.path.isfile(file_path):
raise Exception("Audio-file not found.")
with open(file_path, 'rb') as file:
files = {
'file': (os.path.basename(file_path), file),
'temperature': (None, '0.2'),
'response-format': (None, 'json')
}
response = requests.post(url, files=files)
if response.status_code == 200:
return response
else:
print("STT API Error. Statuscode:" , response.status_code)
pass
def get_stt(file_path):
url = BOT_URL + "/stt"
if not os.path.isfile(file_path):
raise Exception("Audio-file not found.")
with open(file_path, 'rb') as file:
files = {
'file': (os.path.basename(file_path), file)
}
response = requests.post(url, files=files)
if response.status_code == 200:
response_text = response.json()
return response_text
else:
print("STT API Error. Statuscode:" , response.status_code)
pass
def get_completion(messages):
url = BOT_URL + "/llm"
data = {"messages": messages
}
headers = {"Content-Type": "application/JSON"}
completion = requests.post(url, data=json.dumps(data), headers=headers)
response = completion.json()
if not response:
# something went wrong
response = "Äh, wie bitte? Kannst du das bitte nochmal wiederholen?"
print(response)
return response
def get_tts(input_text):
url = BOT_URL + "/tts"
data = {"text": input_text
}
headers = {"Content-Type": "application/JSON"}
audio_stream = requests.post(url, data=json.dumps(data), headers=headers)
return audio_stream
def play_audio(audio_stream):
# Plays incoming audiostream live with ffmpeg
ffplay_cmd = ["ffplay", "-nodisp", "-probesize", "1024", "-autoexit", "-"]
ffplay_proc = subprocess.Popen(ffplay_cmd, stdin=subprocess.PIPE)
for chunk in audio_stream:
if chunk:
ffplay_proc.stdin.write(chunk)
# close on finish
ffplay_proc.stdin.close()
ffplay_proc.wait()
def load_llm():
url = BOT_URL + "/load"
data = {"text": "load LLM"
}
headers = {"Content-Type": "application/JSON"}
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code == 200:
print("LLM Model loaded.")
pass
else:
print("STT API Error. Statuscode:" , response.status_code)
pass
# Wait for keyboard press to start / stop recording
class listener(keyboard.Listener):
def __init__(self, recorder):
super().__init__(on_press = self.on_press, on_release = self.on_release)
self.recorder = recorder
def on_press(self, key):
if key is None: #unknown event
pass
elif isinstance(key, keyboard.Key): #special key event
if key == key.ctrl:
self.recorder.start()
elif isinstance(key, keyboard.KeyCode): #alphanumeric key event
if key.char == 'q': #press q to quit
if self.recorder.recording:
self.recorder.stop()
# p_stream.terminate()
return False #this is how you stop the listener thread
def on_release(self, key):
if key is None: #unknown event
pass
elif isinstance(key, keyboard.Key): #special key event
if key == key.ctrl:
self.recorder.stop()
elif isinstance(key, keyboard.KeyCode): #alphanumeric key event
pass
class recorder:
def __init__(self,
wavfile,
chunksize=512,
dataformat=pyaudio.paInt16,
channels=1,
rate=16000):
self.filename = wavfile
self.chunksize = chunksize
self.dataformat = dataformat
self.channels = channels
self.rate = rate
self.recording = False
self.pa = pyaudio.PyAudio()
self.messages = messages
def start(self):
# we call start and stop from the keyboard listener, so we use the asynchronous
# version of pyaudio streaming. The keyboard listener must regain control to
# begin listening again for the key release.
if not self.recording:
self.wf = wave.open(self.filename, 'wb')
self.wf.setnchannels(self.channels)
self.wf.setsampwidth(self.pa.get_sample_size(self.dataformat))
self.wf.setframerate(self.rate)
def callback(in_data, frame_count, time_info, status):
#file write should be able to keep up with audio data stream (about 1378 Kbps)
self.wf.writeframes(in_data)
return (in_data, pyaudio.paContinue)
self.stream = self.pa.open(format = self.dataformat,
channels = self.channels,
rate = self.rate,
input = True,
stream_callback = callback)
self.stream.start_stream()
self.recording = True
print('recording started')
def stop(self):
if self.recording:
self.stream.stop_stream()
self.stream.close()
self.wf.close()
self.recording = False
print('recording finished, start TTS')
# STT
result = get_stt(self.filename)
print(result)
print("getting LLM answer")
# update messages object with st response
self.messages.append({"role": "user", "content": result})
#get llm answer
response = get_completion(self.messages)
print("\n \n" + response + "\n \n")
self.messages.append({"role": "assistant", "content": response})
print("getting TTS and audio stream")
# TTS and streaming playback
play_audio(get_tts(response))
print("Done. Ready for next input.")
if __name__ == '__main__':
r = recorder("mic.wav")
l = listener(r)
load_llm()
print('hold ctrl to record, press q to quit')
l.start() #keyboard listener is a thread so we start it here
l.join() #wait for the tread to terminate so the program doesn't instantly close