1
1
import asyncio
2
2
import getpass
3
-
3
+ import os
4
+ import json
4
5
import pyaudio
5
6
import oracledb
7
+ from datetime import datetime
6
8
import oci
7
9
from oci .config import from_file
8
10
from oci .auth .signers .security_token_signer import SecurityTokenSigner
11
13
RealtimeClientListener ,
12
14
RealtimeParameters ,
13
15
)
14
-
16
+ from aiohttp import web
17
+
18
+ # Global variables to store the latest data
19
+ latest_thetime = None
20
+ latest_question = None
21
+ latest_answer = None
22
+ compartment_id = os .getenv ('COMPARTMENT_ID' )
23
+ print (f"compartment_id: { compartment_id } " )
15
24
pw = getpass .getpass ("Enter database user password:" )
16
25
17
26
# Use this when making a connection with a wallet
18
27
connection = oracledb .connect (
19
28
user = "moviestream" ,
20
29
password = pw ,
21
30
dsn = "selectaidb_high" ,
22
- config_dir = "/ Users/pparkins/ Downloads/ Wallet_SelectAIDB" ,
23
- wallet_location = "/ Users/pparkins/ Downloads/ Wallet_SelectAIDB"
31
+ config_dir = r"C:\ Users\paulp\ Downloads\ Wallet_SelectAIDB" ,
32
+ wallet_location = r"C:\ Users\paulp\ Downloads\ Wallet_SelectAIDB"
24
33
)
25
- print ("Successfully connected to Oracle Database" )
26
- print (f"Connection details: { connection } " )
34
+ print (f"Successfully connected to Oracle Database Connection: { connection } " )
27
35
28
36
# Create a FIFO queue
29
37
queue = asyncio .Queue ()
43
51
last_result_time = None
44
52
45
53
def authenticator ():
46
- config = from_file ("~/.oci/config" , "paulspeechai " )
54
+ config = from_file ("~/.oci/config" , "MYSPEECHAIPROFILE " )
47
55
with open (config ["security_token_file" ], "r" ) as f :
48
56
token = f .readline ()
49
57
private_key = oci .signer .load_private_key_from_file (config ["key_file" ])
@@ -69,6 +77,7 @@ def audio_callback(in_data, frame_count, time_info, status):
69
77
70
78
stream .start_stream ()
71
79
config = from_file ()
80
+ isInsertResults = True
72
81
73
82
async def send_audio (client ):
74
83
while True :
@@ -85,11 +94,18 @@ def on_result(self, result):
85
94
print (f"Received final results: { transcription } " )
86
95
print (f"Current cummulative result: { cummulativeResult } " )
87
96
if cummulativeResult .lower ().startswith ("select ai" ):
97
+ cummulativeResult = cummulativeResult [len ("select ai" ):].strip ()
98
+ isSelect = True
99
+ elif cummulativeResult .lower ().startswith ("select the eye" ):
100
+ cummulativeResult = cummulativeResult [len ("select the eye" ):].strip ()
88
101
isSelect = True
102
+ else :
103
+ cummulativeResult = ""
89
104
last_result_time = asyncio .get_event_loop ().time ()
90
105
else :
91
106
print (f"Received partial results: { result ['transcriptions' ][0 ]['transcription' ]} " )
92
107
108
+
93
109
def on_ack_message (self , ackmessage ):
94
110
return super ().on_ack_message (ackmessage )
95
111
@@ -113,54 +129,72 @@ async def check_idle():
113
129
isSelect = False
114
130
await asyncio .sleep (1 )
115
131
132
+ # Function to execute AI query and optionally insert results into the table
133
+ # For example Select AI I am looking for the top five selling movies for the latest month please
116
134
def executeSelectAI ():
117
- global cummulativeResult
135
+ global cummulativeResult , isInsertResults , latest_thetime , latest_question , latest_answer
118
136
print (f"executeSelectAI called cummulative result: { cummulativeResult } " )
119
- # for example prompt => 'select ai I am looking for the top 5 selling movies for the latest month please',
137
+
138
+ # AI query - todo use openai_gpt4o
120
139
query = """SELECT DBMS_CLOUD_AI.GENERATE(
121
140
prompt => :prompt,
122
- profile_name => 'openai_gpt35',
141
+ profile_name => 'openai_gpt35',
123
142
action => 'narrate')
124
143
FROM dual"""
125
- with connection .cursor () as cursor :
126
- cursor .execute (query , prompt = cummulativeResult )
127
- result = cursor .fetchone ()
128
- if result and isinstance (result [0 ], oracledb .LOB ):
129
- text_result = result [0 ].read ()
130
- print (text_result )
131
- else :
132
- print (result )
144
+
145
+ try :
146
+ with connection .cursor () as cursor :
147
+ # Execute AI query
148
+ cursor .execute (query , prompt = cummulativeResult )
149
+ result = cursor .fetchone ()
150
+
151
+ if result and isinstance (result [0 ], oracledb .LOB ):
152
+ text_result = result [0 ].read ()
153
+ print (text_result )
154
+
155
+ # Update the global variables
156
+ latest_thetime = datetime .now ()
157
+ latest_question = cummulativeResult
158
+ latest_answer = text_result [:3000 ] # Truncate if necessary
159
+ cummulativeResult = ""
160
+
161
+ # Insert the prompt and result into the table if isInsertResults is True
162
+ if isInsertResults :
163
+ insert_query = """
164
+ INSERT INTO selectai_data (thetime, question, answer)
165
+ VALUES (:thetime, :question, :answer)
166
+ """
167
+ cursor .execute (insert_query , {
168
+ 'thetime' : latest_thetime ,
169
+ 'question' : latest_question ,
170
+ 'answer' : latest_answer
171
+ })
172
+ connection .commit ()
173
+ print ("Insert successful." )
174
+ else :
175
+ print (result )
176
+ except Exception as e :
177
+ print (f"An error occurred: { e } " )
178
+
133
179
# Reset cumulativeResult after execution
134
180
cummulativeResult = ""
135
181
182
+ async def handle_request (request ):
183
+ global latest_thetime , latest_question , latest_answer
184
+ data = {
185
+ "thetime" : latest_thetime .isoformat () if latest_thetime else None , # Convert datetime to ISO format
186
+ "question" : latest_question ,
187
+ "answer" : latest_answer
188
+ }
189
+ return web .json_response (data )
136
190
137
- # logic such as the following could be added to make the app further dynamic as far as action type...
138
- # actionValue = 'narrate'
139
- # if cummulativeResult.lower().startswith("select ai narrate"):
140
- # actionValue = "narrate"
141
- # elif cummulativeResult.lower().startswith("select ai chat"):
142
- # actionValue = "chat"
143
- # elif cummulativeResult.lower().startswith("select ai showsql"):
144
- # actionValue = "showsql"
145
- # elif cummulativeResult.lower().startswith("select ai show sql"):
146
- # actionValue = "showsql"
147
- # elif cummulativeResult.lower().startswith("select ai runsql"):
148
- # actionValue = "runsql"
149
- # elif cummulativeResult.lower().startswith("select ai run sql"):
150
- # actionValue = "runsql"
151
- # # Note that "runsql" is not currently supported as action value
152
- # query = """SELECT DBMS_CLOUD_AI.GENERATE(
153
- # prompt => :prompt,
154
- # profile_name => 'openai_gpt35',
155
- # action => :actionValue)
156
- # FROM dual"""
157
191
158
192
if __name__ == "__main__" :
159
193
# Run the event loop
160
194
def message_callback (message ):
161
195
print (f"Received message: { message } " )
162
196
163
- realtime_speech_parameters : RealtimeParameters = RealtimeParameters ()
197
+ realtime_speech_parameters = RealtimeParameters ()
164
198
realtime_speech_parameters .language_code = "en-US"
165
199
realtime_speech_parameters .model_domain = (
166
200
realtime_speech_parameters .MODEL_DOMAIN_GENERIC
@@ -179,15 +213,24 @@ def message_callback(message):
179
213
listener = SpeechListener (),
180
214
service_endpoint = realtime_speech_url ,
181
215
signer = authenticator (),
182
- compartment_id = "ocid1.compartment.oc1..MYCOMPARMENTID" ,
216
+ compartment_id = compartment_id ,
183
217
)
184
218
185
219
loop = asyncio .get_event_loop ()
186
220
loop .create_task (send_audio (client ))
187
221
loop .create_task (check_idle ())
222
+
223
+ # Set up the HTTP server
224
+ app = web .Application ()
225
+ app .router .add_get ('/selectai_data' , handle_request )
226
+ runner = web .AppRunner (app )
227
+ loop .run_until_complete (runner .setup ())
228
+ site = web .TCPSite (runner , 'localhost' , 8080 )
229
+ loop .run_until_complete (site .start ())
230
+
188
231
loop .run_until_complete (client .connect ())
189
232
190
233
if stream .is_active ():
191
234
stream .close ()
192
235
193
- print ("Closed now" )
236
+ print ("Closed now" )
0 commit comments