5
5
6
6
from azure .cosmos import exceptions
7
7
from azure .cosmos .aio import ContainerProxy , CosmosClient
8
- from azure .cosmos .partition_key import PartitionKey
9
8
from azure .identity .aio import AzureDeveloperCliCredential , ManagedIdentityCredential
10
- from quart import Blueprint , current_app , jsonify , request
9
+ from quart import Blueprint , current_app , jsonify , make_response , request
11
10
12
11
from config import (
13
12
CONFIG_CHAT_HISTORY_COSMOS_ENABLED ,
23
22
chat_history_cosmosdb_bp = Blueprint ("chat_history_cosmos" , __name__ , static_folder = "static" )
24
23
25
24
26
- def make_partition_key (entra_id , session_id = None ):
27
- if entra_id and session_id :
28
- # Need multihash for hierachical partitioning
29
- return PartitionKey (path = ["/entra_id" , "/session_id" ], kind = "MultiHash" )
30
- else :
31
- return PartitionKey (path = "/entra_id" )
32
-
33
-
34
25
@chat_history_cosmosdb_bp .post ("/chat_history" )
35
26
@authenticated
36
27
async def post_chat_history (auth_claims : Dict [str , Any ]):
@@ -48,56 +39,40 @@ async def post_chat_history(auth_claims: Dict[str, Any]):
48
39
try :
49
40
request_json = await request .get_json ()
50
41
session_id = request_json .get ("id" )
51
- answers = request_json .get ("answers" )
52
- title = answers [0 ][0 ][:50 ] + "..." if len (answers [0 ][0 ]) > 50 else answers [0 ][0 ]
42
+ message_pairs = request_json .get ("answers" )
43
+ first_message_question = message_pairs [0 ][0 ]
44
+ title = first_message_question + "..." if len (first_message_question ) > 50 else first_message_question
53
45
timestamp = int (time .time () * 1000 )
54
46
55
47
# Insert the session item:
56
- session_item = {
57
- "id" : id ,
48
+ session = {
49
+ "id" : session_id ,
58
50
"session_id" : session_id ,
59
51
"entra_oid" : entra_oid ,
60
52
"type" : "session" ,
61
53
"title" : title ,
62
54
"timestamp" : timestamp ,
63
55
}
64
56
65
- message_items = []
57
+ messages = []
66
58
# Now insert a message item for each question/response pair:
67
- for ind , message_pair in enumerate (zip (answers [::2 ], answers [1 ::2 ])):
68
- # The id: what if you delete a message and then add a new one? The id will be the same.
69
- # If we had delete mechanism, and you deleted item 5 in a history, then item 6 would still hang around
70
- # and youd have two of item 6.
71
- # abc-0
72
- # abc-1
73
- # abc-2 <-- DELETE
74
- # abc-3
75
- # One approach would be to delete EVERYTHING, then upsert everything.
76
- # Another approach would be to delete item plus everything after, then upsert everything after.
77
- # Or: Change the frontend?
78
- # We can do this first, and change the frontend after
79
- message_items .append (
59
+ for ind , message_pair in enumerate (message_pairs ):
60
+ messages .append (
80
61
{
81
62
"id" : f"{ session_id } -{ ind } " ,
82
- "session_id" : id ,
63
+ "session_id" : session_id ,
83
64
"entra_oid" : entra_oid ,
84
65
"type" : "message" ,
85
66
"question" : message_pair [0 ],
86
67
"response" : message_pair [1 ],
87
- "timestamp" : timestamp , # <-- This is the timestamp of the session, not the message
68
+ "timestamp" : None ,
88
69
}
89
70
)
90
71
91
- batch_operations = [("upsert" , tuple ([ session_item ] + message_items ), {}) ]
72
+ batch_operations = [("upsert" , ( session ,)) ] + [( "upsert" , ( message ,)) for message in messages ]
92
73
93
74
try :
94
- # Run that list of operations
95
- batch_results = container .execute_item_batch (
96
- batch_operations = batch_operations , partition_key = make_partition_key (entra_oid , session_id )
97
- )
98
- # Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
99
- # one of the operations failed within your batch request.
100
- print (f"\n Results for the batch operations: { batch_results } \n " )
75
+ await container .execute_item_batch (batch_operations = batch_operations , partition_key = [entra_oid , session_id ])
101
76
except exceptions .CosmosBatchOperationError as e :
102
77
error_operation_index = e .error_index
103
78
error_operation_response = e .operation_responses [error_operation_index ]
@@ -109,9 +84,9 @@ async def post_chat_history(auth_claims: Dict[str, Any]):
109
84
return error_response (error , "/chat_history" )
110
85
111
86
112
- @chat_history_cosmosdb_bp .get ("/chat_history/items " )
87
+ @chat_history_cosmosdb_bp .get ("/chat_history/sessions " )
113
88
@authenticated
114
- async def get_chat_history (auth_claims : Dict [str , Any ]):
89
+ async def get_chat_history_sessions (auth_claims : Dict [str , Any ]):
115
90
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
116
91
return jsonify ({"error" : "Chat history not enabled" }), 400
117
92
@@ -125,28 +100,27 @@ async def get_chat_history(auth_claims: Dict[str, Any]):
125
100
126
101
try :
127
102
# get the count and continuation token from the request URL
128
- count = request .args .get ("count" , 10 )
103
+ count = int ( request .args .get ("count" , 10 ) )
129
104
continuation_token = request .args .get ("continuation_token" )
130
105
131
106
res = container .query_items (
132
- # TODO: do we need distinct? per Mark's code - Mark says no!
133
107
query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.entra_oid = @entra_oid AND c.type = @type ORDER BY c.timestamp DESC" ,
134
108
parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@type" , value = "session" )],
135
- partition_key = make_partition_key ( entra_oid ) ,
109
+ partition_key = [ entra_oid ] ,
136
110
max_item_count = count ,
137
111
)
138
112
139
113
# set the continuation token for the next page
140
114
pager = res .by_page (continuation_token )
141
115
142
116
# Get the first page, and the continuation token
117
+ sessions = []
143
118
try :
144
119
page = await pager .__anext__ ()
145
120
continuation_token = pager .continuation_token # type: ignore
146
121
147
- items = []
148
122
async for item in page :
149
- items .append (
123
+ sessions .append (
150
124
{
151
125
"id" : item .get ("id" ),
152
126
"entra_oid" : item .get ("entra_oid" ),
@@ -157,18 +131,17 @@ async def get_chat_history(auth_claims: Dict[str, Any]):
157
131
158
132
# If there are no more pages, StopAsyncIteration is raised
159
133
except StopAsyncIteration :
160
- items = []
161
134
continuation_token = None
162
135
163
- return jsonify ({"items " : items , "continuation_token" : continuation_token }), 200
136
+ return jsonify ({"sessions " : sessions , "continuation_token" : continuation_token }), 200
164
137
165
138
except Exception as error :
166
- return error_response (error , "/chat_history/items " )
139
+ return error_response (error , "/chat_history/sessions " )
167
140
168
141
169
- @chat_history_cosmosdb_bp .get ("/chat_history/items/<item_id >" )
142
+ @chat_history_cosmosdb_bp .get ("/chat_history/sessions/<session_id >" )
170
143
@authenticated
171
- async def get_chat_history_session (auth_claims : Dict [str , Any ], item_id : str ):
144
+ async def get_chat_history_session (auth_claims : Dict [str , Any ], session_id : str ):
172
145
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
173
146
return jsonify ({"error" : "Chat history not enabled" }), 400
174
147
@@ -182,33 +155,39 @@ async def get_chat_history_session(auth_claims: Dict[str, Any], item_id: str):
182
155
183
156
try :
184
157
res = container .query_items (
185
- # TODO: do we need distinct? per Mark's code
186
- query = "SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.session_id = @session_id ORDER BY c.timestamp DESC" ,
187
- parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@session_id" , value = item_id )],
188
- partition_key = make_partition_key (entra_oid , item_id ),
189
- # max_item_count=?
158
+ query = "SELECT * FROM c WHERE c.session_id = @session_id" ,
159
+ parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@session_id" , value = session_id )],
160
+ partition_key = [entra_oid , session_id ],
190
161
)
191
162
192
- res = await container .read_item (item = item_id , partition_key = entra_oid )
163
+ message_pairs = []
164
+ session = None
165
+ async for page in res .by_page ():
166
+ async for item in page :
167
+ if item .get ("type" ) == "session" :
168
+ session = item
169
+ else :
170
+ message_pairs .append ([item ["question" ], item ["response" ]])
171
+
193
172
return (
194
173
jsonify (
195
174
{
196
- "id" : res .get ("id" ),
197
- "entra_oid" : res . get ( " entra_oid" ) ,
198
- "title" : res .get ("title" , "untitled " ),
199
- "timestamp" : res .get ("timestamp" ),
200
- "answers" : res . get ( "answers" , []) ,
175
+ "id" : session .get ("id" ),
176
+ "entra_oid" : entra_oid ,
177
+ "title" : session .get ("title" ),
178
+ "timestamp" : session .get ("timestamp" ),
179
+ "answers" : message_pairs ,
201
180
}
202
181
),
203
182
200 ,
204
183
)
205
184
except Exception as error :
206
- return error_response (error , f"/chat_history/items/ { item_id } " )
185
+ return error_response (error , f"/chat_history/sessions/ { session_id } " )
207
186
208
187
209
- @chat_history_cosmosdb_bp .delete ("/chat_history/items/<item_id >" )
188
+ @chat_history_cosmosdb_bp .delete ("/chat_history/sessions/<session_id >" )
210
189
@authenticated
211
- async def delete_chat_history_session (auth_claims : Dict [str , Any ], item_id : str ):
190
+ async def delete_chat_history_session (auth_claims : Dict [str , Any ], session_id : str ):
212
191
if not current_app .config [CONFIG_CHAT_HISTORY_COSMOS_ENABLED ]:
213
192
return jsonify ({"error" : "Chat history not enabled" }), 400
214
193
@@ -221,12 +200,22 @@ async def delete_chat_history_session(auth_claims: Dict[str, Any], item_id: str)
221
200
return jsonify ({"error" : "User OID not found" }), 401
222
201
223
202
try :
224
- await container .delete_item (item = item_id , partition_key = entra_oid )
225
- # Delete session, and all the message items associated with it
226
- # TODO: Delete all the message items as well
227
- return jsonify ({}), 204
203
+ res = container .query_items (
204
+ query = "SELECT * FROM c WHERE c.session_id = @session_id" ,
205
+ parameters = [dict (name = "@entra_oid" , value = entra_oid ), dict (name = "@session_id" , value = session_id )],
206
+ partition_key = [entra_oid , session_id ],
207
+ )
208
+
209
+ ids_to_delete = []
210
+ async for page in res .by_page ():
211
+ async for item in page :
212
+ ids_to_delete .append (item ["id" ])
213
+
214
+ batch_operations = [("delete" , (id ,)) for id in ids_to_delete ]
215
+ await container .execute_item_batch (batch_operations = batch_operations , partition_key = [entra_oid , session_id ])
216
+ return make_response ("" , 204 )
228
217
except Exception as error :
229
- return error_response (error , f"/chat_history/items/ { item_id } " )
218
+ return error_response (error , f"/chat_history/sessions/ { session_id } " )
230
219
231
220
232
221
@chat_history_cosmosdb_bp .before_app_serving
0 commit comments